Refactor channel and mastership handling in P4Runtime

This (big) change aims at solving the issue observed with mastership flapping
and device connection/disconnection with P4Runtime.

Channel handling is now based on the underlying gRPC channel state. Before,
channel events (open/close/error) were generated as a consequence of P4Runtime
StreamChannel events, making device availability dependent on mastership. Now
Stream Channel events only affect mastership (MASTER/STANDBY or NONE when the
SteamChannel RPC is not active).

Mastership handling has been refactored to generate P4Runtime election IDs that
are compatible with the mastership preference decided by the MastershipService.

GeneralDeviceProvider has been re-implemented to support in-order
device event processing and to reduce implementation complexity. Stats polling
has been moved to a separate component, and netcfg handling updated to only
depend on BasicDeviceConfig, augmented with a pipeconf field, and re-using the
managementAddress field to set the gRPC server endpoints (e.g.
grpc://myswitch.local:50051). Before it was depending on 3 different config
classes, making hard to detect changes.

Finally, this change affects some core interfaces:
- Adds a method to DeviceProvider and DeviceHandshaker to check for device
availability, making the meaning of availability device-specific. This is needed
in cases where the device manager needs to change the availability state of a
device (as in change #20842)
- Support device providers not capable of reconciling mastership role responses
with requests (like P4Runtime).
- Clarify the meaning of "connection" in the DeviceConnect behavior.
- Allows driver-based providers to check devices for reachability and
availability without probing the device via the network.

Change-Id: I7ff30d29f5d02ad938e3171536e54ae2916629a2
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
index 0b21ca2..117c6e3 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -16,11 +16,14 @@
 
 package org.onosproject.grpc.ctl;
 
+import io.grpc.ConnectivityState;
 import io.grpc.Context;
+import io.grpc.ManagedChannel;
 import io.grpc.StatusRuntimeException;
 import org.onosproject.grpc.api.GrpcClient;
 import org.onosproject.grpc.api.GrpcClientKey;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
 import org.slf4j.Logger;
 
 import java.util.concurrent.CompletableFuture;
@@ -28,6 +31,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
@@ -54,13 +58,51 @@
 
     protected final ExecutorService executorService;
     protected final DeviceId deviceId;
+    protected final ManagedChannel channel;
+    private final boolean persistent;
+    private final AbstractGrpcClientController controller;
+    private final AtomicBoolean channelOpen = new AtomicBoolean(false);
 
-    protected AbstractGrpcClient(GrpcClientKey clientKey) {
+    /**
+     * Creates an new client for the given key and channel. Setting persistent
+     * to true avoids the gRPC channel to stay IDLE. The controller instance is
+     * needed to propagate channel events.
+     *
+     * @param clientKey  client key
+     * @param channel    channel
+     * @param persistent true if the gRPC should never stay IDLE
+     * @param controller controller
+     */
+    protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel,
+                                 boolean persistent, AbstractGrpcClientController controller) {
         checkNotNull(clientKey);
+        checkNotNull(channel);
         this.deviceId = clientKey.deviceId();
+        this.channel = channel;
+        this.persistent = persistent;
+        this.controller = controller;
         this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
                 "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
+
+        setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
+    }
+
+    @Override
+    public boolean isServerReachable() {
+        final ConnectivityState state = channel.getState(false);
+        switch (state) {
+            case READY:
+            case IDLE:
+                return true;
+            case CONNECTING:
+            case TRANSIENT_FAILURE:
+            case SHUTDOWN:
+                return false;
+            default:
+                log.error("Unrecognized channel connectivity state {}", state);
+                return false;
+        }
     }
 
     @Override
@@ -171,4 +213,85 @@
             }
         }, executor);
     }
+
+    private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
+                                    ConnectivityState sourceState) {
+        if (log.isTraceEnabled()) {
+            log.trace("Setting channel callback for {} with source state {}...",
+                      deviceId, sourceState);
+        }
+        channel.notifyWhenStateChanged(
+                sourceState, new ChannelConnectivityCallback(deviceId, channel));
+    }
+
+    /**
+     * Runnable task invoked at each change of the channel connectivity state.
+     * New callbacks are created as long as the channel is not shut down.
+     */
+    private final class ChannelConnectivityCallback implements Runnable {
+
+        private final DeviceId deviceId;
+        private final ManagedChannel channel;
+
+        private ChannelConnectivityCallback(
+                DeviceId deviceId, ManagedChannel channel) {
+            this.deviceId = deviceId;
+            this.channel = channel;
+        }
+
+        @Override
+        public void run() {
+            final ConnectivityState newState = channel.getState(false);
+            final DeviceAgentEvent.Type eventType;
+            switch (newState) {
+                // On gRPC connectivity states:
+                // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
+                case READY:
+                    eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+                    break;
+                case TRANSIENT_FAILURE:
+                    eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
+                    break;
+                case SHUTDOWN:
+                    eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+                    break;
+                case IDLE:
+                    // IDLE and CONNECTING are transient states that will
+                    // eventually move to READY or TRANSIENT_FAILURE. Do not
+                    // generate an event for now.
+                    if (persistent) {
+                        log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
+                        channel.getState(true);
+                    }
+                    eventType = null;
+                    break;
+                case CONNECTING:
+                    eventType = null;
+                    break;
+                default:
+                    log.error("Unrecognized connectivity state {}", newState);
+                    eventType = null;
+            }
+
+            if (log.isTraceEnabled()) {
+                log.trace("Detected channel connectivity change for {}, new state is {}",
+                          deviceId, newState);
+            }
+
+            if (eventType != null) {
+                // Avoid sending consecutive duplicate events.
+                final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
+                final boolean past = channelOpen.getAndSet(present);
+                if (present != past) {
+                    log.debug("Notifying event {} for {}", eventType, deviceId);
+                    controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
+                }
+            }
+
+            if (newState != ConnectivityState.SHUTDOWN) {
+                // Channels never leave SHUTDOWN state, no need for a new callback.
+                setChannelCallback(deviceId, channel, newState);
+            }
+        }
+    }
 }
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
index c0a3c6b..3c38209 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
@@ -21,7 +21,6 @@
 import io.grpc.ManagedChannel;
 import io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.NettyChannelBuilder;
-import io.netty.handler.ssl.NotSslRecordException;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import org.onosproject.event.AbstractListenerManager;
@@ -33,6 +32,9 @@
 import org.onosproject.grpc.api.GrpcClientController;
 import org.onosproject.grpc.api.GrpcClientKey;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.provider.ProviderId;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
@@ -41,11 +43,11 @@
 
 import javax.net.ssl.SSLException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 import static java.lang.String.format;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -74,40 +76,44 @@
     private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
     private final Map<K, C> clients = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+    private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
+            deviceAgentListeners = Maps.newConcurrentMap();
+    private final Class<E> eventClass;
     private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected GrpcChannelController grpcChannelController;
 
+    public AbstractGrpcClientController(Class<E> eventClass) {
+        this.eventClass = eventClass;
+    }
+
     @Activate
     public void activate() {
+        eventDispatcher.addSink(eventClass, listenerRegistry);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        eventDispatcher.removeSink(eventClass);
         clientKeys.keySet().forEach(this::removeClient);
         clientKeys.clear();
         clients.clear();
         channelIds.clear();
+        deviceAgentListeners.clear();
         log.info("Stopped");
     }
 
     @Override
     public boolean createClient(K clientKey) {
         checkNotNull(clientKey);
-        /*
-            FIXME we might want to move "useTls" and "fallback" to properties of the netcfg and clientKey
-                  For now, we will first try to connect with TLS (accepting any cert), then fall back to
-                  plaintext for every device
-         */
-        return withDeviceLock(() -> doCreateClient(clientKey, true, true), clientKey.deviceId());
+        return withDeviceLock(() -> doCreateClient(clientKey),
+                              clientKey.deviceId());
     }
 
-    private boolean doCreateClient(K clientKey, boolean useTls, boolean fallbackToPlainText) {
+    private boolean doCreateClient(K clientKey) {
         DeviceId deviceId = clientKey.deviceId();
-        String serverAddr = clientKey.serverAddr();
-        int serverPort = clientKey.serverPort();
 
         if (clientKeys.containsKey(deviceId)) {
             final GrpcClientKey existingKey = clientKeys.get(deviceId);
@@ -116,19 +122,25 @@
                           clientName(clientKey), clientKey);
                 return true;
             } else {
-                log.info("Requested new {} with updated key, removing old client... (oldKey={})",
-                         clientName(clientKey), existingKey);
-                doRemoveClient(deviceId);
+                throw new IllegalArgumentException(format(
+                        "A client already exists for device %s (%s)",
+                        deviceId, clientKey));
             }
         }
 
-        log.info("Creating client for {} (server={}:{})...", deviceId, serverAddr, serverPort);
+        final String method = clientKey.requiresSecureChannel()
+                ? "TLS" : "plaintext TCP";
+
+        log.info("Connecting {} client for {} to server at {} using {}...",
+                 clientKey.serviceName(), deviceId, clientKey.serveUri(), method);
 
         SslContext sslContext = null;
-        if (useTls) {
+        if (clientKey.requiresSecureChannel()) {
             try {
-                // Accept any server certificate; this is insecure and should not be used in production
-                sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
+                // FIXME: Accept any server certificate; this is insecure and
+                //  should not be used in production
+                sslContext = GrpcSslContexts.forClient().trustManager(
+                        InsecureTrustManagerFactory.INSTANCE).build();
             } catch (SSLException e) {
                 log.error("Failed to build SSL Context", e);
                 return false;
@@ -137,17 +149,15 @@
 
         GrpcChannelId channelId = GrpcChannelId.of(clientKey.toString());
         NettyChannelBuilder channelBuilder = NettyChannelBuilder
-                .forAddress(serverAddr, serverPort)
+                .forAddress(clientKey.serveUri().getHost(),
+                            clientKey.serveUri().getPort())
                 .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
+
         if (sslContext != null) {
-            log.debug("Using SSL for gRPC connection to {}", deviceId);
             channelBuilder
                     .sslContext(sslContext)
                     .useTransportSecurity();
         } else {
-            checkState(!useTls,
-                    "Not authorized to use plaintext for gRPC connection to {}", deviceId);
-            log.debug("Using plaintext TCP for gRPC connection to {}", deviceId);
             channelBuilder.usePlaintext();
         }
 
@@ -156,24 +166,9 @@
         try {
             channel = grpcChannelController.connectChannel(channelId, channelBuilder);
         } catch (Throwable e) {
-            for (Throwable cause = e; cause != null; cause = cause.getCause()) {
-                if (useTls && cause instanceof NotSslRecordException) {
-                    // Likely root cause is that server is using plaintext
-                    log.info("Failed to connect to server (device={}) using TLS", deviceId);
-                    log.debug("TLS connection exception", e);
-                    if (fallbackToPlainText) {
-                        log.info("Falling back to plaintext for connection to {}", deviceId);
-                        return doCreateClient(clientKey, false, false);
-                    }
-                }
-                if (!useTls && "Connection reset by peer".equals(cause.getMessage())) {
-                    // Not a great signal, but could indicate the server is expected a TLS connection
-                    log.error("Failed to connect to server (device={}) using plaintext TCP; is the server using TLS?",
-                            deviceId);
-                    break;
-                }
-            }
-            log.warn("Unable to connect to gRPC server for {}", deviceId, e);
+            log.warn("Failed to connect to {} ({}) using {}: {}",
+                     deviceId, clientKey.serveUri(), method, e.toString());
+            log.debug("gRPC client connection exception", e);
             return false;
         }
 
@@ -216,37 +211,79 @@
     }
 
     @Override
+    public C getClient(K clientKey) {
+        checkNotNull(clientKey);
+        return clients.get(clientKey);
+    }
+
+    @Override
     public void removeClient(DeviceId deviceId) {
         checkNotNull(deviceId);
         withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
     }
 
+    @Override
+    public void removeClient(K clientKey) {
+        checkNotNull(clientKey);
+        withDeviceLock(() -> doRemoveClient(clientKey), clientKey.deviceId());
+    }
+
     private Void doRemoveClient(DeviceId deviceId) {
         if (clientKeys.containsKey(deviceId)) {
-            final K clientKey = clientKeys.get(deviceId);
-            clients.get(clientKey).shutdown();
-            grpcChannelController.disconnectChannel(channelIds.get(deviceId));
-            clientKeys.remove(deviceId);
-            clients.remove(clientKey);
-            channelIds.remove(deviceId);
+            doRemoveClient(clientKeys.get(deviceId));
         }
         return null;
     }
 
-    @Override
-    public boolean isReachable(DeviceId deviceId) {
-        checkNotNull(deviceId);
-        return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
+    private Void doRemoveClient(K clientKey) {
+        if (clients.containsKey(clientKey)) {
+            clients.get(clientKey).shutdown();
+        }
+        if (channelIds.containsKey(clientKey.deviceId())) {
+            grpcChannelController.disconnectChannel(
+                    channelIds.get(clientKey.deviceId()));
+        }
+        clientKeys.remove(clientKey.deviceId());
+        clients.remove(clientKey);
+        channelIds.remove(clientKey.deviceId());
+        return null;
     }
 
-    private boolean doIsReachable(DeviceId deviceId) {
-        // Default behaviour checks only the gRPC channel, should
-        // check according to different gRPC service
-        if (!clientKeys.containsKey(deviceId)) {
-            log.debug("Missing client for {}, cannot check for reachability", deviceId);
-            return false;
+    @Override
+    public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
+        checkNotNull(deviceId, "deviceId cannot be null");
+        checkNotNull(deviceId, "providerId cannot be null");
+        checkNotNull(listener, "listener cannot be null");
+        deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
+        deviceAgentListeners.get(deviceId).put(providerId, listener);
+    }
+
+    @Override
+    public void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId) {
+        checkNotNull(deviceId, "deviceId cannot be null");
+        checkNotNull(providerId, "listener cannot be null");
+        deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
+            listeners.remove(providerId);
+            return listeners.isEmpty() ? null : listeners;
+        });
+    }
+
+    public void postEvent(E event) {
+        checkNotNull(event);
+        post(event);
+    }
+
+    public void postEvent(DeviceAgentEvent event) {
+        // We should have only one event delivery mechanism. We have two now
+        // because we have two different types of events, DeviceAgentEvent and
+        // controller/protocol specific ones (e.g. P4Runtime or gNMI).
+        // TODO: extend device agent event to allow delivery protocol-specific
+        //  events, e.g. packet-in
+        checkNotNull(event);
+        if (deviceAgentListeners.containsKey(event.subject())) {
+            deviceAgentListeners.get(event.subject()).values()
+                    .forEach(l -> l.event(event));
         }
-        return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
     }
 
     private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index ca0c8c3..87c9426 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -41,6 +41,7 @@
 import java.util.Dictionary;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -152,50 +153,24 @@
         }
     }
 
-    private boolean doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
+    private void doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
         DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
                 .newBlockingStub(channel)
                 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         try {
-            return dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
-                                              .getDefaultInstance()) != null;
+            //noinspection ResultOfMethodCallIgnored
+            dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
+                                       .getDefaultInstance());
         } catch (StatusRuntimeException e) {
-            if (e.getStatus().equals(Status.UNIMPLEMENTED)) {
+            if (!e.getStatus().equals(Status.UNIMPLEMENTED)) {
                 // UNIMPLEMENTED means that the server received our message but
                 // doesn't know how to handle it. Hence, channel is open.
-                return true;
-            } else {
                 throw e;
             }
         }
     }
 
     @Override
-    public boolean isChannelOpen(GrpcChannelId channelId) {
-        checkNotNull(channelId);
-
-        Lock lock = channelLocks.get(channelId);
-        lock.lock();
-
-        try {
-            if (!channels.containsKey(channelId)) {
-                log.warn("Unknown channel ID '{}', can't check if channel is open",
-                         channelId);
-                return false;
-            }
-            try {
-                return doDummyMessage(channels.get(channelId));
-            } catch (StatusRuntimeException e) {
-                log.debug("Unable to send dummy message to {}: {}",
-                          channelId, e.toString());
-                return false;
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
     public void disconnectChannel(GrpcChannelId channelId) {
         checkNotNull(channelId);
 
@@ -240,7 +215,6 @@
 
         Lock lock = channelLocks.get(channelId);
         lock.lock();
-
         try {
             return Optional.ofNullable(channels.get(channelId));
         } finally {
@@ -248,4 +222,23 @@
         }
     }
 
+    @Override
+    public CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId) {
+        final ManagedChannel channel = channels.get(channelId);
+        if (channel == null) {
+            log.warn("Unable to find any channel with ID {}, cannot send probe",
+                     channelId);
+            return CompletableFuture.completedFuture(false);
+        }
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                doDummyMessage(channel);
+                return true;
+            } catch (StatusRuntimeException e) {
+                log.debug("Probe for {} failed", channelId);
+                log.debug("", e);
+                return false;
+            }
+        });
+    }
 }