Refactor channel and mastership handling in P4Runtime

This (big) change aims at solving the issue observed with mastership flapping
and device connection/disconnection with P4Runtime.

Channel handling is now based on the underlying gRPC channel state. Before,
channel events (open/close/error) were generated as a consequence of P4Runtime
StreamChannel events, making device availability dependent on mastership. Now
Stream Channel events only affect mastership (MASTER/STANDBY or NONE when the
SteamChannel RPC is not active).

Mastership handling has been refactored to generate P4Runtime election IDs that
are compatible with the mastership preference decided by the MastershipService.

GeneralDeviceProvider has been re-implemented to support in-order
device event processing and to reduce implementation complexity. Stats polling
has been moved to a separate component, and netcfg handling updated to only
depend on BasicDeviceConfig, augmented with a pipeconf field, and re-using the
managementAddress field to set the gRPC server endpoints (e.g.
grpc://myswitch.local:50051). Before it was depending on 3 different config
classes, making hard to detect changes.

Finally, this change affects some core interfaces:
- Adds a method to DeviceProvider and DeviceHandshaker to check for device
availability, making the meaning of availability device-specific. This is needed
in cases where the device manager needs to change the availability state of a
device (as in change #20842)
- Support device providers not capable of reconciling mastership role responses
with requests (like P4Runtime).
- Clarify the meaning of "connection" in the DeviceConnect behavior.
- Allows driver-based providers to check devices for reachability and
availability without probing the device via the network.

Change-Id: I7ff30d29f5d02ad938e3171536e54ae2916629a2
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
index 353d44e..d06e3fc 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
@@ -17,21 +17,25 @@
 package org.onosproject.p4runtime.ctl.client;
 
 import io.grpc.ManagedChannel;
+import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
 import org.onosproject.grpc.ctl.AbstractGrpcClient;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.runtime.PiPacketOperation;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeClientKey;
-import org.onosproject.p4runtime.api.P4RuntimeEvent;
-import org.onosproject.p4runtime.ctl.controller.BaseEventSubject;
-import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
 import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
 import p4.v1.P4RuntimeGrpc;
 import p4.v1.P4RuntimeOuterClass;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
 
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +43,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
+import static p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest.ResponseType.COOKIE_ONLY;
 
 /**
  * Implementation of P4RuntimeClient.
@@ -59,7 +64,6 @@
     static final int LONG_TIMEOUT_SECONDS = 60;
 
     private final long p4DeviceId;
-    private final ManagedChannel channel;
     private final P4RuntimeControllerImpl controller;
     private final StreamClientImpl streamClient;
     private final PipelineConfigClientImpl pipelineConfigClient;
@@ -67,25 +71,27 @@
     /**
      * Instantiates a new client with the given arguments.
      *
-     * @param clientKey       client key
-     * @param channel         gRPC managed channel
-     * @param controller      P$Runtime controller instance
-     * @param pipeconfService pipeconf service instance
+     * @param clientKey             client key
+     * @param channel               gRPC managed channel
+     * @param controller            P$Runtime controller instance
+     * @param pipeconfService       pipeconf service instance
+     * @param masterElectionIdStore master election ID store
      */
     public P4RuntimeClientImpl(P4RuntimeClientKey clientKey,
                                ManagedChannel channel,
                                P4RuntimeControllerImpl controller,
-                               PiPipeconfService pipeconfService) {
-        super(clientKey);
+                               PiPipeconfService pipeconfService,
+                               MasterElectionIdStore masterElectionIdStore) {
+        super(clientKey, channel, true, controller);
         checkNotNull(channel);
         checkNotNull(controller);
         checkNotNull(pipeconfService);
+        checkNotNull(masterElectionIdStore);
 
         this.p4DeviceId = clientKey.p4DeviceId();
-        this.channel = channel;
         this.controller = controller;
         this.streamClient = new StreamClientImpl(
-                pipeconfService, this, controller);
+                pipeconfService, masterElectionIdStore, this, controller);
         this.pipelineConfigClient = new PipelineConfigClientImpl(this);
     }
 
@@ -108,13 +114,13 @@
     }
 
     @Override
-    public ReadRequest read(PiPipeconf pipeconf) {
-        return new ReadRequestImpl(this, pipeconf);
+    public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
+        return pipelineConfigClient.isAnyPipelineConfigSet();
     }
 
     @Override
-    public void openSession() {
-        streamClient.openSession();
+    public ReadRequest read(PiPipeconf pipeconf) {
+        return new ReadRequestImpl(this, pipeconf);
     }
 
     @Override
@@ -128,8 +134,8 @@
     }
 
     @Override
-    public void runForMastership() {
-        streamClient.runForMastership();
+    public void setMastership(boolean master, BigInteger newElectionId) {
+        streamClient.setMastership(master, newElectionId);
     }
 
     @Override
@@ -147,6 +153,44 @@
         return new WriteRequestImpl(this, pipeconf);
     }
 
+    @Override
+    public CompletableFuture<Boolean> probeService() {
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        final StreamObserver<GetForwardingPipelineConfigResponse> responseObserver =
+                new StreamObserver<GetForwardingPipelineConfigResponse>() {
+                    @Override
+                    public void onNext(GetForwardingPipelineConfigResponse value) {
+                        future.complete(true);
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        if (Status.fromThrowable(t).getCode() ==
+                                Status.Code.FAILED_PRECONDITION) {
+                            // Pipeline not set but service is available.
+                            future.complete(true);
+                        } else {
+                            log.debug("", t);
+                        }
+                        future.complete(false);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                        // Ignore, unary call.
+                    }
+                };
+        // Use long timeout as the device might return the full P4 blob
+        // (e.g. server does not support cookie), over a slow network.
+        execRpc(s -> s.getForwardingPipelineConfig(
+                GetForwardingPipelineConfigRequest.newBuilder()
+                        .setDeviceId(p4DeviceId)
+                        .setResponseType(COOKIE_ONLY)
+                        .build(), responseObserver),
+                SHORT_TIMEOUT_SECONDS);
+        return future;
+    }
+
     /**
      * Returns the P4Runtime-internal device ID associated with this client.
      *
@@ -184,7 +228,8 @@
      * @param stubConsumer P4Runtime stub consumer
      * @param timeout      timeout in seconds
      */
-    void execRpc(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer, int timeout) {
+    void execRpc(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer,
+                 int timeout) {
         if (log.isTraceEnabled()) {
             log.trace("Executing RPC with timeout {} seconds (context deadline {})...",
                       timeout, context().getDeadline());
@@ -235,21 +280,10 @@
     }
 
     private void checkGrpcException(StatusRuntimeException sre) {
-        switch (sre.getStatus().getCode()) {
-            case PERMISSION_DENIED:
-                // Notify upper layers that this node is not master.
-                controller.postEvent(new P4RuntimeEvent(
-                        P4RuntimeEvent.Type.PERMISSION_DENIED,
-                        new BaseEventSubject(deviceId)));
-                break;
-            case UNAVAILABLE:
-                // Channel might be closed.
-                controller.postEvent(new P4RuntimeEvent(
-                        P4RuntimeEvent.Type.CHANNEL_EVENT,
-                        new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
-                break;
-            default:
-                break;
+        if (sre.getStatus().getCode() == Status.Code.PERMISSION_DENIED) {
+            // Notify upper layers that this node is not master.
+            controller.postEvent(new DeviceAgentEvent(
+                    DeviceAgentEvent.Type.NOT_MASTER, deviceId));
         }
     }
 }