blob: 3ae9464ae94be1bb7f8395a30920cd62ad29d2d0 [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone8d99b172017-07-18 17:26:31 -040019import com.google.common.collect.ImmutableMap;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040020import com.google.protobuf.ByteString;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040021import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040022import io.grpc.ManagedChannel;
23import io.grpc.Status;
24import io.grpc.StatusRuntimeException;
25import io.grpc.stub.StreamObserver;
Andrea Campanella288b2732017-07-28 14:16:16 +020026import org.onlab.osgi.DefaultServiceDirectory;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040027import org.onosproject.net.DeviceId;
Andrea Campanella432f7182017-07-14 18:43:27 +020028import org.onosproject.net.pi.model.PiPipeconf;
29import org.onosproject.net.pi.runtime.PiPacketOperation;
Andrea Campanella288b2732017-07-28 14:16:16 +020030import org.onosproject.net.pi.runtime.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040031import org.onosproject.net.pi.runtime.PiTableEntry;
32import org.onosproject.net.pi.runtime.PiTableId;
33import org.onosproject.p4runtime.api.P4RuntimeClient;
34import org.onosproject.p4runtime.api.P4RuntimeEvent;
35import org.slf4j.Logger;
36import p4.P4RuntimeGrpc;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040037import p4.P4RuntimeOuterClass.Entity;
38import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
39import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
40import p4.P4RuntimeOuterClass.PacketIn;
41import p4.P4RuntimeOuterClass.ReadRequest;
42import p4.P4RuntimeOuterClass.ReadResponse;
43import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
44import p4.P4RuntimeOuterClass.StreamMessageRequest;
45import p4.P4RuntimeOuterClass.StreamMessageResponse;
46import p4.P4RuntimeOuterClass.TableEntry;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040047import p4.P4RuntimeOuterClass.Update;
48import p4.P4RuntimeOuterClass.WriteRequest;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020049import p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040050import p4.tmp.P4Config;
51
52import java.io.IOException;
53import java.io.InputStream;
Andrea Campanellabf1301d2017-08-07 18:33:52 +020054import java.nio.ByteBuffer;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040055import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040056import java.util.Collections;
57import java.util.Iterator;
58import java.util.List;
59import java.util.Map;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040060import java.util.concurrent.CompletableFuture;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040061import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040062import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040063import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040064import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040065import java.util.concurrent.locks.Lock;
66import java.util.concurrent.locks.ReentrantLock;
67import java.util.function.Supplier;
68import java.util.stream.Collectors;
69import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040070
Carmelo Cascone8d99b172017-07-18 17:26:31 -040071import static org.onlab.util.Tools.groupedThreads;
72import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040073import static org.slf4j.LoggerFactory.getLogger;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040074import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020075import static p4.P4RuntimeOuterClass.PacketOut;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040076import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
77
78/**
79 * Implementation of a P4Runtime client.
80 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -040081public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040082
83 private static final int DEADLINE_SECONDS = 15;
84
Carmelo Cascone8d99b172017-07-18 17:26:31 -040085 // FIXME: use static election ID, since mastership arbitration is not yet support on BMv2 or Tofino.
86 private static final int ELECTION_ID = 1;
87
88 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
89 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
90 WriteOperationType.INSERT, Update.Type.INSERT,
91 WriteOperationType.MODIFY, Update.Type.MODIFY,
92 WriteOperationType.DELETE, Update.Type.DELETE
93 );
94
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040095 private final Logger log = getLogger(getClass());
96
97 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +020098 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040099 private final P4RuntimeControllerImpl controller;
100 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400101 private final Context.CancellableContext cancellableContext;
102 private final ExecutorService executorService;
103 private final Executor contextExecutor;
104 private final Lock writeLock = new ReentrantLock();
105 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400106
107
Carmelo Casconef423bec2017-08-30 01:56:25 +0200108 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
109 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400110 this.deviceId = deviceId;
111 this.p4DeviceId = p4DeviceId;
112 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400113 this.cancellableContext = Context.current().withCancellation();
Carmelo Casconea966c342017-07-30 01:56:30 -0400114 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400115 "onos/p4runtime-client-" + deviceId.toString(),
116 deviceId.toString() + "-%d"));
117 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200118 //TODO Investigate deadline or timeout in supplyInContext Method
119 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400120 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
121 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
122 }
123
124 /**
125 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
126 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
127 * <p>
128 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
129 * <p>
130 */
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200131 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400132 return CompletableFuture.supplyAsync(() -> {
133 // TODO: explore a more relaxed locking strategy.
134 writeLock.lock();
135 try {
136 return supplier.get();
Carmelo Casconea966c342017-07-30 01:56:30 -0400137 } catch (Throwable ex) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200138 log.error("Exception in P4Runtime client of {}, executing {}", deviceId, opDescription, ex);
Carmelo Casconea966c342017-07-30 01:56:30 -0400139 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400140 } finally {
141 writeLock.unlock();
142 }
143 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400144 }
145
146 @Override
147 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200148 return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400149 }
150
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400151 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400152 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200153 return supplyInContext(() -> doSetPipelineConfig(pipeconf, targetConfigExtType), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400154 }
155
156 @Override
Andrea Campanellabf1301d2017-08-07 18:33:52 +0200157 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer byteBuffer) {
158 return supplyInContext(() -> doSetPipelineConfig(pipeconf, byteBuffer), "setPipelineConfigByteString");
159 }
160
161 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400162 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
163 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200164 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
Andrea Campanellabf1301d2017-08-07 18:33:52 +0200165 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400166 }
167
168 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400169 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200170 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400171 }
172
173 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200174 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200175 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200176 }
177
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400178 /* Blocking method implementations below */
179
180 private boolean doInitStreamChannel() {
181 // To listen for packets and other events, we need to start the RPC.
182 // Here we do it by sending a master arbitration update.
183 log.info("initializing stream chanel on {}...", deviceId);
184 if (!doArbitrationUpdate()) {
185 log.warn("Unable to initialize stream channel for {}", deviceId);
186 return false;
187 } else {
188 return true;
189 }
190 }
191
192 private boolean doArbitrationUpdate() {
193 log.info("Sending arbitration update to {}...", deviceId);
194 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
195 .setArbitration(MasterArbitrationUpdate.newBuilder()
Andrea Campanellabf1301d2017-08-07 18:33:52 +0200196 .setDeviceId(p4DeviceId)
197 .build())
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400198 .build();
199 try {
200 streamRequestObserver.onNext(requestMsg);
201 return true;
202 } catch (StatusRuntimeException e) {
203 log.warn("Arbitration update failed for {}: {}", deviceId, e);
204 return false;
205 }
206 }
207
208 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
209
210 log.info("Setting pipeline config for {} to {} using {}...", deviceId, pipeconf.id(), targetConfigExtType);
211
212 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
213 if (p4Info == null) {
214 // Problem logged by PipeconfHelper.
215 return false;
216 }
217
Andrea Campanellabf1301d2017-08-07 18:33:52 +0200218 if (!pipeconf.extension(targetConfigExtType).isPresent()) {
219 log.warn("Missing extension {} in pipeconf {}", targetConfigExtType, pipeconf.id());
220 return false;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400221 }
222
Andrea Campanellabf1301d2017-08-07 18:33:52 +0200223 InputStream targetConfig = pipeconf.extension(targetConfigExtType).get();
224 try {
225 return sendPipelineConfig(p4Info, ByteString.readFrom(targetConfig));
226 } catch (IOException ex) {
227 log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
228 return false;
229 }
230 }
231
232 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer byteBuffer) {
233
234 log.info("Setting pipeline config for {} to {} using bytebuffer...", deviceId, pipeconf.id());
235
236 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
237 return p4Info != null && byteBuffer != null
238 && sendPipelineConfig(p4Info, ByteString.copyFrom(byteBuffer));
239 }
240
241 private boolean sendPipelineConfig(P4Info p4Info, ByteString deviceData) {
242
243 //p4 device config
244 P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
245 .newBuilder()
246 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
247 .setReassign(true)
248 .setDeviceData(deviceData)
249 .build();
250 //pipeline config message
251 ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
252 .newBuilder()
253 .setDeviceId(p4DeviceId)
254 .setP4Info(p4Info)
255 .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
256 .build();
257
258 //set forwarding pipeline config request
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400259 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
260 .newBuilder()
261 .setAction(VERIFY_AND_COMMIT)
Andrea Campanellabf1301d2017-08-07 18:33:52 +0200262 .addConfigs(pipelineConfig)
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400263 .build();
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400264 try {
265 this.blockingStub.setForwardingPipelineConfig(request);
266
267 } catch (StatusRuntimeException ex) {
268 log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
269 return false;
270 }
271
272 return true;
273 }
274
Andrea Campanellabf1301d2017-08-07 18:33:52 +0200275
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400276 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
277 PiPipeconf pipeconf) {
278
279 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
280
281 Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
282 .stream()
283 .map(tableEntryMsg ->
Andrea Campanellabf1301d2017-08-07 18:33:52 +0200284 Update.newBuilder()
285 .setEntity(Entity.newBuilder()
286 .setTableEntry(tableEntryMsg)
287 .build())
288 .setType(UPDATE_TYPES.get(opType))
289 .build())
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400290 .collect(Collectors.toList());
291
292 if (updateMsgs.size() == 0) {
293 return true;
294 }
295
296 writeRequestBuilder
297 .setDeviceId(p4DeviceId)
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200298 /* PI ignores this ElectionId, commenting out for now.
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400299 .setElectionId(Uint128.newBuilder()
300 .setHigh(0)
301 .setLow(ELECTION_ID)
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200302 .build()) */
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400303 .addAllUpdates(updateMsgs)
304 .build();
305
306 try {
307 blockingStub.write(writeRequestBuilder.build());
308 return true;
309 } catch (StatusRuntimeException e) {
310 log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
311 return false;
312 }
313 }
314
315 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
316
Carmelo Cascone9f007702017-08-24 13:30:51 +0200317 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400318
319 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
320 int tableId;
321 try {
322 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
323 } catch (P4InfoBrowser.NotFoundException e) {
324 log.warn("Unable to dump table: {}", e.getMessage());
325 return Collections.emptyList();
326 }
327
328 ReadRequest requestMsg = ReadRequest.newBuilder()
329 .setDeviceId(p4DeviceId)
330 .addEntities(Entity.newBuilder()
Andrea Campanellabf1301d2017-08-07 18:33:52 +0200331 .setTableEntry(TableEntry.newBuilder()
332 .setTableId(tableId)
333 .build())
334 .build())
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400335 .build();
336
337 Iterator<ReadResponse> responses;
338 try {
339 responses = blockingStub.read(requestMsg);
340 } catch (StatusRuntimeException e) {
341 log.warn("Unable to dump table: {}", e.getMessage());
342 return Collections.emptyList();
343 }
344
345 Iterable<ReadResponse> responseIterable = () -> responses;
346 List<TableEntry> tableEntryMsgs = StreamSupport
347 .stream(responseIterable.spliterator(), false)
348 .map(ReadResponse::getEntitiesList)
349 .flatMap(List::stream)
350 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
351 .map(Entity::getTableEntry)
352 .collect(Collectors.toList());
353
Carmelo Cascone9f007702017-08-24 13:30:51 +0200354 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400355
356 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
357 }
358
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200359 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
360 try {
361 //encode the PiPacketOperation into a PacketOut
362 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
363
364 //Build the request
365 StreamMessageRequest packetOutRequest = StreamMessageRequest
366 .newBuilder().setPacket(packetOut).build();
367
368 //Send the request
369 streamRequestObserver.onNext(packetOutRequest);
370
371 } catch (P4InfoBrowser.NotFoundException e) {
372 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
373 log.debug("Exception", e);
374 return false;
375 }
376 return true;
377 }
378
Carmelo Casconea966c342017-07-30 01:56:30 -0400379 private void doPacketIn(PacketIn packetInMsg) {
380
381 // Retrieve the pipeconf for this client's device.
382 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
383 if (pipeconfService == null) {
384 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
385 }
386 final PiPipeconf pipeconf;
387 if (pipeconfService.ofDevice(deviceId).isPresent() &&
388 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
389 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
390 } else {
391 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
392 return;
393 }
394 // Decode packet message and post event.
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200395 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf);
396 DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
397 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400398 log.debug("Received packet in: {}", event);
399 controller.postEvent(event);
400 }
401
402 private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
403
404 log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
405 }
406
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400407 /**
408 * Returns the internal P4 device ID associated with this client.
409 *
410 * @return P4 device ID
411 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200412 public long p4DeviceId() {
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400413 return p4DeviceId;
414 }
415
416 /**
417 * For testing purpose only. TODO: remove before release.
418 *
419 * @return blocking stub
420 */
421 public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
422 return this.blockingStub;
423 }
424
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200425
Andrea Campanella432f7182017-07-14 18:43:27 +0200426 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400427 public void shutdown() {
428
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400429 log.info("Shutting down client for {}...", deviceId);
430
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400431 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400432 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400433 if (streamRequestObserver != null) {
434 streamRequestObserver.onCompleted();
435 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
436 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400437
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400438 this.executorService.shutdown();
439 try {
440 executorService.awaitTermination(5, TimeUnit.SECONDS);
441 } catch (InterruptedException e) {
442 log.warn("Executor service didn't shutdown in time.");
443 }
444 } finally {
445 writeLock.unlock();
446 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400447 }
448
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400449 /**
450 * Handles messages received from the device on the stream channel.
451 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400452 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
453
454 @Override
455 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400456 executorService.submit(() -> doNext(message));
457 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400458
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400459 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -0400460 try {
Andrea Campanella0288c872017-08-07 18:32:51 +0200461 log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -0400462 switch (message.getUpdateCase()) {
463 case PACKET:
464 // Packet-in
465 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +0200466 return;
Carmelo Casconea966c342017-07-30 01:56:30 -0400467 case ARBITRATION:
468 doArbitrationUpdateFromDevice(message.getArbitration());
469 return;
470 default:
471 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
472 }
473 } catch (Throwable ex) {
474 log.error("Exception while processing stream channel message from {}", deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400475 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400476 }
477
478 @Override
479 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400480 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
481 // FIXME: we might want to recreate the channel.
482 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
483 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400484 }
485
486 @Override
487 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400488 log.warn("Stream channel for {} has completed", deviceId);
489 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400490 }
491 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400492}