Refactor channel and mastership handling in P4Runtime

This (big) change aims at solving the issue observed with mastership flapping
and device connection/disconnection with P4Runtime.

Channel handling is now based on the underlying gRPC channel state. Before,
channel events (open/close/error) were generated as a consequence of P4Runtime
StreamChannel events, making device availability dependent on mastership. Now
Stream Channel events only affect mastership (MASTER/STANDBY or NONE when the
SteamChannel RPC is not active).

Mastership handling has been refactored to generate P4Runtime election IDs that
are compatible with the mastership preference decided by the MastershipService.

GeneralDeviceProvider has been re-implemented to support in-order
device event processing and to reduce implementation complexity. Stats polling
has been moved to a separate component, and netcfg handling updated to only
depend on BasicDeviceConfig, augmented with a pipeconf field, and re-using the
managementAddress field to set the gRPC server endpoints (e.g.
grpc://myswitch.local:50051). Before it was depending on 3 different config
classes, making hard to detect changes.

Finally, this change affects some core interfaces:
- Adds a method to DeviceProvider and DeviceHandshaker to check for device
availability, making the meaning of availability device-specific. This is needed
in cases where the device manager needs to change the availability state of a
device (as in change #20842)
- Support device providers not capable of reconciling mastership role responses
with requests (like P4Runtime).
- Clarify the meaning of "connection" in the DeviceConnect behavior.
- Allows driver-based providers to check devices for reachability and
availability without probing the device via the network.

Change-Id: I7ff30d29f5d02ad938e3171536e54ae2916629a2
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
index 5b07254..ad4324b 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
@@ -23,6 +23,7 @@
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Abstraction of a gRPC controller that stores and manages gRPC channels.
@@ -33,10 +34,10 @@
     int CONNECTION_TIMEOUT_SECONDS = 20;
 
     /**
-     * Creates a gRPC managed channel from the given builder and opens a
-     * connection to it. If the connection is successful returns the managed
-     * channel object and stores the channel internally, associated with the
-     * given channel ID.
+     * Creates a gRPC managed channel from the given builder and opens the
+     * connection. If the connection is successful, returns the managed channel
+     * object and stores the channel internally, associated with the given
+     * channel ID.
      * <p>
      * This method blocks until the channel is open or a timeout expires. By
      * default the timeout is {@link #CONNECTION_TIMEOUT_SECONDS} seconds. If
@@ -72,15 +73,6 @@
     Map<GrpcChannelId, ManagedChannel> getChannels();
 
     /**
-     * Returns true if the channel associated with the given identifier is open,
-     * i.e. the server is able to successfully reply to RPCs, false otherwise.
-     *
-     * @param channelId channel ID
-     * @return true if channel is open, false otherwise.
-     */
-    boolean isChannelOpen(GrpcChannelId channelId);
-
-    /**
      * If present, returns the channel associated with the given ID.
      *
      * @param channelId channel ID
@@ -88,4 +80,14 @@
      */
     Optional<ManagedChannel> getChannel(GrpcChannelId channelId);
 
+    /**
+     * Probes the server at the endpoint of the given channel. Returns true if
+     * the server responded to the probe, false otherwise or if the channel does
+     * not exist.
+     *
+     * @param channelId channel ID
+     * @return completable future eventually true if the gRPC server responded
+     * to the probe; false otherwise
+     */
+    CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId);
 }
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
index d040a23..d529529 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
@@ -17,11 +17,11 @@
 package org.onosproject.grpc.api;
 
 import com.google.common.annotations.Beta;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
  * Abstraction of a gRPC client.
- *
  */
 @Beta
 public interface GrpcClient {
@@ -33,4 +33,30 @@
      * procedure
      */
     CompletableFuture<Void> shutdown();
+
+    /**
+     * This method provides a coarse modelling of gRPC channel {@link
+     * io.grpc.ConnectivityState}. The implementation does not make any attempt
+     * at probing the server by sending messages over the network, as such it
+     * does not block execution. It returns true if this client is expected to
+     * communicate with the server successfully. In other words, if any RPC
+     * would be executed immediately after this method is called and returns
+     * true, then it is expected, but NOT guaranteed, for that RPC message to
+     * reach the server and be processed. If false, it means the channel is in a
+     * failure state and communication with the server is unlikely to happen
+     * soon.
+     *
+     * @return true if server is deemed reachable, false otherwise
+     */
+    boolean isServerReachable();
+
+    /**
+     * Similar to {@link #isServerReachable()}, but might involve sending
+     * packets over the network. This checks whether the specific gRPC
+     * service(s) required by this client is available or not on the server.
+     *
+     * @return completable future eventually true if the gRPC service(s) on the
+     * server was available during the probe; false otherwise
+     */
+    CompletableFuture<Boolean> probeService();
 }
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
index 3cdbcd1..b47d499 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
@@ -18,10 +18,11 @@
 
 import com.google.common.annotations.Beta;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.provider.ProviderId;
 
 /**
- * Abstraction of a gRPC controller which controls specific gRPC client {@link
- * C} with specific client key {@link K}.
+ * Abstraction of controller that manages gRPC clients.
  *
  * @param <K> the gRPC client key
  * @param <C> the gRPC client type
@@ -34,20 +35,27 @@
      * given information. As a result of this method, a client can be later
      * obtained by invoking {@link #getClient(DeviceId)}.
      * <p>
+     * Upon creation, a connection to the server is automatically started, which
+     * blocks execution. If the connection is successful, the client is created
+     * and this method returns true, otherwise (e.g., socket error) any state
+     * associated with this client is destroyed and returns false.
+     * <p>
      * Only one client can exist for the same device ID. Calls to this method
      * are idempotent fot the same client key, i.e. returns true if such client
-     * already exists but a new one is not created. If there exists a client
-     * with same device ID but different address and port, removes old one and
-     * recreate new one.
+     * already exists. Otherwise, if a client for the same device ID but
+     * different client key already exists, throws an exception.
      *
      * @param clientKey the client key
      * @return true if the client was created and the channel to the server is
      * open; false otherwise
+     * @throws IllegalArgumentException if a client for the same device ID but
+     *                                  different client key already exists.
      */
     boolean createClient(K clientKey);
 
     /**
-     * Retrieves the gRPC client to operate on the given device.
+     * Returns the gRPC client previously created for the given device, or null
+     * if such client does not exist.
      *
      * @param deviceId the device identifier
      * @return the gRPC client of the device if exists; null otherwise
@@ -55,24 +63,47 @@
     C getClient(DeviceId deviceId);
 
     /**
-     * Removes the gRPC client for the given device. If no client exists for the
-     * given device, the result is a no-op.
+     * Returns the gRPC client previously created for the given client key, or
+     * null if such client does not exist.
+     *
+     * @param clientKey client key
+     * @return the gRPC client of the device if exists; null otherwise
+     */
+    C getClient(K clientKey);
+
+    /**
+     * Removes the gRPC client for the given device and any gRPC channel state
+     * associated to it. If no client exists for the given device, the result is
+     * a no-op.
      *
      * @param deviceId the device identifier
      */
     void removeClient(DeviceId deviceId);
 
     /**
-     * Check reachability of the gRPC server running on the given device.
-     * Reachability can be tested only if a client is previously created using
-     * {@link #createClient(GrpcClientKey)}. Note that this only checks the
-     * reachability instead of checking service availability, different
-     * service-specific gRPC clients might check service availability in a
-     * different way.
+     * Similar to {@link #removeClient(DeviceId)} but uses the client key to
+     * identify the client to remove.
      *
-     * @param deviceId the device identifier
-     * @return true if client was created and is able to contact the gNMI
-     * server; false otherwise
+     * @param clientKey the client key
      */
-    boolean isReachable(DeviceId deviceId);
+    void removeClient(K clientKey);
+
+    /**
+     * Adds a listener for device agent events for the given provider.
+     *
+     * @param deviceId device identifier
+     * @param providerId provider ID
+     * @param listener the device agent listener
+     */
+    void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId,
+                                DeviceAgentListener listener);
+
+    /**
+     * Removes the listener for device agent events that was previously
+     * registered for the given provider.
+     *
+     * @param deviceId   device identifier
+     * @param providerId the provider ID
+     */
+    void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId);
 }
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
index 99ea23f..d1d0b0f 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
@@ -20,8 +20,11 @@
 import com.google.common.base.Objects;
 import org.onosproject.net.DeviceId;
 
+import java.net.URI;
+
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.lang.String.format;
 
 /**
@@ -29,32 +32,36 @@
  */
 @Beta
 public class GrpcClientKey {
+
+    private static final String GRPC = "grpc";
+    private static final String GRPCS = "grpcs";
+
     private final String serviceName;
     private final DeviceId deviceId;
-    private final String serverAddr;
-    private final int serverPort;
+    private final URI serverUri;
 
     /**
      * Creates a new client key.
      *
      * @param serviceName gRPC service name of the client
-     * @param deviceId ONOS device ID
-     * @param serverAddr gRPC server address
-     * @param serverPort gRPC server port
+     * @param deviceId    ONOS device ID
+     * @param serverUri   gRPC server URI
      */
-    public GrpcClientKey(String serviceName, DeviceId deviceId, String serverAddr, int serverPort) {
+    public GrpcClientKey(String serviceName, DeviceId deviceId, URI serverUri) {
         checkNotNull(serviceName);
         checkNotNull(deviceId);
-        checkNotNull(serverAddr);
+        checkNotNull(serverUri);
         checkArgument(!serviceName.isEmpty(),
-                "Service name can not be null");
-        checkArgument(!serverAddr.isEmpty(),
-                "Server address should not be empty");
-        checkArgument(serverPort > 0 && serverPort <= 65535, "Invalid server port");
+                      "Service name can not be null");
+        checkArgument(serverUri.getScheme().equals(GRPC)
+                              || serverUri.getScheme().equals(GRPCS),
+                      format("Server URI scheme must be %s or %s", GRPC, GRPCS));
+        checkArgument(!isNullOrEmpty(serverUri.getHost()),
+                      "Server host address should not be empty");
+        checkArgument(serverUri.getPort() > 0 && serverUri.getPort() <= 65535, "Invalid server port");
         this.serviceName = serviceName;
         this.deviceId = deviceId;
-        this.serverAddr = serverAddr;
-        this.serverPort = serverPort;
+        this.serverUri = serverUri;
     }
 
     /**
@@ -76,21 +83,21 @@
     }
 
     /**
-     * Gets the gRPC server address.
+     * Returns the gRPC server URI.
      *
-     * @return the gRPC server address.
+     * @return the gRPC server URI.
      */
-    public String serverAddr() {
-        return serverAddr;
+    public URI serveUri() {
+        return serverUri;
     }
 
     /**
-     * Gets the gRPC server port.
+     * Returns true if the client requires TLS/SSL, false otherwise.
      *
-     * @return the gRPC server port.
+     * @return boolean
      */
-    public int serverPort() {
-        return serverPort;
+    public boolean requiresSecureChannel() {
+        return serverUri.getScheme().equals(GRPCS);
     }
 
     @Override
@@ -102,19 +109,18 @@
             return false;
         }
         GrpcClientKey that = (GrpcClientKey) o;
-        return serverPort == that.serverPort &&
-                Objects.equal(serviceName, that.serviceName) &&
+        return Objects.equal(serviceName, that.serviceName) &&
                 Objects.equal(deviceId, that.deviceId) &&
-                Objects.equal(serverAddr, that.serverAddr);
+                Objects.equal(serverUri, that.serverUri);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hashCode(serviceName, deviceId, serverAddr, serverPort);
+        return Objects.hashCode(serviceName, deviceId, serverUri);
     }
 
     @Override
     public String toString() {
-        return format("%s/%s@%s:%s", deviceId, serviceName, serverAddr, serverPort);
+        return format("%s/%s@%s", deviceId, serviceName, serverUri);
     }
 }
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
index 0b21ca2..117c6e3 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -16,11 +16,14 @@
 
 package org.onosproject.grpc.ctl;
 
+import io.grpc.ConnectivityState;
 import io.grpc.Context;
+import io.grpc.ManagedChannel;
 import io.grpc.StatusRuntimeException;
 import org.onosproject.grpc.api.GrpcClient;
 import org.onosproject.grpc.api.GrpcClientKey;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
 import org.slf4j.Logger;
 
 import java.util.concurrent.CompletableFuture;
@@ -28,6 +31,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
@@ -54,13 +58,51 @@
 
     protected final ExecutorService executorService;
     protected final DeviceId deviceId;
+    protected final ManagedChannel channel;
+    private final boolean persistent;
+    private final AbstractGrpcClientController controller;
+    private final AtomicBoolean channelOpen = new AtomicBoolean(false);
 
-    protected AbstractGrpcClient(GrpcClientKey clientKey) {
+    /**
+     * Creates an new client for the given key and channel. Setting persistent
+     * to true avoids the gRPC channel to stay IDLE. The controller instance is
+     * needed to propagate channel events.
+     *
+     * @param clientKey  client key
+     * @param channel    channel
+     * @param persistent true if the gRPC should never stay IDLE
+     * @param controller controller
+     */
+    protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel,
+                                 boolean persistent, AbstractGrpcClientController controller) {
         checkNotNull(clientKey);
+        checkNotNull(channel);
         this.deviceId = clientKey.deviceId();
+        this.channel = channel;
+        this.persistent = persistent;
+        this.controller = controller;
         this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
                 "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
+
+        setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
+    }
+
+    @Override
+    public boolean isServerReachable() {
+        final ConnectivityState state = channel.getState(false);
+        switch (state) {
+            case READY:
+            case IDLE:
+                return true;
+            case CONNECTING:
+            case TRANSIENT_FAILURE:
+            case SHUTDOWN:
+                return false;
+            default:
+                log.error("Unrecognized channel connectivity state {}", state);
+                return false;
+        }
     }
 
     @Override
@@ -171,4 +213,85 @@
             }
         }, executor);
     }
+
+    private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
+                                    ConnectivityState sourceState) {
+        if (log.isTraceEnabled()) {
+            log.trace("Setting channel callback for {} with source state {}...",
+                      deviceId, sourceState);
+        }
+        channel.notifyWhenStateChanged(
+                sourceState, new ChannelConnectivityCallback(deviceId, channel));
+    }
+
+    /**
+     * Runnable task invoked at each change of the channel connectivity state.
+     * New callbacks are created as long as the channel is not shut down.
+     */
+    private final class ChannelConnectivityCallback implements Runnable {
+
+        private final DeviceId deviceId;
+        private final ManagedChannel channel;
+
+        private ChannelConnectivityCallback(
+                DeviceId deviceId, ManagedChannel channel) {
+            this.deviceId = deviceId;
+            this.channel = channel;
+        }
+
+        @Override
+        public void run() {
+            final ConnectivityState newState = channel.getState(false);
+            final DeviceAgentEvent.Type eventType;
+            switch (newState) {
+                // On gRPC connectivity states:
+                // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
+                case READY:
+                    eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+                    break;
+                case TRANSIENT_FAILURE:
+                    eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
+                    break;
+                case SHUTDOWN:
+                    eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+                    break;
+                case IDLE:
+                    // IDLE and CONNECTING are transient states that will
+                    // eventually move to READY or TRANSIENT_FAILURE. Do not
+                    // generate an event for now.
+                    if (persistent) {
+                        log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
+                        channel.getState(true);
+                    }
+                    eventType = null;
+                    break;
+                case CONNECTING:
+                    eventType = null;
+                    break;
+                default:
+                    log.error("Unrecognized connectivity state {}", newState);
+                    eventType = null;
+            }
+
+            if (log.isTraceEnabled()) {
+                log.trace("Detected channel connectivity change for {}, new state is {}",
+                          deviceId, newState);
+            }
+
+            if (eventType != null) {
+                // Avoid sending consecutive duplicate events.
+                final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
+                final boolean past = channelOpen.getAndSet(present);
+                if (present != past) {
+                    log.debug("Notifying event {} for {}", eventType, deviceId);
+                    controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
+                }
+            }
+
+            if (newState != ConnectivityState.SHUTDOWN) {
+                // Channels never leave SHUTDOWN state, no need for a new callback.
+                setChannelCallback(deviceId, channel, newState);
+            }
+        }
+    }
 }
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
index c0a3c6b..3c38209 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
@@ -21,7 +21,6 @@
 import io.grpc.ManagedChannel;
 import io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.NettyChannelBuilder;
-import io.netty.handler.ssl.NotSslRecordException;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import org.onosproject.event.AbstractListenerManager;
@@ -33,6 +32,9 @@
 import org.onosproject.grpc.api.GrpcClientController;
 import org.onosproject.grpc.api.GrpcClientKey;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.provider.ProviderId;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
@@ -41,11 +43,11 @@
 
 import javax.net.ssl.SSLException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 import static java.lang.String.format;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -74,40 +76,44 @@
     private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
     private final Map<K, C> clients = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+    private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
+            deviceAgentListeners = Maps.newConcurrentMap();
+    private final Class<E> eventClass;
     private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected GrpcChannelController grpcChannelController;
 
+    public AbstractGrpcClientController(Class<E> eventClass) {
+        this.eventClass = eventClass;
+    }
+
     @Activate
     public void activate() {
+        eventDispatcher.addSink(eventClass, listenerRegistry);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        eventDispatcher.removeSink(eventClass);
         clientKeys.keySet().forEach(this::removeClient);
         clientKeys.clear();
         clients.clear();
         channelIds.clear();
+        deviceAgentListeners.clear();
         log.info("Stopped");
     }
 
     @Override
     public boolean createClient(K clientKey) {
         checkNotNull(clientKey);
-        /*
-            FIXME we might want to move "useTls" and "fallback" to properties of the netcfg and clientKey
-                  For now, we will first try to connect with TLS (accepting any cert), then fall back to
-                  plaintext for every device
-         */
-        return withDeviceLock(() -> doCreateClient(clientKey, true, true), clientKey.deviceId());
+        return withDeviceLock(() -> doCreateClient(clientKey),
+                              clientKey.deviceId());
     }
 
-    private boolean doCreateClient(K clientKey, boolean useTls, boolean fallbackToPlainText) {
+    private boolean doCreateClient(K clientKey) {
         DeviceId deviceId = clientKey.deviceId();
-        String serverAddr = clientKey.serverAddr();
-        int serverPort = clientKey.serverPort();
 
         if (clientKeys.containsKey(deviceId)) {
             final GrpcClientKey existingKey = clientKeys.get(deviceId);
@@ -116,19 +122,25 @@
                           clientName(clientKey), clientKey);
                 return true;
             } else {
-                log.info("Requested new {} with updated key, removing old client... (oldKey={})",
-                         clientName(clientKey), existingKey);
-                doRemoveClient(deviceId);
+                throw new IllegalArgumentException(format(
+                        "A client already exists for device %s (%s)",
+                        deviceId, clientKey));
             }
         }
 
-        log.info("Creating client for {} (server={}:{})...", deviceId, serverAddr, serverPort);
+        final String method = clientKey.requiresSecureChannel()
+                ? "TLS" : "plaintext TCP";
+
+        log.info("Connecting {} client for {} to server at {} using {}...",
+                 clientKey.serviceName(), deviceId, clientKey.serveUri(), method);
 
         SslContext sslContext = null;
-        if (useTls) {
+        if (clientKey.requiresSecureChannel()) {
             try {
-                // Accept any server certificate; this is insecure and should not be used in production
-                sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
+                // FIXME: Accept any server certificate; this is insecure and
+                //  should not be used in production
+                sslContext = GrpcSslContexts.forClient().trustManager(
+                        InsecureTrustManagerFactory.INSTANCE).build();
             } catch (SSLException e) {
                 log.error("Failed to build SSL Context", e);
                 return false;
@@ -137,17 +149,15 @@
 
         GrpcChannelId channelId = GrpcChannelId.of(clientKey.toString());
         NettyChannelBuilder channelBuilder = NettyChannelBuilder
-                .forAddress(serverAddr, serverPort)
+                .forAddress(clientKey.serveUri().getHost(),
+                            clientKey.serveUri().getPort())
                 .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
+
         if (sslContext != null) {
-            log.debug("Using SSL for gRPC connection to {}", deviceId);
             channelBuilder
                     .sslContext(sslContext)
                     .useTransportSecurity();
         } else {
-            checkState(!useTls,
-                    "Not authorized to use plaintext for gRPC connection to {}", deviceId);
-            log.debug("Using plaintext TCP for gRPC connection to {}", deviceId);
             channelBuilder.usePlaintext();
         }
 
@@ -156,24 +166,9 @@
         try {
             channel = grpcChannelController.connectChannel(channelId, channelBuilder);
         } catch (Throwable e) {
-            for (Throwable cause = e; cause != null; cause = cause.getCause()) {
-                if (useTls && cause instanceof NotSslRecordException) {
-                    // Likely root cause is that server is using plaintext
-                    log.info("Failed to connect to server (device={}) using TLS", deviceId);
-                    log.debug("TLS connection exception", e);
-                    if (fallbackToPlainText) {
-                        log.info("Falling back to plaintext for connection to {}", deviceId);
-                        return doCreateClient(clientKey, false, false);
-                    }
-                }
-                if (!useTls && "Connection reset by peer".equals(cause.getMessage())) {
-                    // Not a great signal, but could indicate the server is expected a TLS connection
-                    log.error("Failed to connect to server (device={}) using plaintext TCP; is the server using TLS?",
-                            deviceId);
-                    break;
-                }
-            }
-            log.warn("Unable to connect to gRPC server for {}", deviceId, e);
+            log.warn("Failed to connect to {} ({}) using {}: {}",
+                     deviceId, clientKey.serveUri(), method, e.toString());
+            log.debug("gRPC client connection exception", e);
             return false;
         }
 
@@ -216,37 +211,79 @@
     }
 
     @Override
+    public C getClient(K clientKey) {
+        checkNotNull(clientKey);
+        return clients.get(clientKey);
+    }
+
+    @Override
     public void removeClient(DeviceId deviceId) {
         checkNotNull(deviceId);
         withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
     }
 
+    @Override
+    public void removeClient(K clientKey) {
+        checkNotNull(clientKey);
+        withDeviceLock(() -> doRemoveClient(clientKey), clientKey.deviceId());
+    }
+
     private Void doRemoveClient(DeviceId deviceId) {
         if (clientKeys.containsKey(deviceId)) {
-            final K clientKey = clientKeys.get(deviceId);
-            clients.get(clientKey).shutdown();
-            grpcChannelController.disconnectChannel(channelIds.get(deviceId));
-            clientKeys.remove(deviceId);
-            clients.remove(clientKey);
-            channelIds.remove(deviceId);
+            doRemoveClient(clientKeys.get(deviceId));
         }
         return null;
     }
 
-    @Override
-    public boolean isReachable(DeviceId deviceId) {
-        checkNotNull(deviceId);
-        return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
+    private Void doRemoveClient(K clientKey) {
+        if (clients.containsKey(clientKey)) {
+            clients.get(clientKey).shutdown();
+        }
+        if (channelIds.containsKey(clientKey.deviceId())) {
+            grpcChannelController.disconnectChannel(
+                    channelIds.get(clientKey.deviceId()));
+        }
+        clientKeys.remove(clientKey.deviceId());
+        clients.remove(clientKey);
+        channelIds.remove(clientKey.deviceId());
+        return null;
     }
 
-    private boolean doIsReachable(DeviceId deviceId) {
-        // Default behaviour checks only the gRPC channel, should
-        // check according to different gRPC service
-        if (!clientKeys.containsKey(deviceId)) {
-            log.debug("Missing client for {}, cannot check for reachability", deviceId);
-            return false;
+    @Override
+    public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
+        checkNotNull(deviceId, "deviceId cannot be null");
+        checkNotNull(deviceId, "providerId cannot be null");
+        checkNotNull(listener, "listener cannot be null");
+        deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
+        deviceAgentListeners.get(deviceId).put(providerId, listener);
+    }
+
+    @Override
+    public void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId) {
+        checkNotNull(deviceId, "deviceId cannot be null");
+        checkNotNull(providerId, "listener cannot be null");
+        deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
+            listeners.remove(providerId);
+            return listeners.isEmpty() ? null : listeners;
+        });
+    }
+
+    public void postEvent(E event) {
+        checkNotNull(event);
+        post(event);
+    }
+
+    public void postEvent(DeviceAgentEvent event) {
+        // We should have only one event delivery mechanism. We have two now
+        // because we have two different types of events, DeviceAgentEvent and
+        // controller/protocol specific ones (e.g. P4Runtime or gNMI).
+        // TODO: extend device agent event to allow delivery protocol-specific
+        //  events, e.g. packet-in
+        checkNotNull(event);
+        if (deviceAgentListeners.containsKey(event.subject())) {
+            deviceAgentListeners.get(event.subject()).values()
+                    .forEach(l -> l.event(event));
         }
-        return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
     }
 
     private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index ca0c8c3..87c9426 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -41,6 +41,7 @@
 import java.util.Dictionary;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -152,50 +153,24 @@
         }
     }
 
-    private boolean doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
+    private void doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
         DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
                 .newBlockingStub(channel)
                 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         try {
-            return dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
-                                              .getDefaultInstance()) != null;
+            //noinspection ResultOfMethodCallIgnored
+            dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
+                                       .getDefaultInstance());
         } catch (StatusRuntimeException e) {
-            if (e.getStatus().equals(Status.UNIMPLEMENTED)) {
+            if (!e.getStatus().equals(Status.UNIMPLEMENTED)) {
                 // UNIMPLEMENTED means that the server received our message but
                 // doesn't know how to handle it. Hence, channel is open.
-                return true;
-            } else {
                 throw e;
             }
         }
     }
 
     @Override
-    public boolean isChannelOpen(GrpcChannelId channelId) {
-        checkNotNull(channelId);
-
-        Lock lock = channelLocks.get(channelId);
-        lock.lock();
-
-        try {
-            if (!channels.containsKey(channelId)) {
-                log.warn("Unknown channel ID '{}', can't check if channel is open",
-                         channelId);
-                return false;
-            }
-            try {
-                return doDummyMessage(channels.get(channelId));
-            } catch (StatusRuntimeException e) {
-                log.debug("Unable to send dummy message to {}: {}",
-                          channelId, e.toString());
-                return false;
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
     public void disconnectChannel(GrpcChannelId channelId) {
         checkNotNull(channelId);
 
@@ -240,7 +215,6 @@
 
         Lock lock = channelLocks.get(channelId);
         lock.lock();
-
         try {
             return Optional.ofNullable(channels.get(channelId));
         } finally {
@@ -248,4 +222,23 @@
         }
     }
 
+    @Override
+    public CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId) {
+        final ManagedChannel channel = channels.get(channelId);
+        if (channel == null) {
+            log.warn("Unable to find any channel with ID {}, cannot send probe",
+                     channelId);
+            return CompletableFuture.completedFuture(false);
+        }
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                doDummyMessage(channel);
+                return true;
+            } catch (StatusRuntimeException e) {
+                log.debug("Probe for {} failed", channelId);
+                log.debug("", e);
+                return false;
+            }
+        });
+    }
 }