blob: 3762eb1eee5b802ab6ff5acd8de6668b64ced709 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
19import com.google.common.collect.ImmutableList;
20import com.google.protobuf.ByteString;
21import com.google.protobuf.ExtensionRegistry;
22import com.google.protobuf.TextFormat;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040023import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040024import io.grpc.ManagedChannel;
25import io.grpc.Status;
26import io.grpc.StatusRuntimeException;
27import io.grpc.stub.StreamObserver;
28import org.onlab.util.ImmutableByteSequence;
29import org.onosproject.net.DeviceId;
Andrea Campanella432f7182017-07-14 18:43:27 +020030import org.onosproject.net.pi.model.PiPipeconf;
31import org.onosproject.net.pi.runtime.PiPacketOperation;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040032import org.onosproject.net.pi.runtime.PiTableEntry;
33import org.onosproject.net.pi.runtime.PiTableId;
34import org.onosproject.p4runtime.api.P4RuntimeClient;
35import org.onosproject.p4runtime.api.P4RuntimeEvent;
36import org.slf4j.Logger;
37import p4.P4RuntimeGrpc;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040038import p4.tmp.P4Config;
39
40import java.io.IOException;
41import java.io.InputStream;
42import java.io.InputStreamReader;
43import java.util.Collection;
44import java.util.concurrent.CompletableFuture;
45import java.util.concurrent.ExecutorService;
46import java.util.concurrent.TimeUnit;
47
48import static org.onlab.util.ImmutableByteSequence.copyFrom;
49import static org.slf4j.LoggerFactory.getLogger;
Andrea Campanella432f7182017-07-14 18:43:27 +020050import static p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
51import static p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
52import static p4.P4RuntimeOuterClass.PacketIn;
53import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040054import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
Andrea Campanella432f7182017-07-14 18:43:27 +020055import static p4.P4RuntimeOuterClass.StreamMessageRequest;
56import static p4.P4RuntimeOuterClass.StreamMessageResponse;
57import static p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040058
59/**
60 * Implementation of a P4Runtime client.
61 */
62public class P4RuntimeClientImpl implements P4RuntimeClient {
63
64 private static final int DEADLINE_SECONDS = 15;
65
66 private final Logger log = getLogger(getClass());
67
68 private final DeviceId deviceId;
69 private final int p4DeviceId;
70 private final P4RuntimeControllerImpl controller;
71 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
72 private final P4RuntimeGrpc.P4RuntimeStub asyncStub;
73 private ExecutorService executorService;
74 private StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040075 private Context.CancellableContext streamContext;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040076
77
78 P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller,
79 ExecutorService executorService) {
80 this.deviceId = deviceId;
81 this.p4DeviceId = p4DeviceId;
82 this.controller = controller;
83 this.executorService = executorService;
84 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
85 .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
Carmelo Cascone59f57de2017-07-11 19:55:09 -040086 this.asyncStub = P4RuntimeGrpc.newStub(channel);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040087 }
88
89 @Override
90 public CompletableFuture<Boolean> initStreamChannel() {
91 return CompletableFuture.supplyAsync(this::doInitStreamChannel, executorService);
92 }
93
94 private boolean doInitStreamChannel() {
95 if (this.streamRequestObserver == null) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -040096
97 streamContext = Context.current().withCancellation();
98 streamContext.run(
99 () -> streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver()));
100
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400101 // To listen for packets and other events, we need to start the RPC.
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400102 // Here we do it by sending a master arbitration update.
103 if (!doArbitrationUpdate()) {
104 log.warn("Unable to initialize stream channel for {}", deviceId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400105 return false;
106 }
107 }
108 return true;
109 }
110
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400111 private boolean doArbitrationUpdate() {
112
113 if (streamRequestObserver == null) {
114 log.error("Null request stream observer for {}", deviceId);
115 return false;
116 }
117
118 try {
119 StreamMessageRequest initRequest = StreamMessageRequest
120 .newBuilder()
121 .setArbitration(MasterArbitrationUpdate
Andrea Campanella432f7182017-07-14 18:43:27 +0200122 .newBuilder()
123 .setDeviceId(p4DeviceId)
124 .build())
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400125 .build();
126 streamRequestObserver.onNext(initRequest);
127 return true;
128 } catch (StatusRuntimeException e) {
129 log.warn("Arbitration update failed for {}: {}", deviceId, e);
130 return false;
131 }
132 }
133
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400134 @Override
135 public CompletableFuture<Boolean> setPipelineConfig(InputStream p4info, InputStream targetConfig) {
136 return CompletableFuture.supplyAsync(() -> doSetPipelineConfig(p4info, targetConfig), executorService);
137 }
138
139 private boolean doSetPipelineConfig(InputStream p4info, InputStream targetConfig) {
140
141 log.debug("Setting pipeline config for {}", deviceId);
142
Andrea Campanella432f7182017-07-14 18:43:27 +0200143 P4Info.Builder p4iInfoBuilder = P4Info.newBuilder();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400144
145 try {
146 TextFormat.getParser().merge(new InputStreamReader(p4info),
Andrea Campanella432f7182017-07-14 18:43:27 +0200147 ExtensionRegistry.getEmptyRegistry(),
148 p4iInfoBuilder);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400149 } catch (IOException ex) {
150 log.warn("Unable to load p4info for {}: {}", deviceId, ex.getMessage());
151 return false;
152 }
153
154 P4Config.P4DeviceConfig deviceIdConfig;
155 try {
156 deviceIdConfig = P4Config.P4DeviceConfig
157 .newBuilder()
158 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
159 .setReassign(true)
160 .setDeviceData(ByteString.readFrom(targetConfig))
161 .build();
162 } catch (IOException ex) {
163 log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
164 return false;
165 }
166
167 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
168 .newBuilder()
169 .setAction(VERIFY_AND_COMMIT)
170 .addConfigs(ForwardingPipelineConfig
Andrea Campanella432f7182017-07-14 18:43:27 +0200171 .newBuilder()
172 .setDeviceId(p4DeviceId)
173 .setP4Info(p4iInfoBuilder.build())
174 .setP4DeviceConfig(deviceIdConfig.toByteString())
175 .build())
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400176 .build();
177 try {
178 this.blockingStub.setForwardingPipelineConfig(request);
179 } catch (StatusRuntimeException ex) {
180 log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
181 return false;
182 }
183
184 return true;
185 }
186
187 @Override
188 public boolean writeTableEntries(Collection<PiTableEntry> entries, WriteOperationType opType) {
189
190 throw new UnsupportedOperationException("writeTableEntries not implemented.");
191 }
192
193 @Override
194 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId tableId) {
195
196 throw new UnsupportedOperationException("dumpTable not implemented.");
197 }
198
199 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200200 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
201 CompletableFuture<Boolean> result = new CompletableFuture<>();
202// P4InfoBrowser browser = null; //PipeconfHelper.getP4InfoBrowser(pipeconf);
203// try {
204// ControllerPacketMetadata controllerPacketMetadata =
205// browser.controllerPacketMetadatas().getByName("packet_out");
206// PacketOut.Builder packetOutBuilder = PacketOut.newBuilder();
207// packetOutBuilder.addAllMetadata(packet.metadatas().stream().map(metadata -> {
208// //FIXME we are assuming that there is no more than one metadata per name.
209// int metadataId = controllerPacketMetadata.getMetadataList().stream().filter(metadataInfo -> {
210// return metadataInfo.getName().equals(metadata.id().name());
211// }).findFirst().get().getId();
212// return PacketMetadata.newBuilder()
213// .setMetadataId(metadataId)
214// .setValue(ByteString.copyFrom(metadata.value().asReadOnlyBuffer()))
215// .build();
216// }).filter(Objects::nonNull).collect(Collectors.toList()));
217// packetOutBuilder.setPayload(ByteString.copyFrom(packet.data().asReadOnlyBuffer()));
218// PacketOut packetOut = packetOutBuilder.build();
219// StreamMessageRequest packetOutRequest = StreamMessageRequest
220// .newBuilder().setPacket(packetOut).build();
221// streamRequestObserver.onNext(packetOutRequest);
222// result.complete(true);
223// } catch (P4InfoBrowser.NotFoundException e) {
224// log.error("Cant find metadata with name \"packet_out\" in p4Info file.");
225// result.complete(false);
226// }
227 return result;
228 }
229
230 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400231 public void shutdown() {
232
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400233 log.info("Shutting down client for {}...", deviceId);
234
235 if (streamRequestObserver != null) {
236 streamRequestObserver.onCompleted();
237 streamContext.cancel(null);
238 streamContext = null;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400239 }
240
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400241 this.executorService.shutdown();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400242 try {
243 executorService.awaitTermination(5, TimeUnit.SECONDS);
244 } catch (InterruptedException e) {
245 log.warn("Executor service didn't shutdown in time.");
246 }
247
248 // Prevent the execution of other tasks.
249 executorService = null;
250 }
251
252 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
253
254 @Override
255 public void onNext(StreamMessageResponse message) {
256
257 P4RuntimeEvent event;
258
259 if (message.getPacket().isInitialized()) {
260 // Packet-in
261 PacketIn packetIn = message.getPacket();
262 ImmutableByteSequence data = copyFrom(packetIn.getPayload().asReadOnlyByteBuffer());
263 ImmutableList.Builder<ImmutableByteSequence> metadataBuilder = ImmutableList.builder();
264 packetIn.getMetadataList().stream()
265 .map(m -> m.getValue().asReadOnlyByteBuffer())
266 .map(ImmutableByteSequence::copyFrom)
267 .forEach(metadataBuilder::add);
268 event = new DefaultPacketInEvent(deviceId, data, metadataBuilder.build());
269
270 } else if (message.getArbitration().isInitialized()) {
271 // Arbitration.
272 throw new UnsupportedOperationException("Arbitration not implemented.");
273
274 } else {
275 log.warn("Unrecognized stream message from {}: {}", deviceId, message);
276 return;
277 }
278
279 controller.postEvent(event);
280 }
281
282 @Override
283 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400284 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
285 // FIXME: we might want to recreate the channel.
286 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
287 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400288 }
289
290 @Override
291 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400292 log.warn("Stream channel for {} has completed", deviceId);
293 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400294 }
295 }
296
297}