ONOS-6561 BMv2 handshaker via P4Runtime
+ support fort device-specific default pipeconf
+ improvements to P4runtime and gRPC protocol stuff
Change-Id: I8986fce3959df564454ea3d31859860f61eabcae
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index e6603ea..35e9726 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -20,6 +20,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.TextFormat;
+import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -64,6 +65,7 @@
private final P4RuntimeGrpc.P4RuntimeStub asyncStub;
private ExecutorService executorService;
private StreamObserver<StreamMessageRequest> streamRequestObserver;
+ private Context.CancellableContext streamContext;
P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller,
@@ -74,8 +76,7 @@
this.executorService = executorService;
this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
.withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
- this.asyncStub = P4RuntimeGrpc.newStub(channel)
- .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
+ this.asyncStub = P4RuntimeGrpc.newStub(channel);
}
@Override
@@ -85,21 +86,44 @@
private boolean doInitStreamChannel() {
if (this.streamRequestObserver == null) {
- this.streamRequestObserver = this.asyncStub.streamChannel(new StreamChannelResponseObserver());
+
+ streamContext = Context.current().withCancellation();
+ streamContext.run(
+ () -> streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver()));
+
// To listen for packets and other events, we need to start the RPC.
- // Here we do it by sending an empty packet out.
- try {
- this.streamRequestObserver.onNext(StreamMessageRequest.newBuilder()
- .setPacket(PacketOut.getDefaultInstance())
- .build());
- } catch (StatusRuntimeException e) {
- log.warn("Unable to initialize stream channel for {}: {}", deviceId, e);
+ // Here we do it by sending a master arbitration update.
+ if (!doArbitrationUpdate()) {
+ log.warn("Unable to initialize stream channel for {}", deviceId);
return false;
}
}
return true;
}
+ private boolean doArbitrationUpdate() {
+
+ if (streamRequestObserver == null) {
+ log.error("Null request stream observer for {}", deviceId);
+ return false;
+ }
+
+ try {
+ StreamMessageRequest initRequest = StreamMessageRequest
+ .newBuilder()
+ .setArbitration(MasterArbitrationUpdate
+ .newBuilder()
+ .setDeviceId(p4DeviceId)
+ .build())
+ .build();
+ streamRequestObserver.onNext(initRequest);
+ return true;
+ } catch (StatusRuntimeException e) {
+ log.warn("Arbitration update failed for {}: {}", deviceId, e);
+ return false;
+ }
+ }
+
@Override
public CompletableFuture<Boolean> setPipelineConfig(InputStream p4info, InputStream targetConfig) {
return CompletableFuture.supplyAsync(() -> doSetPipelineConfig(p4info, targetConfig), executorService);
@@ -168,12 +192,15 @@
@Override
public void shutdown() {
- if (this.streamRequestObserver != null) {
- this.streamRequestObserver.onError(new StatusRuntimeException(Status.CANCELLED));
- this.streamRequestObserver.onCompleted();
+ log.info("Shutting down client for {}...", deviceId);
+
+ if (streamRequestObserver != null) {
+ streamRequestObserver.onCompleted();
+ streamContext.cancel(null);
+ streamContext = null;
}
- this.executorService.shutdownNow();
+ this.executorService.shutdown();
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
@@ -216,12 +243,16 @@
@Override
public void onError(Throwable throwable) {
- log.warn("Error on stream channel for {}: {}", deviceId, throwable);
+ log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
+ // FIXME: we might want to recreate the channel.
+ // In general, we want to be robust against any transient error and, if the channel is open, make sure the
+ // stream channel is always on.
}
@Override
public void onCompleted() {
- // TODO: declare the device as disconnected?
+ log.warn("Stream channel for {} has completed", deviceId);
+ // FIXME: same concern as before.
}
}