Allow sharing the same gRPC channel between clients

This change introduces a refactoring of the gRPC protocol subsystem that
allows the creation of a gRPC chanel independently of the client, while
allowing multiple clients to share the same channel (e.g. as in Stratum
where we use 3 clients).

Moreover, we refactor the P4RuntimeClient API to support multiple
P4Runtime-internal device ID using the same client. While before the
client was associated to one of such ID.

Finally, we provide an abstract implementation for gRPC-based driver
behaviors, reducing code duplication in P4Runtime, gNMI and gNOI drivers.

Change-Id: I1a46352bbbef1e0d24042f169ae8ba580202944f
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
index befa334..933c7ce 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -21,7 +21,6 @@
 import io.grpc.ManagedChannel;
 import io.grpc.StatusRuntimeException;
 import org.onosproject.grpc.api.GrpcClient;
-import org.onosproject.grpc.api.GrpcClientKey;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceAgentEvent;
 import org.slf4j.Logger;
@@ -49,25 +48,26 @@
     private final AtomicBoolean channelOpen = new AtomicBoolean(false);
 
     /**
-     * Creates an new client for the given key and channel. Setting persistent
-     * to true avoids the gRPC channel to stay IDLE. The controller instance is
-     * needed to propagate channel events.
+     * Creates an new client for the given device and channel. Setting
+     * persistent to true avoids the gRPC channel to go {@link
+     * ConnectivityState#IDLE}. The controller instance is needed to propagate
+     * channel events.
      *
-     * @param clientKey  client key
+     * @param deviceId   device ID
      * @param channel    channel
      * @param persistent true if the gRPC should never stay IDLE
      * @param controller controller
      */
-    protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel,
+    protected AbstractGrpcClient(DeviceId deviceId, ManagedChannel channel,
                                  boolean persistent, AbstractGrpcClientController controller) {
-        checkNotNull(clientKey);
+        checkNotNull(deviceId);
         checkNotNull(channel);
-        this.deviceId = clientKey.deviceId();
+        this.deviceId = deviceId;
         this.channel = channel;
         this.persistent = persistent;
         this.controller = controller;
 
-        setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
+        setChannelCallback(ConnectivityState.CONNECTING);
     }
 
     @Override
@@ -94,7 +94,7 @@
                              "ignoring request to shutdown for {}...", deviceId);
             return;
         }
-        log.warn("Shutting down client for {}...", deviceId);
+        log.debug("Shutting down client for {}...", deviceId);
         cancellableContext.cancel(new InterruptedException(
                 "Requested client shutdown"));
     }
@@ -140,84 +140,70 @@
                          opDescription, deviceId), throwable);
     }
 
-    private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
-                                    ConnectivityState sourceState) {
+    private void setChannelCallback(ConnectivityState sourceState) {
         if (log.isTraceEnabled()) {
             log.trace("Setting channel callback for {} with source state {}...",
                       deviceId, sourceState);
         }
         channel.notifyWhenStateChanged(
-                sourceState, new ChannelConnectivityCallback(deviceId, channel));
+                sourceState, this::channelStateCallback);
     }
 
     /**
-     * Runnable task invoked at each change of the channel connectivity state.
-     * New callbacks are created as long as the channel is not shut down.
+     * Invoked at each change of the channel connectivity state. New callbacks
+     * are created as long as the channel is not shut down.
      */
-    private final class ChannelConnectivityCallback implements Runnable {
-
-        private final DeviceId deviceId;
-        private final ManagedChannel channel;
-
-        private ChannelConnectivityCallback(
-                DeviceId deviceId, ManagedChannel channel) {
-            this.deviceId = deviceId;
-            this.channel = channel;
+    private void channelStateCallback() {
+        final ConnectivityState newState = channel.getState(false);
+        final DeviceAgentEvent.Type eventType;
+        switch (newState) {
+            // On gRPC connectivity states:
+            // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
+            case READY:
+                eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+                break;
+            case TRANSIENT_FAILURE:
+                eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
+                break;
+            case SHUTDOWN:
+                eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+                break;
+            case IDLE:
+                // IDLE and CONNECTING are transient states that will
+                // eventually move to READY or TRANSIENT_FAILURE. Do not
+                // generate an event for now.
+                if (persistent) {
+                    log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
+                    channel.getState(true);
+                }
+                eventType = null;
+                break;
+            case CONNECTING:
+                eventType = null;
+                break;
+            default:
+                log.error("Unrecognized connectivity state {}", newState);
+                eventType = null;
         }
 
-        @Override
-        public void run() {
-            final ConnectivityState newState = channel.getState(false);
-            final DeviceAgentEvent.Type eventType;
-            switch (newState) {
-                // On gRPC connectivity states:
-                // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
-                case READY:
-                    eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
-                    break;
-                case TRANSIENT_FAILURE:
-                    eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
-                    break;
-                case SHUTDOWN:
-                    eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
-                    break;
-                case IDLE:
-                    // IDLE and CONNECTING are transient states that will
-                    // eventually move to READY or TRANSIENT_FAILURE. Do not
-                    // generate an event for now.
-                    if (persistent) {
-                        log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
-                        channel.getState(true);
-                    }
-                    eventType = null;
-                    break;
-                case CONNECTING:
-                    eventType = null;
-                    break;
-                default:
-                    log.error("Unrecognized connectivity state {}", newState);
-                    eventType = null;
-            }
+        if (log.isTraceEnabled()) {
+            log.trace("Detected channel connectivity change for {}, new state is {}",
+                      deviceId, newState);
+        }
 
-            if (log.isTraceEnabled()) {
-                log.trace("Detected channel connectivity change for {}, new state is {}",
-                          deviceId, newState);
+        if (eventType != null) {
+            // Avoid sending consecutive duplicate events.
+            final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
+            final boolean past = channelOpen.getAndSet(present);
+            if (present != past) {
+                log.debug("Notifying event {} for {}", eventType, deviceId);
+                controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
             }
+        }
 
-            if (eventType != null) {
-                // Avoid sending consecutive duplicate events.
-                final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
-                final boolean past = channelOpen.getAndSet(present);
-                if (present != past) {
-                    log.debug("Notifying event {} for {}", eventType, deviceId);
-                    controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
-                }
-            }
-
-            if (newState != ConnectivityState.SHUTDOWN) {
-                // Channels never leave SHUTDOWN state, no need for a new callback.
-                setChannelCallback(deviceId, channel, newState);
-            }
+        if (newState != ConnectivityState.SHUTDOWN) {
+            // Channels never leave SHUTDOWN state, no need for a new callback.
+            setChannelCallback(newState);
         }
     }
 }
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
index 3c38209..9fb3625 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
@@ -19,29 +19,19 @@
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Striped;
 import io.grpc.ManagedChannel;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NettyChannelBuilder;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.event.Event;
 import org.onosproject.event.EventListener;
-import org.onosproject.grpc.api.GrpcChannelController;
-import org.onosproject.grpc.api.GrpcChannelId;
 import org.onosproject.grpc.api.GrpcClient;
 import org.onosproject.grpc.api.GrpcClientController;
-import org.onosproject.grpc.api.GrpcClientKey;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.provider.ProviderId;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
-import javax.net.ssl.SSLException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
@@ -52,40 +42,35 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Abstract class of a gRPC based client controller for specific gRPC client
- * which provides basic gRPC client management and thread safe mechanism.
+ * Abstract class of a controller for gRPC clients which provides means to
+ * create clients, associate device agent listeners to them and register other
+ * event listeners.
  *
  * @param <C> the gRPC client type
- * @param <K> the key type of the gRPC client
  * @param <E> the event type of the gRPC client
  * @param <L> the event listener of event {@link E}
  */
 public abstract class AbstractGrpcClientController
-        <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
+        <C extends GrpcClient, E extends Event, L extends EventListener<E>>
         extends AbstractListenerManager<E, L>
-        implements GrpcClientController<K, C> {
+        implements GrpcClientController<C> {
 
     /**
      * The default max inbound message size (MB).
      */
-    private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
-    private static final int MEGABYTES = 1024 * 1024;
     private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
 
     private final Logger log = getLogger(getClass());
-    private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
-    private final Map<K, C> clients = Maps.newHashMap();
-    private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+    private final Map<DeviceId, C> clients = Maps.newHashMap();
     private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
             deviceAgentListeners = Maps.newConcurrentMap();
     private final Class<E> eventClass;
+    private final String serviceName;
     private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected GrpcChannelController grpcChannelController;
-
-    public AbstractGrpcClientController(Class<E> eventClass) {
+    public AbstractGrpcClientController(Class<E> eventClass, String serviceName) {
         this.eventClass = eventClass;
+        this.serviceName = serviceName;
     }
 
     @Activate
@@ -97,162 +82,67 @@
     @Deactivate
     public void deactivate() {
         eventDispatcher.removeSink(eventClass);
-        clientKeys.keySet().forEach(this::removeClient);
-        clientKeys.clear();
         clients.clear();
-        channelIds.clear();
         deviceAgentListeners.clear();
         log.info("Stopped");
     }
 
     @Override
-    public boolean createClient(K clientKey) {
-        checkNotNull(clientKey);
-        return withDeviceLock(() -> doCreateClient(clientKey),
-                              clientKey.deviceId());
+    public boolean create(DeviceId deviceId, ManagedChannel channel) {
+        checkNotNull(deviceId);
+        checkNotNull(channel);
+        return withDeviceLock(() -> doCreateClient(deviceId, channel), deviceId);
     }
 
-    private boolean doCreateClient(K clientKey) {
-        DeviceId deviceId = clientKey.deviceId();
+    private boolean doCreateClient(DeviceId deviceId, ManagedChannel channel) {
 
-        if (clientKeys.containsKey(deviceId)) {
-            final GrpcClientKey existingKey = clientKeys.get(deviceId);
-            if (clientKey.equals(existingKey)) {
-                log.debug("Not creating {} as it already exists... (key={})",
-                          clientName(clientKey), clientKey);
-                return true;
-            } else {
-                throw new IllegalArgumentException(format(
-                        "A client already exists for device %s (%s)",
-                        deviceId, clientKey));
-            }
-        }
-
-        final String method = clientKey.requiresSecureChannel()
-                ? "TLS" : "plaintext TCP";
-
-        log.info("Connecting {} client for {} to server at {} using {}...",
-                 clientKey.serviceName(), deviceId, clientKey.serveUri(), method);
-
-        SslContext sslContext = null;
-        if (clientKey.requiresSecureChannel()) {
-            try {
-                // FIXME: Accept any server certificate; this is insecure and
-                //  should not be used in production
-                sslContext = GrpcSslContexts.forClient().trustManager(
-                        InsecureTrustManagerFactory.INSTANCE).build();
-            } catch (SSLException e) {
-                log.error("Failed to build SSL Context", e);
-                return false;
-            }
-        }
-
-        GrpcChannelId channelId = GrpcChannelId.of(clientKey.toString());
-        NettyChannelBuilder channelBuilder = NettyChannelBuilder
-                .forAddress(clientKey.serveUri().getHost(),
-                            clientKey.serveUri().getPort())
-                .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
-
-        if (sslContext != null) {
-            channelBuilder
-                    .sslContext(sslContext)
-                    .useTransportSecurity();
-        } else {
-            channelBuilder.usePlaintext();
-        }
-
-        final ManagedChannel channel;
-
-        try {
-            channel = grpcChannelController.connectChannel(channelId, channelBuilder);
-        } catch (Throwable e) {
-            log.warn("Failed to connect to {} ({}) using {}: {}",
-                     deviceId, clientKey.serveUri(), method, e.toString());
-            log.debug("gRPC client connection exception", e);
-            return false;
+        if (clients.containsKey(deviceId)) {
+            throw new IllegalArgumentException(format(
+                    "A %s client already exists for %s", serviceName, deviceId));
         }
 
         final C client;
         try {
-            client = createClientInstance(clientKey, channel);
+            client = createClientInstance(deviceId, channel);
         } catch (Throwable e) {
-            log.error("Exception while creating {}", clientName(clientKey), e);
-            grpcChannelController.disconnectChannel(channelId);
+            log.error("Exception while creating {}", clientName(deviceId), e);
             return false;
         }
 
         if (client == null) {
-            log.error("Unable to create {}, implementation returned null... (key={})",
-                      clientName(clientKey), clientKey);
-            grpcChannelController.disconnectChannel(channelId);
+            log.error("Unable to create {}, implementation returned null...",
+                      clientName(deviceId));
             return false;
         }
 
-        clientKeys.put(deviceId, clientKey);
-        clients.put(clientKey, client);
-        channelIds.put(deviceId, channelId);
-
+        clients.put(deviceId, client);
         return true;
     }
 
-    protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
+    protected abstract C createClientInstance(DeviceId deviceId, ManagedChannel channel);
 
     @Override
-    public C getClient(DeviceId deviceId) {
+    public C get(DeviceId deviceId) {
         checkNotNull(deviceId);
-        return withDeviceLock(() -> doGetClient(deviceId), deviceId);
+        return withDeviceLock(() -> clients.get(deviceId), deviceId);
     }
 
-    private C doGetClient(DeviceId deviceId) {
-        if (!clientKeys.containsKey(deviceId)) {
+    @Override
+    public void remove(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        withDeviceLock(() -> {
+            final C client = clients.remove(deviceId);
+            if (client != null) {
+                client.shutdown();
+            }
             return null;
-        }
-        return clients.get(clientKeys.get(deviceId));
-    }
-
-    @Override
-    public C getClient(K clientKey) {
-        checkNotNull(clientKey);
-        return clients.get(clientKey);
-    }
-
-    @Override
-    public void removeClient(DeviceId deviceId) {
-        checkNotNull(deviceId);
-        withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
-    }
-
-    @Override
-    public void removeClient(K clientKey) {
-        checkNotNull(clientKey);
-        withDeviceLock(() -> doRemoveClient(clientKey), clientKey.deviceId());
-    }
-
-    private Void doRemoveClient(DeviceId deviceId) {
-        if (clientKeys.containsKey(deviceId)) {
-            doRemoveClient(clientKeys.get(deviceId));
-        }
-        return null;
-    }
-
-    private Void doRemoveClient(K clientKey) {
-        if (clients.containsKey(clientKey)) {
-            clients.get(clientKey).shutdown();
-        }
-        if (channelIds.containsKey(clientKey.deviceId())) {
-            grpcChannelController.disconnectChannel(
-                    channelIds.get(clientKey.deviceId()));
-        }
-        clientKeys.remove(clientKey.deviceId());
-        clients.remove(clientKey);
-        channelIds.remove(clientKey.deviceId());
-        return null;
+        }, deviceId);
     }
 
     @Override
     public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
         checkNotNull(deviceId, "deviceId cannot be null");
-        checkNotNull(deviceId, "providerId cannot be null");
+        checkNotNull(providerId, "providerId cannot be null");
         checkNotNull(listener, "listener cannot be null");
         deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
         deviceAgentListeners.get(deviceId).put(providerId, listener);
@@ -277,7 +167,7 @@
         // We should have only one event delivery mechanism. We have two now
         // because we have two different types of events, DeviceAgentEvent and
         // controller/protocol specific ones (e.g. P4Runtime or gNMI).
-        // TODO: extend device agent event to allow delivery protocol-specific
+        // TODO: extend device agent event to allow delivery of protocol-specific
         //  events, e.g. packet-in
         checkNotNull(event);
         if (deviceAgentListeners.containsKey(event.subject())) {
@@ -296,7 +186,7 @@
         }
     }
 
-    private String clientName(GrpcClientKey key) {
-        return format("%s client for %s", key.serviceName(), key.deviceId());
+    private String clientName(DeviceId deviceId) {
+        return format("%s client for %s", serviceName, deviceId);
     }
 }
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index 96a1671..3be7706 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -16,18 +16,16 @@
 
 package org.onosproject.grpc.ctl;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Striped;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.grpc.api.GrpcChannelController;
-import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.proto.dummy.Dummy;
-import org.onosproject.grpc.proto.dummy.DummyServiceGrpc;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -38,16 +36,19 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLException;
+import java.net.URI;
 import java.util.Dictionary;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.lang.String.format;
 import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG;
 import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG_DEFAULT;
@@ -61,6 +62,12 @@
         })
 public class GrpcChannelControllerImpl implements GrpcChannelController {
 
+    private static final String GRPC = "grpc";
+    private static final String GRPCS = "grpcs";
+
+    private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
+    private static final int MEGABYTES = 1024 * 1024;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService componentConfigService;
 
@@ -72,8 +79,8 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private Map<GrpcChannelId, ManagedChannel> channels;
-    private Map<GrpcChannelId, GrpcLoggingInterceptor> interceptors;
+    private Map<URI, ManagedChannel> channels;
+    private Map<URI, GrpcLoggingInterceptor> interceptors;
 
     private final Striped<Lock> channelLocks = Striped.lock(30);
 
@@ -109,129 +116,109 @@
     }
 
     @Override
-    public ManagedChannel connectChannel(GrpcChannelId channelId,
-                                         ManagedChannelBuilder<?> channelBuilder) {
-        checkNotNull(channelId);
-        checkNotNull(channelBuilder);
-
-        Lock lock = channelLocks.get(channelId);
-        lock.lock();
-
-        try {
-            if (channels.containsKey(channelId)) {
-                throw new IllegalArgumentException(format(
-                        "A channel with ID '%s' already exists", channelId));
-            }
-
-            final GrpcLoggingInterceptor interceptor = new GrpcLoggingInterceptor(
-                    channelId, enableMessageLog);
-            channelBuilder.intercept(interceptor);
-
-            ManagedChannel channel = channelBuilder.build();
-            // Forced connection API is still experimental. Use workaround...
-            // channel.getState(true);
-            try {
-                doDummyMessage(channel);
-            } catch (StatusRuntimeException e) {
-                interceptor.close();
-                shutdownNowAndWait(channel, channelId);
-                throw e;
-            }
-            // If here, channel is open.
-            channels.put(channelId, channel);
-            interceptors.put(channelId, interceptor);
-            return channel;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
-        DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
-                .newBlockingStub(channel)
-                .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-        try {
-            //noinspection ResultOfMethodCallIgnored
-            dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
-                                       .getDefaultInstance());
-        } catch (StatusRuntimeException e) {
-            if (!e.getStatus().equals(Status.UNIMPLEMENTED)) {
-                // UNIMPLEMENTED means that the server received our message but
-                // doesn't know how to handle it. Hence, channel is open.
-                throw e;
-            }
-        }
+    public ManagedChannel create(URI channelUri) {
+        return create(channelUri, makeChannelBuilder(channelUri));
     }
 
     @Override
-    public void disconnectChannel(GrpcChannelId channelId) {
-        checkNotNull(channelId);
+    public ManagedChannel create(URI channelUri, ManagedChannelBuilder<?> channelBuilder) {
+        checkNotNull(channelUri);
+        checkNotNull(channelBuilder);
 
-        Lock lock = channelLocks.get(channelId);
-        lock.lock();
+        channelLocks.get(channelUri).lock();
         try {
-            final ManagedChannel channel = channels.remove(channelId);
-            if (channel != null) {
-                shutdownNowAndWait(channel, channelId);
+            if (channels.containsKey(channelUri)) {
+                throw new IllegalArgumentException(format(
+                        "A channel with ID '%s' already exists", channelUri));
             }
-            final GrpcLoggingInterceptor interceptor = interceptors.remove(channelId);
+
+            log.info("Creating new gRPC channel {}...", channelUri);
+
+            final GrpcLoggingInterceptor interceptor = new GrpcLoggingInterceptor(
+                    channelUri, enableMessageLog);
+            channelBuilder.intercept(interceptor);
+
+            final ManagedChannel channel = channelBuilder.build();
+
+            channels.put(channelUri, channelBuilder.build());
+            interceptors.put(channelUri, interceptor);
+
+            return channel;
+        } finally {
+            channelLocks.get(channelUri).unlock();
+        }
+    }
+
+    private NettyChannelBuilder makeChannelBuilder(URI channelUri) {
+
+        checkArgument(channelUri.getScheme().equals(GRPC)
+                              || channelUri.getScheme().equals(GRPCS),
+                      format("Server URI scheme must be %s or %s", GRPC, GRPCS));
+        checkArgument(!isNullOrEmpty(channelUri.getHost()),
+                      "Server host address should not be empty");
+        checkArgument(channelUri.getPort() > 0 && channelUri.getPort() <= 65535,
+                      "Invalid server port");
+
+        final boolean useTls = channelUri.getScheme().equals(GRPCS);
+
+        final NettyChannelBuilder channelBuilder = NettyChannelBuilder
+                .forAddress(channelUri.getHost(),
+                            channelUri.getPort())
+                .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
+
+        if (useTls) {
+            try {
+                // Accept any server certificate; this is insecure and
+                // should not be used in production.
+                final SslContext sslContext = GrpcSslContexts.forClient().trustManager(
+                        InsecureTrustManagerFactory.INSTANCE).build();
+                channelBuilder.sslContext(sslContext).useTransportSecurity();
+            } catch (SSLException e) {
+                log.error("Failed to build SSL context", e);
+                return null;
+            }
+        } else {
+            channelBuilder.usePlaintext();
+        }
+
+        return channelBuilder;
+    }
+
+    @Override
+    public void destroy(URI channelUri) {
+        checkNotNull(channelUri);
+
+        channelLocks.get(channelUri).lock();
+        try {
+            final ManagedChannel channel = channels.remove(channelUri);
+            if (channel != null) {
+                shutdownNowAndWait(channel, channelUri);
+            }
+            final GrpcLoggingInterceptor interceptor = interceptors.remove(channelUri);
             if (interceptor != null) {
                 interceptor.close();
             }
         } finally {
-            lock.unlock();
+            channelLocks.get(channelUri).unlock();
         }
     }
 
-    private void shutdownNowAndWait(ManagedChannel channel, GrpcChannelId channelId) {
+    private void shutdownNowAndWait(ManagedChannel channel, URI channelUri) {
         try {
             if (!channel.shutdownNow()
                     .awaitTermination(5, TimeUnit.SECONDS)) {
-                log.error("Channel '{}' didn't terminate, although we " +
-                                  "triggered a shutdown and waited",
-                          channelId);
+                log.error("Channel {} did not terminate properly",
+                          channelUri);
             }
         } catch (InterruptedException e) {
-            log.warn("Channel {} didn't shutdown in time", channelId);
+            log.warn("Channel {} didn't shutdown in time", channelUri);
             Thread.currentThread().interrupt();
         }
     }
 
     @Override
-    public Map<GrpcChannelId, ManagedChannel> getChannels() {
-        return ImmutableMap.copyOf(channels);
-    }
-
-    @Override
-    public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
-        checkNotNull(channelId);
-
-        Lock lock = channelLocks.get(channelId);
-        lock.lock();
-        try {
-            return Optional.ofNullable(channels.get(channelId));
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId) {
-        final ManagedChannel channel = channels.get(channelId);
-        if (channel == null) {
-            log.warn("Unable to find any channel with ID {}, cannot send probe",
-                     channelId);
-            return CompletableFuture.completedFuture(false);
-        }
-        return CompletableFuture.supplyAsync(() -> {
-            try {
-                doDummyMessage(channel);
-                return true;
-            } catch (StatusRuntimeException e) {
-                log.debug("Probe for {} failed", channelId);
-                log.debug("", e);
-                return false;
-            }
-        });
+    public Optional<ManagedChannel> get(URI channelUri) {
+        checkNotNull(channelUri);
+        return Optional.ofNullable(channels.get(channelUri));
     }
 }
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java
index 45ec5ba..6f06c11 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java
@@ -26,12 +26,12 @@
 import io.grpc.MethodDescriptor;
 import io.grpc.Status;
 import io.grpc.protobuf.lite.ProtoLiteUtils;
-import org.onosproject.grpc.api.GrpcChannelId;
 import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.net.URI;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.StringJoiner;
@@ -55,13 +55,13 @@
     private static final Logger log = getLogger(GrpcLoggingInterceptor.class);
 
     private final AtomicLong callIdGenerator = new AtomicLong();
-    private final GrpcChannelId channelId;
+    private final URI channelUri;
     private final AtomicBoolean enabled;
 
     private FileWriter writer;
 
-    GrpcLoggingInterceptor(GrpcChannelId channelId, AtomicBoolean enabled) {
-        this.channelId = channelId;
+    GrpcLoggingInterceptor(URI channelUri, AtomicBoolean enabled) {
+        this.channelUri = channelUri;
         this.enabled = enabled;
     }
 
@@ -69,14 +69,13 @@
         if (writer != null) {
             return true;
         }
-        final String safeChName = channelId.id()
-                .replaceAll("[^A-Za-z0-9]", "_");
-        final String fileName = format("grpc_%s_", safeChName).toLowerCase();
+        final String safeChName = channelUri.toString()
+                .replaceAll("[^A-Za-z0-9]", "_").toLowerCase();
         try {
-            final File tmpFile = File.createTempFile(fileName, ".log");
+            final File tmpFile = File.createTempFile(safeChName + "_", ".log");
             this.writer = new FileWriter(tmpFile);
             log.info("Created gRPC call log file for channel {}: {}",
-                     channelId, tmpFile.getAbsolutePath());
+                     channelUri, tmpFile.getAbsolutePath());
             return true;
         } catch (IOException e) {
             log.error("Unable to initialize gRPC call log writer", e);
@@ -90,7 +89,7 @@
                 return;
             }
             try {
-                log.info("Closing log writer for {}...", channelId);
+                log.info("Closing log writer for {}...", channelUri);
                 writer.close();
             } catch (IOException e) {
                 log.error("Unable to close gRPC call log writer", e);