Improve scalability of P4Runtime subsystem

The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.

Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)

Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups

Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/core/api/src/main/java/org/onosproject/net/device/ChannelEvent.java b/core/api/src/main/java/org/onosproject/net/device/ChannelEvent.java
deleted file mode 100644
index 98ea5cb..0000000
--- a/core/api/src/main/java/org/onosproject/net/device/ChannelEvent.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.net.device;
-
-import org.onlab.util.Tools;
-import org.onosproject.event.AbstractEvent;
-import org.onosproject.net.DeviceId;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-/**
- * Describes event related to a channel established by ONOS with a device.
- */
-public class ChannelEvent extends AbstractEvent<ChannelEvent.Type, DeviceId> {
-
-    private final Throwable throwable;
-
-    /**
-     * Type of device events.
-     */
-    public enum Type {
-        /**
-         * Signifies that the channel has properly connected.
-         */
-        CHANNEL_CONNECTED,
-
-        /**
-         * Signifies that the channel has disconnected.
-         */
-        CHANNEL_DISCONNECTED,
-
-        /**
-         * Signifies that an error happened on the channel with the given device.
-         */
-        CHANNEL_ERROR
-
-    }
-
-    /**
-     * Creates an event of a given type and for the specified device.
-     *
-     * @param type     device event type
-     * @param deviceId event device subject
-     */
-    public ChannelEvent(Type type, DeviceId deviceId) {
-        this(type, deviceId, null);
-    }
-
-    /**
-     * Creates an event of a given type and for the specified device, given a certain throwable.
-     *
-     * @param type      device event type
-     * @param deviceId  event device subject
-     * @param throwable exception happened on the channel
-     */
-    public ChannelEvent(Type type, DeviceId deviceId, Throwable throwable) {
-        super(type, deviceId);
-        this.throwable = throwable;
-    }
-
-    /**
-     * Creates an event of a given type and for the specified device and the current time.
-     *
-     * @param type      device event type
-     * @param deviceId  event device subject
-     * @param throwable exception happened on the channel
-     * @param time      occurrence time
-     */
-    public ChannelEvent(Type type, DeviceId deviceId, Throwable throwable, long time) {
-        super(type, deviceId, time);
-        this.throwable = throwable;
-    }
-
-    /**
-     * Returns the exception that happened on the channel.
-     *
-     * @return a throwable if associated to the event, otherwise null.
-     */
-    public Throwable throwable() {
-        return throwable;
-    }
-
-    @Override
-    public String toString() {
-        if (throwable == null) {
-            return super.toString();
-        }
-        return toStringHelper(this)
-                .add("time", Tools.defaultOffsetDataTime(time()))
-                .add("type", type())
-                .add("subject", subject())
-                .add("throwable", throwable)
-                .toString();
-    }
-}
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceAgentEvent.java b/core/api/src/main/java/org/onosproject/net/device/DeviceAgentEvent.java
new file mode 100644
index 0000000..22eab99
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceAgentEvent.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.device;
+
+import org.onosproject.event.AbstractEvent;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Describes and event related to a protocol agent used to interact with an
+ * infrastructure device.
+ */
+public final class DeviceAgentEvent
+        extends AbstractEvent<DeviceAgentEvent.Type, DeviceId> {
+
+    /**
+     * Type of device events.
+     */
+    public enum Type {
+        /**
+         * Signifies that a channel between the agent and the device is open and
+         * the two can communicate.
+         */
+        CHANNEL_OPEN,
+
+        /**
+         * Signifies that a channel between the agent and the device is closed
+         * and the two cannot communicate.
+         */
+        CHANNEL_CLOSED,
+
+        /**
+         * Signifies that a channel error has been detected. Further
+         * investigation should be performed to check if the channel is still
+         * open or closed.
+         */
+        CHANNEL_ERROR,
+
+        /**
+         * Signifies that the agent has acquired master role.
+         */
+        ROLE_MASTER,
+
+        /**
+         * Signifies that the agent has standby/slave mastership role.
+         */
+        ROLE_STANDBY,
+
+        /**
+         * Signifies that the agent cannot acquire any valid mastership role for
+         * the device.
+         */
+        ROLE_NONE,
+
+    }
+
+    /**
+     * Creates a new device agent event for the given type and device ID.
+     *
+     * @param type     event type
+     * @param deviceId device ID
+     */
+    public DeviceAgentEvent(Type type, DeviceId deviceId) {
+        super(type, deviceId);
+    }
+
+    /**
+     * Creates a new device agent event for the given type, device ID and time.
+     *
+     * @param type     event type
+     * @param deviceId device ID
+     * @param time     occurrence time
+     */
+    public DeviceAgentEvent(Type type, DeviceId deviceId, long time) {
+        super(type, deviceId, time);
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/device/ChannelListener.java b/core/api/src/main/java/org/onosproject/net/device/DeviceAgentListener.java
similarity index 79%
rename from core/api/src/main/java/org/onosproject/net/device/ChannelListener.java
rename to core/api/src/main/java/org/onosproject/net/device/DeviceAgentListener.java
index cdc90bf70..c034dc2 100644
--- a/core/api/src/main/java/org/onosproject/net/device/ChannelListener.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceAgentListener.java
@@ -18,7 +18,8 @@
 import org.onosproject.event.EventListener;
 
 /**
- * A listener to receive events related to a transport channel established with a device.
+ * A listener to receive events related to a protocol agent controlling an
+ * infrastructure device.
  */
-public interface ChannelListener extends EventListener<ChannelEvent> {
+public interface DeviceAgentListener extends EventListener<DeviceAgentEvent> {
 }
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
index 606248a..bc870d3 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
@@ -30,38 +30,41 @@
 public interface DeviceHandshaker extends DeviceConnect {
 
     /**
-     * Checks the reachability (connectivity) of a device.
-     * Reachability, unlike availability, denotes whether THIS particular node
-     * can send messages and receive replies from the specified device.
+     * Checks the reachability (connectivity) of a device. Reachability, unlike
+     * availability, denotes whether THIS particular node can send messages and
+     * receive replies from the specified device.
      *
      * @return CompletableFuture eventually true if reachable, false otherwise
      */
     CompletableFuture<Boolean> isReachable();
 
     /**
-     * Applies on the device a mastership role change as decided by the core.
+     * Notifies the device a mastership role change as decided by the core. The
+     * implementation of this method should trigger a {@link DeviceAgentEvent}
+     * signaling the mastership role accepted by the device.
      *
-     * @param newRole newly determined mastership role
-     * @return CompletableFuture with the mastership role accepted from the device
+     * @param newRole new mastership role
      */
-    CompletableFuture<MastershipRole> roleChanged(MastershipRole newRole);
+    void roleChanged(MastershipRole newRole);
 
     /**
-     * Applies a listener to a channel established with the device.
+     * Adds a device agent listener.
      *
-     * @param listener the channel listener
+     * @param listener device agent listener
      */
-    default void addChannelListener(ChannelListener listener) {
-        throw new UnsupportedOperationException("Listener Registration not supported");
+    default void addDeviceAgentListener(DeviceAgentListener listener) {
+        throw new UnsupportedOperationException(
+                "Device agent listener registration not supported");
     }
 
     /**
-     * Removes a listener to a channel established with the device.
+     * Removes a device agent listener.
      *
-     * @param listener the channel listener
+     * @param listener device agent listener
      */
-    default void removeChannelListener(ChannelListener listener) {
-        throw new UnsupportedOperationException("Listener Removal not supported");
+    default void removeDeviceAgentListener(DeviceAgentListener listener) {
+        throw new UnsupportedOperationException(
+                "Device agent listener removal not supported");
     }
 
 }
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 414db4a..09d95b4 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -798,7 +798,8 @@
                 }
             } else {
                 // we didn't get back what we asked for. Reelect someone else.
-                log.warn("Failed to assert role [{}] onto Device {}", response, deviceId);
+                log.warn("Failed to assert role onto device {}. requested={}, response={}",
+                         deviceId, requested, response);
                 if (requested == MastershipRole.MASTER) {
                     mastershipService.relinquishMastership(deviceId);
                     // TODO: Shouldn't we be triggering event?
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslatorImpl.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslatorImpl.java
index 5741e65..38efdb9 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslatorImpl.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiFlowRuleTranslatorImpl.java
@@ -146,7 +146,7 @@
             if (tableModel.supportsAging()) {
                 tableEntryBuilder.withTimeout((double) rule.timeout());
             } else {
-                log.warn("Flow rule is temporary, but table '{}' doesn't support " +
+                log.debug("Flow rule is temporary, but table '{}' doesn't support " +
                                  "aging, translating to permanent.", tableModel.id());
             }
 
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 212cf1c..55a4a11 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -139,7 +139,6 @@
 
     @Override
     public void register(PiPipeconf pipeconf) throws IllegalStateException {
-        log.warn("Currently using local maps, needs to be moved to a distributed store");
         if (piPipeconfs.containsKey(pipeconf.id())) {
             throw new IllegalStateException(format("Pipeconf %s is already registered", pipeconf.id()));
         }
@@ -262,7 +261,7 @@
                 cfgService.getConfig(deviceId, PiPipeconfConfig.class);
         PiPipeconfId id = pipeconfConfig.piPipeconfId();
         if (id.id().equals("")) {
-            log.warn("Not adding empty pipeconfId for device {}", deviceId);
+            log.debug("Ignoring empty pipeconf ID for device {}", deviceId);
         } else {
             pipeconfMappingStore.createOrUpdateBinding(deviceId, id);
         }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
index 8e2b54d..ac4856a 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
@@ -16,11 +16,13 @@
 
 package org.onosproject.drivers.p4runtime;
 
+import io.grpc.StatusRuntimeException;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.net.pi.service.PiTranslationService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
@@ -28,6 +30,11 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -35,6 +42,10 @@
  */
 public class AbstractP4RuntimeHandlerBehaviour extends AbstractHandlerBehaviour {
 
+    // Timeout in seconds for device operations.
+    // TODO make configurable via driver properties
+    private static final int DEVICE_OP_TIMEOUT = 5;
+
     public static final String P4RUNTIME_SERVER_ADDR_KEY = "p4runtime_ip";
     public static final String P4RUNTIME_SERVER_PORT_KEY = "p4runtime_port";
     public static final String P4RUNTIME_DEVICE_ID_KEY = "p4runtime_deviceId";
@@ -76,12 +87,18 @@
         client = controller.getClient(deviceId);
 
         PiPipeconfService piPipeconfService = handler().get(PiPipeconfService.class);
-        if (!piPipeconfService.ofDevice(deviceId).isPresent() ||
-                !piPipeconfService.getPipeconf(piPipeconfService.ofDevice(deviceId).get()).isPresent()) {
-            log.warn("Unable to get the pipeconf of {}, aborting operation", deviceId);
+        if (!piPipeconfService.ofDevice(deviceId).isPresent()) {
+            log.warn("Unable to get assigned pipeconf for {} (mapping " +
+                             "missing in PiPipeconfService), aborting operation",
+                     deviceId);
             return false;
         }
-        pipeconf = piPipeconfService.getPipeconf(piPipeconfService.ofDevice(deviceId).get()).get();
+        PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).get();
+        if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
+            log.warn("Cannot find any pipeconf with ID '{}' ({}), aborting operation", pipeconfId, deviceId);
+            return false;
+        }
+        pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
 
         piTranslationService = handler().get(PiTranslationService.class);
 
@@ -89,12 +106,12 @@
     }
 
     /**
-     * Create a P4Runtime client for this device. Returns true if the operation
-     * was successful, false otherwise.
+     * Returns a P4Runtime client for this device, null if such client cannot
+     * be created.
      *
-     * @return true if successful, false otherwise
+     * @return client or null
      */
-    protected boolean createClient() {
+     P4RuntimeClient createClient() {
         deviceId = handler().data().deviceId();
         controller = handler().get(P4RuntimeController.class);
 
@@ -105,24 +122,38 @@
         if (serverAddr == null || serverPortString == null || p4DeviceIdString == null) {
             log.warn("Unable to create client for {}, missing driver data key (required is {}, {}, and {})",
                      deviceId, P4RUNTIME_SERVER_ADDR_KEY, P4RUNTIME_SERVER_PORT_KEY, P4RUNTIME_DEVICE_ID_KEY);
-            return false;
+            return null;
         }
 
-        if (!controller.createClient(deviceId, serverAddr,
-                                     Integer.parseUnsignedInt(serverPortString),
-                                     Long.parseUnsignedLong(p4DeviceIdString))) {
+         final int serverPort;
+         final long p4DeviceId;
+
+         try {
+            serverPort = Integer.parseUnsignedInt(serverPortString);
+        } catch (NumberFormatException e) {
+            log.error("{} is not a valid P4Runtime port number", serverPortString);
+            return null;
+        }
+         try {
+             p4DeviceId = Long.parseUnsignedLong(p4DeviceIdString);
+         } catch (NumberFormatException e) {
+             log.error("{} is not a valid P4Runtime-internal device ID", p4DeviceIdString);
+             return null;
+         }
+
+        if (!controller.createClient(deviceId, serverAddr, serverPort, p4DeviceId)) {
             log.warn("Unable to create client for {}, aborting operation", deviceId);
-            return false;
+            return null;
         }
 
-        return true;
+        return controller.getClient(deviceId);
     }
 
     /**
-     * Returns the value of the given driver property, if present,
-     * otherwise returns the given default value.
+     * Returns the value of the given driver property, if present, otherwise
+     * returns the given default value.
      *
-     * @param propName property name
+     * @param propName   property name
      * @param defaultVal default value
      * @return boolean
      */
@@ -134,4 +165,36 @@
             return Boolean.parseBoolean(handler().driver().getProperty(propName));
         }
     }
+
+    /**
+     * Convenience method to get the result of a completable future while
+     * setting a timeout and checking for exceptions.
+     *
+     * @param future        completable future
+     * @param opDescription operation description to use in log messages. Should
+     *                      be a sentence starting with a verb ending in -ing,
+     *                      e.g. "reading...", "writing...", etc.
+     * @param defaultValue  value to return if operation fails
+     * @param <U>           type of returned value
+     * @return future result or default value
+     */
+    <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
+                                U defaultValue) {
+        try {
+            return future.get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("Exception while {} on {}", opDescription, deviceId);
+        } catch (ExecutionException e) {
+            final Throwable cause = e.getCause();
+            if (cause instanceof StatusRuntimeException) {
+                final StatusRuntimeException grpcError = (StatusRuntimeException) cause;
+                log.warn("Error while {} on {}: {}", opDescription, deviceId, grpcError.getMessage());
+            } else {
+                log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
+            }
+        } catch (TimeoutException e) {
+            log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
+        }
+        return defaultValue;
+    }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
index 6a6c2e9..cfe717d 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
@@ -18,9 +18,8 @@
 
 import org.onlab.util.SharedExecutors;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.behaviour.PiPipelineProgrammable;
+import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.slf4j.Logger;
@@ -28,14 +27,14 @@
 import java.nio.ByteBuffer;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Abstract implementation of the PiPipelineProgrammable behaviours for a P4Runtime device.
  */
-public abstract class AbstractP4RuntimePipelineProgrammable extends AbstractHandlerBehaviour
+public abstract class AbstractP4RuntimePipelineProgrammable
+        extends AbstractP4RuntimeHandlerBehaviour
         implements PiPipelineProgrammable {
 
     protected final Logger log = getLogger(getClass());
@@ -72,25 +71,31 @@
             return false;
         }
 
-        try {
-            if (!client.initStreamChannel().get()) {
-                log.warn("Unable to init stream channel to {}.", deviceId);
-                return false;
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            log.error("Exception while initializing stream channel on {}", deviceId, e);
+        // We need to be master to perform write RPC to the device.
+        // FIXME: properly perform mastership handling in the device provider
+        // This would probably mean deploying the pipeline after the device as
+        // been notified to the core.
+        final Boolean masterSuccess = getFutureWithDeadline(
+                client.becomeMaster(),
+                "becoming master ", null);
+        if (masterSuccess == null) {
+            // Error already logged by getFutureWithDeadline()
+            return false;
+        } else if (!masterSuccess) {
+            log.warn("Unable to become master for {}, aborting pipeconf deploy", deviceId);
             return false;
         }
 
-        try {
-            if (!client.setPipelineConfig(pipeconf, deviceDataBuffer).get()) {
-                log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
-                return false;
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            log.error("Exception while deploying pipeconf to {}", deviceId, e);
+        final Boolean deploySuccess = getFutureWithDeadline(
+                client.setPipelineConfig(pipeconf, deviceDataBuffer),
+                "deploying pipeconf", null);
+        if (deploySuccess == null) {
+            return false;
+        } else if (!deploySuccess) {
+            log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
             return false;
         }
+
         return true;
     }
 
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index 00fe9380..2b941cb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -21,7 +21,6 @@
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import io.grpc.StatusRuntimeException;
 import org.onlab.util.SharedExecutors;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
 import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
@@ -32,7 +31,6 @@
 import org.onosproject.net.pi.model.PiPipelineInterpreter;
 import org.onosproject.net.pi.model.PiPipelineModel;
 import org.onosproject.net.pi.model.PiTableId;
-import org.onosproject.net.pi.model.PiTableModel;
 import org.onosproject.net.pi.runtime.PiCounterCellData;
 import org.onosproject.net.pi.runtime.PiCounterCellId;
 import org.onosproject.net.pi.runtime.PiTableEntry;
@@ -48,7 +46,7 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -110,7 +108,6 @@
                 }
             });
     private PiPipelineModel pipelineModel;
-    private PiPipelineInterpreter interpreter;
     private P4RuntimeTableMirror tableMirror;
     private PiFlowRuleTranslator translator;
 
@@ -125,7 +122,6 @@
             log.warn("Unable to get interpreter of {}", deviceId);
             return false;
         }
-        interpreter = device.as(PiPipelineInterpreter.class);
         pipelineModel = pipeconf.pipelineModel();
         tableMirror = handler().get(P4RuntimeTableMirror.class);
         translator = piTranslationService.flowRuleTranslator();
@@ -146,49 +142,35 @@
         final ImmutableList.Builder<FlowEntry> result = ImmutableList.builder();
         final List<PiTableEntry> inconsistentEntries = Lists.newArrayList();
 
-        for (PiTableModel tableModel : pipelineModel.tables()) {
+        // Read table entries.
+        final Collection<PiTableEntry> installedEntries;
+        // TODO: ONOS-7596 read counters with table entries
+        installedEntries = getFutureWithDeadline(client.dumpAllTables(pipeconf),
+                                                 "dumping tables", Collections.emptyList());
 
-            final PiTableId piTableId = tableModel.id();
+        if (installedEntries.isEmpty()) {
+            return Collections.emptyList();
+        }
 
-            // Read table entries.
-            final Collection<PiTableEntry> installedEntries;
-            try {
-                // TODO: optimize by dumping entries and counters in parallel
-                // From ALL tables with the same request.
-                installedEntries = client.dumpTable(piTableId, pipeconf).get();
-            } catch (InterruptedException | ExecutionException e) {
-                if (!(e.getCause() instanceof StatusRuntimeException)) {
-                    // gRPC errors are logged in the client.
-                    log.error("Exception while dumping table {} of {}",
-                              piTableId, deviceId, e);
-                }
-                continue; // next table
-            }
+        // Read table direct counters (if any).
+        final Map<PiTableEntry, PiCounterCellData> counterCellMap =
+                readEntryCounters(installedEntries);
 
-            if (installedEntries.size() == 0) {
-                continue; // next table
-            }
+        // Forge flow entries with counter values.
+        for (PiTableEntry installedEntry : installedEntries) {
 
-            // Read table direct counters (if any).
-            final Map<PiTableEntry, PiCounterCellData> counterCellMap =
-                    readEntryCounters(installedEntries);
+            final FlowEntry flowEntry = forgeFlowEntry(
+                    installedEntry, counterCellMap.get(installedEntry));
 
-            // Forge flow entries with counter values.
-            for (PiTableEntry installedEntry : installedEntries) {
-
-                final FlowEntry flowEntry = forgeFlowEntry(
-                        installedEntry, counterCellMap.get(installedEntry));
-
-                if (flowEntry == null) {
-                    // Entry is on device but unknown to translation service or
-                    // device mirror. Inconsistent. Mark for removal.
-                    // TODO: make this behaviour configurable
-                    // In some cases it's fine for the device to have rules
-                    // that were not installed by us.
-                    inconsistentEntries.add(installedEntry);
-                } else {
-                    result.add(flowEntry);
-                }
+            if (flowEntry == null) {
+                // Entry is on device but unknown to translation service or
+                // device mirror. Inconsistent. Mark for removal.
+                // TODO: make this behaviour configurable
+                // In some cases it's fine for the device to have rules
+                // that were not installed by us.
+                inconsistentEntries.add(installedEntry);
+            } else {
+                result.add(flowEntry);
             }
         }
 
@@ -354,19 +336,19 @@
      */
     private boolean writeEntry(PiTableEntry entry,
                                WriteOperationType p4Operation) {
-        try {
-            if (client.writeTableEntries(
-                    newArrayList(entry), p4Operation, pipeconf).get()) {
-                return true;
-            } else {
-                log.warn("Unable to {} table entry in {}: {}",
-                         p4Operation.name(), deviceId, entry);
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            log.warn("Exception while performing {} table entry operation:",
-                     p4Operation, e);
+        final CompletableFuture<Boolean> future = client.writeTableEntries(
+                newArrayList(entry), p4Operation, pipeconf);
+        final Boolean success = getFutureWithDeadline(
+                future, "performing table " + p4Operation.name(), null);
+        if (success == null) {
+            // Error logged by getFutureWithDeadline();
+            return false;
         }
-        return false;
+        if (!success) {
+            log.warn("Unable to {} table entry in {}: {}",
+                     p4Operation.name(), deviceId, entry);
+        }
+        return success;
     }
 
     private void updateStores(PiTableEntryHandle handle,
@@ -392,36 +374,28 @@
     private Map<PiTableEntry, PiCounterCellData> readEntryCounters(
             Collection<PiTableEntry> tableEntries) {
         if (!driverBoolProperty(SUPPORT_TABLE_COUNTERS,
-                                DEFAULT_SUPPORT_TABLE_COUNTERS)) {
+                                DEFAULT_SUPPORT_TABLE_COUNTERS)
+                || tableEntries.isEmpty()) {
             return Collections.emptyMap();
         }
 
         Collection<PiCounterCellData> cellDatas;
-        try {
-            if (driverBoolProperty(READ_ALL_DIRECT_COUNTERS,
-                                   DEFAULT_READ_ALL_DIRECT_COUNTERS)) {
-                // FIXME: re-implement reading all counters ONOS-7595, or
-                // (even better) read counters when dumping table entries ONOS-7596
-                // cellDatas = client.readAllCounterCells(
-                //        singleton(counterId), pipeconf).get();
-                cellDatas = Collections.emptyList();
-            } else {
-                Set<PiCounterCellId> cellIds = tableEntries.stream()
-                        .filter(e -> tableHasCounter(e.table()))
-                        .map(PiCounterCellId::ofDirect)
-                        .collect(Collectors.toSet());
-                cellDatas = client.readCounterCells(cellIds, pipeconf).get();
-            }
-            return cellDatas.stream()
-                    .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
-        } catch (InterruptedException | ExecutionException e) {
-            if (!(e.getCause() instanceof StatusRuntimeException)) {
-                // gRPC errors are logged in the client.
-                log.error("Exception while reading table counters from {}: {}",
-                          deviceId, e);
-            }
-            return Collections.emptyMap();
+
+        if (driverBoolProperty(READ_ALL_DIRECT_COUNTERS,
+                               DEFAULT_READ_ALL_DIRECT_COUNTERS)) {
+            // FIXME: read counters when dumping table entries ONOS-7596
+            cellDatas = Collections.emptyList();
+        } else {
+            Set<PiCounterCellId> cellIds = tableEntries.stream()
+                    .filter(e -> tableHasCounter(e.table()))
+                    .map(PiCounterCellId::ofDirect)
+                    .collect(Collectors.toSet());
+            cellDatas = getFutureWithDeadline(client.readCounterCells(cellIds, pipeconf),
+                                              "reading table counters", Collections.emptyList());
         }
+        return cellDatas.stream()
+                .collect(Collectors.toMap(c -> c.cellId().tableEntry(), c -> c));
+
     }
 
     private boolean tableHasCounter(PiTableId tableId) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
index dc2ec69..02c0fa3 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
@@ -42,12 +42,12 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.lang.String.format;
 import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.DELETE;
 import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.INSERT;
 import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.MODIFY;
@@ -226,7 +226,7 @@
 
         if (!membersToRemove.isEmpty() &&
                 !completeFuture(client.writeActionGroupMembers(groupProfileId, membersToRemove, DELETE, pipeconf),
-                               ACT_GRP_MEMS_STR, DELETE_STR)) {
+                                ACT_GRP_MEMS_STR, DELETE_STR)) {
             // add what we removed
             completeFuture(client.writeActionGroupMembers(groupProfileId, membersToRemove, INSERT, pipeconf),
                            ACT_GRP_MEMS_STR, DELETE_STR);
@@ -271,31 +271,18 @@
 
     private boolean completeFuture(CompletableFuture<Boolean> completableFuture,
                                    String topic, String action) {
-        try {
-            if (completableFuture.get()) {
-                return true;
-            } else {
-                log.warn("Unable to {} {}", action, topic);
-                return false;
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            log.warn("Exception while performing {} {}: {}", action, topic, e.getMessage());
-            log.debug("Exception", e);
-            return false;
-        }
+        return getFutureWithDeadline(
+                completableFuture, format("performing %s %s", action, topic), false);
     }
 
     private Stream<Group> streamGroupsFromDevice(PiActionProfileId actProfId) {
-        try {
-            // Read PI groups and return original PD one.
-            return client.dumpGroups(actProfId, pipeconf).get().stream()
-                    .map(this::forgeGroupEntry)
-                    .filter(Objects::nonNull);
-        } catch (ExecutionException | InterruptedException e) {
-            log.error("Exception while dumping groups from action profile '{}' on {}: {}",
-                      actProfId.id(), deviceId, e);
-            return Stream.empty();
-        }
+        // Read PI groups and return original PD one.
+        Collection<PiActionGroup> groups = getFutureWithDeadline(
+                client.dumpGroups(actProfId, pipeconf),
+                "dumping groups", Collections.emptyList());
+        return groups.stream()
+                .map(this::forgeGroupEntry)
+                .filter(Objects::nonNull);
     }
 
     private Group forgeGroupEntry(PiActionGroup piGroup) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
index 58cab8a..23fcf1f 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
@@ -18,7 +18,7 @@
 
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.device.DeviceHandshaker;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 
@@ -29,72 +29,60 @@
  */
 public class P4RuntimeHandshaker extends AbstractP4RuntimeHandlerBehaviour implements DeviceHandshaker {
 
-    // TODO: consider abstract class with empty connect method and  implementation into a protected one for reusability.
-
     @Override
     public CompletableFuture<Boolean> connect() {
-        return CompletableFuture.supplyAsync(this::doConnect);
-    }
-
-    private boolean doConnect() {
-        return super.createClient();
+        return CompletableFuture
+                .supplyAsync(super::createClient)
+                .thenCompose(client -> {
+                    if (client == null) {
+                        return CompletableFuture.completedFuture(false);
+                    }
+                    return client.start();
+                });
     }
 
     @Override
     public CompletableFuture<Boolean> disconnect() {
-        return CompletableFuture.supplyAsync(() -> {
-            P4RuntimeController controller = handler().get(P4RuntimeController.class);
-            DeviceId deviceId = handler().data().deviceId();
-            controller.removeClient(deviceId);
-            return true;
-        });
+        final P4RuntimeController controller = handler().get(P4RuntimeController.class);
+        final DeviceId deviceId = handler().data().deviceId();
+        if (!controller.hasClient(deviceId)) {
+            return CompletableFuture.completedFuture(true);
+        } else {
+            return controller.getClient(deviceId).shutdown()
+                    .thenApplyAsync(v -> {
+                        controller.removeClient(deviceId);
+                        return true;
+                    });
+        }
     }
 
     @Override
     public CompletableFuture<Boolean> isReachable() {
-        return CompletableFuture.supplyAsync(() -> {
-            P4RuntimeController controller = handler().get(P4RuntimeController.class);
-            DeviceId deviceId = handler().data().deviceId();
-            return controller.isReacheable(deviceId);
-        });
+        return CompletableFuture.supplyAsync(() -> handler()
+                .get(P4RuntimeController.class)
+                .isReacheable(handler().data().deviceId())
+        );
     }
 
     @Override
-    public CompletableFuture<MastershipRole> roleChanged(MastershipRole newRole) {
-        deviceId = handler().data().deviceId();
-        controller = handler().get(P4RuntimeController.class);
-        CompletableFuture<MastershipRole> result = new CompletableFuture<>();
-
-        client = controller.getClient(deviceId);
-        if (client == null || !controller.isReacheable(deviceId)) {
-            result.complete(MastershipRole.NONE);
-            return result;
-        }
-        if (newRole.equals(MastershipRole.MASTER)) {
-            client.sendMasterArbitrationUpdate().thenAcceptAsync(success -> {
-                if (!success) {
-                    log.warn("Device {} arbitration failed", deviceId);
-                    result.complete(MastershipRole.STANDBY);
-                } else {
-                    result.complete(MastershipRole.MASTER);
+    public void roleChanged(MastershipRole newRole) {
+        if (setupBehaviour() && newRole.equals(MastershipRole.MASTER)) {
+            client.becomeMaster().thenAcceptAsync(result -> {
+                if (!result) {
+                    log.error("Unable to notify mastership role {} to {}",
+                              newRole, deviceId);
                 }
             });
-        } else {
-            // Since we don't need to do anything, we can complete it directly
-            // Spec: The client with the highest election id is referred to as the
-            // "master", while all other clients are referred to as "slaves".
-            result.complete(newRole);
         }
-        return result;
     }
 
     @Override
-    public void addChannelListener(ChannelListener listener) {
-        controller.addChannelListener(deviceId, listener);
+    public void addDeviceAgentListener(DeviceAgentListener listener) {
+        controller.addDeviceAgentListener(deviceId, listener);
     }
 
     @Override
-    public void removeChannelListener(ChannelListener listener) {
-        controller.removeChannelListener(deviceId, listener);
+    public void removeDeviceAgentListener(DeviceAgentListener listener) {
+        controller.removeDeviceAgentListener(deviceId, listener);
     }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
index d7a204e..39ed84d 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeMeterProgrammable.java
@@ -16,9 +16,9 @@
 
 package org.onosproject.drivers.p4runtime;
 
-import com.google.common.cache.LoadingCache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMeterMirror;
 import org.onosproject.net.meter.Band;
 import org.onosproject.net.meter.DefaultBand;
@@ -37,11 +37,11 @@
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Set;
 import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
@@ -108,18 +108,13 @@
 
         final PiMeterHandle handle = PiMeterHandle.of(deviceId, piMeterCellConfig);
         ENTRY_LOCKS.getUnchecked(handle).lock();
-        boolean result = false;
-        try {
-            if (client.writeMeterCells(newArrayList(piMeterCellConfig), pipeconf).get()) {
-                meterMirror.put(handle, piMeterCellConfig);
-                result = true;
-            }
-
-        } catch (InterruptedException | ExecutionException e) {
-            log.warn("Exception while modify meter entry:", e);
-        } finally {
-            ENTRY_LOCKS.getUnchecked(handle).unlock();
+        final boolean result = getFutureWithDeadline(
+                client.writeMeterCells(newArrayList(piMeterCellConfig), pipeconf),
+                "writing meter cells", false);
+        if (result) {
+            meterMirror.put(handle, piMeterCellConfig);
         }
+        ENTRY_LOCKS.getUnchecked(handle).unlock();
 
         return result;
     }
@@ -162,4 +157,4 @@
 
         return CompletableFuture.completedFuture(meters);
     }
-}
\ No newline at end of file
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
index 6c61148..cc79cc1 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
@@ -26,7 +26,9 @@
 /**
  * Implementation of PacketProgrammable behaviour for P4Runtime.
  */
-public class P4RuntimePacketProgrammable extends AbstractP4RuntimeHandlerBehaviour implements PacketProgrammable {
+public class P4RuntimePacketProgrammable
+        extends AbstractP4RuntimeHandlerBehaviour
+        implements PacketProgrammable {
 
     @Override
     public void emit(OutboundPacket packet) {
@@ -38,7 +40,8 @@
         final PiPipelineInterpreter interpreter = device.is(PiPipelineInterpreter.class)
                 ? device.as(PiPipelineInterpreter.class) : null;
         if (interpreter == null) {
-            log.warn("Device {} with pipeconf {} has no interpreter, aborting emit operation", deviceId, pipeconf.id());
+            log.warn("Unable to get interpreter for {} with pipeconf {}, aborting emit operation",
+                     deviceId, pipeconf.id());
             return;
         }
 
@@ -46,11 +49,13 @@
             Collection<PiPacketOperation> operations = interpreter.mapOutboundPacket(packet);
             operations.forEach(piPacketOperation -> {
                 log.debug("Doing PiPacketOperation {}", piPacketOperation);
-                client.packetOut(piPacketOperation, pipeconf);
+                getFutureWithDeadline(
+                        client.packetOut(piPacketOperation, pipeconf),
+                        "sending packet-out", false);
             });
         } catch (PiPipelineInterpreter.PiInterpreterException e) {
-            log.error("Interpreter of pipeconf {} was unable to translate outbound packet: {}",
-                    pipeconf.id(), e.getMessage());
+            log.error("Unable to translate outbound packet for {} with pipeconf {}: {}",
+                      deviceId, pipeconf.id(), e.getMessage());
         }
     }
 }
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
index 0bf61f2..a9a6e08 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
@@ -53,99 +53,135 @@
     }
 
     /**
-     * Sets the device pipeline according to the given pipeconf, and for the given byte buffer representing the
-     * target-specific data to be used in the P4Runtime's SetPipelineConfig message. This method should be called
+     * Starts the client by starting the Stream RPC with the device.
+     *
+     * @return completable future containing true if the operation was
+     * successful, false otherwise.
+     */
+    CompletableFuture<Boolean> start();
+
+    /**
+     * Shutdowns the client by terminating any active RPC such as the Stream
+     * one.
+     *
+     * @return a completable future to signal the completion of the shutdown
+     * procedure
+     */
+    CompletableFuture<Void> shutdown();
+
+    /**
+     * Sends a master arbitration update to the device with a new election ID
+     * that is guaranteed to be the highest value between all clients.
+     *
+     * @return completable future containing true if the operation was
+     * successful; false otherwise
+     */
+    CompletableFuture<Boolean> becomeMaster();
+
+    /**
+     * Sets the device pipeline according to the given pipeconf, and for the
+     * given byte buffer representing the target-specific data to be used in the
+     * P4Runtime's SetPipelineConfig message. This method should be called
      * before any other method of this client.
      *
      * @param pipeconf   pipeconf
      * @param deviceData target-specific data
-     * @return a completable future of a boolean, true if the operations was successful, false otherwise.
+     * @return a completable future of a boolean, true if the operations was
+     * successful, false otherwise.
      */
-    CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData);
+    CompletableFuture<Boolean> setPipelineConfig(
+            PiPipeconf pipeconf, ByteBuffer deviceData);
 
     /**
-     * Initializes the stream channel, after which all messages received from the device will be notified using the
-     * {@link P4RuntimeController} event listener.
-     *
-     * @return a completable future of a boolean, true if the operations was successful, false otherwise.
-     */
-    CompletableFuture<Boolean> initStreamChannel();
-
-    /**
-     * Performs the given write operation for the given table entries and pipeconf.
+     * Performs the given write operation for the given table entries and
+     * pipeconf.
      *
      * @param entries  table entries
      * @param opType   operation type
      * @param pipeconf pipeconf currently deployed on the device
      * @return true if the operation was successful, false otherwise.
      */
-    CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> entries, WriteOperationType opType,
-                                                 PiPipeconf pipeconf);
+    CompletableFuture<Boolean> writeTableEntries(
+            Collection<PiTableEntry> entries, WriteOperationType opType,
+            PiPipeconf pipeconf);
 
     /**
-     * Dumps all entries currently installed in the given table, for the given pipeconf.
+     * Dumps all entries currently installed in the given table, for the given
+     * pipeconf.
      *
      * @param tableId  table identifier
      * @param pipeconf pipeconf currently deployed on the device
      * @return completable future of a collection of table entries
      */
-    CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId tableId, PiPipeconf pipeconf);
+    CompletableFuture<Collection<PiTableEntry>> dumpTable(
+            PiTableId tableId, PiPipeconf pipeconf);
+
+    /**
+     * Dumps entries from all tables, for the given pipeconf.
+     *
+     * @param pipeconf pipeconf currently deployed on the device
+     * @return completable future of a collection of table entries
+     */
+    CompletableFuture<Collection<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf);
 
     /**
      * Executes a packet-out operation for the given pipeconf.
      *
      * @param packet   packet-out operation to be performed by the device
      * @param pipeconf pipeconf currently deployed on the device
-     * @return a completable future of a boolean, true if the operations was successful, false otherwise.
+     * @return a completable future of a boolean, true if the operations was
+     * successful, false otherwise.
      */
-    CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf);
+    CompletableFuture<Boolean> packetOut(
+            PiPacketOperation packet, PiPipeconf pipeconf);
 
     /**
-     * Returns the value of all counter cells for the given set of counter identifiers and pipeconf.
+     * Returns the value of all counter cells for the given set of counter
+     * identifiers and pipeconf.
      *
      * @param counterIds counter identifiers
      * @param pipeconf   pipeconf
      * @return collection of counter data
      */
-    CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
-                                                                         PiPipeconf pipeconf);
+    CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(
+            Set<PiCounterId> counterIds, PiPipeconf pipeconf);
 
     /**
-     * Returns a collection of counter data corresponding to the given set of counter cell identifiers, for the given
-     * pipeconf.
+     * Returns a collection of counter data corresponding to the given set of
+     * counter cell identifiers, for the given pipeconf.
      *
      * @param cellIds  set of counter cell identifiers
      * @param pipeconf pipeconf
      * @return collection of counter data
      */
-    CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
-                                                                      PiPipeconf pipeconf);
+    CompletableFuture<Collection<PiCounterCellData>> readCounterCells(
+            Set<PiCounterCellId> cellIds, PiPipeconf pipeconf);
 
     /**
-     * Performs the given write operation for the given action group members and pipeconf.
+     * Performs the given write operation for the given action group members and
+     * pipeconf.
      *
-     * @param profileId  action group profile ID
-     * @param members    action group members
-     * @param opType   write operation type
-     * @param pipeconf the pipeconf currently deployed on the device
+     * @param profileId action group profile ID
+     * @param members   action group members
+     * @param opType    write operation type
+     * @param pipeconf  the pipeconf currently deployed on the device
      * @return true if the operation was successful, false otherwise
      */
-    CompletableFuture<Boolean> writeActionGroupMembers(PiActionProfileId profileId,
-                                                       Collection<PiActionGroupMember> members,
-                                                       WriteOperationType opType,
-                                                       PiPipeconf pipeconf);
+    CompletableFuture<Boolean> writeActionGroupMembers(
+            PiActionProfileId profileId, Collection<PiActionGroupMember> members,
+            WriteOperationType opType, PiPipeconf pipeconf);
 
     /**
-     * Performs the given write operation for the given action group and pipeconf.
+     * Performs the given write operation for the given action group and
+     * pipeconf.
      *
      * @param group    the action group
      * @param opType   write operation type
      * @param pipeconf the pipeconf currently deployed on the device
      * @return true if the operation was successful, false otherwise
      */
-    CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
-                                                WriteOperationType opType,
-                                                PiPipeconf pipeconf);
+    CompletableFuture<Boolean> writeActionGroup(
+            PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf);
 
     /**
      * Dumps all groups currently installed for the given action profile.
@@ -154,50 +190,39 @@
      * @param pipeconf        the pipeconf currently deployed on the device
      * @return completable future of a collection of groups
      */
-    CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
-                                                            PiPipeconf pipeconf);
+    CompletableFuture<Collection<PiActionGroup>> dumpGroups(
+            PiActionProfileId actionProfileId, PiPipeconf pipeconf);
 
     /**
-     * Returns the configuration of all meter cells for the given set of meter identifiers and pipeconf.
+     * Returns the configuration of all meter cells for the given set of meter
+     * identifiers and pipeconf.
      *
-     * @param meterIds   meter identifiers
-     * @param pipeconf   pipeconf
+     * @param meterIds meter identifiers
+     * @param pipeconf pipeconf
      * @return collection of meter configurations
      */
-    CompletableFuture<Collection<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
-                                                                       PiPipeconf pipeconf);
+    CompletableFuture<Collection<PiMeterCellConfig>> readAllMeterCells(
+            Set<PiMeterId> meterIds, PiPipeconf pipeconf);
 
     /**
-     * Returns a collection of meter configurations corresponding to the given set of meter cell identifiers,
-     * for the given pipeconf.
+     * Returns a collection of meter configurations corresponding to the given
+     * set of meter cell identifiers, for the given pipeconf.
      *
-     * @param cellIds    set of meter cell identifiers
-     * @param pipeconf   pipeconf
+     * @param cellIds  set of meter cell identifiers
+     * @param pipeconf pipeconf
      * @return collection of meter configrations
      */
-    CompletableFuture<Collection<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
-                                                                    PiPipeconf pipeconf);
+    CompletableFuture<Collection<PiMeterCellConfig>> readMeterCells(
+            Set<PiMeterCellId> cellIds, PiPipeconf pipeconf);
 
     /**
-     * Performs a write operation for the given meter configurations and pipeconf.
+     * Performs a write operation for the given meter configurations and
+     * pipeconf.
      *
-     * @param cellConfigs  meter cell configurations
-     * @param pipeconf     pipeconf currently deployed on the device
+     * @param cellConfigs meter cell configurations
+     * @param pipeconf    pipeconf currently deployed on the device
      * @return true if the operation was successful, false otherwise.
      */
-    CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf);
-
-    /**
-     * Shutdown the client by terminating any active RPC such as the stream channel.
-     */
-    void shutdown();
-
-    /**
-     * Sends a master arbitration update to the device.
-     *
-     * @return a completable future containing true if the operation was successful; false otherwise
-     */
-    CompletableFuture<Boolean> sendMasterArbitrationUpdate();
-
-    // TODO: work in progress.
+    CompletableFuture<Boolean> writeMeterCells(
+            Collection<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf);
 }
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 3cd81f2..fc76a23 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -19,13 +19,14 @@
 import com.google.common.annotations.Beta;
 import org.onosproject.event.ListenerService;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentListener;
 
 /**
  * Controller of P4Runtime devices.
  */
 @Beta
-public interface P4RuntimeController extends ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
+public interface P4RuntimeController
+        extends ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
 
     /**
      * Instantiates a new client to operate on a P4Runtime device identified by
@@ -58,20 +59,22 @@
      *
      * @param deviceId device identifier
      * @return client instance
-     * @throws IllegalStateException if no client exists for the given device identifier
+     * @throws IllegalStateException if no client exists for the given device
+     *                               identifier
      */
     P4RuntimeClient getClient(DeviceId deviceId);
 
     /**
-     * Removes the client for the given device. If no client exists for the given device identifier, the
-     * result is a no-op.
+     * Removes the client for the given device. If no client exists for the
+     * given device identifier, the result is a no-op.
      *
      * @param deviceId device identifier
      */
     void removeClient(DeviceId deviceId);
 
     /**
-     * Returns true if a client exists for the given device identifier, false otherwise.
+     * Returns true if a client exists for the given device identifier, false
+     * otherwise.
      *
      * @param deviceId device identifier
      * @return true if client exists, false otherwise.
@@ -79,37 +82,31 @@
     boolean hasClient(DeviceId deviceId);
 
     /**
-     * Returns true if the P4Runtime server running on the given device is reachable, i.e. the channel is open and the
-     * server is able to respond to RPCs, false otherwise. Reachability can be tested only if a client was previously
-     * created using {@link #createClient(DeviceId, String, int, long)}, otherwise this method returns false.
+     * Returns true if the P4Runtime server running on the given device is
+     * reachable, i.e. the channel is open and the server is able to respond to
+     * RPCs, false otherwise. Reachability can be tested only if a client was
+     * previously created using {@link #createClient(DeviceId, String, int,
+     * long)}, otherwise this method returns false.
      *
      * @param deviceId device identifier.
-     * @return true if a client was created and is able to contact the P4Runtime server, false otherwise.
+     * @return true if a client was created and is able to contact the P4Runtime
+     * server, false otherwise.
      */
     boolean isReacheable(DeviceId deviceId);
 
     /**
-     * Gets new election id for device arbitration request.
-     *
-     * @return the election id
-     */
-    long getNewMasterElectionId();
-
-    /**
-     * Adds a listener for P4Runtime client-server channel events.
-     * If the channel for the device is not present and/or established the listener will get notified
-     * only after channel setup.
+     * Adds a listener for device agent events.
      *
      * @param deviceId device identifier
-     * @param listener the channel listener
+     * @param listener the device agent listener
      */
-    void addChannelListener(DeviceId deviceId, ChannelListener listener);
+    void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener);
 
     /**
-     * Removes the listener for P4Runtime client-server channel events.
+     * Removes the listener for device agent events.
      *
      * @param deviceId device identifier
-     * @param listener the channel listener
+     * @param listener the device agent listener
      */
-    void removeChannelListener(DeviceId deviceId, ChannelListener listener);
+    void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener);
 }
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
index ae29097..0316ed4 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
@@ -37,7 +37,7 @@
         /**
          * Arbitration reply.
          */
-        ARBITRATION,
+        ARBITRATION_RESPONSE,
 
         /**
          * Channel Event.
diff --git a/protocols/p4runtime/ctl/BUCK b/protocols/p4runtime/ctl/BUCK
index cc62acb..46540b3 100644
--- a/protocols/p4runtime/ctl/BUCK
+++ b/protocols/p4runtime/ctl/BUCK
@@ -3,6 +3,7 @@
 
 COMPILE_DEPS = [
     '//lib:CORE_DEPS',
+    '//lib:KRYO',
     '//protocols/grpc/api:onos-protocols-grpc-api',
     '//protocols/p4runtime/api:onos-protocols-p4runtime-api',
     '//protocols/p4runtime/proto:onos-protocols-p4runtime-proto',
@@ -10,6 +11,7 @@
     '//lib:grpc-stub-' + GRPC_VER,
     '//lib:grpc-netty-' + GRPC_VER,
     '//lib:protobuf-java-' + PROTOBUF_VER,
+    '//core/store/serializers:onos-core-serializers',
 ]
 
 TEST_DEPS = [
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java
index ba31e69..4dbcac3 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ActionProfileMemberEncoder.java
@@ -31,7 +31,7 @@
 /**
  * Encoder/Decoder of action profile member.
  */
-public final class ActionProfileMemberEncoder {
+final class ActionProfileMemberEncoder {
     private ActionProfileMemberEncoder() {
         // Hide default constructor
     }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ArbitrationResponse.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ArbitrationResponse.java
new file mode 100644
index 0000000..ea3c24f
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ArbitrationResponse.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Default implementation of arbitration in P4Runtime.
+ */
+class ArbitrationResponse implements P4RuntimeEventSubject {
+
+    private DeviceId deviceId;
+    private boolean isMaster;
+
+    /**
+     * Creates arbitration with given role and master flag.
+     *
+     * @param deviceId the device
+     * @param isMaster true if arbitration response signals master status
+     */
+    ArbitrationResponse(DeviceId deviceId, boolean isMaster) {
+        this.deviceId = deviceId;
+        this.isMaster = isMaster;
+    }
+
+    /**
+     * Returns true if arbitration response signals master status, false
+     * otherwise.
+     *
+     * @return boolean flag
+     */
+    boolean isMaster() {
+        return isMaster;
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ChannelEvent.java
new file mode 100644
index 0000000..6e33514
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/ChannelEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Channel event in P4Runtime.
+ */
+final class ChannelEvent implements P4RuntimeEventSubject {
+
+    enum Type {
+        OPEN,
+        CLOSED,
+        ERROR
+    }
+
+    private DeviceId deviceId;
+    private Type type;
+
+    /**
+     * Creates channel event with given status and throwable.
+     *
+     * @param deviceId  the device
+     * @param type      error type
+     */
+    ChannelEvent(DeviceId deviceId, Type type) {
+        this.deviceId = deviceId;
+        this.type = type;
+    }
+
+    /**
+     * Gets the type of this event.
+     *
+     * @return the error type
+     */
+    public Type type() {
+        return type;
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
deleted file mode 100644
index e8b8658..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultArbitration.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
-import p4.v1.P4RuntimeOuterClass.Uint128;
-
-/**
- * Default implementation of arbitration in P4Runtime.
- */
-public class DefaultArbitration implements P4RuntimeEventSubject {
-    private MastershipRole role;
-    private Uint128 electionId;
-    private DeviceId deviceId;
-
-    /**
-     * Creates arbitration with given role and election id.
-     *
-     * @param deviceId   the device
-     * @param role       the role
-     * @param electionId the election id
-     */
-     DefaultArbitration(DeviceId deviceId, MastershipRole role, Uint128 electionId) {
-        this.deviceId = deviceId;
-        this.role = role;
-        this.electionId = electionId;
-    }
-
-    /**
-     * Gets the role of this arbitration.
-     *
-     * @return the role
-     */
-    public MastershipRole role() {
-        return role;
-    }
-
-    /**
-     * Gets election id of this arbitration.
-     *
-     * @return the election id
-     */
-    public Uint128 electionId() {
-        return electionId;
-    }
-
-    @Override
-    public DeviceId deviceId() {
-        return deviceId;
-    }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java
deleted file mode 100644
index 5f5b3b9..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultChannelEvent.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelEvent.Type;
-import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
-
-/**
- * Default implementation of channel event in P4Runtime. It allows passing any type of event.
- * If the event is an error a throwable can be directly passed.
- * Any other type of event cause can be passed as string.
- */
-public class DefaultChannelEvent implements P4RuntimeEventSubject {
-    private DeviceId deviceId;
-    private Type type;
-    private Throwable throwable;
-    private String message;
-
-    /**
-     * Creates channel event with given status and throwable.
-     *
-     * @param deviceId  the device
-     * @param type      error type
-     * @param throwable the cause
-     */
-    public DefaultChannelEvent(DeviceId deviceId, Type type, Throwable throwable) {
-        this.deviceId = deviceId;
-        this.type = type;
-        this.message = throwable.getMessage();
-        this.throwable = throwable;
-    }
-
-    /**
-     * Creates channel event with given status and string cause.
-     *
-     * @param deviceId the device
-     * @param type     error type
-     * @param message    the message
-     */
-    public DefaultChannelEvent(DeviceId deviceId, Type type, String message) {
-        this.deviceId = deviceId;
-        this.type = type;
-        this.message = message;
-        this.throwable = null;
-    }
-
-    /**
-     * Creates channel event with given status, cause and throwable.
-     *
-     * @param deviceId the device
-     * @param type     error type
-     * @param message the message
-     * @param throwable the cause
-     */
-    public DefaultChannelEvent(DeviceId deviceId, Type type, String message, Throwable throwable) {
-        this.deviceId = deviceId;
-        this.type = type;
-        this.message = message;
-        this.throwable = throwable;
-    }
-
-    /**
-     * Gets the type of this event.
-     *
-     * @return the error type
-     */
-    public Type type() {
-        return type;
-    }
-
-    /**
-     * Gets the message related to this event.
-     *
-     * @return the message
-     */
-    public String message() {
-        return message;
-    }
-
-
-    /**
-     * Gets throwable of this event.
-     * If no throwable is present returns null.
-     *
-     * @return the throwable
-     */
-    public Throwable throwable() {
-        return throwable;
-    }
-
-    @Override
-    public DeviceId deviceId() {
-        return deviceId;
-    }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DistributedElectionIdGenerator.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DistributedElectionIdGenerator.java
new file mode 100644
index 0000000..ecfe35d
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DistributedElectionIdGenerator.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicCounterMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Distributed implementation of a generator of P4Runtime election IDs.
+ */
+class DistributedElectionIdGenerator {
+
+    private final Logger log = getLogger(this.getClass());
+
+    private AtomicCounterMap<DeviceId> electionIds;
+
+    /**
+     * Creates a new election ID generator using the given storage service.
+     *
+     * @param storageService storage service
+     */
+    DistributedElectionIdGenerator(StorageService storageService) {
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .build();
+        this.electionIds = storageService.<DeviceId>atomicCounterMapBuilder()
+                .withName("p4runtime-election-ids")
+                .withSerializer(Serializer.using(serializer))
+                .build();
+    }
+
+    /**
+     * Returns an election ID for the given device ID. The first election ID for
+     * a given device ID is always 1.
+     *
+     * @param deviceId device ID
+     * @return new election ID
+     */
+    BigInteger generate(DeviceId deviceId) {
+        if (electionIds == null) {
+            return null;
+        }
+        // Default value is 0 for AtomicCounterMap.
+        return BigInteger.valueOf(electionIds.incrementAndGet(deviceId));
+    }
+
+    /**
+     * Destroy the backing distributed primitive of this generator.
+     */
+    void destroy() {
+        try {
+            electionIds.destroy().get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            log.error("Exception while destroying distributed counter map", e);
+        } finally {
+            electionIds = null;
+        }
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 43d01e5..c952e61 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -19,7 +19,6 @@
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -30,10 +29,9 @@
 import io.grpc.stub.StreamObserver;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.util.SharedExecutors;
 import org.onlab.util.Tools;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.net.device.ChannelEvent;
 import org.onosproject.net.pi.model.PiActionProfileId;
 import org.onosproject.net.pi.model.PiCounterId;
 import org.onosproject.net.pi.model.PiMeterId;
@@ -52,6 +50,8 @@
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.slf4j.Logger;
+import p4.config.v1.P4InfoOuterClass.P4Info;
+import p4.tmp.P4Config;
 import p4.v1.P4RuntimeGrpc;
 import p4.v1.P4RuntimeOuterClass;
 import p4.v1.P4RuntimeOuterClass.ActionProfileGroup;
@@ -59,7 +59,6 @@
 import p4.v1.P4RuntimeOuterClass.Entity;
 import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
 import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
-import p4.v1.P4RuntimeOuterClass.PacketIn;
 import p4.v1.P4RuntimeOuterClass.ReadRequest;
 import p4.v1.P4RuntimeOuterClass.ReadResponse;
 import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest;
@@ -69,9 +68,8 @@
 import p4.v1.P4RuntimeOuterClass.Uint128;
 import p4.v1.P4RuntimeOuterClass.Update;
 import p4.v1.P4RuntimeOuterClass.WriteRequest;
-import p4.config.v1.P4InfoOuterClass.P4Info;
-import p4.tmp.P4Config;
 
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
@@ -81,7 +79,6 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -99,13 +96,17 @@
 import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
 import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
 import static p4.v1.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
+import static p4.v1.P4RuntimeOuterClass.PacketIn;
 import static p4.v1.P4RuntimeOuterClass.PacketOut;
 import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
 
 /**
  * Implementation of a P4Runtime client.
  */
-public final class P4RuntimeClientImpl implements P4RuntimeClient {
+final class P4RuntimeClientImpl implements P4RuntimeClient {
+
+    // Timeout in seconds to obtain the client lock.
+    private static final int LOCK_TIMEOUT = 10;
 
     private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
             WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
@@ -116,18 +117,20 @@
 
     private final Logger log = getLogger(getClass());
 
+    private final Lock requestLock = new ReentrantLock();
+    private final Context.CancellableContext cancellableContext =
+            Context.current().withCancellation();
+
     private final DeviceId deviceId;
     private final long p4DeviceId;
     private final P4RuntimeControllerImpl controller;
     private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
-    private final Context.CancellableContext cancellableContext;
     private final ExecutorService executorService;
     private final Executor contextExecutor;
-    private final Lock writeLock = new ReentrantLock();
     private final StreamObserver<StreamMessageRequest> streamRequestObserver;
 
-    private Map<Uint128, CompletableFuture<Boolean>> arbitrationUpdateMap = Maps.newConcurrentMap();
-    protected Uint128 p4RuntimeElectionId;
+    // Used by this client for write requests.
+    private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
 
     /**
      * Default constructor.
@@ -142,45 +145,77 @@
         this.deviceId = deviceId;
         this.p4DeviceId = p4DeviceId;
         this.controller = controller;
-        this.cancellableContext = Context.current().withCancellation();
         this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
-                "onos/p4runtime-client-" + deviceId.toString(),
-                deviceId.toString() + "-%d"));
+                "onos-p4runtime-client-" + deviceId.toString(), "%d"));
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
-        //TODO Investigate deadline or timeout in supplyInContext Method
+        //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
         this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
-        P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
-        this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
+        this.streamRequestObserver = P4RuntimeGrpc.newStub(channel)
+                .streamChannel(new StreamChannelResponseObserver());
     }
 
     /**
-     * Executes the given task (supplier) in the gRPC context executor of this client, such that if the context is
-     * cancelled (e.g. client shutdown) the RPC is automatically cancelled.
-     * <p>
-     * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
-     * <p>
+     * Submits a task for async execution via the given executor.
+     * All tasks submitted with this method will be executed sequentially.
      */
-    private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
+    private <U> CompletableFuture<U> supplyWithExecutor(
+            Supplier<U> supplier, String opDescription, Executor executor) {
         return CompletableFuture.supplyAsync(() -> {
             // TODO: explore a more relaxed locking strategy.
-            writeLock.lock();
+            try {
+                if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
+                    log.error("LOCK TIMEOUT! This is likely a deadlock, "
+                                      + "please debug (executing {})",
+                              opDescription);
+                    throw new IllegalThreadStateException("Lock timeout");
+                }
+            } catch (InterruptedException e) {
+                log.warn("Thread interrupted while waiting for lock (executing {})",
+                         opDescription);
+                throw new RuntimeException(e);
+            }
             try {
                 return supplier.get();
             } catch (StatusRuntimeException ex) {
-                log.warn("Unable to execute {} on {}: {}", opDescription, deviceId, ex.toString());
+                log.warn("Unable to execute {} on {}: {}",
+                         opDescription, deviceId, ex.toString());
                 throw ex;
             } catch (Throwable ex) {
-                log.error("Exception in client of {}, executing {}", deviceId, opDescription, ex);
+                log.error("Exception in client of {}, executing {}",
+                          deviceId, opDescription, ex);
                 throw ex;
             } finally {
-                writeLock.unlock();
+                requestLock.unlock();
             }
-        }, contextExecutor);
+        }, executor);
+    }
+
+    /**
+     * Equivalent of supplyWithExecutor using the gRPC context executor of this
+     * client, such that if the context is cancelled (e.g. client shutdown) the
+     * RPC is automatically cancelled.
+     */
+    private <U> CompletableFuture<U> supplyInContext(
+            Supplier<U> supplier, String opDescription) {
+        return supplyWithExecutor(supplier, opDescription, contextExecutor);
     }
 
     @Override
-    public CompletableFuture<Boolean> initStreamChannel() {
-        return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
+    public CompletableFuture<Boolean> start() {
+        return supplyInContext(this::doInitStreamChannel,
+                               "start-initStreamChannel");
+    }
+
+    @Override
+    public CompletableFuture<Void> shutdown() {
+        return supplyWithExecutor(this::doShutdown, "shutdown",
+                                  SharedExecutors.getPoolThreadExecutor());
+    }
+
+    @Override
+    public CompletableFuture<Boolean> becomeMaster() {
+        return supplyInContext(this::doBecomeMaster,
+                               "becomeMaster");
     }
 
     @Override
@@ -201,6 +236,11 @@
     }
 
     @Override
+    public CompletableFuture<Collection<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf) {
+        return supplyInContext(() -> doDumpTable(null, pipeconf), "dumpAllTables");
+    }
+
+    @Override
     public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
         return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
     }
@@ -245,11 +285,6 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> sendMasterArbitrationUpdate() {
-        return supplyInContext(this::doArbitrationUpdate, "arbitrationUpdate");
-    }
-
-    @Override
     public CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellIds, PiPipeconf pipeconf) {
 
         return supplyInContext(() -> doWriteMeterCells(cellIds, pipeconf),
@@ -272,42 +307,48 @@
 
     /* Blocking method implementations below */
 
-    private boolean doArbitrationUpdate() {
+    private boolean doBecomeMaster() {
+        final Uint128 newId = bigIntegerToUint128(
+                controller.newMasterElectionId(deviceId));
+        if (sendMasterArbitrationUpdate(newId)) {
+            clientElectionId = newId;
+            return true;
+        }
+        return false;
+    }
 
-        CompletableFuture<Boolean> result = new CompletableFuture<>();
-        // TODO: currently we use 64-bit Long type for election id, should
-        // we use 128-bit ?
-        long nextElectId = controller.getNewMasterElectionId();
-        Uint128 newElectionId = Uint128.newBuilder()
-                .setLow(nextElectId)
-                .build();
-        MasterArbitrationUpdate arbitrationUpdate = MasterArbitrationUpdate.newBuilder()
-                .setDeviceId(p4DeviceId)
-                .setElectionId(newElectionId)
-                .build();
-        StreamMessageRequest requestMsg = StreamMessageRequest.newBuilder()
-                .setArbitration(arbitrationUpdate)
-                .build();
-        log.debug("Sending arbitration update to {} with election id {}...",
-                  deviceId, newElectionId);
-        arbitrationUpdateMap.put(newElectionId, result);
+    private boolean sendMasterArbitrationUpdate(Uint128 electionId) {
+        log.info("Sending arbitration update to {}... electionId={}",
+                  deviceId, uint128ToBigInteger(electionId));
         try {
-            streamRequestObserver.onNext(requestMsg);
-            return result.get();
+            streamRequestObserver.onNext(
+                    StreamMessageRequest.newBuilder()
+                            .setArbitration(
+                                    MasterArbitrationUpdate
+                                            .newBuilder()
+                                            .setDeviceId(p4DeviceId)
+                                            .setElectionId(electionId)
+                                            .build())
+                            .build());
+            return true;
         } catch (StatusRuntimeException e) {
             log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
-            arbitrationUpdateMap.remove(newElectionId);
-            return false;
-        } catch (InterruptedException | ExecutionException e) {
-            log.warn("Arbitration update failed for {} due to {}", deviceId, e);
-            arbitrationUpdateMap.remove(newElectionId);
-            return false;
         }
+        return false;
     }
+
     private boolean doInitStreamChannel() {
         // To listen for packets and other events, we need to start the RPC.
-        // Here we do it by sending a master arbitration update.
-        return doArbitrationUpdate();
+        // Here we send an empty StreamMessageRequest.
+        try {
+            log.info("Starting stream channel with {}...", deviceId);
+            streamRequestObserver.onNext(StreamMessageRequest.newBuilder().build());
+            return true;
+        } catch (StatusRuntimeException e) {
+            log.error("Unable to start stream channel with {}: {}",
+                      deviceId, e.getMessage());
+            return false;
+        }
     }
 
     private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
@@ -338,7 +379,7 @@
         SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
                 .newBuilder()
                 .setDeviceId(p4DeviceId)
-                .setElectionId(p4RuntimeElectionId)
+                .setElectionId(clientElectionId)
                 .setAction(VERIFY_AND_COMMIT)
                 .setConfig(pipelineConfig)
                 .build();
@@ -380,7 +421,7 @@
 
         writeRequestBuilder
                 .setDeviceId(p4DeviceId)
-                .setElectionId(p4RuntimeElectionId)
+                .setElectionId(clientElectionId)
                 .addAllUpdates(updateMsgs)
                 .build();
 
@@ -388,7 +429,7 @@
             blockingStub.write(writeRequestBuilder.build());
             return true;
         } catch (StatusRuntimeException e) {
-            logWriteErrors(piTableEntries, e, opType, "table entry");
+            checkAndLogWriteErrors(piTableEntries, e, opType, "table entry");
             return false;
         }
     }
@@ -397,13 +438,18 @@
 
         log.debug("Dumping table {} from {} (pipeconf {})...", piTableId, deviceId, pipeconf.id());
 
-        P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
         int tableId;
-        try {
-            tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
-        } catch (P4InfoBrowser.NotFoundException e) {
-            log.warn("Unable to dump table: {}", e.getMessage());
-            return Collections.emptyList();
+        if (piTableId == null) {
+            // Dump all tables.
+            tableId = 0;
+        } else {
+            P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
+            try {
+                tableId = browser.tables().getByName(piTableId.id()).getPreamble().getId();
+            } catch (P4InfoBrowser.NotFoundException e) {
+                log.warn("Unable to dump table: {}", e.getMessage());
+                return Collections.emptyList();
+            }
         }
 
         ReadRequest requestMsg = ReadRequest.newBuilder()
@@ -474,40 +520,29 @@
         }
         // Decode packet message and post event.
         PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf, deviceId);
-        DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
+        PacketInEvent packetInEventSubject = new PacketInEvent(deviceId, packetOperation);
         P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
         log.debug("Received packet in: {}", event);
         controller.postEvent(event);
     }
 
-    private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
-        log.debug("Received arbitration update from {}: {}", deviceId, arbitrationMsg);
-
-        Uint128 electionId = arbitrationMsg.getElectionId();
-        CompletableFuture<Boolean> mastershipFeature = arbitrationUpdateMap.remove(electionId);
-
-        if (mastershipFeature == null) {
-            log.warn("Can't find completable future of election id {}", electionId);
+    private void doArbitrationResponse(MasterArbitrationUpdate msg) {
+        // From the spec...
+        // - Election_id: The stream RPC with the highest election_id is the
+        // master. Switch populates with the highest election ID it
+        // has received from all connected controllers.
+        // - Status: Switch populates this with OK for the client that is the
+        // master, and with an error status for all other connected clients (at
+        // every mastership change).
+        if (!msg.hasElectionId() || !msg.hasStatus()) {
             return;
         }
-
-        this.p4RuntimeElectionId = electionId;
-        int statusCode = arbitrationMsg.getStatus().getCode();
-        MastershipRole arbitrationRole;
-        // arbitration update success
-
-        if (statusCode == Status.OK.getCode().value()) {
-            mastershipFeature.complete(true);
-            arbitrationRole = MastershipRole.MASTER;
-        } else {
-            mastershipFeature.complete(false);
-            arbitrationRole = MastershipRole.STANDBY;
-        }
-
-        DefaultArbitration arbitrationEventSubject = new DefaultArbitration(deviceId, arbitrationRole, electionId);
-        P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.ARBITRATION,
-                                                  arbitrationEventSubject);
-        controller.postEvent(event);
+        final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
+        log.info("Received arbitration update from {}: isMaster={}, electionId={}",
+                 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
+        controller.postEvent(new P4RuntimeEvent(
+                P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
+                new ArbitrationResponse(deviceId, isMaster)));
     }
 
     private Collection<PiCounterCellData> doReadAllCounterCells(
@@ -583,14 +618,14 @@
 
         WriteRequest writeRequestMsg = WriteRequest.newBuilder()
                 .setDeviceId(p4DeviceId)
-                .setElectionId(p4RuntimeElectionId)
+                .setElectionId(clientElectionId)
                 .addAllUpdates(updateMsgs)
                 .build();
         try {
             blockingStub.write(writeRequestMsg);
             return true;
         } catch (StatusRuntimeException e) {
-            logWriteErrors(members, e, opType, "group member");
+            checkAndLogWriteErrors(members, e, opType, "group member");
             return false;
         }
     }
@@ -729,7 +764,7 @@
 
         final WriteRequest writeRequestMsg = WriteRequest.newBuilder()
                 .setDeviceId(p4DeviceId)
-                .setElectionId(p4RuntimeElectionId)
+                .setElectionId(clientElectionId)
                 .addUpdates(Update.newBuilder()
                                     .setEntity(Entity.newBuilder()
                                                        .setActionProfileGroup(actionProfileGroup)
@@ -741,7 +776,7 @@
             blockingStub.write(writeRequestMsg);
             return true;
         } catch (StatusRuntimeException e) {
-            logWriteErrors(Collections.singleton(group), e, opType, "group");
+            checkAndLogWriteErrors(Collections.singleton(group), e, opType, "group");
             return false;
         }
     }
@@ -814,7 +849,7 @@
 
         writeRequestBuilder
                 .setDeviceId(p4DeviceId)
-                .setElectionId(p4RuntimeElectionId)
+                .setElectionId(clientElectionId)
                 .addAllUpdates(updateMsgs)
                 .build();
         try {
@@ -827,53 +862,34 @@
         }
     }
 
-    /**
-     * Returns the internal P4 device ID associated with this client.
-     *
-     * @return P4 device ID
-     */
-    public long p4DeviceId() {
-        return p4DeviceId;
-    }
-
-    /**
-     * For testing purpose only. TODO: remove before release.
-     *
-     * @return blocking stub
-     */
-    public P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub() {
-        return this.blockingStub;
-    }
-
-
-    @Override
-    public void shutdown() {
-
+    private Void doShutdown() {
         log.info("Shutting down client for {}...", deviceId);
-
-        writeLock.lock();
-        try {
-            if (streamRequestObserver != null) {
-                streamRequestObserver.onCompleted();
-                cancellableContext.cancel(new InterruptedException("Requested client shutdown"));
-            }
-
-            this.executorService.shutdown();
+        if (streamRequestObserver != null) {
             try {
-                executorService.awaitTermination(5, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-                log.warn("Executor service didn't shutdown in time.");
-                Thread.currentThread().interrupt();
+                streamRequestObserver.onCompleted();
+            } catch (IllegalStateException e) {
+                // Thrown if stream channel is already completed. Can ignore.
+                log.debug("Ignored expection: {}", e);
             }
-        } finally {
-            writeLock.unlock();
+            cancellableContext.cancel(new InterruptedException(
+                    "Requested client shutdown"));
         }
+        this.executorService.shutdown();
+        try {
+            executorService.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("Executor service didn't shutdown in time.");
+            Thread.currentThread().interrupt();
+        }
+        return null;
     }
 
-    private <E extends PiEntity> void logWriteErrors(Collection<E> writeEntities,
-                                                     StatusRuntimeException ex,
-                                                     WriteOperationType opType,
-                                                     String entryType) {
+    private <E extends PiEntity> void checkAndLogWriteErrors(
+            Collection<E> writeEntities, StatusRuntimeException ex,
+            WriteOperationType opType, String entryType) {
+
+        checkGrpcException(ex);
+
         List<P4RuntimeOuterClass.Error> errors = null;
         String description = null;
         try {
@@ -946,10 +962,80 @@
                       err.hasDetails() ? "\n" + err.getDetails().toString() : "");
     }
 
+    private void checkGrpcException(StatusRuntimeException ex) {
+        switch (ex.getStatus().getCode()) {
+            case OK:
+                break;
+            case CANCELLED:
+                break;
+            case UNKNOWN:
+                break;
+            case INVALID_ARGUMENT:
+                break;
+            case DEADLINE_EXCEEDED:
+                break;
+            case NOT_FOUND:
+                break;
+            case ALREADY_EXISTS:
+                break;
+            case PERMISSION_DENIED:
+                // Notify upper layers that this node is not master.
+                controller.postEvent(new P4RuntimeEvent(
+                        P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
+                        new ArbitrationResponse(deviceId, false)));
+                break;
+            case RESOURCE_EXHAUSTED:
+                break;
+            case FAILED_PRECONDITION:
+                break;
+            case ABORTED:
+                break;
+            case OUT_OF_RANGE:
+                break;
+            case UNIMPLEMENTED:
+                break;
+            case INTERNAL:
+                break;
+            case UNAVAILABLE:
+                // Channel might be closed.
+                controller.postEvent(new P4RuntimeEvent(
+                        P4RuntimeEvent.Type.CHANNEL_EVENT,
+                        new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
+                break;
+            case DATA_LOSS:
+                break;
+            case UNAUTHENTICATED:
+                break;
+            default:
+                break;
+        }
+    }
+
+    private Uint128 bigIntegerToUint128(BigInteger value) {
+        final byte[] arr = value.toByteArray();
+        final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
+                .put(new byte[Long.BYTES * 2 - arr.length])
+                .put(arr);
+        bb.rewind();
+        return Uint128.newBuilder()
+                .setHigh(bb.getLong())
+                .setLow(bb.getLong())
+                .build();
+    }
+
+    private BigInteger uint128ToBigInteger(Uint128 value) {
+        return new BigInteger(
+                ByteBuffer.allocate(Long.BYTES * 2)
+                        .putLong(value.getHigh())
+                        .putLong(value.getLow())
+                        .array());
+    }
+
     /**
      * Handles messages received from the device on the stream channel.
      */
-    private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
+    private class StreamChannelResponseObserver
+            implements StreamObserver<StreamMessageResponse> {
 
         @Override
         public void onNext(StreamMessageResponse message) {
@@ -958,40 +1044,40 @@
 
         private void doNext(StreamMessageResponse message) {
             try {
-                log.debug("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
+                log.debug("Received message on stream channel from {}: {}",
+                          deviceId, message.getUpdateCase());
                 switch (message.getUpdateCase()) {
                     case PACKET:
-                        // Packet-in
                         doPacketIn(message.getPacket());
                         return;
                     case ARBITRATION:
-                        doArbitrationUpdateFromDevice(message.getArbitration());
+                        doArbitrationResponse(message.getArbitration());
                         return;
                     default:
-                        log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
+                        log.warn("Unrecognized stream message from {}: {}",
+                                 deviceId, message.getUpdateCase());
                 }
             } catch (Throwable ex) {
-                log.error("Exception while processing stream channel message from {}", deviceId, ex);
+                log.error("Exception while processing stream message from {}",
+                          deviceId, ex);
             }
         }
 
         @Override
         public void onError(Throwable throwable) {
-            log.warn("Error on stream channel for {}: {}", deviceId, Status.fromThrowable(throwable));
-            controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
-                    new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_ERROR,
-                            throwable)));
-            // FIXME: we might want to recreate the channel.
-            // In general, we want to be robust against any transient error and, if the channel is open, make sure the
-            // stream channel is always on.
+            log.warn("Error on stream channel for {}: {}",
+                     deviceId, Status.fromThrowable(throwable));
+            controller.postEvent(new P4RuntimeEvent(
+                    P4RuntimeEvent.Type.CHANNEL_EVENT,
+                    new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
         }
 
         @Override
         public void onCompleted() {
             log.warn("Stream channel for {} has completed", deviceId);
-            controller.postEvent(new P4RuntimeEvent(P4RuntimeEvent.Type.CHANNEL_EVENT,
-                    new DefaultChannelEvent(deviceId, ChannelEvent.Type.CHANNEL_DISCONNECTED,
-                            "Stream channel has completed")));
+            controller.postEvent(new P4RuntimeEvent(
+                    P4RuntimeEvent.Type.CHANNEL_EVENT,
+                    new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
         }
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 987356b..d2773b2 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -35,20 +35,21 @@
 import org.onosproject.grpc.api.GrpcChannelId;
 import org.onosproject.grpc.api.GrpcController;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelEvent;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.onosproject.p4runtime.api.P4RuntimeEventListener;
-import org.onosproject.store.service.AtomicCounter;
 import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.math.BigInteger;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -65,14 +66,13 @@
         extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
         implements P4RuntimeController {
 
-    private static final String P4R_ELECTION = "p4runtime-election";
     private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
     private final Logger log = getLogger(getClass());
     private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
     private final Map<DeviceId, ClientKey> deviceIdToClientKey = Maps.newHashMap();
     private final Map<ClientKey, P4RuntimeClient> clientKeyToClient = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
-    private final Map<DeviceId, List<ChannelListener>> channelListeners = Maps.newConcurrentMap();
+    private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
     private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
             .expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
             .build(new CacheLoader<DeviceId, ReadWriteLock>() {
@@ -81,8 +81,7 @@
                     return new ReentrantReadWriteLock();
                 }
             });
-
-    private AtomicCounter electionIdGenerator;
+    private DistributedElectionIdGenerator electionIdGenerator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private GrpcController grpcController;
@@ -93,8 +92,7 @@
     @Activate
     public void activate() {
         eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
-        electionIdGenerator = storageService.getAtomicCounter(P4R_ELECTION);
-
+        electionIdGenerator = new DistributedElectionIdGenerator(storageService);
         log.info("Started");
     }
 
@@ -102,6 +100,8 @@
     @Deactivate
     public void deactivate() {
         grpcController = null;
+        electionIdGenerator.destroy();
+        electionIdGenerator = null;
         eventDispatcher.removeSink(P4RuntimeEvent.class);
         log.info("Stopped");
     }
@@ -119,13 +119,13 @@
                 .usePlaintext(true);
 
         deviceLocks.getUnchecked(deviceId).writeLock().lock();
-        log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
-                 deviceId, serverAddr, serverPort, p4DeviceId);
 
         try {
             if (deviceIdToClientKey.containsKey(deviceId)) {
                 final ClientKey existingKey = deviceIdToClientKey.get(deviceId);
                 if (newKey.equals(existingKey)) {
+                    log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
+                             deviceId, serverAddr, serverPort, p4DeviceId);
                     return true;
                 } else {
                     throw new IllegalStateException(
@@ -133,6 +133,8 @@
                                     "server endpoints already exists");
                 }
             } else {
+                log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
+                         deviceId, serverAddr, serverPort, p4DeviceId);
                 return doCreateClient(newKey, channelBuilder);
             }
         } finally {
@@ -187,12 +189,11 @@
     public void removeClient(DeviceId deviceId) {
 
         deviceLocks.getUnchecked(deviceId).writeLock().lock();
-
         try {
             if (deviceIdToClientKey.containsKey(deviceId)) {
                 final ClientKey clientKey = deviceIdToClientKey.get(deviceId);
-                grpcController.disconnectChannel(channelIds.get(deviceId));
                 clientKeyToClient.remove(clientKey).shutdown();
+                grpcController.disconnectChannel(channelIds.get(deviceId));
                 deviceIdToClientKey.remove(deviceId);
                 channelIds.remove(deviceId);
             }
@@ -222,7 +223,7 @@
                 log.debug("No client for {}, can't check for reachability", deviceId);
                 return false;
             }
-
+            // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
             return grpcController.isChannelOpen(channelIds.get(deviceId));
         } finally {
             deviceLocks.getUnchecked(deviceId).readLock().unlock();
@@ -230,67 +231,73 @@
     }
 
     @Override
-    public long getNewMasterElectionId() {
-        return electionIdGenerator.incrementAndGet();
+    public void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+        deviceAgentListeners.putIfAbsent(deviceId, new CopyOnWriteArrayList<>());
+        deviceAgentListeners.get(deviceId).add(listener);
     }
 
     @Override
-    public void addChannelListener(DeviceId deviceId, ChannelListener listener) {
-        channelListeners.compute(deviceId, (devId, listeners) -> {
-            List<ChannelListener> newListeners;
-            if (listeners != null) {
-                newListeners = listeners;
-            } else {
-                newListeners = new ArrayList<>();
-            }
-            newListeners.add(listener);
-            return newListeners;
+    public void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+        deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
+            listeners.remove(listener);
+            return listeners;
         });
     }
 
-    @Override
-    public void removeChannelListener(DeviceId deviceId, ChannelListener listener) {
-        channelListeners.compute(deviceId, (devId, listeners) -> {
-            if (listeners != null) {
-                listeners.remove(listener);
-                return listeners;
-            } else {
-                log.debug("Device {} has no listener registered", deviceId);
-                return null;
-            }
-        });
+    BigInteger newMasterElectionId(DeviceId deviceId) {
+        return electionIdGenerator.generate(deviceId);
     }
 
     void postEvent(P4RuntimeEvent event) {
-        if (event.type().equals(P4RuntimeEvent.Type.CHANNEL_EVENT)) {
-            DefaultChannelEvent channelError = (DefaultChannelEvent) event.subject();
-            DeviceId deviceId = event.subject().deviceId();
-            ChannelEvent channelEvent = null;
-            //If disconnection is already known we propagate it.
-            if (channelError.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
-                channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
-                        channelError.throwable());
-            } else if (channelError.type().equals(ChannelEvent.Type.CHANNEL_ERROR)) {
-                //If we don't know what the error is we check for reachability
-                if (!isReacheable(deviceId)) {
-                    //if false the channel has disconnected
-                    channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
-                            channelError.throwable());
-                } else {
-                    // else we propagate the event.
-                    channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_ERROR, channelError.deviceId(),
-                            channelError.throwable());
-                }
-            }
-            //Ignoring CHANNEL_CONNECTED
-            if (channelEvent != null && channelListeners.get(deviceId) != null) {
-                for (ChannelListener listener : channelListeners.get(deviceId)) {
-                    listener.event(channelEvent);
-                }
-            }
-        } else {
-            post(event);
+        switch (event.type()) {
+            case CHANNEL_EVENT:
+                handleChannelEvent(event);
+                break;
+            case ARBITRATION_RESPONSE:
+                handleArbitrationReply(event);
+                break;
+            default:
+                post(event);
+                break;
         }
     }
 
+    private void handleChannelEvent(P4RuntimeEvent event) {
+        final ChannelEvent channelEvent = (ChannelEvent) event.subject();
+        final DeviceId deviceId = channelEvent.deviceId();
+        final DeviceAgentEvent.Type agentEventType;
+        switch (channelEvent.type()) {
+            case OPEN:
+                agentEventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+                break;
+            case CLOSED:
+                agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+                break;
+            case ERROR:
+                agentEventType = !isReacheable(deviceId)
+                        ? DeviceAgentEvent.Type.CHANNEL_CLOSED
+                        : DeviceAgentEvent.Type.CHANNEL_ERROR;
+                break;
+            default:
+                log.warn("Unrecognized channel event type {}", channelEvent.type());
+                return;
+        }
+        postDeviceAgentEvent(deviceId, new DeviceAgentEvent(agentEventType, deviceId));
+    }
+
+    private void handleArbitrationReply(P4RuntimeEvent event) {
+        final DeviceId deviceId = event.subject().deviceId();
+        final ArbitrationResponse response = (ArbitrationResponse) event.subject();
+        final DeviceAgentEvent.Type roleType = response.isMaster()
+                ? DeviceAgentEvent.Type.ROLE_MASTER
+                : DeviceAgentEvent.Type.ROLE_STANDBY;
+        postDeviceAgentEvent(deviceId, new DeviceAgentEvent(
+                roleType, response.deviceId()));
+    }
+
+    private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
+        if (deviceAgentListeners.containsKey(deviceId)) {
+            deviceAgentListeners.get(deviceId).forEach(l -> l.event(event));
+        }
+    }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketInEvent.java
similarity index 89%
rename from protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java
rename to protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketInEvent.java
index a1cce46..dddee2b 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketInEvent.java
@@ -25,14 +25,14 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
- * Default implementation of a packet-in in P4Runtime.
+ * P4Runtime packet-in.
  */
-final class DefaultPacketIn implements P4RuntimePacketIn {
+final class PacketInEvent implements P4RuntimePacketIn {
 
     private final DeviceId deviceId;
     private final PiPacketOperation operation;
 
-    DefaultPacketIn(DeviceId deviceId, PiPacketOperation operation) {
+    PacketInEvent(DeviceId deviceId, PiPacketOperation operation) {
         this.deviceId = checkNotNull(deviceId);
         this.operation = checkNotNull(operation);
     }
@@ -55,7 +55,7 @@
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        DefaultPacketIn that = (DefaultPacketIn) o;
+        PacketInEvent that = (PacketInEvent) o;
         return Objects.equal(deviceId, that.deviceId) &&
                 Objects.equal(operation, that.operation);
     }
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index 1e83577..893049f 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -144,7 +144,7 @@
     }
 
     @AfterClass
-    public static void globalTeerDown() {
+    public static void globalTearDown() {
         grpcServer.shutdown();
         grpcChannel.shutdown();
     }
@@ -156,7 +156,7 @@
         client = new P4RuntimeClientImpl(DEVICE_ID, P4_DEVICE_ID,
                                          grpcChannel,
                                          controller);
-        client.p4RuntimeElectionId = DEFAULT_ELECTION_ID;
+        client.becomeMaster();
     }
 
     @Test
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/DefaultPacketInTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/PacketInEventTest.java
similarity index 88%
rename from protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/DefaultPacketInTest.java
rename to protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/PacketInEventTest.java
index 5993142..0a1f82e 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/DefaultPacketInTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/PacketInEventTest.java
@@ -32,7 +32,7 @@
 /**
  * Test for DefaultPacketIn class.
  */
-public class DefaultPacketInTest {
+public class PacketInEventTest {
 
     private static final int DEFAULT_ORIGINAL_VALUE = 255;
     private static final int DEFAULT_BIT_WIDTH = 9;
@@ -46,10 +46,10 @@
     private PiPacketOperation packetOperation2;
     private PiPacketOperation nullPacketOperation = null;
 
-    private DefaultPacketIn packetIn;
-    private DefaultPacketIn sameAsPacketIn;
-    private DefaultPacketIn packetIn2;
-    private DefaultPacketIn packetIn3;
+    private PacketInEvent packetIn;
+    private PacketInEvent sameAsPacketIn;
+    private PacketInEvent packetIn2;
+    private PacketInEvent packetIn3;
 
     /**
      * Setup method for packetOperation and packetOperation2.
@@ -78,10 +78,10 @@
                                       .build())
                 .build();
 
-        packetIn = new DefaultPacketIn(deviceId, packetOperation);
-        sameAsPacketIn = new DefaultPacketIn(sameDeviceId, packetOperation);
-        packetIn2 = new DefaultPacketIn(deviceId2, packetOperation);
-        packetIn3 = new DefaultPacketIn(deviceId, packetOperation2);
+        packetIn = new PacketInEvent(deviceId, packetOperation);
+        sameAsPacketIn = new PacketInEvent(sameDeviceId, packetOperation);
+        packetIn2 = new PacketInEvent(deviceId2, packetOperation);
+        packetIn3 = new PacketInEvent(deviceId, packetOperation2);
     }
 
     /**
@@ -105,7 +105,7 @@
     @Test(expected = NullPointerException.class)
     public void testConstructorWithNullDeviceId() {
 
-        new DefaultPacketIn(nullDeviceId, packetOperation);
+        new PacketInEvent(nullDeviceId, packetOperation);
     }
 
     /**
@@ -114,7 +114,7 @@
     @Test(expected = NullPointerException.class)
     public void testConstructorWithNullPacketOperation() {
 
-        new DefaultPacketIn(deviceId, nullPacketOperation);
+        new PacketInEvent(deviceId, nullPacketOperation);
     }
 
     /**
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 550ca59..2b31b91 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -50,9 +50,9 @@
 import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.config.basics.BasicDeviceConfig;
 import org.onosproject.net.config.basics.SubjectFactories;
-import org.onosproject.net.device.ChannelEvent;
-import org.onosproject.net.device.ChannelListener;
 import org.onosproject.net.device.DefaultDeviceDescription;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.device.DeviceDescription;
 import org.onosproject.net.device.DeviceDescriptionDiscovery;
 import org.onosproject.net.device.DeviceEvent;
@@ -95,6 +95,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -102,6 +103,7 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static java.util.concurrent.Executors.newFixedThreadPool;
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.device.DeviceEvent.Type;
@@ -116,45 +118,48 @@
 @Component(immediate = true)
 public class GeneralDeviceProvider extends AbstractProvider
         implements DeviceProvider {
-    public static final String DRIVER = "driver";
-    public static final int REACHABILITY_TIMEOUT = 10;
-    public static final String DEPLOY = "deploy-";
-    public static final String PIPECONF_TOPIC = "-pipeconf";
-    public static final String CHECK = "check-";
-    public static final String CONNECTION = "-connection";
+
+    // Timeout in seconds for operations on devices.
+    private static final int DEVICE_OP_TIMEOUT = 10;
+
+    private static final String DRIVER = "driver";
+    private static final String DEPLOY = "deploy-";
+    private static final String PIPECONF_TOPIC = "-pipeconf";
+    private static final String CHECK = "check-";
+    private static final String CONNECTION = "-connection";
     private static final String POLL_FREQUENCY = "pollFrequency";
 
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceProviderRegistry providerRegistry;
+    private DeviceProviderRegistry providerRegistry;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ComponentConfigService componentConfigService;
+    private ComponentConfigService componentConfigService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected NetworkConfigRegistry cfgService;
+    private NetworkConfigRegistry cfgService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected CoreService coreService;
+    private CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceService deviceService;
+    private DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DriverService driverService;
+    private DriverService driverService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected MastershipService mastershipService;
+    private MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected PiPipeconfService piPipeconfService;
+    private PiPipeconfService piPipeconfService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterService clusterService;
+    private ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LeadershipService leadershipService;
+    private LeadershipService leadershipService;
 
     private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
     @Property(name = POLL_FREQUENCY, intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
@@ -168,9 +173,9 @@
                     "default is 10 sec")
     private int deviceAvailabilityPollFrequency = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS;
 
-    protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
-    protected static final String URI_SCHEME = "device";
-    protected static final String CFG_SCHEME = "generalprovider";
+    private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
+    private static final String URI_SCHEME = "device";
+    private static final String CFG_SCHEME = "generalprovider";
     private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
     private static final int CORE_POOL_SIZE = 10;
     private static final String UNKNOWN = "unknown";
@@ -187,25 +192,24 @@
 
     private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
 
+    private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
 
-    protected ScheduledExecutorService connectionExecutor
-            = newScheduledThreadPool(CORE_POOL_SIZE,
-            groupedThreads("onos/generaldeviceprovider-device",
-                    "connection-executor-%d", log));
-    protected ScheduledExecutorService portStatsExecutor
-            = newScheduledThreadPool(CORE_POOL_SIZE,
-            groupedThreads("onos/generaldeviceprovider-port-stats",
-                    "port-stats-executor-%d", log));
-    protected ScheduledExecutorService availabilityCheckExecutor
-            = newScheduledThreadPool(CORE_POOL_SIZE,
-            groupedThreads("onos/generaldeviceprovider-availability-check",
-                    "availability-check-executor-%d", log));
-    protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
 
-    protected DeviceProviderService providerService;
+    private ExecutorService connectionExecutor
+            = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
+                    "onos/generaldeviceprovider-device-connect", "%d", log));
+    private ScheduledExecutorService portStatsExecutor
+            = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
+                    "onos/generaldeviceprovider-port-stats", "%d", log));
+    private ScheduledExecutorService availabilityCheckExecutor
+            = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
+                    "onos/generaldeviceprovider-availability-check", "%d", log));
+    private ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
+
+    private DeviceProviderService providerService;
     private InternalDeviceListener deviceListener = new InternalDeviceListener();
 
-    protected final ConfigFactory factory =
+    private final ConfigFactory factory =
             new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
                     SubjectFactories.DEVICE_SUBJECT_FACTORY,
                     GeneralProviderDeviceConfig.class, CFG_SCHEME) {
@@ -215,8 +219,8 @@
                 }
             };
 
-    protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
-    private ChannelListener channelListener = new InternalChannelListener();
+    private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
+    private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
 
 
     @Activate
@@ -233,7 +237,8 @@
         cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
                 .forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
         //Initiating a periodic check to see if any device is available again and reconnect it.
-        availabilityCheckExecutor.scheduleAtFixedRate(this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
+        availabilityCheckExecutor.scheduleAtFixedRate(
+                this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
                 deviceAvailabilityPollFrequency, TimeUnit.SECONDS);
         modified(context);
         log.info("Started");
@@ -255,12 +260,12 @@
             Set<DeviceId> deviceSubjects =
                     cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
             deviceSubjects.forEach(deviceId -> {
-                if (!compareScheme(deviceId)) {
+                if (notMyScheme(deviceId)) {
                     // not under my scheme, skipping
                     log.debug("{} is not my scheme, skipping", deviceId);
                     return;
                 }
-                scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, true));
+                scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, true));
             });
         }
     }
@@ -299,17 +304,18 @@
     @Override
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
         log.info("Received role {} for device {}", newRole, deviceId);
-        CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
-        roleReply.thenAcceptAsync(mastership -> {
-            providerService.receivedRoleReply(deviceId, newRole, mastership);
-            if (!mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) != null) {
-                scheduledTasks.get(deviceId).cancel(false);
-                scheduledTasks.remove(deviceId);
-            } else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
-                scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, false));
-                updatePortStatistics(deviceId);
-            }
-        });
+        requestedRoles.put(deviceId, newRole);
+        connectionExecutor.submit(() -> doRoleChanged(deviceId, newRole));
+    }
+
+    private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
+        DeviceHandshaker handshaker = getHandshaker(deviceId);
+        if (handshaker == null) {
+            log.warn("Null handshaker. Unable to notify new role {} to {}",
+                     newRole, deviceId);
+            return;
+        }
+        handshaker.roleChanged(newRole);
     }
 
     @Override
@@ -321,9 +327,9 @@
             return false;
         }
 
-        CompletableFuture<Boolean> reachable = handshaker.isReachable();
         try {
-            return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
+            return handshaker.isReachable()
+                    .get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.warn("Device {} is not reachable {}", deviceId, e.getMessage());
             log.debug("Exception", e);
@@ -358,11 +364,8 @@
     @Override
     public void triggerDisconnect(DeviceId deviceId) {
         log.debug("Triggering disconnection of device {}", deviceId);
-        connectionExecutor.execute(() -> {
-            disconnectDevice(deviceId).whenComplete((success, ex) -> {
-                checkAndConnect(deviceId);
-            });
-        });
+        connectionExecutor.execute(() -> disconnectDevice(deviceId)
+                .thenRunAsync(() -> checkAndConnect(deviceId)));
     }
 
     private DeviceHandshaker getHandshaker(DeviceId deviceId) {
@@ -438,7 +441,7 @@
 
             connected.thenAcceptAsync(result -> {
                 if (result) {
-                    handshaker.addChannelListener(channelListener);
+                    handshaker.addDeviceAgentListener(deviceAgentListener);
                     //Populated with the default values obtained by the driver
                     ChassisId cid = new ChassisId();
                     SparseAnnotations annotations = DefaultAnnotations.builder()
@@ -496,7 +499,7 @@
         //Connecting to the device
         handshaker.connect().thenAcceptAsync(result -> {
             if (result) {
-                handshaker.addChannelListener(channelListener);
+                handshaker.addDeviceAgentListener(deviceAgentListener);
                 handlePipeconf(deviceId, handshaker.handler().driver(), handshaker.data(), false);
             }
         });
@@ -583,32 +586,29 @@
     private CompletableFuture<Boolean> disconnectDevice(DeviceId deviceId) {
         log.info("Disconnecting for device {}", deviceId);
 
-        CompletableFuture<Boolean> disconnectError = new CompletableFuture<>();
+        if (scheduledTasks.containsKey(deviceId)) {
+            scheduledTasks.remove(deviceId).cancel(true);
+        }
 
         DeviceHandshaker handshaker = handshakers.remove(deviceId);
-        if (handshaker != null) {
-            handshaker.disconnect().thenAcceptAsync(result -> {
-                if (result) {
-                    log.info("Disconnected device {}", deviceId);
-                    providerService.deviceDisconnected(deviceId);
-                    disconnectError.complete(true);
-                } else {
-                    disconnectError.complete(false);
-                    log.warn("Device {} was unable to disconnect", deviceId);
-                }
-            });
-        } else {
-            //gracefully ignoring.
+
+        if (handshaker == null) {
+            // gracefully ignoring.
             log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
-                    "shutdown of communication", deviceId);
-            disconnectError.complete(false);
+                             "shutdown of communication", deviceId);
+            return CompletableFuture.completedFuture(false);
         }
-        ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
-        if (pollingStatisticsTask != null) {
-            pollingStatisticsTask.cancel(true);
-            scheduledTasks.remove(deviceId);
-        }
-        return disconnectError;
+
+        return handshaker.disconnect()
+                .thenApplyAsync(result -> {
+                    if (result) {
+                        log.info("Disconnected device {}", deviceId);
+                        providerService.deviceDisconnected(deviceId);
+                    } else {
+                        log.warn("Device {} was unable to disconnect", deviceId);
+                    }
+                    return result;
+                });
     }
 
     //Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -637,8 +637,8 @@
         }
     }
 
-    private boolean compareScheme(DeviceId deviceId) {
-        return deviceId.uri().getScheme().equals(URI_SCHEME);
+    private boolean notMyScheme(DeviceId deviceId) {
+        return !deviceId.uri().getScheme().equals(URI_SCHEME);
     }
 
     /**
@@ -650,7 +650,7 @@
         public void event(NetworkConfigEvent event) {
             DeviceId deviceId = (DeviceId) event.subject();
             //Assuming that the deviceId comes with uri 'device:'
-            if (!compareScheme(deviceId)) {
+            if (notMyScheme(deviceId)) {
                 // not under my scheme, skipping
                 log.debug("{} is not my scheme, skipping", deviceId);
                 return;
@@ -765,7 +765,7 @@
         pipelineConfigured.remove(deviceId);
     }
 
-    private ScheduledFuture<?> scheduleStatistcsPolling(DeviceId deviceId, boolean randomize) {
+    private ScheduledFuture<?> scheduleStatsPolling(DeviceId deviceId, boolean randomize) {
         int delay = 0;
         if (randomize) {
             delay = new SecureRandom().nextInt(10);
@@ -826,6 +826,34 @@
         return present;
     }
 
+    private void handleChannelClosed(DeviceId deviceId) {
+        disconnectDevice(deviceId).thenRunAsync(() -> {
+            // If master, notifies disconnection to the core.
+            if (mastershipService.isLocalMaster(deviceId)) {
+                log.info("Disconnecting device {}, due to channel closed event",
+                         deviceId);
+                providerService.deviceDisconnected(deviceId);
+            }
+        });
+    }
+
+    private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
+        //Notify core about response.
+        if (!requestedRoles.containsKey(deviceId)) {
+            return;
+        }
+        providerService.receivedRoleReply(deviceId, requestedRoles.get(deviceId), response);
+        // If not master, cancel polling tasks, otherwise start them.
+        if (!response.equals(MastershipRole.MASTER)
+                && scheduledTasks.get(deviceId) != null) {
+            scheduledTasks.remove(deviceId).cancel(false);
+        } else if (response.equals(MastershipRole.MASTER)
+                && scheduledTasks.get(deviceId) == null) {
+            scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
+            updatePortStatistics(deviceId);
+        }
+    }
+
     /**
      * Listener for core device events.
      */
@@ -834,11 +862,10 @@
         public void event(DeviceEvent event) {
             DeviceId deviceId = event.subject().id();
             // FIXME handling for mastership change scenario missing?
-
             // For now this is scheduled periodically, when streaming API will
             // be available we check and base it on the streaming API (e.g. gNMI)
             if (mastershipService.isLocalMaster(deviceId)) {
-                scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, false));
+                scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
             }
         }
 
@@ -850,26 +877,37 @@
     }
 
     /**
-     * Listener for device channel events.
+     * Listener for device agent events.
      */
-    private class InternalChannelListener implements ChannelListener {
+    private class InternalDeviceAgentListener implements DeviceAgentListener {
 
         @Override
-        public void event(ChannelEvent event) {
+        public void event(DeviceAgentEvent event) {
             DeviceId deviceId = event.subject();
-            if (event.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
-                //let's properly handle device disconnection
-                CompletableFuture<Boolean> disconnection = disconnectDevice(deviceId);
-                disconnection.thenAcceptAsync(result -> {
-                    //If master notifying of disconnection to the core.
-                    if (mastershipService.isLocalMaster(deviceId)) {
-                        log.info("Disconnecting unreachable device {}, due to error on channel", deviceId);
-                        providerService.deviceDisconnected(deviceId);
-                    }
-                });
-
+            switch (event.type()) {
+                case CHANNEL_OPEN:
+                    // Ignore.
+                    break;
+                case CHANNEL_CLOSED:
+                    handleChannelClosed(deviceId);
+                    break;
+                case CHANNEL_ERROR:
+                    // TODO evaluate other reaction to channel error.
+                    log.warn("Received CHANNEL_ERROR from {}. Is the channel still open?",
+                             deviceId);
+                    break;
+                case ROLE_MASTER:
+                    handleMastershipResponse(deviceId, MastershipRole.MASTER);
+                    break;
+                case ROLE_STANDBY:
+                    handleMastershipResponse(deviceId, MastershipRole.STANDBY);
+                    break;
+                case ROLE_NONE:
+                    handleMastershipResponse(deviceId, MastershipRole.NONE);
+                    break;
+                default:
+                    log.warn("Unrecognized device agent event {}", event.type());
             }
-            //TODO evaluate other type of reactions.
         }
 
     }