[ONOS-7829] Implement AbstractGrpcClient and AbstractGrpcClientControl

Change-Id: I39cba6834e7fe8d1b60b576b9934c0b3cfa7104b
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiDeviceDescriptionDiscovery.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiDeviceDescriptionDiscovery.java
index e935920..934d76d 100644
--- a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiDeviceDescriptionDiscovery.java
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiDeviceDescriptionDiscovery.java
@@ -24,8 +24,8 @@
 import io.grpc.StatusRuntimeException;
 import io.grpc.netty.NettyChannelBuilder;
 import io.grpc.stub.StreamObserver;
+import org.onosproject.grpc.api.GrpcChannelController;
 import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.api.GrpcController;
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Port;
@@ -139,7 +139,7 @@
 
         DeviceId deviceId = handler().data().deviceId();
 
-        GrpcController controller = handler().get(GrpcController.class);
+        GrpcChannelController controller = handler().get(GrpcChannelController.class);
         ManagedChannel channel = null;
 
         //FIXME can be optimized
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
index 3f6c9ad..90e7a31 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
@@ -27,6 +27,7 @@
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.net.pi.service.PiTranslationService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -157,7 +158,9 @@
             return null;
         }
 
-        if (!controller.createClient(deviceId, serverAddr, serverPort, p4DeviceId)) {
+        P4RuntimeClientKey clientKey = new
+                P4RuntimeClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
+        if (!controller.createClient(clientKey)) {
             log.warn("Unable to create client for {}, aborting operation", deviceId);
             return null;
         }
diff --git a/modules.defs b/modules.defs
index b4097c7..5c16ff3 100644
--- a/modules.defs
+++ b/modules.defs
@@ -231,7 +231,7 @@
     '//apps/l3vpn:onos-apps-l3vpn-oar',
     '//apps/openroadm:onos-apps-openroadm-oar',
     '//apps/artemis:onos-apps-artemis-oar',
-    '//apps/pi-demo/ecmp:onos-apps-pi-demo-ecmp-oar',
+    #'//apps/pi-demo/ecmp:onos-apps-pi-demo-ecmp-oar',
     '//apps/gluon:onos-apps-gluon-oar',
     '//apps/evpnopenflow:onos-apps-evpnopenflow-oar',
     '//apps/route-service:onos-apps-route-service-oar',
@@ -256,7 +256,7 @@
     '//apps/mcast:onos-apps-mcast-oar',
     '//apps/layout:onos-apps-layout-oar',
     '//apps/imr:onos-apps-imr-oar',
-    '//apps/inbandtelemetry/app:onos-apps-inbandtelemetry-app-oar',
+    #'//apps/inbandtelemetry/app:onos-apps-inbandtelemetry-app-oar',
     '//apps/workflow:onos-apps-workflow-oar',
     # nodemetrics application
     '//apps/nodemetrics:onos-apps-nodemetrics-oar',
diff --git a/pipelines/basic/BUCK b/pipelines/basic/BUCK
index 26e74dd..21b9801 100644
--- a/pipelines/basic/BUCK
+++ b/pipelines/basic/BUCK
@@ -5,6 +5,7 @@
     '//protocols/p4runtime/model:onos-protocols-p4runtime-model',
     '//protocols/p4runtime/api:onos-protocols-p4runtime-api',
     '//apps/inbandtelemetry/api:onos-apps-inbandtelemetry-api',
+    '//protocols/grpc/api:onos-protocols-grpc-api',
 ]
 
 BUNDLES = [
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
similarity index 98%
rename from protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
rename to protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
index 33df22b..5b6771c 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
@@ -30,7 +30,7 @@
  * Abstraction of a gRPC controller that stores and manages gRPC channels.
  */
 @Beta
-public interface GrpcController {
+public interface GrpcChannelController {
 
     int CONNECTION_TIMEOUT_SECONDS = 20;
 
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
new file mode 100644
index 0000000..d040a23
--- /dev/null
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.api;
+
+import com.google.common.annotations.Beta;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction of a gRPC client.
+ *
+ */
+@Beta
+public interface GrpcClient {
+
+    /**
+     * Shutdowns the client by terminating any active RPC.
+     *
+     * @return a completable future to signal the completion of the shutdown
+     * procedure
+     */
+    CompletableFuture<Void> shutdown();
+}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
new file mode 100644
index 0000000..087898b
--- /dev/null
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.api;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Abstraction of a gRPC controller which controls specific gRPC
+ * client {@link C} with specific client key {@link K}.
+ *
+ * @param <K> the gRPC client key
+ * @param <C> the gRPC client type
+ */
+@Beta
+public interface GrpcClientController<K extends GrpcClientKey, C extends GrpcClient> {
+
+    /**
+     * Instantiates a new client to operate on a gRPC server identified by
+     * the given information. As a result of this method, a client can be later
+     * obtained by invoking {@link #getClient(DeviceId)}.
+     *
+     * Only one client can exist for the same device ID. Calls to this method are
+     * idempotent fot the same client key, i.e. returns true
+     * if such client already exists but a new one is not created.
+     * If there exists a client with same device ID but different address and port,
+     * removes old one and recreate new one.
+     *
+     * @param clientKey the client key
+     * @return true if the client was created and the channel to the server is open;
+     *         false otherwise
+     */
+    boolean createClient(K clientKey);
+
+    /**
+     * Retrieves the gRPC client to operate on the given device.
+     *
+     * @param deviceId the device identifier
+     * @return the gRPC client of the device if exists; null otherwise
+     */
+    C getClient(DeviceId deviceId);
+
+    /**
+     * Removes the gRPC client for the given device. If no client
+     * exists for the given device, the result is a no-op.
+     *
+     * @param deviceId the device identifier
+     */
+    void removeClient(DeviceId deviceId);
+
+    /**
+     * Check reachability of the gRPC server running on the given device.
+     * Reachability can be tested only if a client is previously created
+     * using {@link #createClient(GrpcClientKey)}.
+     * Different gRPC service may have different ways to test if it is
+     * reachable or not.
+     *
+     * @param deviceId the device identifier
+     * @return true of client was created and is able to contact the gNMI server;
+     *         false otherwise
+     */
+    boolean isReachable(DeviceId deviceId);
+}
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
new file mode 100644
index 0000000..ad8a7da
--- /dev/null
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.api;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import org.onosproject.net.DeviceId;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Key that uniquely identifies a gRPC client.
+ */
+@Beta
+public class GrpcClientKey {
+    private final String serviceName;
+    private final DeviceId deviceId;
+    private final String serverAddr;
+    private final int serverPort;
+
+    /**
+     * Creates a new client key.
+     *
+     * @param serviceName gRPC service name of the client
+     * @param deviceId ONOS device ID
+     * @param serverAddr gRPC server address
+     * @param serverPort gRPC server port
+     */
+    public GrpcClientKey(String serviceName, DeviceId deviceId, String serverAddr, int serverPort) {
+        checkNotNull(serviceName);
+        checkNotNull(deviceId);
+        checkNotNull(serverAddr);
+        checkArgument(!serviceName.isEmpty(),
+                "Service name can not be null");
+        checkArgument(!serverAddr.isEmpty(),
+                "Server address should not be empty");
+        checkArgument(serverPort > 0 && serverPort <= 65535, "Invalid server port");
+        this.serviceName = serviceName;
+        this.deviceId = deviceId;
+        this.serverAddr = serverAddr;
+        this.serverPort = serverPort;
+    }
+
+    /**
+     * Gets the gRPC service name of the client.
+     *
+     * @return the service name
+     */
+    public String serviceName() {
+        return serviceName;
+    }
+
+    /**
+     * Gets the device ID.
+     *
+     * @return the device ID
+     */
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    /**
+     * Gets the gRPC server address.
+     *
+     * @return the gRPC server address.
+     */
+    public String serverAddr() {
+        return serverAddr;
+    }
+
+    /**
+     * Gets the gRPC server port.
+     *
+     * @return the gRPC server port.
+     */
+    public int serverPort() {
+        return serverPort;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        GrpcClientKey that = (GrpcClientKey) o;
+        return serverPort == that.serverPort &&
+                Objects.equal(serviceName, that.serviceName) &&
+                Objects.equal(deviceId, that.deviceId) &&
+                Objects.equal(serverAddr, that.serverAddr);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(serviceName, deviceId, serverAddr, serverPort);
+    }
+
+    protected MoreObjects.ToStringHelper toStringHelper() {
+        return MoreObjects.toStringHelper(this)
+                .add("serviceName", serviceName)
+                .add("deviceId", deviceId)
+                .add("serverAddr", serverAddr)
+                .add("serverPort", serverPort);
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper().toString();
+    }
+}
\ No newline at end of file
diff --git a/protocols/grpc/ctl/BUILD b/protocols/grpc/ctl/BUILD
index 20c7ab8..ac0703d 100644
--- a/protocols/grpc/ctl/BUILD
+++ b/protocols/grpc/ctl/BUILD
@@ -2,6 +2,7 @@
     "//protocols/grpc/api:onos-protocols-grpc-api",
     "//protocols/grpc/proto:onos-protocols-grpc-proto",
     "@io_grpc_grpc_java//core",
+    "@io_grpc_grpc_java//netty",
 ]
 
 osgi_jar(
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
new file mode 100644
index 0000000..4764a56
--- /dev/null
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.ctl;
+
+import io.grpc.Context;
+import io.grpc.ManagedChannel;
+import io.grpc.StatusRuntimeException;
+import org.onlab.util.SharedExecutors;
+import org.onosproject.grpc.api.GrpcClient;
+import org.onosproject.grpc.api.GrpcClientKey;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Abstract client for gRPC service.
+ *
+ */
+public abstract class AbstractGrpcClient implements GrpcClient {
+
+    // Timeout in seconds to obtain the request lock.
+    protected static final int LOCK_TIMEOUT = 60;
+    private static final int DEFAULT_THREAD_POOL_SIZE = 10;
+
+    protected final Logger log = getLogger(getClass());
+
+    protected final Lock requestLock = new ReentrantLock();
+    protected final Context.CancellableContext cancellableContext =
+            Context.current().withCancellation();
+    protected final ExecutorService executorService;
+    protected final Executor contextExecutor;
+
+    protected ManagedChannel channel;
+    protected DeviceId deviceId;
+
+    protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel) {
+        this.deviceId = clientKey.deviceId();
+        this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
+                "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
+        this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
+        this.channel = channel;
+    }
+
+    @Override
+    public CompletableFuture<Void> shutdown() {
+        return supplyWithExecutor(this::doShutdown, "shutdown",
+                SharedExecutors.getPoolThreadExecutor());
+    }
+
+    protected Void doShutdown() {
+        log.debug("Shutting down client for {}...", deviceId);
+        cancellableContext.cancel(new InterruptedException(
+                "Requested client shutdown"));
+        this.executorService.shutdownNow();
+        try {
+            executorService.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("Executor service didn't shutdown in time.");
+            Thread.currentThread().interrupt();
+        }
+        return null;
+    }
+
+    /**
+     * Equivalent of supplyWithExecutor using the gRPC context executor of this
+     * client, such that if the context is cancelled (e.g. client shutdown) the
+     * RPC is automatically cancelled.
+     *
+     * @param <U> return type of supplier
+     * @param supplier the supplier to be executed
+     * @param opDescription the description of this supplier
+     * @return CompletableFuture includes the result of supplier
+     */
+    protected  <U> CompletableFuture<U> supplyInContext(
+            Supplier<U> supplier, String opDescription) {
+        return supplyWithExecutor(supplier, opDescription, contextExecutor);
+    }
+
+    /**
+     * Submits a task for async execution via the given executor. All tasks
+     * submitted with this method will be executed sequentially.
+     *
+     * @param <U> return type of supplier
+     * @param supplier the supplier to be executed
+     * @param opDescription the description of this supplier
+     * @param executor the executor to execute this supplier
+     * @return CompletableFuture includes the result of supplier
+     */
+    protected <U> CompletableFuture<U> supplyWithExecutor(
+            Supplier<U> supplier, String opDescription, Executor executor) {
+        return CompletableFuture.supplyAsync(() -> {
+            // TODO: explore a more relaxed locking strategy.
+            try {
+                if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
+                    log.error("LOCK TIMEOUT! This is likely a deadlock, "
+                                    + "please debug (executing {})",
+                            opDescription);
+                    throw new IllegalThreadStateException("Lock timeout");
+                }
+            } catch (InterruptedException e) {
+                log.warn("Thread interrupted while waiting for lock (executing {})",
+                        opDescription);
+                throw new IllegalStateException(e);
+            }
+            try {
+                return supplier.get();
+            } catch (StatusRuntimeException ex) {
+                log.warn("Unable to execute {} on {}: {}",
+                        opDescription, deviceId, ex.toString());
+                throw ex;
+            } catch (Throwable ex) {
+                log.error("Exception in client of {}, executing {}",
+                        deviceId, opDescription, ex);
+                throw ex;
+            } finally {
+                requestLock.unlock();
+            }
+        }, executor);
+    }
+}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
new file mode 100644
index 0000000..9ae14da
--- /dev/null
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.grpc.ctl;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.event.Event;
+import org.onosproject.event.EventListener;
+import org.onosproject.grpc.api.GrpcChannelController;
+import org.onosproject.grpc.api.GrpcChannelId;
+import org.onosproject.grpc.api.GrpcClient;
+import org.onosproject.grpc.api.GrpcClientController;
+import org.onosproject.grpc.api.GrpcClientKey;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Abstract class of a gRPC based client controller for specific gRPC client
+ * which provides basic gRPC client management and thread safe mechanism.
+ *
+ * @param <C> the gRPC client type
+ * @param <K> the key type of the gRPC client
+ * @param <E> the event type of the gRPC client
+ * @param <L> the event listener of event {@link E}
+ */
+@Component
+public abstract class AbstractGrpcClientController
+        <K extends GrpcClientKey, C extends GrpcClient, E extends Event, L extends EventListener<E>>
+        extends AbstractListenerManager<E, L>
+        implements GrpcClientController<K, C> {
+
+    /**
+     * The default max inbound message size (MB).
+     */
+    private static final int DEFAULT_MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
+    private static final int MEGABYTES = 1024 * 1024;
+    private static final int DEFAULT_DEVICE_LOCK_SIZE = 30;
+
+    private final Logger log = getLogger(getClass());
+    private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
+    private final Map<K, C> clients = Maps.newHashMap();
+    private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+    private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private GrpcChannelController grpcChannelController;
+
+    @Activate
+    public void activate() {
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clientKeys.keySet().forEach(this::removeClient);
+        clientKeys.clear();
+        clients.clear();
+        channelIds.clear();
+        log.info("Stopped");
+    }
+
+    @Override
+    public boolean createClient(K clientKey) {
+        checkNotNull(clientKey);
+        return withDeviceLock(() -> doCreateClient(clientKey), clientKey.deviceId());
+    }
+
+    private boolean doCreateClient(K clientKey) {
+        DeviceId deviceId = clientKey.deviceId();
+        String serverAddr = clientKey.serverAddr();
+        int serverPort = clientKey.serverPort();
+
+        if (clientKeys.containsKey(deviceId)) {
+            final GrpcClientKey existingKey = clientKeys.get(deviceId);
+            if (clientKey.equals(existingKey)) {
+                log.debug("Not creating client for {} as it already exists (key={})...",
+                        deviceId, clientKey);
+                return true;
+            } else {
+                log.info("Requested client for {} with new " +
+                                "endpoint, removing old client (key={})...",
+                        deviceId, clientKey);
+                doRemoveClient(deviceId);
+            }
+        }
+        log.info("Creating client for {} (server={}:{})...",
+                deviceId, serverAddr, serverPort);
+        GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(), clientKey.toString());
+        ManagedChannelBuilder channelBuilder = NettyChannelBuilder
+                .forAddress(serverAddr, serverPort)
+                .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES)
+                .usePlaintext();
+
+        ManagedChannel channel;
+        try {
+            channel = grpcChannelController.connectChannel(channelId, channelBuilder);
+        } catch (IOException e) {
+            log.warn("Unable to connect to gRPC server of {}: {}",
+                    clientKey.deviceId(), e.getMessage());
+            return false;
+        }
+
+        C client = createClientInstance(clientKey, channel);
+        if (client == null) {
+            log.warn("Cannot create client for {} (key={})", deviceId, clientKey);
+            return false;
+        }
+        clientKeys.put(deviceId, clientKey);
+        clients.put(clientKey, client);
+        channelIds.put(deviceId, channelId);
+
+        return true;
+    }
+
+    protected abstract C createClientInstance(K clientKey, ManagedChannel channel);
+
+    @Override
+    public C getClient(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        return withDeviceLock(() -> doGetClient(deviceId), deviceId);
+    }
+
+    protected C doGetClient(DeviceId deviceId) {
+        if (!clientKeys.containsKey(deviceId)) {
+            return null;
+        }
+        return clients.get(clientKeys.get(deviceId));
+    }
+
+    @Override
+    public void removeClient(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
+    }
+
+    private Void doRemoveClient(DeviceId deviceId) {
+        if (clientKeys.containsKey(deviceId)) {
+            final K clientKey = clientKeys.get(deviceId);
+            clients.get(clientKey).shutdown();
+            grpcChannelController.disconnectChannel(channelIds.get(deviceId));
+            clientKeys.remove(deviceId);
+            clients.remove(clientKey);
+            channelIds.remove(deviceId);
+        }
+        return null;
+    }
+
+    @Override
+    public boolean isReachable(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
+    }
+
+    protected boolean doIsReachable(DeviceId deviceId) {
+        // Default behaviour checks only the gRPC channel, should
+        // check according to different gRPC service
+        if (!clientKeys.containsKey(deviceId)) {
+            log.debug("No client for {}, can't check for reachability", deviceId);
+            return false;
+        }
+        return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
+    }
+
+    private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
+        final Lock lock = stripedLocks.get(deviceId);
+        lock.lock();
+        try {
+            return task.get();
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
similarity index 97%
rename from protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
rename to protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index fb87571..6bd6a82 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -41,8 +41,8 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.grpc.api.GrpcChannelController;
 import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.api.GrpcController;
 import org.onosproject.grpc.proto.dummy.Dummy;
 import org.onosproject.grpc.proto.dummy.DummyServiceGrpc;
 import org.onosproject.net.DeviceId;
@@ -64,12 +64,13 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
- * Default implementation of the GrpcController.
+ * Default implementation of the GrpcChannelController.
  */
 @Component(immediate = true)
 @Service
-public class GrpcControllerImpl implements GrpcController {
+public class GrpcChannelControllerImpl implements GrpcChannelController {
 
+    // FIXME: Should use message size to determine whether it needs to log the message or not.
     private  static final String SET_FORWARDING_PIPELINE_CONFIG_METHOD = "p4.P4Runtime/SetForwardingPipelineConfig";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
diff --git a/protocols/p4runtime/api/BUCK b/protocols/p4runtime/api/BUCK
index fd05763..c775aea 100644
--- a/protocols/p4runtime/api/BUCK
+++ b/protocols/p4runtime/api/BUCK
@@ -3,6 +3,7 @@
 COMPILE_DEPS = [
     '//lib:CORE_DEPS',
     '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
+    '//protocols/grpc/api:onos-protocols-grpc-api',
 ]
 
 TEST_DEPS = [
diff --git a/protocols/p4runtime/api/BUILD b/protocols/p4runtime/api/BUILD
index 531d734..2fb15ba 100644
--- a/protocols/p4runtime/api/BUILD
+++ b/protocols/p4runtime/api/BUILD
@@ -1,4 +1,5 @@
 COMPILE_DEPS = CORE_DEPS + [
+    "//protocols/grpc/api:onos-protocols-grpc-api",
     "@io_grpc_grpc_java//core",
 ]
 
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
index f44c629..7a08668 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
@@ -17,6 +17,7 @@
 package org.onosproject.p4runtime.api;
 
 import com.google.common.annotations.Beta;
+import org.onosproject.grpc.api.GrpcClient;
 import org.onosproject.net.pi.model.PiActionProfileId;
 import org.onosproject.net.pi.model.PiCounterId;
 import org.onosproject.net.pi.model.PiMeterId;
@@ -42,7 +43,7 @@
  * Client to control a P4Runtime device.
  */
 @Beta
-public interface P4RuntimeClient {
+public interface P4RuntimeClient extends GrpcClient {
 
     /**
      * Type of write operation.
@@ -70,15 +71,6 @@
     boolean isStreamChannelOpen();
 
     /**
-     * Shutdowns the client by terminating any active RPC such as the Stream
-     * one.
-     *
-     * @return a completable future to signal the completion of the shutdown
-     * procedure
-     */
-    CompletableFuture<Void> shutdown();
-
-    /**
      * Sends a master arbitration update to the device with a new election ID
      * that is guaranteed to be the highest value between all clients.
      *
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
new file mode 100644
index 0000000..70bf33c
--- /dev/null
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.api;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.grpc.api.GrpcClientKey;
+import org.onosproject.net.DeviceId;
+
+import java.util.Objects;
+
+/**
+ * Key that uniquely identifies a P4Runtime client.
+ */
+@Beta
+public final class P4RuntimeClientKey extends GrpcClientKey {
+    private static final String P4R_SERVICE_NAME = "p4runtime";
+    private final long p4DeviceId;
+
+    /**
+     * Creates a new client key.
+     *
+     * @param deviceId   ONOS device ID
+     * @param serverAddr P4Runtime server address
+     * @param serverPort P4Runtime server port
+     * @param p4DeviceId P4Runtime server-internal device ID
+     */
+    public P4RuntimeClientKey(DeviceId deviceId, String serverAddr,
+                              int serverPort, long p4DeviceId) {
+        super(P4R_SERVICE_NAME, deviceId, serverAddr, serverPort);
+        this.p4DeviceId = p4DeviceId;
+    }
+
+    /**
+     * Returns the P4Runtime server-internal device ID.
+     *
+     * @return P4Runtime server-internal device ID
+     */
+    public long p4DeviceId() {
+        return p4DeviceId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        P4RuntimeClientKey that = (P4RuntimeClientKey) o;
+        return p4DeviceId == that.p4DeviceId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), p4DeviceId);
+    }
+
+    @Override
+    public String toString() {
+        return super.toStringHelper()
+                .add("p4DeviceId", p4DeviceId)
+                .toString();
+    }
+}
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 159e313..0bc8ed6 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -18,6 +18,7 @@
 
 import com.google.common.annotations.Beta;
 import org.onosproject.event.ListenerService;
+import org.onosproject.grpc.api.GrpcClientController;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.provider.ProviderId;
@@ -27,73 +28,8 @@
  */
 @Beta
 public interface P4RuntimeController
-        extends ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
-
-    /**
-     * Instantiates a new client to operate on a P4Runtime device identified by
-     * the given information. As a result of this method, a {@link
-     * P4RuntimeClient} can be later obtained by invoking {@link
-     * #getClient(DeviceId)}. Returns true if the client was created and the
-     * channel to the device is open, false otherwise.
-     * <p>
-     * Only one client can exist for the same device ID. Calls to this method
-     * are idempotent for the same [device ID, address, port, p4DeviceId]
-     * triplet, i.e. returns true if such client already exists but a new one is
-     * not created. Throws an {@link IllegalStateException} if a client for
-     * device ID already exists but for different [address, port, p4DeviceId].
-     *
-     * @param deviceId   device identifier
-     * @param serverAddr address of the P4Runtime server
-     * @param serverPort port of the P4Runtime server
-     * @param p4DeviceId P4Runtime-specific device identifier
-     * @return true if the client was created and the channel to the device is
-     * open
-     * @throws IllegalStateException if a client already exists for this device
-     *                               ID but for different [address, port,
-     *                               p4DeviceId] triplet.
-     */
-    boolean createClient(DeviceId deviceId, String serverAddr, int serverPort,
-                         long p4DeviceId);
-
-    /**
-     * Returns a client to operate on the given device, or null if a client for
-     * such device does not exist in this controller.
-     *
-     * @param deviceId device identifier
-     * @return client instance or null
-     */
-    P4RuntimeClient getClient(DeviceId deviceId);
-
-    /**
-     * Removes the client for the given device. If no client exists for the
-     * given device identifier, the result is a no-op.
-     *
-     * @param deviceId device identifier
-     */
-    void removeClient(DeviceId deviceId);
-
-    /**
-     * Returns true if a client exists for the given device identifier, false
-     * otherwise.
-     *
-     * @param deviceId device identifier
-     * @return true if client exists, false otherwise.
-     */
-    boolean hasClient(DeviceId deviceId);
-
-    /**
-     * Returns true if the P4Runtime server running on the given device is
-     * reachable, i.e. the channel is open and the server is able to respond to
-     * RPCs, false otherwise. Reachability can be tested only if a client was
-     * previously created using {@link #createClient(DeviceId, String, int,
-     * long)}, otherwise this method returns false.
-     *
-     * @param deviceId device identifier.
-     * @return true if a client was created and is able to contact the P4Runtime
-     * server, false otherwise.
-     */
-    boolean isReachable(DeviceId deviceId);
-
+        extends GrpcClientController<P4RuntimeClientKey, P4RuntimeClient>,
+                ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
     /**
      * Adds a listener for device agent events for the given provider.
      *
diff --git a/protocols/p4runtime/ctl/BUILD b/protocols/p4runtime/ctl/BUILD
index d6c0c7a..66aab43 100644
--- a/protocols/p4runtime/ctl/BUILD
+++ b/protocols/p4runtime/ctl/BUILD
@@ -1,6 +1,7 @@
 COMPILE_DEPS = CORE_DEPS + KRYO + [
     "//core/store/serializers:onos-core-serializers",
     "//protocols/grpc/api:onos-protocols-grpc-api",
+    "//protocols/grpc/ctl:onos-protocols-grpc-ctl",
     "//protocols/p4runtime/api:onos-protocols-p4runtime-api",
     "//protocols/p4runtime/proto:onos-protocols-p4runtime-proto",
     "@com_google_protobuf//:protobuf_java",
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ClientKey.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ClientKey.java
deleted file mode 100644
index cc4bed0..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ClientKey.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import com.google.common.base.MoreObjects;
-import org.onosproject.net.DeviceId;
-
-import java.util.Objects;
-
-/**
- * Key that uniquely identifies a P4Runtime client.
- */
-final class ClientKey {
-
-    private final DeviceId deviceId;
-    private final String serverAddr;
-    private final int serverPort;
-    private final long p4DeviceId;
-
-    /**
-     * Creates a new client key.
-     *
-     * @param deviceId   ONOS device ID
-     * @param serverAddr P4Runtime server address
-     * @param serverPort P4Runtime server port
-     * @param p4DeviceId P4Runtime server-internal device ID
-     */
-    ClientKey(DeviceId deviceId, String serverAddr, int serverPort, long p4DeviceId) {
-        this.deviceId = deviceId;
-        this.serverAddr = serverAddr;
-        this.serverPort = serverPort;
-        this.p4DeviceId = p4DeviceId;
-    }
-
-    /**
-     * Returns the device ID.
-     *
-     * @return device ID.
-     */
-    public DeviceId deviceId() {
-        return deviceId;
-    }
-
-    /**
-     * Returns the P4Runtime server address.
-     *
-     * @return P4Runtime server address
-     */
-    public String serverAddr() {
-        return serverAddr;
-    }
-
-    /**
-     * Returns the P4Runtime server port.
-     *
-     * @return P4Runtime server port
-     */
-    public int serverPort() {
-        return serverPort;
-    }
-
-    /**
-     * Returns the P4Runtime server-internal device ID.
-     *
-     * @return P4Runtime server-internal device ID
-     */
-    public long p4DeviceId() {
-        return p4DeviceId;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(serverAddr, serverPort, p4DeviceId);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final ClientKey other = (ClientKey) obj;
-        return Objects.equals(this.serverAddr, other.serverAddr)
-                && Objects.equals(this.serverPort, other.serverPort)
-                && Objects.equals(this.p4DeviceId, other.p4DeviceId);
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("deviceId", deviceId)
-                .add("serverAddr", serverAddr)
-                .add("serverPort", serverPort)
-                .add("p4DeviceId", p4DeviceId)
-                .toString();
-    }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 821e744..dd968ce 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -23,7 +23,6 @@
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
-import io.grpc.Context;
 import io.grpc.ManagedChannel;
 import io.grpc.Metadata;
 import io.grpc.Status;
@@ -32,9 +31,8 @@
 import io.grpc.stub.ClientCallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import org.onlab.osgi.DefaultServiceDirectory;
-import org.onlab.util.SharedExecutors;
 import org.onlab.util.Tools;
-import org.onosproject.net.DeviceId;
+import org.onosproject.grpc.ctl.AbstractGrpcClient;
 import org.onosproject.net.pi.model.PiActionProfileId;
 import org.onosproject.net.pi.model.PiCounterId;
 import org.onosproject.net.pi.model.PiMeterId;
@@ -52,8 +50,8 @@
 import org.onosproject.net.pi.runtime.PiTableEntry;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
-import org.slf4j.Logger;
 import p4.config.v1.P4InfoOuterClass.P4Info;
 import p4.tmp.P4Config;
 import p4.v1.P4RuntimeGrpc;
@@ -87,22 +85,13 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 import static java.util.Collections.singletonList;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
 import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
 import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
 import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.PACKET_REPLICATION_ENGINE_ENTRY;
@@ -115,10 +104,7 @@
 /**
  * Implementation of a P4Runtime client.
  */
-final class P4RuntimeClientImpl implements P4RuntimeClient {
-
-    // Timeout in seconds to obtain the request lock.
-    private static final int LOCK_TIMEOUT = 60;
+final class P4RuntimeClientImpl extends AbstractGrpcClient implements P4RuntimeClient {
 
     private static final Metadata.Key<com.google.rpc.Status> STATUS_DETAILS_KEY =
             Metadata.Key.of("grpc-status-details-bin",
@@ -132,18 +118,9 @@
             WriteOperationType.DELETE, Update.Type.DELETE
     );
 
-    private final Logger log = getLogger(getClass());
-
-    private final Lock requestLock = new ReentrantLock();
-    private final Context.CancellableContext cancellableContext =
-            Context.current().withCancellation();
-
-    private final DeviceId deviceId;
     private final long p4DeviceId;
     private final P4RuntimeControllerImpl controller;
     private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
-    private final ExecutorService executorService;
-    private final Executor contextExecutor;
     private StreamChannelManager streamChannelManager;
 
     // Used by this client for write requests.
@@ -154,70 +131,22 @@
     /**
      * Default constructor.
      *
-     * @param deviceId   the ONOS device id
-     * @param p4DeviceId the P4 device id
+     * @param clientKey  the client key of this client
      * @param channel    gRPC channel
      * @param controller runtime client controller
      */
-    P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
+    P4RuntimeClientImpl(P4RuntimeClientKey clientKey, ManagedChannel channel,
                         P4RuntimeControllerImpl controller) {
-        this.deviceId = deviceId;
-        this.p4DeviceId = p4DeviceId;
+
+        super(clientKey, channel);
+        this.p4DeviceId = clientKey.p4DeviceId();
         this.controller = controller;
-        this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
-                "onos-p4runtime-client-" + deviceId.toString(), "%d"));
-        this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
+
         //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
         this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
         this.streamChannelManager = new StreamChannelManager(channel);
     }
 
-    /**
-     * Submits a task for async execution via the given executor. All tasks
-     * submitted with this method will be executed sequentially.
-     */
-    private <U> CompletableFuture<U> supplyWithExecutor(
-            Supplier<U> supplier, String opDescription, Executor executor) {
-        return CompletableFuture.supplyAsync(() -> {
-            // TODO: explore a more relaxed locking strategy.
-            try {
-                if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
-                    log.error("LOCK TIMEOUT! This is likely a deadlock, "
-                                      + "please debug (executing {})",
-                              opDescription);
-                    throw new IllegalThreadStateException("Lock timeout");
-                }
-            } catch (InterruptedException e) {
-                log.warn("Thread interrupted while waiting for lock (executing {})",
-                         opDescription);
-                throw new IllegalStateException(e);
-            }
-            try {
-                return supplier.get();
-            } catch (StatusRuntimeException ex) {
-                log.warn("Unable to execute {} on {}: {}",
-                         opDescription, deviceId, ex.toString());
-                throw ex;
-            } catch (Throwable ex) {
-                log.error("Exception in client of {}, executing {}",
-                          deviceId, opDescription, ex);
-                throw ex;
-            } finally {
-                requestLock.unlock();
-            }
-        }, executor);
-    }
-
-    /**
-     * Equivalent of supplyWithExecutor using the gRPC context executor of this
-     * client, such that if the context is cancelled (e.g. client shutdown) the
-     * RPC is automatically cancelled.
-     */
-    private <U> CompletableFuture<U> supplyInContext(
-            Supplier<U> supplier, String opDescription) {
-        return supplyWithExecutor(supplier, opDescription, contextExecutor);
-    }
-
     @Override
     public CompletableFuture<Boolean> startStreamChannel() {
         return supplyInContext(() -> sendMasterArbitrationUpdate(false),
@@ -225,12 +154,6 @@
     }
 
     @Override
-    public CompletableFuture<Void> shutdown() {
-        return supplyWithExecutor(this::doShutdown, "shutdown",
-                                  SharedExecutors.getPoolThreadExecutor());
-    }
-
-    @Override
     public CompletableFuture<Boolean> becomeMaster() {
         return supplyInContext(() -> sendMasterArbitrationUpdate(true),
                                "becomeMaster");
@@ -1154,19 +1077,9 @@
                 .build();
     }
 
-    private Void doShutdown() {
-        log.debug("Shutting down client for {}...", deviceId);
+    protected Void doShutdown() {
         streamChannelManager.complete();
-        cancellableContext.cancel(new InterruptedException(
-                "Requested client shutdown"));
-        this.executorService.shutdownNow();
-        try {
-            executorService.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            log.warn("Executor service didn't shutdown in time.");
-            Thread.currentThread().interrupt();
-        }
-        return null;
+        return super.doShutdown();
     }
 
     // Returns the collection of succesfully write entities.
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index a140491..abafaf5 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -19,36 +19,29 @@
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Striped;
 import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.netty.NettyChannelBuilder;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.api.GrpcController;
+import org.onosproject.grpc.ctl.AbstractGrpcClientController;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.onosproject.p4runtime.api.P4RuntimeEventListener;
 import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
-import java.io.IOException;
 import java.math.BigInteger;
-import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
-import java.util.function.Supplier;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -58,19 +51,12 @@
 @Component(immediate = true)
 @Service
 public class P4RuntimeControllerImpl
-        extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
+        extends AbstractGrpcClientController
+        <P4RuntimeClientKey, P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
         implements P4RuntimeController {
 
-    // Getting the pipeline config from the device can take tens of MBs.
-    private static final int MAX_INBOUND_MSG_SIZE = 256; // Megabytes.
-    private static final int MEGABYTES = 1024 * 1024;
-
     private final Logger log = getLogger(getClass());
 
-    private final Map<DeviceId, ClientKey> clientKeys = Maps.newHashMap();
-    private final Map<ClientKey, P4RuntimeClient> clients = Maps.newHashMap();
-    private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
-
     private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
             deviceAgentListeners = Maps.newConcurrentMap();
     private final Striped<Lock> stripedLocks = Striped.lock(30);
@@ -78,151 +64,28 @@
     private DistributedElectionIdGenerator electionIdGenerator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private GrpcController grpcController;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private StorageService storageService;
 
     @Activate
     public void activate() {
+        super.activate();
         eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
         electionIdGenerator = new DistributedElectionIdGenerator(storageService);
         log.info("Started");
     }
 
-
     @Deactivate
     public void deactivate() {
-        clientKeys.keySet().forEach(this::removeClient);
-        clientKeys.clear();
-        clients.clear();
-        channelIds.clear();
+        super.deactivate();
         deviceAgentListeners.clear();
-        grpcController = null;
         electionIdGenerator.destroy();
         electionIdGenerator = null;
-        eventDispatcher.removeSink(P4RuntimeEvent.class);
         log.info("Stopped");
     }
 
     @Override
-    public boolean createClient(DeviceId deviceId, String serverAddr,
-                                int serverPort, long p4DeviceId) {
-        checkNotNull(deviceId);
-        checkNotNull(serverAddr);
-        checkArgument(serverPort > 0, "Invalid server port");
-
-        return withDeviceLock(() -> doCreateClient(
-                deviceId, serverAddr, serverPort, p4DeviceId), deviceId);
-    }
-
-    private boolean doCreateClient(DeviceId deviceId, String serverAddr,
-                                   int serverPort, long p4DeviceId) {
-
-        ClientKey clientKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
-
-        if (clientKeys.containsKey(deviceId)) {
-            final ClientKey existingKey = clientKeys.get(deviceId);
-            if (clientKey.equals(existingKey)) {
-                log.debug("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
-                          deviceId, serverAddr, serverPort, p4DeviceId);
-                return true;
-            } else {
-                log.info("Requested client for {} with new " +
-                                 "endpoint, removing old client (server={}:{}, " +
-                                 "p4DeviceId={})...",
-                         deviceId, existingKey.serverAddr(),
-                         existingKey.serverPort(), existingKey.p4DeviceId());
-                doRemoveClient(deviceId);
-            }
-        }
-
-        log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
-                 deviceId, serverAddr, serverPort, p4DeviceId);
-
-        GrpcChannelId channelId = GrpcChannelId.of(
-                clientKey.deviceId(), "p4runtime-" + clientKey);
-
-        ManagedChannelBuilder channelBuilder = NettyChannelBuilder
-                .forAddress(serverAddr, serverPort)
-                .maxInboundMessageSize(MAX_INBOUND_MSG_SIZE * MEGABYTES)
-                .usePlaintext();
-
-        ManagedChannel channel;
-        try {
-            channel = grpcController.connectChannel(channelId, channelBuilder);
-        } catch (IOException e) {
-            log.warn("Unable to connect to gRPC server of {}: {}",
-                     clientKey.deviceId(), e.getMessage());
-            return false;
-        }
-
-        P4RuntimeClient client = new P4RuntimeClientImpl(
-                clientKey.deviceId(), clientKey.p4DeviceId(), channel, this);
-
-        clientKeys.put(clientKey.deviceId(), clientKey);
-        clients.put(clientKey, client);
-        channelIds.put(clientKey.deviceId(), channelId);
-
-        return true;
-    }
-
-    @Override
-    public P4RuntimeClient getClient(DeviceId deviceId) {
-        if (deviceId == null) {
-            return null;
-        }
-        return withDeviceLock(() -> doGetClient(deviceId), deviceId);
-    }
-
-    private P4RuntimeClient doGetClient(DeviceId deviceId) {
-        if (!clientKeys.containsKey(deviceId)) {
-            return null;
-        } else {
-            return clients.get(clientKeys.get(deviceId));
-        }
-    }
-
-    @Override
-    public void removeClient(DeviceId deviceId) {
-        if (deviceId == null) {
-            return;
-        }
-        withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
-    }
-
-    private Void doRemoveClient(DeviceId deviceId) {
-        if (clientKeys.containsKey(deviceId)) {
-            final ClientKey clientKey = clientKeys.get(deviceId);
-            clients.get(clientKey).shutdown();
-            grpcController.disconnectChannel(channelIds.get(deviceId));
-            clientKeys.remove(deviceId);
-            clients.remove(clientKey);
-            channelIds.remove(deviceId);
-        }
-        return null;
-    }
-
-    @Override
-    public boolean hasClient(DeviceId deviceId) {
-        return clientKeys.containsKey(deviceId);
-    }
-
-    @Override
-    public boolean isReachable(DeviceId deviceId) {
-        if (deviceId == null) {
-            return false;
-        }
-        return withDeviceLock(() -> doIsReacheable(deviceId), deviceId);
-    }
-
-    private boolean doIsReacheable(DeviceId deviceId) {
-        // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
-        if (!clientKeys.containsKey(deviceId)) {
-            log.debug("No client for {}, can't check for reachability", deviceId);
-            return false;
-        }
-        return grpcController.isChannelOpen(channelIds.get(deviceId));
+    protected P4RuntimeClient createClientInstance(P4RuntimeClientKey clientKey, ManagedChannel channel) {
+        return new P4RuntimeClientImpl(clientKey, channel, this);
     }
 
     @Override
@@ -244,16 +107,6 @@
         });
     }
 
-    private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
-        final Lock lock = stripedLocks.get(deviceId);
-        lock.lock();
-        try {
-            return task.get();
-        } finally {
-            lock.unlock();
-        }
-    }
-
     BigInteger newMasterElectionId(DeviceId deviceId) {
         return electionIdGenerator.generate(deviceId);
     }
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index 36e60ea..301d5e1 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -44,6 +44,7 @@
 import org.onosproject.net.pi.runtime.PiActionGroupMember;
 import org.onosproject.net.pi.runtime.PiActionGroupMemberId;
 import org.onosproject.net.pi.runtime.PiActionParam;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
 import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
 import p4.v1.P4RuntimeOuterClass.Entity;
@@ -100,6 +101,8 @@
     private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
     private static final long DEFAULT_TIMEOUT_TIME = 10;
     private static final Uint128 DEFAULT_ELECTION_ID = Uint128.newBuilder().setLow(1).build();
+    private static final String P4R_IP = "127.0.0.1";
+    private static final int P4R_PORT = 50010;
 
     private P4RuntimeClientImpl client;
     private P4RuntimeControllerImpl controller;
@@ -152,9 +155,8 @@
     @Before
     public void setup() {
         controller = niceMock(P4RuntimeControllerImpl.class);
-        client = new P4RuntimeClientImpl(DEVICE_ID, P4_DEVICE_ID,
-                                         grpcChannel,
-                                         controller);
+        P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, P4R_IP, P4R_PORT, P4_DEVICE_ID);
+        client = new P4RuntimeClientImpl(clientKey, grpcChannel, controller);
         client.becomeMaster();
     }
 
diff --git a/providers/p4runtime/packet/BUILD b/providers/p4runtime/packet/BUILD
index 8a83273..a1c3f5d 100644
--- a/providers/p4runtime/packet/BUILD
+++ b/providers/p4runtime/packet/BUILD
@@ -1,4 +1,5 @@
 COMPILE_DEPS = CORE_DEPS + [
+    "//protocols/grpc/api:onos-protocols-grpc-api",
     "//protocols/p4runtime/api:onos-protocols-p4runtime-api",
 ]