blob: a9adf552395219729671220925046cf49d6ace4a [file] [log] [blame]
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Carmelo Casconef7aa3f92017-07-06 23:56:50 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.p4runtime.ctl;
18
Carmelo Cascone8d99b172017-07-18 17:26:31 -040019import com.google.common.collect.ImmutableMap;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020020import com.google.common.collect.Maps;
Carmelo Cascone7f75be42017-09-07 14:37:02 +020021import com.google.common.collect.Sets;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040022import com.google.protobuf.ByteString;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040023import io.grpc.Context;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040024import io.grpc.ManagedChannel;
25import io.grpc.Status;
26import io.grpc.StatusRuntimeException;
27import io.grpc.stub.StreamObserver;
Andrea Campanella288b2732017-07-28 14:16:16 +020028import org.onlab.osgi.DefaultServiceDirectory;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040029import org.onosproject.net.DeviceId;
Andrea Campanella432f7182017-07-14 18:43:27 +020030import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020031import org.onosproject.net.pi.runtime.PiCounterCellData;
32import org.onosproject.net.pi.runtime.PiCounterCellId;
33import org.onosproject.net.pi.runtime.PiCounterId;
Carmelo Cascone7f75be42017-09-07 14:37:02 +020034import org.onosproject.net.pi.runtime.PiDirectCounterCellId;
35import org.onosproject.net.pi.runtime.PiIndirectCounterCellId;
Andrea Campanella432f7182017-07-14 18:43:27 +020036import org.onosproject.net.pi.runtime.PiPacketOperation;
Andrea Campanella288b2732017-07-28 14:16:16 +020037import org.onosproject.net.pi.runtime.PiPipeconfService;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040038import org.onosproject.net.pi.runtime.PiTableEntry;
39import org.onosproject.net.pi.runtime.PiTableId;
40import org.onosproject.p4runtime.api.P4RuntimeClient;
41import org.onosproject.p4runtime.api.P4RuntimeEvent;
42import org.slf4j.Logger;
43import p4.P4RuntimeGrpc;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040044import p4.P4RuntimeOuterClass.Entity;
45import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
46import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
47import p4.P4RuntimeOuterClass.PacketIn;
48import p4.P4RuntimeOuterClass.ReadRequest;
49import p4.P4RuntimeOuterClass.ReadResponse;
50import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
51import p4.P4RuntimeOuterClass.StreamMessageRequest;
52import p4.P4RuntimeOuterClass.StreamMessageResponse;
53import p4.P4RuntimeOuterClass.TableEntry;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040054import p4.P4RuntimeOuterClass.Update;
55import p4.P4RuntimeOuterClass.WriteRequest;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020056import p4.config.P4InfoOuterClass.P4Info;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040057import p4.tmp.P4Config;
58
59import java.io.IOException;
60import java.io.InputStream;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040061import java.util.Collection;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040062import java.util.Collections;
63import java.util.Iterator;
64import java.util.List;
65import java.util.Map;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +020066import java.util.Set;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040067import java.util.concurrent.CompletableFuture;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040068import java.util.concurrent.Executor;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040069import java.util.concurrent.ExecutorService;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040070import java.util.concurrent.Executors;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040071import java.util.concurrent.TimeUnit;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040072import java.util.concurrent.locks.Lock;
73import java.util.concurrent.locks.ReentrantLock;
74import java.util.function.Supplier;
75import java.util.stream.Collectors;
76import java.util.stream.StreamSupport;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040077
Carmelo Cascone8d99b172017-07-18 17:26:31 -040078import static org.onlab.util.Tools.groupedThreads;
79import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040080import static org.slf4j.LoggerFactory.getLogger;
Carmelo Cascone8d99b172017-07-18 17:26:31 -040081import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
Andrea Campanellafc1d34c2017-07-18 17:01:41 +020082import static p4.P4RuntimeOuterClass.PacketOut;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040083import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
84
85/**
86 * Implementation of a P4Runtime client.
87 */
Carmelo Cascone8d99b172017-07-18 17:26:31 -040088public final class P4RuntimeClientImpl implements P4RuntimeClient {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040089
Carmelo Cascone8d99b172017-07-18 17:26:31 -040090 private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
91 WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
92 WriteOperationType.INSERT, Update.Type.INSERT,
93 WriteOperationType.MODIFY, Update.Type.MODIFY,
94 WriteOperationType.DELETE, Update.Type.DELETE
95 );
96
Carmelo Casconef7aa3f92017-07-06 23:56:50 -040097 private final Logger log = getLogger(getClass());
98
99 private final DeviceId deviceId;
Carmelo Casconef423bec2017-08-30 01:56:25 +0200100 private final long p4DeviceId;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400101 private final P4RuntimeControllerImpl controller;
102 private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400103 private final Context.CancellableContext cancellableContext;
104 private final ExecutorService executorService;
105 private final Executor contextExecutor;
106 private final Lock writeLock = new ReentrantLock();
107 private final StreamObserver<StreamMessageRequest> streamRequestObserver;
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400108
109
Carmelo Casconef423bec2017-08-30 01:56:25 +0200110 P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
111 P4RuntimeControllerImpl controller) {
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400112 this.deviceId = deviceId;
113 this.p4DeviceId = p4DeviceId;
114 this.controller = controller;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400115 this.cancellableContext = Context.current().withCancellation();
Carmelo Casconea966c342017-07-30 01:56:30 -0400116 this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400117 "onos/p4runtime-client-" + deviceId.toString(),
118 deviceId.toString() + "-%d"));
119 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200120 //TODO Investigate deadline or timeout in supplyInContext Method
121 this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400122 P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
123 this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
124 }
125
126 /**
127 * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
128 * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
129 * <p>
130 * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
131 * <p>
132 */
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200133 private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400134 return CompletableFuture.supplyAsync(() -> {
135 // TODO: explore a more relaxed locking strategy.
136 writeLock.lock();
137 try {
138 return supplier.get();
Carmelo Casconea966c342017-07-30 01:56:30 -0400139 } catch (Throwable ex) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200140 if (ex instanceof StatusRuntimeException) {
141 log.warn("Unable to execute {} on {}: {}", opDescription, deviceId, ex.toString());
142 } else {
143 log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
144 }
Carmelo Casconea966c342017-07-30 01:56:30 -0400145 throw ex;
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400146 } finally {
147 writeLock.unlock();
148 }
149 }, contextExecutor);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400150 }
151
152 @Override
153 public CompletableFuture<Boolean> initStreamChannel() {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200154 return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400155 }
156
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400157 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400158 public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200159 return supplyInContext(() -> doSetPipelineConfig(pipeconf, targetConfigExtType), "setPipelineConfig");
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400160 }
161
162 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400163 public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
164 WriteOperationType opType, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200165 return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
166 "writeTableEntries-" + opType.name());
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400167 }
168
169 @Override
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400170 public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200171 return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400172 }
173
174 @Override
Andrea Campanella432f7182017-07-14 18:43:27 +0200175 public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200176 return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
Andrea Campanella432f7182017-07-14 18:43:27 +0200177 }
178
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200179 @Override
180 public CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
181 PiPipeconf pipeconf) {
182 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
183 "readCounterCells-" + cellIds.hashCode());
184 }
185
186 @Override
187 public CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
188 PiPipeconf pipeconf) {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200189
190 /*
191 From p4runtime.proto, the scope of a ReadRequest is defined as follows:
192 CounterEntry:
193 - All counter cells for all meters if counter_id = 0 (default).
194 - All counter cells for given counter_id if index = 0 (default).
195 DirectCounterEntry:
196 - All counter cells for all meters if counter_id = 0 (default).
197 - All counter cells for given counter_id if table_entry.match is empty.
198 */
199
200 Set<PiCounterCellId> cellIds = Sets.newHashSet();
201
202 for (PiCounterId counterId : counterIds) {
203 switch (counterId.type()) {
204 case INDIRECT:
205 cellIds.add(PiIndirectCounterCellId.of(counterId, 0));
206 break;
207 case DIRECT:
208 cellIds.add(PiDirectCounterCellId.of(counterId, PiTableEntry.EMTPY));
209 break;
210 default:
211 log.warn("Unrecognized PI counter ID '{}'", counterId.type());
212 }
213 }
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200214
215 return supplyInContext(() -> doReadCounterCells(cellIds, pipeconf),
216 "readAllCounterCells-" + cellIds.hashCode());
217 }
218
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400219 /* Blocking method implementations below */
220
221 private boolean doInitStreamChannel() {
222 // To listen for packets and other events, we need to start the RPC.
223 // Here we do it by sending a master arbitration update.
224 log.info("initializing stream chanel on {}...", deviceId);
225 if (!doArbitrationUpdate()) {
226 log.warn("Unable to initialize stream channel for {}", deviceId);
227 return false;
228 } else {
229 return true;
230 }
231 }
232
233 private boolean doArbitrationUpdate() {
234 log.info("Sending arbitration update to {}...", deviceId);
235 StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
236 .setArbitration(MasterArbitrationUpdate.newBuilder()
237 .setDeviceId(p4DeviceId)
238 .build())
239 .build();
240 try {
241 streamRequestObserver.onNext(requestMsg);
242 return true;
243 } catch (StatusRuntimeException e) {
244 log.warn("Arbitration update failed for {}: {}", deviceId, e);
245 return false;
246 }
247 }
248
249 private boolean doSetPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
250
251 log.info("Setting pipeline config for {} to {} using {}...", deviceId, pipeconf.id(), targetConfigExtType);
252
253 P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
254 if (p4Info == null) {
255 // Problem logged by PipeconfHelper.
256 return false;
257 }
258
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400259
Andrea Campanella0288c872017-08-07 18:32:51 +0200260 ForwardingPipelineConfig.Builder pipelineConfigBuilder = ForwardingPipelineConfig
261 .newBuilder()
262 .setDeviceId(p4DeviceId)
263 .setP4Info(p4Info);
264
265 //if the target config extension is null we don't want to add the config.
266 if (targetConfigExtType != null) {
267 if (!pipeconf.extension(targetConfigExtType).isPresent()) {
268 log.warn("Missing extension {} in pipeconf {}", targetConfigExtType, pipeconf.id());
269 return false;
270 }
271 InputStream targetConfig = pipeconf.extension(targetConfigExtType).get();
272 P4Config.P4DeviceConfig p4DeviceConfigMsg;
273 try {
274 p4DeviceConfigMsg = P4Config.P4DeviceConfig
275 .newBuilder()
276 .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
277 .setReassign(true)
278 .setDeviceData(ByteString.readFrom(targetConfig))
279 .build();
280
281 pipelineConfigBuilder.setP4DeviceConfig(p4DeviceConfigMsg.toByteString());
282
283 } catch (IOException ex) {
284 log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
285 return false;
286 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400287 }
288
289 SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
290 .newBuilder()
291 .setAction(VERIFY_AND_COMMIT)
Andrea Campanella0288c872017-08-07 18:32:51 +0200292 .addConfigs(pipelineConfigBuilder.build())
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400293 .build();
294
295 try {
296 this.blockingStub.setForwardingPipelineConfig(request);
297
298 } catch (StatusRuntimeException ex) {
299 log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
300 return false;
301 }
302
303 return true;
304 }
305
306 private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
307 PiPipeconf pipeconf) {
308
309 WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
310
311 Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
312 .stream()
313 .map(tableEntryMsg ->
314 Update.newBuilder()
315 .setEntity(Entity.newBuilder()
316 .setTableEntry(tableEntryMsg)
317 .build())
318 .setType(UPDATE_TYPES.get(opType))
319 .build())
320 .collect(Collectors.toList());
321
322 if (updateMsgs.size() == 0) {
323 return true;
324 }
325
326 writeRequestBuilder
327 .setDeviceId(p4DeviceId)
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200328 /* PI ignores this ElectionId, commenting out for now.
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400329 .setElectionId(Uint128.newBuilder()
330 .setHigh(0)
331 .setLow(ELECTION_ID)
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200332 .build()) */
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400333 .addAllUpdates(updateMsgs)
334 .build();
335
336 try {
337 blockingStub.write(writeRequestBuilder.build());
338 return true;
339 } catch (StatusRuntimeException e) {
340 log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
341 return false;
342 }
343 }
344
345 private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
346
Carmelo Cascone9f007702017-08-24 13:30:51 +0200347 log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400348
349 P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
350 int tableId;
351 try {
352 tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
353 } catch (P4InfoBrowser.NotFoundException e) {
354 log.warn("Unable to dump table: {}", e.getMessage());
355 return Collections.emptyList();
356 }
357
358 ReadRequest requestMsg = ReadRequest.newBuilder()
359 .setDeviceId(p4DeviceId)
360 .addEntities(Entity.newBuilder()
361 .setTableEntry(TableEntry.newBuilder()
362 .setTableId(tableId)
363 .build())
364 .build())
365 .build();
366
367 Iterator<ReadResponse> responses;
368 try {
369 responses = blockingStub.read(requestMsg);
370 } catch (StatusRuntimeException e) {
371 log.warn("Unable to dump table: {}", e.getMessage());
372 return Collections.emptyList();
373 }
374
375 Iterable<ReadResponse> responseIterable = () -> responses;
376 List<TableEntry> tableEntryMsgs = StreamSupport
377 .stream(responseIterable.spliterator(), false)
378 .map(ReadResponse::getEntitiesList)
379 .flatMap(List::stream)
380 .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
381 .map(Entity::getTableEntry)
382 .collect(Collectors.toList());
383
Carmelo Cascone9f007702017-08-24 13:30:51 +0200384 log.debug("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400385
386 return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
387 }
388
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200389 private boolean doPacketOut(PiPacketOperation packet, PiPipeconf pipeconf) {
390 try {
391 //encode the PiPacketOperation into a PacketOut
392 PacketOut packetOut = PacketIOCodec.encodePacketOut(packet, pipeconf);
393
394 //Build the request
395 StreamMessageRequest packetOutRequest = StreamMessageRequest
396 .newBuilder().setPacket(packetOut).build();
397
398 //Send the request
399 streamRequestObserver.onNext(packetOutRequest);
400
401 } catch (P4InfoBrowser.NotFoundException e) {
402 log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
403 log.debug("Exception", e);
404 return false;
405 }
406 return true;
407 }
408
Carmelo Casconea966c342017-07-30 01:56:30 -0400409 private void doPacketIn(PacketIn packetInMsg) {
410
411 // Retrieve the pipeconf for this client's device.
412 PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
413 if (pipeconfService == null) {
414 throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
415 }
416 final PiPipeconf pipeconf;
417 if (pipeconfService.ofDevice(deviceId).isPresent() &&
418 pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
419 pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
420 } else {
421 log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
422 return;
423 }
424 // Decode packet message and post event.
Carmelo Cascone2cad9ef2017-08-01 21:52:07 +0200425 PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf);
426 DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
427 P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
Carmelo Casconea966c342017-07-30 01:56:30 -0400428 log.debug("Received packet in: {}", event);
429 controller.postEvent(event);
430 }
431
432 private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
433
434 log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
435 }
436
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200437 private Collection<PiCounterCellData> doReadCounterCells(Collection<PiCounterCellId> cellIds, PiPipeconf pipeconf) {
438
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200439 // We use this map to remember the original PI counter IDs of the returned response.
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200440 final Map<Integer, PiCounterId> counterIdMap = Maps.newHashMap();
441
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200442 final ReadRequest request = ReadRequest.newBuilder()
443 .setDeviceId(p4DeviceId)
444 .addAllEntities(CounterEntryCodec.encodePiCounterCellIds(cellIds, counterIdMap, pipeconf))
445 .build();
446
447 if (request.getEntitiesList().size() == 0) {
448 return Collections.emptyList();
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200449 }
450
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200451 final Iterable<ReadResponse> responses;
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200452 try {
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200453 responses = () -> blockingStub.read(request);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200454 } catch (StatusRuntimeException e) {
455 log.warn("Unable to read counters: {}", e.getMessage());
456 return Collections.emptyList();
457 }
458
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200459 List<Entity> entities = StreamSupport.stream(responses.spliterator(), false)
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200460 .map(ReadResponse::getEntitiesList)
461 .flatMap(List::stream)
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200462 .collect(Collectors.toList());
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200463
Carmelo Cascone7f75be42017-09-07 14:37:02 +0200464 return CounterEntryCodec.decodeCounterEntities(entities, counterIdMap, pipeconf);
Carmelo Casconeb045ddc2017-09-01 01:26:35 +0200465 }
466
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400467 /**
468 * Returns the internal P4 device ID associated with this client.
469 *
470 * @return P4 device ID
471 */
Carmelo Casconef423bec2017-08-30 01:56:25 +0200472 public long p4DeviceId() {
Carmelo Casconeb2e3dba2017-07-27 12:07:09 -0400473 return p4DeviceId;
474 }
475
476 /**
477 * For testing purpose only. TODO: remove before release.
478 *
479 * @return blocking stub
480 */
481 public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
482 return this.blockingStub;
483 }
484
Andrea Campanellafc1d34c2017-07-18 17:01:41 +0200485
Andrea Campanella432f7182017-07-14 18:43:27 +0200486 @Override
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400487 public void shutdown() {
488
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400489 log.info("Shutting down client for {}...", deviceId);
490
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400491 writeLock.lock();
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400492 try {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400493 if (streamRequestObserver != null) {
494 streamRequestObserver.onCompleted();
495 cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
496 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400497
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400498 this.executorService.shutdown();
499 try {
500 executorService.awaitTermination(5, TimeUnit.SECONDS);
501 } catch (InterruptedException e) {
502 log.warn("Executor service didn't shutdown in time.");
503 }
504 } finally {
505 writeLock.unlock();
506 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400507 }
508
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400509 /**
510 * Handles messages received from the device on the stream channel.
511 */
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400512 private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
513
514 @Override
515 public void onNext(StreamMessageResponse message) {
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400516 executorService.submit(() -> doNext(message));
517 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400518
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400519 private void doNext(StreamMessageResponse message) {
Carmelo Casconea966c342017-07-30 01:56:30 -0400520 try {
Andrea Campanella0288c872017-08-07 18:32:51 +0200521 log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
Carmelo Casconea966c342017-07-30 01:56:30 -0400522 switch (message.getUpdateCase()) {
523 case PACKET:
524 // Packet-in
525 doPacketIn(message.getPacket());
Andrea Campanella288b2732017-07-28 14:16:16 +0200526 return;
Carmelo Casconea966c342017-07-30 01:56:30 -0400527 case ARBITRATION:
528 doArbitrationUpdateFromDevice(message.getArbitration());
529 return;
530 default:
531 log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
532 }
533 } catch (Throwable ex) {
534 log.error("Exception while processing stream channel message from {}", deviceId, ex);
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400535 }
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400536 }
537
538 @Override
539 public void onError(Throwable throwable) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400540 log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
541 // FIXME: we might want to recreate the channel.
542 // In general, we want to be robust against any transient error and, if the channel is open, make sure the
543 // stream channel is always on.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400544 }
545
546 @Override
547 public void onCompleted() {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400548 log.warn("Stream channel for {} has completed", deviceId);
549 // FIXME: same concern as before.
Carmelo Casconef7aa3f92017-07-06 23:56:50 -0400550 }
551 }
Carmelo Cascone8d99b172017-07-18 17:26:31 -0400552}