Allow sharing the same gRPC channel between clients

This change introduces a refactoring of the gRPC protocol subsystem that
allows the creation of a gRPC chanel independently of the client, while
allowing multiple clients to share the same channel (e.g. as in Stratum
where we use 3 clients).

Moreover, we refactor the P4RuntimeClient API to support multiple
P4Runtime-internal device ID using the same client. While before the
client was associated to one of such ID.

Finally, we provide an abstract implementation for gRPC-based driver
behaviors, reducing code duplication in P4Runtime, gNMI and gNOI drivers.

Change-Id: I1a46352bbbef1e0d24042f169ae8ba580202944f
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
index d54ef97..653fcfd 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
@@ -16,6 +16,8 @@
 
 package org.onosproject.p4runtime.ctl.client;
 
+import com.google.common.collect.Maps;
+import io.grpc.ConnectivityState;
 import io.grpc.ManagedChannel;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
@@ -27,7 +29,6 @@
 import org.onosproject.net.pi.runtime.PiPacketOperation;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
-import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
 import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
 import p4.v1.P4RuntimeGrpc;
@@ -38,6 +39,7 @@
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
@@ -50,6 +52,8 @@
 public final class P4RuntimeClientImpl
         extends AbstractGrpcClient implements P4RuntimeClient {
 
+    private static final long DEFAULT_P4_DEVICE_ID = 1;
+
     // TODO: consider making timeouts configurable per-device via netcfg
     /**
      * Timeout in seconds for short/fast RPCs.
@@ -62,94 +66,103 @@
      */
     static final int LONG_TIMEOUT_SECONDS = 60;
 
-    private final long p4DeviceId;
     private final P4RuntimeControllerImpl controller;
-    private final StreamClientImpl streamClient;
     private final PipelineConfigClientImpl pipelineConfigClient;
+    private final PiPipeconfService pipeconfService;
+    private final MasterElectionIdStore masterElectionIdStore;
+    private final ConcurrentMap<Long, StreamClientImpl> streamClients = Maps.newConcurrentMap();
 
     /**
      * Instantiates a new client with the given arguments.
      *
-     * @param clientKey             client key
+     * @param deviceId              device ID
      * @param channel               gRPC managed channel
-     * @param controller            P$Runtime controller instance
+     * @param controller            P4Runtime controller instance
      * @param pipeconfService       pipeconf service instance
      * @param masterElectionIdStore master election ID store
      */
-    public P4RuntimeClientImpl(P4RuntimeClientKey clientKey,
+    public P4RuntimeClientImpl(DeviceId deviceId,
                                ManagedChannel channel,
                                P4RuntimeControllerImpl controller,
                                PiPipeconfService pipeconfService,
                                MasterElectionIdStore masterElectionIdStore) {
-        super(clientKey, channel, true, controller);
+        super(deviceId, channel, true, controller);
         checkNotNull(channel);
         checkNotNull(controller);
         checkNotNull(pipeconfService);
         checkNotNull(masterElectionIdStore);
 
-        this.p4DeviceId = clientKey.p4DeviceId();
         this.controller = controller;
-        this.streamClient = new StreamClientImpl(
-                pipeconfService, masterElectionIdStore, this, controller);
+        this.pipeconfService = pipeconfService;
+        this.masterElectionIdStore = masterElectionIdStore;
         this.pipelineConfigClient = new PipelineConfigClientImpl(this);
     }
 
     @Override
     public void shutdown() {
-        streamClient.closeSession();
+        streamClients.forEach((p4DeviceId, streamClient) ->
+                                      streamClient.closeSession(p4DeviceId));
         super.shutdown();
     }
 
     @Override
     public CompletableFuture<Boolean> setPipelineConfig(
-            PiPipeconf pipeconf, ByteBuffer deviceData) {
-        return pipelineConfigClient.setPipelineConfig(pipeconf, deviceData);
+            long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
+        return pipelineConfigClient.setPipelineConfig(p4DeviceId, pipeconf, deviceData);
     }
 
     @Override
     public CompletableFuture<Boolean> isPipelineConfigSet(
-            PiPipeconf pipeconf, ByteBuffer deviceData) {
-        return pipelineConfigClient.isPipelineConfigSet(pipeconf, deviceData);
+            long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
+        return pipelineConfigClient.isPipelineConfigSet(p4DeviceId, pipeconf, deviceData);
     }
 
     @Override
-    public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
-        return pipelineConfigClient.isAnyPipelineConfigSet();
+    public CompletableFuture<Boolean> isAnyPipelineConfigSet(long p4DeviceId) {
+        return pipelineConfigClient.isAnyPipelineConfigSet(p4DeviceId);
     }
 
     @Override
-    public ReadRequest read(PiPipeconf pipeconf) {
-        return new ReadRequestImpl(this, pipeconf);
+    public ReadRequest read(long p4DeviceId, PiPipeconf pipeconf) {
+        return new ReadRequestImpl(this, p4DeviceId, pipeconf);
     }
 
     @Override
-    public boolean isSessionOpen() {
-        return streamClient.isSessionOpen();
+    public boolean isSessionOpen(long p4DeviceId) {
+        return streamClients.containsKey(p4DeviceId) &&
+                streamClients.get(p4DeviceId).isSessionOpen(p4DeviceId);
     }
 
     @Override
-    public void closeSession() {
-        streamClient.closeSession();
+    public void closeSession(long p4DeviceId) {
+        if (streamClients.containsKey(p4DeviceId)) {
+            streamClients.get(p4DeviceId).closeSession(p4DeviceId);
+        }
     }
 
     @Override
-    public void setMastership(boolean master, BigInteger newElectionId) {
-        streamClient.setMastership(master, newElectionId);
+    public void setMastership(long p4DeviceId, boolean master, BigInteger newElectionId) {
+        streamClients.putIfAbsent(p4DeviceId, new StreamClientImpl(
+                pipeconfService, masterElectionIdStore, this, p4DeviceId, controller));
+        streamClients.get(p4DeviceId).setMastership(p4DeviceId, master, newElectionId);
     }
 
     @Override
-    public boolean isMaster() {
-        return streamClient.isMaster();
+    public boolean isMaster(long p4DeviceId) {
+        return streamClients.containsKey(p4DeviceId) &&
+                streamClients.get(p4DeviceId).isMaster(p4DeviceId);
     }
 
     @Override
-    public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
-        streamClient.packetOut(packet, pipeconf);
+    public void packetOut(long p4DeviceId, PiPacketOperation packet, PiPipeconf pipeconf) {
+        if (streamClients.containsKey(p4DeviceId)) {
+            streamClients.get(p4DeviceId).packetOut(p4DeviceId, packet, pipeconf);
+        }
     }
 
     @Override
-    public WriteRequest write(PiPipeconf pipeconf) {
-        return new WriteRequestImpl(this, pipeconf);
+    public WriteRequest write(long p4DeviceId, PiPipeconf pipeconf) {
+        return new WriteRequestImpl(this, p4DeviceId, pipeconf);
     }
 
     @Override
@@ -164,14 +177,14 @@
 
                     @Override
                     public void onError(Throwable t) {
-                        if (Status.fromThrowable(t).getCode() ==
-                                Status.Code.FAILED_PRECONDITION) {
-                            // Pipeline not set but service is available.
-                            future.complete(true);
-                        } else {
-                            log.debug("", t);
-                        }
-                        future.complete(false);
+                        log.debug("", t);
+                        // FIXME: The P4Runtime spec is not explicit about error
+                        //  codes when a pipeline config is not set, which would
+                        //  be useful here as it's an indication that the
+                        //  service is available. As a workaround, we simply
+                        //  check the channel state.
+                        future.complete(ConnectivityState.READY.equals(
+                                channel.getState(false)));
                     }
 
                     @Override
@@ -179,6 +192,9 @@
                         // Ignore, unary call.
                     }
                 };
+        // Get any p4DeviceId under the control of this client or a default one.
+        final long p4DeviceId = streamClients.isEmpty() ? DEFAULT_P4_DEVICE_ID
+                : streamClients.keySet().iterator().next();
         // Use long timeout as the device might return the full P4 blob
         // (e.g. server does not support cookie), over a slow network.
         execRpc(s -> s.getForwardingPipelineConfig(
@@ -207,15 +223,6 @@
     }
 
     /**
-     * Returns the P4Runtime-internal device ID associated with this client.
-     *
-     * @return P4Runtime-internal device ID
-     */
-    long p4DeviceId() {
-        return this.p4DeviceId;
-    }
-
-    /**
      * Returns the ONOS device ID associated with this client.
      *
      * @return ONOS device ID
@@ -226,14 +233,20 @@
 
     /**
      * Returns the election ID last used in a MasterArbitrationUpdate message
-     * sent by the client to the server. No guarantees are given that this is
-     * the current election ID associated to the session, nor that the server
-     * has acknowledged this value as valid.
+     * sent by the client to the server for the given P4Runtime-internal device
+     * ID. No guarantees are given that this is the current election ID
+     * associated to the session, nor that the server has acknowledged this
+     * value as valid.
      *
+     * @param p4DeviceId P4Runtime-internal device ID
      * @return election ID uint128 protobuf message
      */
-    P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
-        return streamClient.lastUsedElectionId();
+    P4RuntimeOuterClass.Uint128 lastUsedElectionId(long p4DeviceId) {
+        if (streamClients.containsKey(p4DeviceId)) {
+            return streamClients.get(p4DeviceId).lastUsedElectionId();
+        } else {
+            return P4RuntimeOuterClass.Uint128.getDefaultInstance();
+        }
     }
 
     /**
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
index b0b6c57..1dffc9c 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
@@ -63,9 +63,9 @@
 
     @Override
     public CompletableFuture<Boolean> setPipelineConfig(
-            PiPipeconf pipeconf, ByteBuffer deviceData) {
+            long p4DeviceId, PiPipeconf pipeconf, ByteBuffer deviceData) {
 
-        if (!client.isSessionOpen()) {
+        if (!client.isSessionOpen(p4DeviceId)) {
             log.warn("Dropping set pipeline config request for {}, session is CLOSED",
                      client.deviceId());
             return completedFuture(false);
@@ -86,8 +86,8 @@
         final SetForwardingPipelineConfigRequest requestMsg =
                 SetForwardingPipelineConfigRequest
                         .newBuilder()
-                        .setDeviceId(client.p4DeviceId())
-                        .setElectionId(client.lastUsedElectionId())
+                        .setDeviceId(p4DeviceId)
+                        .setElectionId(client.lastUsedElectionId(p4DeviceId))
                         .setAction(VERIFY_AND_COMMIT)
                         .setConfig(pipelineConfigMsg)
                         .build();
@@ -159,15 +159,15 @@
 
     @Override
     public CompletableFuture<Boolean> isPipelineConfigSet(
-            PiPipeconf pipeconf, ByteBuffer expectedDeviceData) {
-        return getPipelineCookieFromServer()
+            long p4DeviceId, PiPipeconf pipeconf, ByteBuffer expectedDeviceData) {
+        return getPipelineCookieFromServer(p4DeviceId)
                 .thenApply(cfgFromDevice -> comparePipelineConfig(
                         pipeconf, expectedDeviceData, cfgFromDevice));
     }
 
     @Override
-    public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
-        return getPipelineCookieFromServer().thenApply(Objects::nonNull);
+    public CompletableFuture<Boolean> isAnyPipelineConfigSet(long p4DeviceId) {
+        return getPipelineCookieFromServer(p4DeviceId).thenApply(Objects::nonNull);
     }
 
     private boolean comparePipelineConfig(
@@ -211,11 +211,12 @@
                 .equals(expectedCfg.getP4Info());
     }
 
-    private CompletableFuture<ForwardingPipelineConfig> getPipelineCookieFromServer() {
+    private CompletableFuture<ForwardingPipelineConfig> getPipelineCookieFromServer(
+            long p4DeviceId) {
         final GetForwardingPipelineConfigRequest request =
                 GetForwardingPipelineConfigRequest
                         .newBuilder()
-                        .setDeviceId(client.p4DeviceId())
+                        .setDeviceId(p4DeviceId)
                         .setResponseType(COOKIE_ONLY)
                         .build();
         final CompletableFuture<ForwardingPipelineConfig> future = new CompletableFuture<>();
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java
index c85c7e8..29a7169 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/ReadRequestImpl.java
@@ -51,11 +51,11 @@
     private final PiPipeconf pipeconf;
     private final P4RuntimeOuterClass.ReadRequest.Builder requestMsg;
 
-    ReadRequestImpl(P4RuntimeClientImpl client, PiPipeconf pipeconf) {
+    ReadRequestImpl(P4RuntimeClientImpl client, long p4DeviceId, PiPipeconf pipeconf) {
         this.client = client;
         this.pipeconf = pipeconf;
         this.requestMsg = P4RuntimeOuterClass.ReadRequest.newBuilder()
-                .setDeviceId(client.p4DeviceId());
+                .setDeviceId(p4DeviceId);
     }
 
     @Override
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
index 11e91aa..a14f960 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -55,7 +55,8 @@
 
 /**
  * Implementation of P4RuntimeStreamClient. Handles P4Runtime StreamChannel RPC
- * operations, such as arbitration update and packet-in/out.
+ * operations, such as arbitration update and packet-in/out, for a given
+ * P4Runtime-internal device ID.
  */
 public final class StreamClientImpl implements P4RuntimeStreamClient {
 
@@ -87,24 +88,27 @@
             PiPipeconfService pipeconfService,
             MasterElectionIdStore masterElectionIdStore,
             P4RuntimeClientImpl client,
+            long p4DeviceId,
             P4RuntimeControllerImpl controller) {
         this.client = client;
         this.deviceId = client.deviceId();
-        this.p4DeviceId = client.p4DeviceId();
+        this.p4DeviceId = p4DeviceId;
         this.pipeconfService = pipeconfService;
         this.masterElectionIdStore = masterElectionIdStore;
         this.controller = controller;
     }
 
     @Override
-    public boolean isSessionOpen() {
+    public boolean isSessionOpen(long p4DeviceId) {
+        checkArgument(this.p4DeviceId == p4DeviceId);
         return streamChannelManager.isOpen();
     }
 
     @Override
-    public void closeSession() {
+    public void closeSession(long p4DeviceId) {
+        checkArgument(this.p4DeviceId == p4DeviceId);
         synchronized (requestedToBeMaster) {
-            this.masterElectionIdStore.unsetListener(deviceId);
+            this.masterElectionIdStore.unsetListener(deviceId, p4DeviceId);
             streamChannelManager.teardown();
             pendingElectionId = null;
             requestedToBeMaster.set(false);
@@ -113,14 +117,17 @@
     }
 
     @Override
-    public void setMastership(boolean master, BigInteger newElectionId) {
+    public void setMastership(long p4DeviceId, boolean master,
+                              BigInteger newElectionId) {
+        checkArgument(this.p4DeviceId == p4DeviceId);
         checkNotNull(newElectionId);
         checkArgument(newElectionId.compareTo(BigInteger.ZERO) > 0,
                       "newElectionId must be a non zero positive number");
         synchronized (requestedToBeMaster) {
             requestedToBeMaster.set(master);
             pendingElectionId = newElectionId;
-            handlePendingElectionId(masterElectionIdStore.get(deviceId));
+            handlePendingElectionId(masterElectionIdStore.get(
+                    deviceId, p4DeviceId));
         }
     }
 
@@ -151,7 +158,8 @@
                         > ARBITRATION_TIMEOUT_SECONDS * 1000;
             }
             if (timeoutExpired) {
-                log.warn("{} arbitration timeout expired! Will send pending election ID now...",
+                log.warn("Arbitration timeout expired for {}! " +
+                                 "Will send pending election ID now...",
                          deviceId);
             }
             if (!timeoutExpired &&
@@ -163,35 +171,41 @@
                          deviceId, masterElectionId, pendingElectionId);
                 // Will try again as soon as the master election ID store is
                 // updated...
-                masterElectionIdStore.setListener(deviceId, masterElectionIdListener);
+                masterElectionIdStore.setListener(
+                        deviceId, p4DeviceId, masterElectionIdListener);
                 // ..or in ARBITRATION_RETRY_SECONDS at the latest (if we missed
                 // the store event).
                 pendingElectionIdRetryTask = SharedScheduledExecutors.newTimeout(
-                        () -> handlePendingElectionId(masterElectionIdStore.get(deviceId)),
+                        () -> handlePendingElectionId(
+                                masterElectionIdStore.get(deviceId, p4DeviceId)),
                         ARBITRATION_RETRY_SECONDS, TimeUnit.SECONDS);
             } else {
                 // Send now.
                 log.info("Setting mastership on {}... " +
-                                 "master={}, newElectionId={}, masterElectionId={}",
+                                 "master={}, newElectionId={}, " +
+                                 "masterElectionId={}, sessionOpen={}",
                          deviceId, requestedToBeMaster.get(),
-                         pendingElectionId, masterElectionId);
+                         pendingElectionId, masterElectionId,
+                         streamChannelManager.isOpen());
                 sendMasterArbitrationUpdate(pendingElectionId);
                 pendingElectionId = null;
                 pendingElectionIdTimestamp = 0;
                 // No need to listen for master election ID changes.
-                masterElectionIdStore.unsetListener(deviceId);
+                masterElectionIdStore.unsetListener(deviceId, p4DeviceId);
             }
         }
     }
 
     @Override
-    public boolean isMaster() {
+    public boolean isMaster(long p4DeviceId) {
+        checkArgument(this.p4DeviceId == p4DeviceId);
         return isMaster.get();
     }
 
     @Override
-    public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
-        if (!isSessionOpen()) {
+    public void packetOut(long p4DeviceId, PiPacketOperation packet, PiPipeconf pipeconf) {
+        checkArgument(this.p4DeviceId == p4DeviceId);
+        if (!isSessionOpen(p4DeviceId)) {
             log.warn("Dropping packet-out request for {}, session is closed",
                      deviceId);
             return;
@@ -293,7 +307,7 @@
         // and that otherwise would not be aware of changes, keeping their
         // pending mastership operations forever.
         final BigInteger masterElectionId = uint128ToBigInteger(msg.getElectionId());
-        masterElectionIdStore.set(deviceId, masterElectionId);
+        masterElectionIdStore.set(deviceId, p4DeviceId, masterElectionId);
 
         log.debug("Received arbitration update from {}: isMaster={}, masterElectionId={}",
                   deviceId, isMaster.get(), masterElectionId);
@@ -332,7 +346,7 @@
     }
 
     /**
-     * A manager for the P4Runtime stream channel that opportunistically creates
+     * A manager for the P4Runtime StreamChannel RPC that opportunistically creates
      * new stream RCP stubs (e.g. when one fails because of errors) and posts
      * channel events via the P4Runtime controller.
      */
@@ -355,7 +369,7 @@
 
         private void initIfRequired() {
             if (requestObserver == null) {
-                log.debug("Creating new stream channel for {}...", deviceId);
+                log.debug("Starting new StreamChannel RPC for {}...", deviceId);
                 open.set(false);
                 client.execRpcNoTimeout(
                         s -> requestObserver =
@@ -397,7 +411,7 @@
     }
 
     /**
-     * Handles messages received from the device on the stream channel.
+     * Handles messages received from the device on the StreamChannel RPC.
      */
     private final class InternalStreamResponseObserver
             implements StreamObserver<StreamMessageResponse> {
@@ -429,7 +443,7 @@
                                  deviceId, message.getUpdateCase());
                 }
             } catch (Throwable ex) {
-                log.error("Exception while processing stream message from {}",
+                log.error("Exception while processing StreamMessageResponse from {}",
                           deviceId, ex);
             }
         }
@@ -442,12 +456,12 @@
                     log.warn("{} is unreachable ({})",
                              deviceId, sre.getCause().getMessage());
                 } else {
-                    log.warn("Error on stream channel for {}: {}",
+                    log.warn("Error on StreamChannel RPC for {}: {}",
                              deviceId, throwable.getMessage());
                 }
                 log.debug("", throwable);
             } else {
-                log.error(format("Exception on stream channel for %s",
+                log.error(format("Exception on StreamChannel RPC for %s",
                                  deviceId), throwable);
             }
             streamChannelManager.teardown();
@@ -455,7 +469,7 @@
 
         @Override
         public void onCompleted() {
-            log.warn("Stream channel for {} has completed", deviceId);
+            log.warn("StreamChannel RPC for {} has completed", deviceId);
             streamChannelManager.teardown();
         }
     }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
index fb3a5fb..481cf04 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
@@ -60,11 +60,11 @@
     // set once we receive a response from the device.
     private final WriteResponseImpl.Builder responseBuilder;
 
-    WriteRequestImpl(P4RuntimeClientImpl client, PiPipeconf pipeconf) {
+    WriteRequestImpl(P4RuntimeClientImpl client, long p4DeviceId, PiPipeconf pipeconf) {
         this.client = checkNotNull(client);
         this.pipeconf = checkNotNull(pipeconf);
         this.requestMsg = P4RuntimeOuterClass.WriteRequest.newBuilder()
-                .setDeviceId(client.p4DeviceId());
+                .setDeviceId(p4DeviceId);
         this.responseBuilder = WriteResponseImpl.builder(client.deviceId());
     }
 
@@ -160,7 +160,8 @@
         checkState(!submitted.getAndSet(true),
                    "Request has already been submitted, cannot submit again");
         final P4RuntimeOuterClass.WriteRequest writeRequest = requestMsg
-                .setElectionId(client.lastUsedElectionId())
+                .setElectionId(client.lastUsedElectionId(
+                        requestMsg.getDeviceId()))
                 .build();
         log.debug("Sending write request to {} with {} updates...",
                   client.deviceId(), writeRequest.getUpdatesCount());
@@ -186,11 +187,13 @@
                             future.complete(responseBuilder.setSuccessAllAndBuild());
                         }
                     }
+
                     @Override
                     public void onError(Throwable t) {
                         client.handleRpcError(t, "WRITE");
                         future.complete(responseBuilder.setErrorsAndBuild(t));
                     }
+
                     @Override
                     public void onCompleted() {
                         // Nothing to do, unary call.
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
index dff2330..04e39f7 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
@@ -17,6 +17,7 @@
 package org.onosproject.p4runtime.ctl.controller;
 
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.net.DeviceId;
 import org.onosproject.store.serializers.KryoNamespaces;
@@ -49,64 +50,77 @@
 
     private static final KryoNamespace SERIALIZER = KryoNamespace.newBuilder()
             .register(KryoNamespaces.API)
+            .register(Pair.class)
+            .register(Long.class)
             .register(BigInteger.class)
             .build();
 
     private final Logger log = getLogger(getClass());
-    private final EventuallyConsistentMapListener<DeviceId, BigInteger> mapListener =
+    private final EventuallyConsistentMapListener<Pair<DeviceId, Long>, BigInteger> mapListener =
             new InternalMapListener();
 
-    private EventuallyConsistentMap<DeviceId, BigInteger> masterElectionIds;
-    private ConcurrentMap<DeviceId, MasterElectionIdListener> listeners =
+    private EventuallyConsistentMap<Pair<DeviceId, Long>, BigInteger> masterElectionIds;
+    private ConcurrentMap<Pair<DeviceId, Long>, MasterElectionIdListener> listeners =
             Maps.newConcurrentMap();
 
     @Activate
     public void activate() {
-        this.listeners = Maps.newConcurrentMap();
-        this.masterElectionIds = storageService.<DeviceId, BigInteger>eventuallyConsistentMapBuilder()
+        listeners = Maps.newConcurrentMap();
+        masterElectionIds = storageService.<Pair<DeviceId, Long>,
+                BigInteger>eventuallyConsistentMapBuilder()
                 .withName("p4runtime-master-election-ids")
                 .withSerializer(SERIALIZER)
                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
                 .build();
-        this.masterElectionIds.addListener(mapListener);
+        masterElectionIds.addListener(mapListener);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
-        this.masterElectionIds.removeListener(mapListener);
-        this.masterElectionIds.destroy();
-        this.masterElectionIds = null;
-        this.listeners.clear();
-        this.listeners = null;
+        masterElectionIds.removeListener(mapListener);
+        masterElectionIds.destroy();
+        masterElectionIds = null;
+        listeners.clear();
+        listeners = null;
         log.info("Stopped");
     }
 
 
     @Override
-    public void set(DeviceId deviceId, BigInteger electionId) {
+    public void set(DeviceId deviceId, long p4DeviceId, BigInteger electionId) {
         checkNotNull(deviceId);
         checkNotNull(electionId);
-        this.masterElectionIds.put(deviceId, electionId);
+        masterElectionIds.put(Pair.of(deviceId, p4DeviceId), electionId);
     }
 
     @Override
-    public BigInteger get(DeviceId deviceId) {
+    public BigInteger get(DeviceId deviceId, long p4DeviceId) {
         checkNotNull(deviceId);
-        return this.masterElectionIds.get(deviceId);
+        return masterElectionIds.get(Pair.of(deviceId, p4DeviceId));
     }
 
     @Override
-    public void remove(DeviceId deviceId) {
+    public void remove(DeviceId deviceId, long p4DeviceId) {
         checkNotNull(deviceId);
-        this.masterElectionIds.remove(deviceId);
+        masterElectionIds.remove(Pair.of(deviceId, p4DeviceId));
     }
 
     @Override
-    public void setListener(DeviceId deviceId, MasterElectionIdListener newListener) {
+    public void removeAll(DeviceId deviceId) {
+        masterElectionIds.keySet().forEach(k -> {
+            if (k.getLeft().equals(deviceId)) {
+                masterElectionIds.remove(k);
+            }
+        });
+    }
+
+    @Override
+    public void setListener(DeviceId deviceId, long p4DeviceId,
+                            MasterElectionIdListener newListener) {
         checkNotNull(deviceId);
         checkNotNull(newListener);
-        listeners.compute(deviceId, (did, existingListener) -> {
+        listeners.compute(Pair.of(deviceId, p4DeviceId), (x, existingListener) -> {
             if (existingListener == null || existingListener == newListener) {
                 return newListener;
             } else {
@@ -117,13 +131,13 @@
     }
 
     @Override
-    public void unsetListener(DeviceId deviceId) {
-        listeners.remove(deviceId);
+    public void unsetListener(DeviceId deviceId, long p4DeviceId) {
+        listeners.remove(Pair.of(deviceId, p4DeviceId));
     }
 
-    private class InternalMapListener implements EventuallyConsistentMapListener<DeviceId, BigInteger> {
+    private class InternalMapListener implements EventuallyConsistentMapListener<Pair<DeviceId, Long>, BigInteger> {
         @Override
-        public void event(EventuallyConsistentMapEvent<DeviceId, BigInteger> event) {
+        public void event(EventuallyConsistentMapEvent<Pair<DeviceId, Long>, BigInteger> event) {
             final MasterElectionIdListener listener = listeners.get(event.key());
             if (listener == null) {
                 return;
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
index f5393d1..7e5a49f 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
@@ -21,51 +21,71 @@
 import java.math.BigInteger;
 
 /**
- * Store that keeps track of master election IDs for each device.
+ * Store that keeps track of master election IDs for each server (device) and
+ * P4Runtime-internal device ID.
  */
 public interface MasterElectionIdStore {
 
     /**
-     * Sets the master election ID for the given device.
+     * Sets the master election ID for the given device and P4Runtime-internal
+     * device ID.
      *
      * @param deviceId   device ID
+     * @param p4DeviceId P4Runtime-internal device ID
      * @param electionId election ID
      */
-    void set(DeviceId deviceId, BigInteger electionId);
+    void set(DeviceId deviceId, long p4DeviceId, BigInteger electionId);
 
     /**
-     * Returns the last known master election ID for the given device, or null.
+     * Returns the last known master election ID for the given device and
+     * P4Runtime-internal device ID, or null.
      *
-     * @param deviceId device ID
+     * @param deviceId   device ID
+     * @param p4DeviceId P4Runtime-internal device ID
      * @return election ID
      */
-    BigInteger get(DeviceId deviceId);
+    BigInteger get(DeviceId deviceId, long p4DeviceId);
 
     /**
-     * Removes any state associated with the given device.
+     * Removes any state associated with the given device and P4Runtime-internal
+     * device ID.
+     *
+     * @param deviceId   device ID
+     * @param p4DeviceId P4Runtime-internal device ID
+     */
+    void remove(DeviceId deviceId, long p4DeviceId);
+
+    /**
+     * Removes all state associated with the given device.
      *
      * @param deviceId device ID
      */
-    void remove(DeviceId deviceId);
+    void removeAll(DeviceId deviceId);
 
     /**
-     * Sets a listener for the given device that will be invoked every time
-     * there will be changes to the master election ID.
+     * Sets a listener for the given device and P4Runtime-internal device ID
+     * that will be invoked every time there will be changes to the master
+     * election ID.
      *
-     * @param deviceId device ID
-     * @param listener listener
+     * @param deviceId   device ID
+     * @param p4DeviceId P4Runtime-internal device ID
+     * @param listener   listener
      */
-    void setListener(DeviceId deviceId, MasterElectionIdListener listener);
+    void setListener(DeviceId deviceId, long p4DeviceId,
+                     MasterElectionIdListener listener);
 
     /**
-     * Unset the listener for the given device.
+     * Unset the listener for the given device and P4Runtime-internal device
+     * ID.
      *
-     * @param deviceId device ID
+     * @param deviceId   device ID
+     * @param p4DeviceId P4Runtime-internal device ID
      */
-    void unsetListener(DeviceId deviceId);
+    void unsetListener(DeviceId deviceId, long p4DeviceId);
 
     /**
-     * Listener of master election ID changes for a specific device.
+     * Listener of master election ID changes for a specific device and
+     * P4Runtime-internal device ID.
      */
     interface MasterElectionIdListener {
 
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
index 2256d5e..c188ee4 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
@@ -21,7 +21,6 @@
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
-import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.onosproject.p4runtime.api.P4RuntimeEventListener;
@@ -36,7 +35,7 @@
 @Component(immediate = true, service = P4RuntimeController.class)
 public class P4RuntimeControllerImpl
         extends AbstractGrpcClientController
-        <P4RuntimeClientKey, P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
+        <P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
         implements P4RuntimeController {
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -46,27 +45,21 @@
     private MasterElectionIdStore masterElectionIdStore;
 
     public P4RuntimeControllerImpl() {
-        super(P4RuntimeEvent.class);
+        super(P4RuntimeEvent.class, "P4Runtime");
     }
 
     @Override
-    public void removeClient(DeviceId deviceId) {
-        super.removeClient(deviceId);
+    public void remove(DeviceId deviceId) {
+        super.remove(deviceId);
         // Assuming that when a client is removed, it is done so by all nodes,
         // this is the best place to clear master election ID state.
-        masterElectionIdStore.remove(deviceId);
-    }
-
-    @Override
-    public void removeClient(P4RuntimeClientKey clientKey) {
-        super.removeClient(clientKey);
-        masterElectionIdStore.remove(clientKey.deviceId());
+        masterElectionIdStore.removeAll(deviceId);
     }
 
     @Override
     protected P4RuntimeClient createClientInstance(
-            P4RuntimeClientKey clientKey, ManagedChannel channel) {
-        return new P4RuntimeClientImpl(clientKey, channel, this,
+            DeviceId deviceId, ManagedChannel channel) {
+        return new P4RuntimeClientImpl(deviceId, channel, this,
                                        pipeconfService, masterElectionIdStore);
     }
 }
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
index 595e07d..596546b 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
@@ -24,27 +24,32 @@
 public class MockMasterElectionIdStore implements MasterElectionIdStore {
 
     @Override
-    public void set(DeviceId deviceId, BigInteger electionId) {
+    public void set(DeviceId deviceId, long p4DeviceId, BigInteger electionId) {
 
     }
 
     @Override
-    public BigInteger get(DeviceId deviceId) {
+    public BigInteger get(DeviceId deviceId, long p4DeviceId) {
         return null;
     }
 
     @Override
-    public void remove(DeviceId deviceId) {
+    public void remove(DeviceId deviceId, long p4DeviceId) {
 
     }
 
     @Override
-    public void setListener(DeviceId deviceId, MasterElectionIdListener listener) {
+    public void removeAll(DeviceId deviceId) {
 
     }
 
     @Override
-    public void unsetListener(DeviceId deviceId) {
+    public void setListener(DeviceId deviceId, long p4DeviceId, MasterElectionIdListener listener) {
+
+    }
+
+    @Override
+    public void unsetListener(DeviceId deviceId, long p4DeviceId) {
 
     }
 }
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index 4c4dcdf..146f102 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -25,6 +25,7 @@
 import io.grpc.inprocess.InProcessServerBuilder;
 import io.grpc.internal.AbstractServerImplBuilder;
 import org.easymock.EasyMock;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -44,7 +45,6 @@
 import org.onosproject.net.pi.runtime.PiActionProfileGroupId;
 import org.onosproject.net.pi.runtime.PiActionProfileMember;
 import org.onosproject.net.pi.runtime.PiActionProfileMemberId;
-import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl;
 import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
 import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
@@ -55,8 +55,6 @@
 import p4.v1.P4RuntimeOuterClass.WriteRequest;
 
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.Collection;
 import java.util.List;
@@ -64,7 +62,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static java.lang.String.format;
 import static org.easymock.EasyMock.niceMock;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -109,8 +106,6 @@
     private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
     private static final long DEFAULT_TIMEOUT_TIME = 10;
     private static final Uint128 DEFAULT_ELECTION_ID = Uint128.getDefaultInstance();
-    private static final String P4R_IP = "127.0.0.1";
-    private static final int P4R_PORT = 50010;
 
     private org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl client;
     private P4RuntimeControllerImpl controller;
@@ -160,20 +155,23 @@
 
 
     @Before
-    public void setup() throws URISyntaxException {
+    public void setup() {
         controller = niceMock(org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl.class);
-        P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, new URI(
-                format("grpc://%s:%d?device_id=%d", P4R_IP, P4R_PORT, P4_DEVICE_ID)));
         client = new P4RuntimeClientImpl(
-                clientKey, grpcChannel, controller, new MockPipeconfService(),
+                DEVICE_ID, grpcChannel, controller, new MockPipeconfService(),
                 new MockMasterElectionIdStore());
     }
 
+    @After
+    public void teardown() {
+        client.shutdown();
+    }
+
     @Test
     public void testInsertPiActionProfileGroup() throws Exception {
         CompletableFuture<Void> complete = p4RuntimeServerImpl.expectRequests(1);
-        client.write(PIPECONF).insert(GROUP).submitSync();
-        assertTrue(client.write(PIPECONF).insert(GROUP).submitSync().isSuccess());
+        client.write(P4_DEVICE_ID, PIPECONF).insert(GROUP).submitSync();
+        assertTrue(client.write(P4_DEVICE_ID, PIPECONF).insert(GROUP).submitSync().isSuccess());
         complete.get(DEFAULT_TIMEOUT_TIME, TimeUnit.SECONDS);
         WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0);
         assertEquals(1, result.getDeviceId());
@@ -202,7 +200,7 @@
     @Test
     public void testInsertPiActionMembers() throws Exception {
         CompletableFuture<Void> complete = p4RuntimeServerImpl.expectRequests(1);
-        assertTrue(client.write(PIPECONF).insert(GROUP_MEMBER_INSTANCES)
+        assertTrue(client.write(P4_DEVICE_ID, PIPECONF).insert(GROUP_MEMBER_INSTANCES)
                            .submitSync().isSuccess());
         complete.get(DEFAULT_TIMEOUT_TIME, TimeUnit.SECONDS);
         WriteRequest result = p4RuntimeServerImpl.getWriteReqs().get(0);
@@ -251,7 +249,7 @@
 
         p4RuntimeServerImpl.willReturnReadResult(responses);
         CompletableFuture<Void> complete = p4RuntimeServerImpl.expectRequests(1);
-        Collection<PiActionProfileGroup> groups = client.read(PIPECONF)
+        Collection<PiActionProfileGroup> groups = client.read(P4_DEVICE_ID, PIPECONF)
                 .actionProfileGroups(ACT_PROF_ID)
                 .submitSync().all(PiActionProfileGroup.class);
         complete.get(DEFAULT_TIMEOUT_TIME, TimeUnit.SECONDS);
@@ -300,7 +298,7 @@
 
         p4RuntimeServerImpl.willReturnReadResult(responses);
         CompletableFuture<Void> complete = p4RuntimeServerImpl.expectRequests(1);
-        Collection<PiActionProfileMember> piMembers = client.read(PIPECONF)
+        Collection<PiActionProfileMember> piMembers = client.read(P4_DEVICE_ID, PIPECONF)
                 .actionProfileMembers(ACT_PROF_ID).submitSync()
                 .all(PiActionProfileMember.class);
         complete.get(DEFAULT_TIMEOUT_TIME, TimeUnit.SECONDS);