Refactor channel and mastership handling in P4Runtime

This (big) change aims at solving the issue observed with mastership flapping
and device connection/disconnection with P4Runtime.

Channel handling is now based on the underlying gRPC channel state. Before,
channel events (open/close/error) were generated as a consequence of P4Runtime
StreamChannel events, making device availability dependent on mastership. Now
Stream Channel events only affect mastership (MASTER/STANDBY or NONE when the
SteamChannel RPC is not active).

Mastership handling has been refactored to generate P4Runtime election IDs that
are compatible with the mastership preference decided by the MastershipService.

GeneralDeviceProvider has been re-implemented to support in-order
device event processing and to reduce implementation complexity. Stats polling
has been moved to a separate component, and netcfg handling updated to only
depend on BasicDeviceConfig, augmented with a pipeconf field, and re-using the
managementAddress field to set the gRPC server endpoints (e.g.
grpc://myswitch.local:50051). Before it was depending on 3 different config
classes, making hard to detect changes.

Finally, this change affects some core interfaces:
- Adds a method to DeviceProvider and DeviceHandshaker to check for device
availability, making the meaning of availability device-specific. This is needed
in cases where the device manager needs to change the availability state of a
device (as in change #20842)
- Support device providers not capable of reconciling mastership role responses
with requests (like P4Runtime).
- Clarify the meaning of "connection" in the DeviceConnect behavior.
- Allows driver-based providers to check devices for reachability and
availability without probing the device via the network.

Change-Id: I7ff30d29f5d02ad938e3171536e54ae2916629a2
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
index 353d44e..d06e3fc 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
@@ -17,21 +17,25 @@
 package org.onosproject.p4runtime.ctl.client;
 
 import io.grpc.ManagedChannel;
+import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
 import org.onosproject.grpc.ctl.AbstractGrpcClient;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.runtime.PiPacketOperation;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeClientKey;
-import org.onosproject.p4runtime.api.P4RuntimeEvent;
-import org.onosproject.p4runtime.ctl.controller.BaseEventSubject;
-import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
 import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
 import p4.v1.P4RuntimeGrpc;
 import p4.v1.P4RuntimeOuterClass;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
 
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +43,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
+import static p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest.ResponseType.COOKIE_ONLY;
 
 /**
  * Implementation of P4RuntimeClient.
@@ -59,7 +64,6 @@
     static final int LONG_TIMEOUT_SECONDS = 60;
 
     private final long p4DeviceId;
-    private final ManagedChannel channel;
     private final P4RuntimeControllerImpl controller;
     private final StreamClientImpl streamClient;
     private final PipelineConfigClientImpl pipelineConfigClient;
@@ -67,25 +71,27 @@
     /**
      * Instantiates a new client with the given arguments.
      *
-     * @param clientKey       client key
-     * @param channel         gRPC managed channel
-     * @param controller      P$Runtime controller instance
-     * @param pipeconfService pipeconf service instance
+     * @param clientKey             client key
+     * @param channel               gRPC managed channel
+     * @param controller            P$Runtime controller instance
+     * @param pipeconfService       pipeconf service instance
+     * @param masterElectionIdStore master election ID store
      */
     public P4RuntimeClientImpl(P4RuntimeClientKey clientKey,
                                ManagedChannel channel,
                                P4RuntimeControllerImpl controller,
-                               PiPipeconfService pipeconfService) {
-        super(clientKey);
+                               PiPipeconfService pipeconfService,
+                               MasterElectionIdStore masterElectionIdStore) {
+        super(clientKey, channel, true, controller);
         checkNotNull(channel);
         checkNotNull(controller);
         checkNotNull(pipeconfService);
+        checkNotNull(masterElectionIdStore);
 
         this.p4DeviceId = clientKey.p4DeviceId();
-        this.channel = channel;
         this.controller = controller;
         this.streamClient = new StreamClientImpl(
-                pipeconfService, this, controller);
+                pipeconfService, masterElectionIdStore, this, controller);
         this.pipelineConfigClient = new PipelineConfigClientImpl(this);
     }
 
@@ -108,13 +114,13 @@
     }
 
     @Override
-    public ReadRequest read(PiPipeconf pipeconf) {
-        return new ReadRequestImpl(this, pipeconf);
+    public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
+        return pipelineConfigClient.isAnyPipelineConfigSet();
     }
 
     @Override
-    public void openSession() {
-        streamClient.openSession();
+    public ReadRequest read(PiPipeconf pipeconf) {
+        return new ReadRequestImpl(this, pipeconf);
     }
 
     @Override
@@ -128,8 +134,8 @@
     }
 
     @Override
-    public void runForMastership() {
-        streamClient.runForMastership();
+    public void setMastership(boolean master, BigInteger newElectionId) {
+        streamClient.setMastership(master, newElectionId);
     }
 
     @Override
@@ -147,6 +153,44 @@
         return new WriteRequestImpl(this, pipeconf);
     }
 
+    @Override
+    public CompletableFuture<Boolean> probeService() {
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        final StreamObserver<GetForwardingPipelineConfigResponse> responseObserver =
+                new StreamObserver<GetForwardingPipelineConfigResponse>() {
+                    @Override
+                    public void onNext(GetForwardingPipelineConfigResponse value) {
+                        future.complete(true);
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        if (Status.fromThrowable(t).getCode() ==
+                                Status.Code.FAILED_PRECONDITION) {
+                            // Pipeline not set but service is available.
+                            future.complete(true);
+                        } else {
+                            log.debug("", t);
+                        }
+                        future.complete(false);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                        // Ignore, unary call.
+                    }
+                };
+        // Use long timeout as the device might return the full P4 blob
+        // (e.g. server does not support cookie), over a slow network.
+        execRpc(s -> s.getForwardingPipelineConfig(
+                GetForwardingPipelineConfigRequest.newBuilder()
+                        .setDeviceId(p4DeviceId)
+                        .setResponseType(COOKIE_ONLY)
+                        .build(), responseObserver),
+                SHORT_TIMEOUT_SECONDS);
+        return future;
+    }
+
     /**
      * Returns the P4Runtime-internal device ID associated with this client.
      *
@@ -184,7 +228,8 @@
      * @param stubConsumer P4Runtime stub consumer
      * @param timeout      timeout in seconds
      */
-    void execRpc(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer, int timeout) {
+    void execRpc(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer,
+                 int timeout) {
         if (log.isTraceEnabled()) {
             log.trace("Executing RPC with timeout {} seconds (context deadline {})...",
                       timeout, context().getDeadline());
@@ -235,21 +280,10 @@
     }
 
     private void checkGrpcException(StatusRuntimeException sre) {
-        switch (sre.getStatus().getCode()) {
-            case PERMISSION_DENIED:
-                // Notify upper layers that this node is not master.
-                controller.postEvent(new P4RuntimeEvent(
-                        P4RuntimeEvent.Type.PERMISSION_DENIED,
-                        new BaseEventSubject(deviceId)));
-                break;
-            case UNAVAILABLE:
-                // Channel might be closed.
-                controller.postEvent(new P4RuntimeEvent(
-                        P4RuntimeEvent.Type.CHANNEL_EVENT,
-                        new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
-                break;
-            default:
-                break;
+        if (sre.getStatus().getCode() == Status.Code.PERMISSION_DENIED) {
+            // Notify upper layers that this node is not master.
+            controller.postEvent(new DeviceAgentEvent(
+                    DeviceAgentEvent.Type.NOT_MASTER, deviceId));
         }
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
index c023f26..b0b6c57 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
@@ -18,6 +18,7 @@
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.TextFormat;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.p4runtime.api.P4RuntimePipelineConfigClient;
@@ -32,11 +33,13 @@
 import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigResponse;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.LONG_TIMEOUT_SECONDS;
+import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.SHORT_TIMEOUT_SECONDS;
 import static org.slf4j.LoggerFactory.getLogger;
 import static p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest.ResponseType.COOKIE_ONLY;
 import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
@@ -62,6 +65,12 @@
     public CompletableFuture<Boolean> setPipelineConfig(
             PiPipeconf pipeconf, ByteBuffer deviceData) {
 
+        if (!client.isSessionOpen()) {
+            log.warn("Dropping set pipeline config request for {}, session is CLOSED",
+                     client.deviceId());
+            return completedFuture(false);
+        }
+
         log.info("Setting pipeline config for {} to {}...",
                  client.deviceId(), pipeconf.id());
 
@@ -98,11 +107,13 @@
                         // All good, pipeline is set.
                         future.complete(true);
                     }
+
                     @Override
                     public void onError(Throwable t) {
                         client.handleRpcError(t, "SET-pipeline-config");
                         future.complete(false);
                     }
+
                     @Override
                     public void onCompleted() {
                         // Ignore, unary call.
@@ -154,6 +165,11 @@
                         pipeconf, expectedDeviceData, cfgFromDevice));
     }
 
+    @Override
+    public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
+        return getPipelineCookieFromServer().thenApply(Objects::nonNull);
+    }
+
     private boolean comparePipelineConfig(
             PiPipeconf pipeconf, ByteBuffer expectedDeviceData,
             ForwardingPipelineConfig cfgFromDevice) {
@@ -209,17 +225,38 @@
                     public void onNext(GetForwardingPipelineConfigResponse value) {
                         if (value.hasConfig()) {
                             future.complete(value.getConfig());
+                            if (!value.getConfig().getP4DeviceConfig().isEmpty()) {
+                                log.warn("{} returned GetForwardingPipelineConfigResponse " +
+                                                 "with p4_device_config field set " +
+                                                 "({} bytes), but we requested COOKIE_ONLY",
+                                         client.deviceId(),
+                                         value.getConfig().getP4DeviceConfig().size());
+                            }
+                            if (value.getConfig().hasP4Info()) {
+                                log.warn("{} returned GetForwardingPipelineConfigResponse " +
+                                                 "with p4_info field set " +
+                                                 "({} bytes), but we requested COOKIE_ONLY",
+                                         client.deviceId(),
+                                         value.getConfig().getP4Info().getSerializedSize());
+                            }
                         } else {
+                            future.complete(null);
                             log.warn("{} returned {} with 'config' field unset",
                                      client.deviceId(), value.getClass().getSimpleName());
                         }
-                        future.complete(null);
                     }
 
                     @Override
                     public void onError(Throwable t) {
-                        client.handleRpcError(t, "GET-pipeline-config");
                         future.complete(null);
+                        if (Status.fromThrowable(t).getCode() ==
+                                Status.Code.FAILED_PRECONDITION) {
+                            // FAILED_PRECONDITION means that a pipeline
+                            // config was not set in the first place, don't
+                            // bother logging.
+                            return;
+                        }
+                        client.handleRpcError(t, "GET-pipeline-config");
                     }
 
                     @Override
@@ -231,7 +268,7 @@
         // (e.g. server does not support cookie), over a slow network.
         client.execRpc(
                 s -> s.getForwardingPipelineConfig(request, responseObserver),
-                LONG_TIMEOUT_SECONDS);
+                SHORT_TIMEOUT_SECONDS);
         return future;
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
index 7afd97b..1bf195c 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -22,14 +22,15 @@
 import io.grpc.stub.ClientCallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.runtime.PiPacketOperation;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.onosproject.p4runtime.api.P4RuntimeStreamClient;
 import org.onosproject.p4runtime.ctl.codec.CodecException;
-import org.onosproject.p4runtime.ctl.controller.ArbitrationUpdateEvent;
-import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore.MasterElectionIdListener;
 import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
 import org.onosproject.p4runtime.ctl.controller.PacketInEvent;
 import org.slf4j.Logger;
@@ -42,6 +43,8 @@
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -54,73 +57,95 @@
 
     private static final Logger log = getLogger(StreamClientImpl.class);
 
-    private static final BigInteger ONE_THOUSAND = BigInteger.valueOf(1000);
-
     private final P4RuntimeClientImpl client;
     private final DeviceId deviceId;
     private final long p4DeviceId;
     private final PiPipeconfService pipeconfService;
+    private final MasterElectionIdStore masterElectionIdStore;
     private final P4RuntimeControllerImpl controller;
+
     private final StreamChannelManager streamChannelManager = new StreamChannelManager();
+    private final MasterElectionIdListener masterElectionIdListener = new InternalMasterElectionIdListener();
 
-    private P4RuntimeOuterClass.Uint128 lastUsedElectionId = P4RuntimeOuterClass.Uint128
-            .newBuilder().setLow(1).build();
+    private final AtomicBoolean isMaster = new AtomicBoolean(false);
+    private final AtomicBoolean requestedToBeMaster = new AtomicBoolean(false);
 
-    private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
+    private BigInteger pendingElectionId = null;
+    private BigInteger lastUsedElectionId = null;
 
     StreamClientImpl(
             PiPipeconfService pipeconfService,
+            MasterElectionIdStore masterElectionIdStore,
             P4RuntimeClientImpl client,
             P4RuntimeControllerImpl controller) {
         this.client = client;
         this.deviceId = client.deviceId();
         this.p4DeviceId = client.p4DeviceId();
         this.pipeconfService = pipeconfService;
+        this.masterElectionIdStore = masterElectionIdStore;
         this.controller = controller;
     }
 
     @Override
-    public void openSession() {
-        if (isSessionOpen()) {
-            log.debug("Dropping request to open session for {}, session is already open",
-                      deviceId);
-            return;
-        }
-        log.debug("Opening session for {}...", deviceId);
-        sendMasterArbitrationUpdate(controller.newMasterElectionId(deviceId));
-
-    }
-
-    @Override
     public boolean isSessionOpen() {
         return streamChannelManager.isOpen();
     }
 
     @Override
     public void closeSession() {
-        streamChannelManager.complete();
+        synchronized (requestedToBeMaster) {
+            this.masterElectionIdStore.unsetListener(deviceId);
+            streamChannelManager.teardown();
+            pendingElectionId = null;
+            requestedToBeMaster.set(false);
+            isMaster.set(false);
+        }
     }
 
     @Override
-    public void runForMastership() {
-        if (!isSessionOpen()) {
-            log.debug("Dropping mastership request for {}, session is closed",
-                      deviceId);
-            return;
+    public void setMastership(boolean master, BigInteger newElectionId) {
+        checkNotNull(newElectionId);
+        checkArgument(newElectionId.compareTo(BigInteger.ZERO) > 0,
+                      "newElectionId must be a non zero positive number");
+        synchronized (requestedToBeMaster) {
+            requestedToBeMaster.set(master);
+            pendingElectionId = newElectionId;
+            handlePendingElectionId(masterElectionIdStore.get(deviceId));
         }
-        // Becoming master is a race. Here we increase our chances of win, i.e.
-        // using the highest election ID, against other ONOS nodes in the
-        // cluster that are calling openSession() (which is used to start the
-        // stream RPC session, not to become master).
-        log.debug("Running for mastership on {}...", deviceId);
-        final BigInteger masterId = controller.newMasterElectionId(deviceId)
-                .add(ONE_THOUSAND);
-        sendMasterArbitrationUpdate(masterId);
+    }
+
+    private void handlePendingElectionId(BigInteger masterElectionId) {
+        synchronized (requestedToBeMaster) {
+            if (pendingElectionId == null) {
+                // No pending requests.
+                return;
+            }
+            if (!requestedToBeMaster.get() && masterElectionId != null
+                    && pendingElectionId.compareTo(masterElectionId) > 0) {
+                log.info("Deferring sending master arbitration update, master " +
+                                  "election ID of server ({}) is smaller than " +
+                                  "requested one ({}), but we do NOT want to be master...",
+                          masterElectionId, pendingElectionId);
+                // Will try again as soon as the server reports a new master
+                // election ID that is bigger than the pending non-master one.
+                masterElectionIdStore.setListener(deviceId, masterElectionIdListener);
+            } else {
+                // Send now.
+                log.info("Setting mastership on {}... " +
+                                  "master={}, newElectionId={}, masterElectionId={}",
+                          deviceId, requestedToBeMaster.get(),
+                          pendingElectionId, masterElectionId);
+                sendMasterArbitrationUpdate(pendingElectionId);
+                pendingElectionId = null;
+                // No need to listen for master election ID changes.
+                masterElectionIdStore.unsetListener(deviceId);
+            }
+        }
     }
 
     @Override
     public boolean isMaster() {
-        return streamChannelManager.isOpen() && isClientMaster.get();
+        return isMaster.get();
     }
 
     @Override
@@ -141,7 +166,7 @@
             final StreamMessageRequest packetOutRequest = StreamMessageRequest
                     .newBuilder().setPacket(packetOut).build();
             // Send.
-            streamChannelManager.sendIfOpen(packetOutRequest);
+            streamChannelManager.send(packetOutRequest);
         } catch (CodecException e) {
             log.error("Unable to send packet-out: {}", e.getMessage());
         }
@@ -160,7 +185,7 @@
                                         .setElectionId(idMsg)
                                         .build())
                         .build());
-        lastUsedElectionId = idMsg;
+        lastUsedElectionId = electionId;
     }
 
     private P4RuntimeOuterClass.Uint128 bigIntegerToUint128(BigInteger value) {
@@ -220,13 +245,22 @@
         if (!msg.hasElectionId() || !msg.hasStatus()) {
             return;
         }
-        final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
-        log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
-                  deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
-        controller.postEvent(new P4RuntimeEvent(
-                P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
-                new ArbitrationUpdateEvent(deviceId, isMaster)));
-        isClientMaster.set(isMaster);
+        // Is this client master?
+        isMaster.set(msg.getStatus().getCode() == Status.OK.getCode().value());
+        // Notify new master election IDs to all nodes via distributed store.
+        // This is required for those nodes who do not have a Stream RPC open,
+        // and that otherwise would not be aware of changes, keeping their
+        // pending mastership operations forever.
+        final BigInteger masterElectionId = uint128ToBigInteger(msg.getElectionId());
+        masterElectionIdStore.set(deviceId, masterElectionId);
+
+        log.debug("Received arbitration update from {}: isMaster={}, masterElectionId={}",
+                  deviceId, isMaster.get(), masterElectionId);
+
+        // Post mastership event via controller.
+        controller.postEvent(new DeviceAgentEvent(
+                isMaster.get() ? DeviceAgentEvent.Type.ROLE_MASTER
+                        : DeviceAgentEvent.Type.ROLE_STANDBY, deviceId));
     }
 
     /**
@@ -236,7 +270,22 @@
      * @return election ID uint128 protobuf message
      */
     P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
-        return lastUsedElectionId;
+        return lastUsedElectionId == null
+                ? P4RuntimeOuterClass.Uint128.getDefaultInstance()
+                : bigIntegerToUint128(lastUsedElectionId);
+    }
+
+    /**
+     * Handles updates of the master election ID by applying any pending
+     * mastership operation.
+     */
+    private class InternalMasterElectionIdListener
+            implements MasterElectionIdStore.MasterElectionIdListener {
+
+        @Override
+        public void updated(BigInteger masterElectionId) {
+            handlePendingElectionId(masterElectionId);
+        }
     }
 
     /**
@@ -258,23 +307,12 @@
             }
         }
 
-        void sendIfOpen(StreamMessageRequest value) {
-            // We do not lock here, but we ignore NPEs due to stream RPC not
-            // being active (null requestObserver). Good for frequent
-            // packet-outs.
-            try {
-                doSend(value);
-            } catch (NullPointerException e) {
-                if (requestObserver != null) {
-                    // Must be something else.
-                    throw e;
-                }
-            }
-        }
-
         private void doSend(StreamMessageRequest value) {
             try {
                 requestObserver.onNext(value);
+                // Optimistically set the session as open. In case of errors, it
+                // will be closed by the response stream observer.
+                streamChannelManager.signalOpen();
             } catch (Throwable ex) {
                 if (ex instanceof StatusRuntimeException) {
                     log.warn("Unable to send {} to {}: {}",
@@ -283,7 +321,7 @@
                     log.error("Exception while sending {} to {}: {}",
                               value.getUpdateCase().toString(), deviceId, ex);
                 }
-                complete();
+                teardown();
             }
         }
 
@@ -299,7 +337,7 @@
             }
         }
 
-        void complete() {
+        void teardown() {
             synchronized (this) {
                 signalClosed();
                 if (requestObserver != null) {
@@ -311,23 +349,16 @@
         }
 
         void signalOpen() {
-            synchronized (this) {
-                final boolean wasOpen = open.getAndSet(true);
-                if (!wasOpen) {
-                    controller.postEvent(new P4RuntimeEvent(
-                            P4RuntimeEvent.Type.CHANNEL_EVENT,
-                            new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
-                }
-            }
+            open.set(true);
         }
 
         void signalClosed() {
             synchronized (this) {
                 final boolean wasOpen = open.getAndSet(false);
                 if (wasOpen) {
-                    controller.postEvent(new P4RuntimeEvent(
-                            P4RuntimeEvent.Type.CHANNEL_EVENT,
-                            new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
+                    // We lost any valid mastership role.
+                    controller.postEvent(new DeviceAgentEvent(
+                            DeviceAgentEvent.Type.ROLE_NONE, deviceId));
                 }
             }
         }
@@ -352,13 +383,11 @@
 
         @Override
         public void onNext(StreamMessageResponse message) {
-            streamChannelManager.signalOpen();
             try {
                 if (log.isTraceEnabled()) {
-                    log.trace(
-                            "Received {} from {}: {}",
-                            message.getUpdateCase(), deviceId,
-                            TextFormat.shortDebugString(message));
+                    log.trace("Received {} from {}: {}",
+                              message.getUpdateCase(), deviceId,
+                              TextFormat.shortDebugString(message));
                 }
                 switch (message.getUpdateCase()) {
                     case PACKET:
@@ -388,17 +417,18 @@
                     log.warn("Error on stream channel for {}: {}",
                              deviceId, throwable.getMessage());
                 }
+                log.debug("", throwable);
             } else {
                 log.error(format("Exception on stream channel for %s",
                                  deviceId), throwable);
             }
-            streamChannelManager.complete();
+            streamChannelManager.teardown();
         }
 
         @Override
         public void onCompleted() {
             log.warn("Stream channel for {} has completed", deviceId);
-            streamChannelManager.complete();
+            streamChannelManager.teardown();
         }
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ChannelEvent.java
deleted file mode 100644
index 0a77e46..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ChannelEvent.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl.controller;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
-
-/**
- * Channel event in P4Runtime.
- */
-public final class ChannelEvent implements P4RuntimeEventSubject {
-
-    public enum Type {
-        OPEN,
-        CLOSED,
-        ERROR
-    }
-
-    private DeviceId deviceId;
-    private Type type;
-
-    /**
-     * Creates channel event with given status and throwable.
-     *
-     * @param deviceId  the device
-     * @param type      error type
-     */
-    public ChannelEvent(DeviceId deviceId, Type type) {
-        this.deviceId = deviceId;
-        this.type = type;
-    }
-
-    /**
-     * Gets the type of this event.
-     *
-     * @return the error type
-     */
-    public Type type() {
-        return type;
-    }
-
-    @Override
-    public DeviceId deviceId() {
-        return deviceId;
-    }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedElectionIdGenerator.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedElectionIdGenerator.java
deleted file mode 100644
index 980ab11..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedElectionIdGenerator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl.controller;
-
-import org.onlab.util.KryoNamespace;
-import org.onosproject.net.DeviceId;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.AtomicCounterMap;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.slf4j.Logger;
-
-import java.math.BigInteger;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Distributed implementation of a generator of P4Runtime election IDs.
- */
-final class DistributedElectionIdGenerator {
-
-    private final Logger log = getLogger(this.getClass());
-
-    // FIXME: counter map use long, but P4Runtime accepts 128bit election IDs
-    private AtomicCounterMap<DeviceId> electionIds;
-
-    /**
-     * Creates a new election ID generator using the given storage service.
-     *
-     * @param storageService storage service
-     */
-    DistributedElectionIdGenerator(StorageService storageService) {
-        KryoNamespace serializer = KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .build();
-        this.electionIds = storageService.<DeviceId>atomicCounterMapBuilder()
-                .withName("p4runtime-election-ids")
-                .withSerializer(Serializer.using(serializer))
-                .build();
-    }
-
-    /**
-     * Returns an election ID for the given device ID. The first election ID for
-     * a given device ID is always 1.
-     *
-     * @param deviceId device ID
-     * @return new election ID
-     */
-    BigInteger generate(DeviceId deviceId) {
-        if (electionIds == null) {
-            return null;
-        }
-        // Default value is 0 for AtomicCounterMap.
-        return BigInteger.valueOf(electionIds.incrementAndGet(deviceId));
-    }
-
-    /**
-     * Destroy the backing distributed primitive of this generator.
-     */
-    void destroy() {
-        try {
-            electionIds.destroy().get(10, TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.error("Exception while destroying distributed counter map", e);
-        } finally {
-            electionIds = null;
-        }
-    }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
new file mode 100644
index 0000000..dff2330
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.controller;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.concurrent.ConcurrentMap;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Distributed implementation of MasterElectionIdStore.
+ */
+@Component(immediate = true, service = MasterElectionIdStore.class)
+public class DistributedMasterElectionIdStore implements MasterElectionIdStore {
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    private static final KryoNamespace SERIALIZER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(BigInteger.class)
+            .build();
+
+    private final Logger log = getLogger(getClass());
+    private final EventuallyConsistentMapListener<DeviceId, BigInteger> mapListener =
+            new InternalMapListener();
+
+    private EventuallyConsistentMap<DeviceId, BigInteger> masterElectionIds;
+    private ConcurrentMap<DeviceId, MasterElectionIdListener> listeners =
+            Maps.newConcurrentMap();
+
+    @Activate
+    public void activate() {
+        this.listeners = Maps.newConcurrentMap();
+        this.masterElectionIds = storageService.<DeviceId, BigInteger>eventuallyConsistentMapBuilder()
+                .withName("p4runtime-master-election-ids")
+                .withSerializer(SERIALIZER)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .build();
+        this.masterElectionIds.addListener(mapListener);
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        this.masterElectionIds.removeListener(mapListener);
+        this.masterElectionIds.destroy();
+        this.masterElectionIds = null;
+        this.listeners.clear();
+        this.listeners = null;
+        log.info("Stopped");
+    }
+
+
+    @Override
+    public void set(DeviceId deviceId, BigInteger electionId) {
+        checkNotNull(deviceId);
+        checkNotNull(electionId);
+        this.masterElectionIds.put(deviceId, electionId);
+    }
+
+    @Override
+    public BigInteger get(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        return this.masterElectionIds.get(deviceId);
+    }
+
+    @Override
+    public void remove(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        this.masterElectionIds.remove(deviceId);
+    }
+
+    @Override
+    public void setListener(DeviceId deviceId, MasterElectionIdListener newListener) {
+        checkNotNull(deviceId);
+        checkNotNull(newListener);
+        listeners.compute(deviceId, (did, existingListener) -> {
+            if (existingListener == null || existingListener == newListener) {
+                return newListener;
+            } else {
+                log.error("Cannot add listener as one already exist for {}", deviceId);
+                return existingListener;
+            }
+        });
+    }
+
+    @Override
+    public void unsetListener(DeviceId deviceId) {
+        listeners.remove(deviceId);
+    }
+
+    private class InternalMapListener implements EventuallyConsistentMapListener<DeviceId, BigInteger> {
+        @Override
+        public void event(EventuallyConsistentMapEvent<DeviceId, BigInteger> event) {
+            final MasterElectionIdListener listener = listeners.get(event.key());
+            if (listener == null) {
+                return;
+            }
+            switch (event.type()) {
+                case PUT:
+                    listener.updated(event.value());
+                    break;
+                case REMOVE:
+                    listener.updated(null);
+                    break;
+                default:
+                    log.error("Unrecognized map event type {}", event.type());
+            }
+        }
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
new file mode 100644
index 0000000..f5393d1
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.controller;
+
+import org.onosproject.net.DeviceId;
+
+import java.math.BigInteger;
+
+/**
+ * Store that keeps track of master election IDs for each device.
+ */
+public interface MasterElectionIdStore {
+
+    /**
+     * Sets the master election ID for the given device.
+     *
+     * @param deviceId   device ID
+     * @param electionId election ID
+     */
+    void set(DeviceId deviceId, BigInteger electionId);
+
+    /**
+     * Returns the last known master election ID for the given device, or null.
+     *
+     * @param deviceId device ID
+     * @return election ID
+     */
+    BigInteger get(DeviceId deviceId);
+
+    /**
+     * Removes any state associated with the given device.
+     *
+     * @param deviceId device ID
+     */
+    void remove(DeviceId deviceId);
+
+    /**
+     * Sets a listener for the given device that will be invoked every time
+     * there will be changes to the master election ID.
+     *
+     * @param deviceId device ID
+     * @param listener listener
+     */
+    void setListener(DeviceId deviceId, MasterElectionIdListener listener);
+
+    /**
+     * Unset the listener for the given device.
+     *
+     * @param deviceId device ID
+     */
+    void unsetListener(DeviceId deviceId);
+
+    /**
+     * Listener of master election ID changes for a specific device.
+     */
+    interface MasterElectionIdListener {
+
+        /**
+         * Notifies that the master election ID has been updated to the given
+         * (nullable) value.
+         *
+         * @param masterElectionId new master election ID, or null
+         */
+        void updated(BigInteger masterElectionId);
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
index affbf7d..2256d5e 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
@@ -16,33 +16,19 @@
 
 package org.onosproject.p4runtime.ctl.controller;
 
-import com.google.common.collect.Maps;
 import io.grpc.ManagedChannel;
 import org.onosproject.grpc.ctl.AbstractGrpcClientController;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceAgentEvent;
-import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.pi.service.PiPipeconfService;
-import org.onosproject.net.provider.ProviderId;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.onosproject.p4runtime.api.P4RuntimeEventListener;
 import org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl;
-import org.onosproject.store.service.StorageService;
-import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-
-import java.math.BigInteger;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * P4Runtime controller implementation.
@@ -53,122 +39,34 @@
         <P4RuntimeClientKey, P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
         implements P4RuntimeController {
 
-    private final Logger log = getLogger(getClass());
-
-    private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
-            deviceAgentListeners = Maps.newConcurrentMap();
-
-    private DistributedElectionIdGenerator electionIdGenerator;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    private StorageService storageService;
-
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     private PiPipeconfService pipeconfService;
 
-    @Activate
-    public void activate() {
-        super.activate();
-        eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
-        electionIdGenerator = new DistributedElectionIdGenerator(storageService);
-        log.info("Started");
-    }
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private MasterElectionIdStore masterElectionIdStore;
 
-    @Deactivate
-    public void deactivate() {
-        super.deactivate();
-        deviceAgentListeners.clear();
-        electionIdGenerator.destroy();
-        electionIdGenerator = null;
-        log.info("Stopped");
+    public P4RuntimeControllerImpl() {
+        super(P4RuntimeEvent.class);
     }
 
     @Override
-    protected P4RuntimeClient createClientInstance(P4RuntimeClientKey clientKey, ManagedChannel channel) {
-        return new P4RuntimeClientImpl(clientKey, channel, this, pipeconfService);
+    public void removeClient(DeviceId deviceId) {
+        super.removeClient(deviceId);
+        // Assuming that when a client is removed, it is done so by all nodes,
+        // this is the best place to clear master election ID state.
+        masterElectionIdStore.remove(deviceId);
     }
 
     @Override
-    public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
-        checkNotNull(deviceId, "deviceId cannot be null");
-        checkNotNull(deviceId, "providerId cannot be null");
-        checkNotNull(listener, "listener cannot be null");
-        deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
-        deviceAgentListeners.get(deviceId).put(providerId, listener);
+    public void removeClient(P4RuntimeClientKey clientKey) {
+        super.removeClient(clientKey);
+        masterElectionIdStore.remove(clientKey.deviceId());
     }
 
     @Override
-    public void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId) {
-        checkNotNull(deviceId, "deviceId cannot be null");
-        checkNotNull(providerId, "listener cannot be null");
-        deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
-            listeners.remove(providerId);
-            return listeners;
-        });
-    }
-
-    public BigInteger newMasterElectionId(DeviceId deviceId) {
-        return electionIdGenerator.generate(deviceId);
-    }
-
-    public void postEvent(P4RuntimeEvent event) {
-        switch (event.type()) {
-            case CHANNEL_EVENT:
-                handleChannelEvent(event);
-                break;
-            case ARBITRATION_RESPONSE:
-                handleArbitrationReply(event);
-                break;
-            case PERMISSION_DENIED:
-                handlePermissionDenied(event);
-                break;
-            default:
-                post(event);
-                break;
-        }
-    }
-
-    private void handlePermissionDenied(P4RuntimeEvent event) {
-        postDeviceAgentEvent(event.subject().deviceId(), new DeviceAgentEvent(
-                DeviceAgentEvent.Type.NOT_MASTER, event.subject().deviceId()));
-    }
-
-    private void handleChannelEvent(P4RuntimeEvent event) {
-        final ChannelEvent channelEvent = (ChannelEvent) event.subject();
-        final DeviceId deviceId = channelEvent.deviceId();
-        final DeviceAgentEvent.Type agentEventType;
-        switch (channelEvent.type()) {
-            case OPEN:
-                agentEventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
-                break;
-            case CLOSED:
-                agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
-                break;
-            case ERROR:
-                agentEventType = !isReachable(deviceId)
-                        ? DeviceAgentEvent.Type.CHANNEL_CLOSED
-                        : DeviceAgentEvent.Type.CHANNEL_ERROR;
-                break;
-            default:
-                log.warn("Unrecognized channel event type {}", channelEvent.type());
-                return;
-        }
-        postDeviceAgentEvent(deviceId, new DeviceAgentEvent(agentEventType, deviceId));
-    }
-
-    private void handleArbitrationReply(P4RuntimeEvent event) {
-        final DeviceId deviceId = event.subject().deviceId();
-        final ArbitrationUpdateEvent response = (ArbitrationUpdateEvent) event.subject();
-        final DeviceAgentEvent.Type roleType = response.isMaster()
-                ? DeviceAgentEvent.Type.ROLE_MASTER
-                : DeviceAgentEvent.Type.ROLE_STANDBY;
-        postDeviceAgentEvent(deviceId, new DeviceAgentEvent(
-                roleType, response.deviceId()));
-    }
-
-    private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
-        if (deviceAgentListeners.containsKey(deviceId)) {
-            deviceAgentListeners.get(deviceId).values().forEach(l -> l.event(event));
-        }
+    protected P4RuntimeClient createClientInstance(
+            P4RuntimeClientKey clientKey, ManagedChannel channel) {
+        return new P4RuntimeClientImpl(clientKey, channel, this,
+                                       pipeconfService, masterElectionIdStore);
     }
 }
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
new file mode 100644
index 0000000..595e07d
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
+
+import java.math.BigInteger;
+
+public class MockMasterElectionIdStore implements MasterElectionIdStore {
+
+    @Override
+    public void set(DeviceId deviceId, BigInteger electionId) {
+
+    }
+
+    @Override
+    public BigInteger get(DeviceId deviceId) {
+        return null;
+    }
+
+    @Override
+    public void remove(DeviceId deviceId) {
+
+    }
+
+    @Override
+    public void setListener(DeviceId deviceId, MasterElectionIdListener listener) {
+
+    }
+
+    @Override
+    public void unsetListener(DeviceId deviceId) {
+
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index c6a43cb..4c4dcdf 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -55,6 +55,8 @@
 import p4.v1.P4RuntimeOuterClass.WriteRequest;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.Collection;
 import java.util.List;
@@ -62,6 +64,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static java.lang.String.format;
 import static org.easymock.EasyMock.niceMock;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -105,7 +108,7 @@
     private static final int SET_EGRESS_PORT_ID = 16794308;
     private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
     private static final long DEFAULT_TIMEOUT_TIME = 10;
-    private static final Uint128 DEFAULT_ELECTION_ID = Uint128.newBuilder().setLow(1).build();
+    private static final Uint128 DEFAULT_ELECTION_ID = Uint128.getDefaultInstance();
     private static final String P4R_IP = "127.0.0.1";
     private static final int P4R_PORT = 50010;
 
@@ -157,10 +160,13 @@
 
 
     @Before
-    public void setup() {
+    public void setup() throws URISyntaxException {
         controller = niceMock(org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl.class);
-        P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, P4R_IP, P4R_PORT, P4_DEVICE_ID);
-        client = new P4RuntimeClientImpl(clientKey, grpcChannel, controller, new MockPipeconfService());
+        P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, new URI(
+                format("grpc://%s:%d?device_id=%d", P4R_IP, P4R_PORT, P4_DEVICE_ID)));
+        client = new P4RuntimeClientImpl(
+                clientKey, grpcChannel, controller, new MockPipeconfService(),
+                new MockMasterElectionIdStore());
     }
 
     @Test