Handling of table entry messages in P4Runtime
+ synchonized method execution in P4RuntimeClient
+ support for cancellable contexts (for client shutdown)
+ logging of sent/received messages in GrpcControllerImpl
+ minor refactorings
Change-Id: I43f0fcc263579e01957a02ef3392105aed476f33
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 3762eb1..132689a 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -17,9 +17,8 @@
package org.onosproject.p4runtime.ctl;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.TextFormat;
import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.Status;
@@ -35,165 +34,130 @@
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.slf4j.Logger;
import p4.P4RuntimeGrpc;
+import p4.P4RuntimeOuterClass.Entity;
+import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
+import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
+import p4.P4RuntimeOuterClass.PacketIn;
+import p4.P4RuntimeOuterClass.ReadRequest;
+import p4.P4RuntimeOuterClass.ReadResponse;
+import p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
+import p4.P4RuntimeOuterClass.StreamMessageRequest;
+import p4.P4RuntimeOuterClass.StreamMessageResponse;
+import p4.P4RuntimeOuterClass.TableEntry;
+import p4.P4RuntimeOuterClass.Uint128;
+import p4.P4RuntimeOuterClass.Update;
+import p4.P4RuntimeOuterClass.WriteRequest;
import p4.tmp.P4Config;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import static org.onlab.util.ImmutableByteSequence.copyFrom;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType;
import static org.slf4j.LoggerFactory.getLogger;
-import static p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
-import static p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
-import static p4.P4RuntimeOuterClass.PacketIn;
-import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
+import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
-import static p4.P4RuntimeOuterClass.StreamMessageRequest;
-import static p4.P4RuntimeOuterClass.StreamMessageResponse;
import static p4.config.P4InfoOuterClass.P4Info;
/**
* Implementation of a P4Runtime client.
*/
-public class P4RuntimeClientImpl implements P4RuntimeClient {
+public final class P4RuntimeClientImpl implements P4RuntimeClient {
private static final int DEADLINE_SECONDS = 15;
+ // FIXME: use static election ID, since mastership arbitration is not yet support on BMv2 or Tofino.
+ private static final int ELECTION_ID = 1;
+
+ private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
+ WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
+ WriteOperationType.INSERT, Update.Type.INSERT,
+ WriteOperationType.MODIFY, Update.Type.MODIFY,
+ WriteOperationType.DELETE, Update.Type.DELETE
+ );
+
private final Logger log = getLogger(getClass());
private final DeviceId deviceId;
private final int p4DeviceId;
private final P4RuntimeControllerImpl controller;
private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
- private final P4RuntimeGrpc.P4RuntimeStub asyncStub;
- private ExecutorService executorService;
- private StreamObserver<StreamMessageRequest> streamRequestObserver;
- private Context.CancellableContext streamContext;
+ private final Context.CancellableContext cancellableContext;
+ private final ExecutorService executorService;
+ private final Executor contextExecutor;
+ private final Lock writeLock = new ReentrantLock();
+ private final StreamObserver<StreamMessageRequest> streamRequestObserver;
- P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller,
- ExecutorService executorService) {
+ P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller) {
this.deviceId = deviceId;
this.p4DeviceId = p4DeviceId;
this.controller = controller;
- this.executorService = executorService;
+ this.cancellableContext = Context.current().withCancellation();
+ this.executorService = Executors.newFixedThreadPool(5, groupedThreads(
+ "onos/p4runtime-client-" + deviceId.toString(),
+ deviceId.toString() + "-%d"));
+ this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
.withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
- this.asyncStub = P4RuntimeGrpc.newStub(channel);
+ P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
+ this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
+ }
+
+ /**
+ * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
+ * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
+ * <p>
+ * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
+ * <p>
+ */
+ private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier) {
+ return CompletableFuture.supplyAsync(() -> {
+ // TODO: explore a more relaxed locking strategy.
+ writeLock.lock();
+ try {
+ return supplier.get();
+ } finally {
+ writeLock.unlock();
+ }
+ }, contextExecutor);
}
@Override
public CompletableFuture<Boolean> initStreamChannel() {
- return CompletableFuture.supplyAsync(this::doInitStreamChannel, executorService);
- }
-
- private boolean doInitStreamChannel() {
- if (this.streamRequestObserver == null) {
-
- streamContext = Context.current().withCancellation();
- streamContext.run(
- () -> streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver()));
-
- // To listen for packets and other events, we need to start the RPC.
- // Here we do it by sending a master arbitration update.
- if (!doArbitrationUpdate()) {
- log.warn("Unable to initialize stream channel for {}", deviceId);
- return false;
- }
- }
- return true;
- }
-
- private boolean doArbitrationUpdate() {
-
- if (streamRequestObserver == null) {
- log.error("Null request stream observer for {}", deviceId);
- return false;
- }
-
- try {
- StreamMessageRequest initRequest = StreamMessageRequest
- .newBuilder()
- .setArbitration(MasterArbitrationUpdate
- .newBuilder()
- .setDeviceId(p4DeviceId)
- .build())
- .build();
- streamRequestObserver.onNext(initRequest);
- return true;
- } catch (StatusRuntimeException e) {
- log.warn("Arbitration update failed for {}: {}", deviceId, e);
- return false;
- }
+ return supplyInContext(this::doInitStreamChannel);
}
@Override
- public CompletableFuture<Boolean> setPipelineConfig(InputStream p4info, InputStream targetConfig) {
- return CompletableFuture.supplyAsync(() -> doSetPipelineConfig(p4info, targetConfig), executorService);
- }
-
- private boolean doSetPipelineConfig(InputStream p4info, InputStream targetConfig) {
-
- log.debug("Setting pipeline config for {}", deviceId);
-
- P4Info.Builder p4iInfoBuilder = P4Info.newBuilder();
-
- try {
- TextFormat.getParser().merge(new InputStreamReader(p4info),
- ExtensionRegistry.getEmptyRegistry(),
- p4iInfoBuilder);
- } catch (IOException ex) {
- log.warn("Unable to load p4info for {}: {}", deviceId, ex.getMessage());
- return false;
- }
-
- P4Config.P4DeviceConfig deviceIdConfig;
- try {
- deviceIdConfig = P4Config.P4DeviceConfig
- .newBuilder()
- .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
- .setReassign(true)
- .setDeviceData(ByteString.readFrom(targetConfig))
- .build();
- } catch (IOException ex) {
- log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
- return false;
- }
-
- SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
- .newBuilder()
- .setAction(VERIFY_AND_COMMIT)
- .addConfigs(ForwardingPipelineConfig
- .newBuilder()
- .setDeviceId(p4DeviceId)
- .setP4Info(p4iInfoBuilder.build())
- .setP4DeviceConfig(deviceIdConfig.toByteString())
- .build())
- .build();
- try {
- this.blockingStub.setForwardingPipelineConfig(request);
- } catch (StatusRuntimeException ex) {
- log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
- return false;
- }
-
- return true;
+ public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
+ return supplyInContext(() -> doSetPipelineConfig(pipeconf, targetConfigExtType));
}
@Override
- public boolean writeTableEntries(Collection<PiTableEntry> entries, WriteOperationType opType) {
-
- throw new UnsupportedOperationException("writeTableEntries not implemented.");
+ public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
+ WriteOperationType opType, PiPipeconf pipeconf) {
+ return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf));
}
@Override
- public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId tableId) {
-
- throw new UnsupportedOperationException("dumpTable not implemented.");
+ public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
+ return supplyInContext(() -> doDumpTable(piTableId, pipeconf));
}
@Override
@@ -227,56 +191,224 @@
return result;
}
+ /* Blocking method implementations below */
+
+ private boolean doInitStreamChannel() {
+ // To listen for packets and other events, we need to start the RPC.
+ // Here we do it by sending a master arbitration update.
+ log.info("initializing stream chanel on {}...", deviceId);
+ if (!doArbitrationUpdate()) {
+ log.warn("Unable to initialize stream channel for {}", deviceId);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ private boolean doArbitrationUpdate() {
+ log.info("Sending arbitration update to {}...", deviceId);
+ StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
+ .setArbitration(MasterArbitrationUpdate.newBuilder()
+ .setDeviceId(p4DeviceId)
+ .build())
+ .build();
+ try {
+ streamRequestObserver.onNext(requestMsg);
+ return true;
+ } catch (StatusRuntimeException e) {
+ log.warn("Arbitration update failed for {}: {}", deviceId, e);
+ return false;
+ }
+ }
+
+ private boolean doSetPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
+
+ log.info("Setting pipeline config for {} to {} using {}...", deviceId, pipeconf.id(), targetConfigExtType);
+
+ P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
+ if (p4Info == null) {
+ // Problem logged by PipeconfHelper.
+ return false;
+ }
+
+ if (!pipeconf.extension(targetConfigExtType).isPresent()) {
+ log.warn("Missing extension {} in pipeconf {}", targetConfigExtType, pipeconf.id());
+ return false;
+ }
+
+ InputStream targetConfig = pipeconf.extension(targetConfigExtType).get();
+ P4Config.P4DeviceConfig p4DeviceConfigMsg;
+ try {
+ p4DeviceConfigMsg = P4Config.P4DeviceConfig
+ .newBuilder()
+ .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
+ .setReassign(true)
+ .setDeviceData(ByteString.readFrom(targetConfig))
+ .build();
+ } catch (IOException ex) {
+ log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
+ return false;
+ }
+
+ SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
+ .newBuilder()
+ .setAction(VERIFY_AND_COMMIT)
+ .addConfigs(ForwardingPipelineConfig
+ .newBuilder()
+ .setDeviceId(p4DeviceId)
+ .setP4Info(p4Info)
+ .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
+ .build())
+ .build();
+
+ try {
+ this.blockingStub.setForwardingPipelineConfig(request);
+
+ } catch (StatusRuntimeException ex) {
+ log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean doWriteTableEntries(Collection<PiTableEntry> piTableEntries, WriteOperationType opType,
+ PiPipeconf pipeconf) {
+
+ WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
+
+ Collection<Update> updateMsgs = TableEntryEncoder.encode(piTableEntries, pipeconf)
+ .stream()
+ .map(tableEntryMsg ->
+ Update.newBuilder()
+ .setEntity(Entity.newBuilder()
+ .setTableEntry(tableEntryMsg)
+ .build())
+ .setType(UPDATE_TYPES.get(opType))
+ .build())
+ .collect(Collectors.toList());
+
+ if (updateMsgs.size() == 0) {
+ return true;
+ }
+
+ writeRequestBuilder
+ .setDeviceId(p4DeviceId)
+ .setElectionId(Uint128.newBuilder()
+ .setHigh(0)
+ .setLow(ELECTION_ID)
+ .build())
+ .addAllUpdates(updateMsgs)
+ .build();
+
+ try {
+ blockingStub.write(writeRequestBuilder.build());
+ return true;
+ } catch (StatusRuntimeException e) {
+ log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
+ return false;
+ }
+ }
+
+ private Collection<PiTableEntry> doDumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
+
+ log.info("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
+
+ P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
+ int tableId;
+ try {
+ tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
+ } catch (P4InfoBrowser.NotFoundException e) {
+ log.warn("Unable to dump table: {}", e.getMessage());
+ return Collections.emptyList();
+ }
+
+ ReadRequest requestMsg = ReadRequest.newBuilder()
+ .setDeviceId(p4DeviceId)
+ .addEntities(Entity.newBuilder()
+ .setTableEntry(TableEntry.newBuilder()
+ .setTableId(tableId)
+ .build())
+ .build())
+ .build();
+
+ Iterator<ReadResponse> responses;
+ try {
+ responses = blockingStub.read(requestMsg);
+ } catch (StatusRuntimeException e) {
+ log.warn("Unable to dump table: {}", e.getMessage());
+ return Collections.emptyList();
+ }
+
+ Iterable<ReadResponse> responseIterable = () -> responses;
+ List<TableEntry> tableEntryMsgs = StreamSupport
+ .stream(responseIterable.spliterator(), false)
+ .map(ReadResponse::getEntitiesList)
+ .flatMap(List::stream)
+ .filter(entity -> entity.getEntityCase() == TABLE_ENTRY)
+ .map(Entity::getTableEntry)
+ .collect(Collectors.toList());
+
+ log.info("Retrieved {} entries from table {} on {}...", tableEntryMsgs.size(), piTableId, deviceId);
+
+ return TableEntryEncoder.decode(tableEntryMsgs, pipeconf);
+ }
+
@Override
public void shutdown() {
log.info("Shutting down client for {}...", deviceId);
- if (streamRequestObserver != null) {
- streamRequestObserver.onCompleted();
- streamContext.cancel(null);
- streamContext = null;
- }
-
- this.executorService.shutdown();
+ writeLock.lock();
try {
- executorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Executor service didn't shutdown in time.");
- }
+ if (streamRequestObserver != null) {
+ streamRequestObserver.onCompleted();
+ cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
+ }
- // Prevent the execution of other tasks.
- executorService = null;
+ this.executorService.shutdown();
+ try {
+ executorService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Executor service didn't shutdown in time.");
+ }
+ } finally {
+ writeLock.unlock();
+ }
}
+ /**
+ * Handles messages received from the device on the stream channel.
+ */
private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
@Override
public void onNext(StreamMessageResponse message) {
+ executorService.submit(() -> doNext(message));
+ }
- P4RuntimeEvent event;
+ private void doNext(StreamMessageResponse message) {
+ log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
+ switch (message.getUpdateCase()) {
+ case PACKET:
+ // Packet-in
+ PacketIn packetIn = message.getPacket();
+ ImmutableByteSequence data = copyFrom(packetIn.getPayload().asReadOnlyByteBuffer());
+ ImmutableList.Builder<ImmutableByteSequence> metadataBuilder = ImmutableList.builder();
+ packetIn.getMetadataList().stream()
+ .map(m -> m.getValue().asReadOnlyByteBuffer())
+ .map(ImmutableByteSequence::copyFrom)
+ .forEach(metadataBuilder::add);
+ P4RuntimeEvent event = new DefaultPacketInEvent(deviceId, data, metadataBuilder.build());
+ controller.postEvent(event);
+ return;
- if (message.getPacket().isInitialized()) {
- // Packet-in
- PacketIn packetIn = message.getPacket();
- ImmutableByteSequence data = copyFrom(packetIn.getPayload().asReadOnlyByteBuffer());
- ImmutableList.Builder<ImmutableByteSequence> metadataBuilder = ImmutableList.builder();
- packetIn.getMetadataList().stream()
- .map(m -> m.getValue().asReadOnlyByteBuffer())
- .map(ImmutableByteSequence::copyFrom)
- .forEach(metadataBuilder::add);
- event = new DefaultPacketInEvent(deviceId, data, metadataBuilder.build());
+ case ARBITRATION:
+ throw new UnsupportedOperationException("Arbitration not implemented.");
- } else if (message.getArbitration().isInitialized()) {
- // Arbitration.
- throw new UnsupportedOperationException("Arbitration not implemented.");
-
- } else {
- log.warn("Unrecognized stream message from {}: {}", deviceId, message);
- return;
+ default:
+ log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
}
-
- controller.postEvent(event);
}
@Override
@@ -293,5 +425,4 @@
// FIXME: same concern as before.
}
}
-
-}
+}
\ No newline at end of file