Refactor channel and mastership handling in P4Runtime

This (big) change aims at solving the issue observed with mastership flapping
and device connection/disconnection with P4Runtime.

Channel handling is now based on the underlying gRPC channel state. Before,
channel events (open/close/error) were generated as a consequence of P4Runtime
StreamChannel events, making device availability dependent on mastership. Now
Stream Channel events only affect mastership (MASTER/STANDBY or NONE when the
SteamChannel RPC is not active).

Mastership handling has been refactored to generate P4Runtime election IDs that
are compatible with the mastership preference decided by the MastershipService.

GeneralDeviceProvider has been re-implemented to support in-order
device event processing and to reduce implementation complexity. Stats polling
has been moved to a separate component, and netcfg handling updated to only
depend on BasicDeviceConfig, augmented with a pipeconf field, and re-using the
managementAddress field to set the gRPC server endpoints (e.g.
grpc://myswitch.local:50051). Before it was depending on 3 different config
classes, making hard to detect changes.

Finally, this change affects some core interfaces:
- Adds a method to DeviceProvider and DeviceHandshaker to check for device
availability, making the meaning of availability device-specific. This is needed
in cases where the device manager needs to change the availability state of a
device (as in change #20842)
- Support device providers not capable of reconciling mastership role responses
with requests (like P4Runtime).
- Clarify the meaning of "connection" in the DeviceConnect behavior.
- Allows driver-based providers to check devices for reachability and
availability without probing the device via the network.

Change-Id: I7ff30d29f5d02ad938e3171536e54ae2916629a2
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
index 0b21ca2..117c6e3 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -16,11 +16,14 @@
 
 package org.onosproject.grpc.ctl;
 
+import io.grpc.ConnectivityState;
 import io.grpc.Context;
+import io.grpc.ManagedChannel;
 import io.grpc.StatusRuntimeException;
 import org.onosproject.grpc.api.GrpcClient;
 import org.onosproject.grpc.api.GrpcClientKey;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
 import org.slf4j.Logger;
 
 import java.util.concurrent.CompletableFuture;
@@ -28,6 +31,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
@@ -54,13 +58,51 @@
 
     protected final ExecutorService executorService;
     protected final DeviceId deviceId;
+    protected final ManagedChannel channel;
+    private final boolean persistent;
+    private final AbstractGrpcClientController controller;
+    private final AtomicBoolean channelOpen = new AtomicBoolean(false);
 
-    protected AbstractGrpcClient(GrpcClientKey clientKey) {
+    /**
+     * Creates an new client for the given key and channel. Setting persistent
+     * to true avoids the gRPC channel to stay IDLE. The controller instance is
+     * needed to propagate channel events.
+     *
+     * @param clientKey  client key
+     * @param channel    channel
+     * @param persistent true if the gRPC should never stay IDLE
+     * @param controller controller
+     */
+    protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel,
+                                 boolean persistent, AbstractGrpcClientController controller) {
         checkNotNull(clientKey);
+        checkNotNull(channel);
         this.deviceId = clientKey.deviceId();
+        this.channel = channel;
+        this.persistent = persistent;
+        this.controller = controller;
         this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
                 "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
+
+        setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
+    }
+
+    @Override
+    public boolean isServerReachable() {
+        final ConnectivityState state = channel.getState(false);
+        switch (state) {
+            case READY:
+            case IDLE:
+                return true;
+            case CONNECTING:
+            case TRANSIENT_FAILURE:
+            case SHUTDOWN:
+                return false;
+            default:
+                log.error("Unrecognized channel connectivity state {}", state);
+                return false;
+        }
     }
 
     @Override
@@ -171,4 +213,85 @@
             }
         }, executor);
     }
+
+    private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
+                                    ConnectivityState sourceState) {
+        if (log.isTraceEnabled()) {
+            log.trace("Setting channel callback for {} with source state {}...",
+                      deviceId, sourceState);
+        }
+        channel.notifyWhenStateChanged(
+                sourceState, new ChannelConnectivityCallback(deviceId, channel));
+    }
+
+    /**
+     * Runnable task invoked at each change of the channel connectivity state.
+     * New callbacks are created as long as the channel is not shut down.
+     */
+    private final class ChannelConnectivityCallback implements Runnable {
+
+        private final DeviceId deviceId;
+        private final ManagedChannel channel;
+
+        private ChannelConnectivityCallback(
+                DeviceId deviceId, ManagedChannel channel) {
+            this.deviceId = deviceId;
+            this.channel = channel;
+        }
+
+        @Override
+        public void run() {
+            final ConnectivityState newState = channel.getState(false);
+            final DeviceAgentEvent.Type eventType;
+            switch (newState) {
+                // On gRPC connectivity states:
+                // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
+                case READY:
+                    eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+                    break;
+                case TRANSIENT_FAILURE:
+                    eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
+                    break;
+                case SHUTDOWN:
+                    eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+                    break;
+                case IDLE:
+                    // IDLE and CONNECTING are transient states that will
+                    // eventually move to READY or TRANSIENT_FAILURE. Do not
+                    // generate an event for now.
+                    if (persistent) {
+                        log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
+                        channel.getState(true);
+                    }
+                    eventType = null;
+                    break;
+                case CONNECTING:
+                    eventType = null;
+                    break;
+                default:
+                    log.error("Unrecognized connectivity state {}", newState);
+                    eventType = null;
+            }
+
+            if (log.isTraceEnabled()) {
+                log.trace("Detected channel connectivity change for {}, new state is {}",
+                          deviceId, newState);
+            }
+
+            if (eventType != null) {
+                // Avoid sending consecutive duplicate events.
+                final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
+                final boolean past = channelOpen.getAndSet(present);
+                if (present != past) {
+                    log.debug("Notifying event {} for {}", eventType, deviceId);
+                    controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
+                }
+            }
+
+            if (newState != ConnectivityState.SHUTDOWN) {
+                // Channels never leave SHUTDOWN state, no need for a new callback.
+                setChannelCallback(deviceId, channel, newState);
+            }
+        }
+    }
 }