Refactor channel and mastership handling in P4Runtime

This (big) change aims at solving the issue observed with mastership flapping
and device connection/disconnection with P4Runtime.

Channel handling is now based on the underlying gRPC channel state. Before,
channel events (open/close/error) were generated as a consequence of P4Runtime
StreamChannel events, making device availability dependent on mastership. Now
Stream Channel events only affect mastership (MASTER/STANDBY or NONE when the
SteamChannel RPC is not active).

Mastership handling has been refactored to generate P4Runtime election IDs that
are compatible with the mastership preference decided by the MastershipService.

GeneralDeviceProvider has been re-implemented to support in-order
device event processing and to reduce implementation complexity. Stats polling
has been moved to a separate component, and netcfg handling updated to only
depend on BasicDeviceConfig, augmented with a pipeconf field, and re-using the
managementAddress field to set the gRPC server endpoints (e.g.
grpc://myswitch.local:50051). Before it was depending on 3 different config
classes, making hard to detect changes.

Finally, this change affects some core interfaces:
- Adds a method to DeviceProvider and DeviceHandshaker to check for device
availability, making the meaning of availability device-specific. This is needed
in cases where the device manager needs to change the availability state of a
device (as in change #20842)
- Support device providers not capable of reconciling mastership role responses
with requests (like P4Runtime).
- Clarify the meaning of "connection" in the DeviceConnect behavior.
- Allows driver-based providers to check devices for reachability and
availability without probing the device via the network.

Change-Id: I7ff30d29f5d02ad938e3171536e54ae2916629a2
diff --git a/core/api/src/main/java/org/onosproject/net/AnnotationKeys.java b/core/api/src/main/java/org/onosproject/net/AnnotationKeys.java
index ce3c094..2e92c53 100644
--- a/core/api/src/main/java/org/onosproject/net/AnnotationKeys.java
+++ b/core/api/src/main/java/org/onosproject/net/AnnotationKeys.java
@@ -89,16 +89,6 @@
     public static final String DRIVER = "driver";
 
     /**
-     * Annotation key for the device availability behavior. The value of this key
-     * is expected to be a boolean ({@code true} or {@code false}) and it is
-     * used to determine if the device can be marked online by the core when
-     * deemed appropriate (value is {@code false}, default behaviour if key is
-     * not present), or if marking online should be left to providers ({@code
-     * true}).
-     */
-    public static final String PROVIDER_MARK_ONLINE = "providerMarkOnline";
-
-    /**
      * Annotation key for durable links.
      */
     public static final String DURABLE = "durable";
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java b/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java
index 22367ba..be1c6c4 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java
@@ -44,16 +44,17 @@
     CompletableFuture<Boolean> setPipeconf(PiPipeconf pipeconf);
 
     /**
-     * Returns true if the device is configured with the given pipeconf, false
-     * otherwise.
+     * Probes the device to verify that the given pipeconf is the one currently
+     * configured.
      * <p>
      * This method is expected to always return true after successfully calling
      * {@link #setPipeconf(PiPipeconf)} with the given pipeconf.
      *
      * @param pipeconf pipeconf
-     * @return true if the device has the given pipeconf set, false otherwise
+     * @return completable future eventually true if the device has the given
+     * pipeconf set, false otherwise
      */
-    boolean isPipeconfSet(PiPipeconf pipeconf);
+    CompletableFuture<Boolean> isPipeconfSet(PiPipeconf pipeconf);
 
     /**
      * Returns the default pipeconf for this device, to be used when any other
diff --git a/core/api/src/main/java/org/onosproject/net/config/basics/BasicDeviceConfig.java b/core/api/src/main/java/org/onosproject/net/config/basics/BasicDeviceConfig.java
index 0353bf2..1a7ffd6 100644
--- a/core/api/src/main/java/org/onosproject/net/config/basics/BasicDeviceConfig.java
+++ b/core/api/src/main/java/org/onosproject/net/config/basics/BasicDeviceConfig.java
@@ -29,6 +29,7 @@
     private static final String TYPE = "type";
     private static final String DRIVER = "driver";
     private static final String MANAGEMENT_ADDRESS = "managementAddress";
+    private static final String PIPECONF = "pipeconf";
     private static final String MANUFACTURER = "manufacturer";
     private static final String HW_VERSION = "hwVersion";
     private static final String SW_VERSION = "swVersion";
@@ -41,6 +42,7 @@
     private static final int SW_VERSION_MAX_LENGTH = 256;
     private static final int SERIAL_MAX_LENGTH = 256;
     private static final int MANAGEMENT_ADDRESS_MAX_LENGTH = 1024;
+    private static final int PIPECONF_MAX_LENGTH = 256;
 
     @Override
     public boolean isValid() {
@@ -52,13 +54,14 @@
                 && hasOnlyFields(ALLOWED, NAME, LOC_TYPE, LATITUDE, LONGITUDE,
                 GRID_Y, GRID_X, UI_TYPE, RACK_ADDRESS, OWNER, TYPE, DRIVER, ROLES,
                 MANUFACTURER, HW_VERSION, SW_VERSION, SERIAL,
-                MANAGEMENT_ADDRESS, DEVICE_KEY_ID)
+                MANAGEMENT_ADDRESS, PIPECONF, DEVICE_KEY_ID)
                 && isValidLength(DRIVER, DRIVER_MAX_LENGTH)
                 && isValidLength(MANUFACTURER, MANUFACTURER_MAX_LENGTH)
                 && isValidLength(HW_VERSION, MANUFACTURER_MAX_LENGTH)
                 && isValidLength(SW_VERSION, MANUFACTURER_MAX_LENGTH)
                 && isValidLength(SERIAL, MANUFACTURER_MAX_LENGTH)
-                && isValidLength(MANAGEMENT_ADDRESS, MANAGEMENT_ADDRESS_MAX_LENGTH);
+                && isValidLength(MANAGEMENT_ADDRESS, MANAGEMENT_ADDRESS_MAX_LENGTH)
+                && isValidLength(PIPECONF, PIPECONF_MAX_LENGTH);
     }
 
     /**
@@ -186,15 +189,25 @@
     }
 
     /**
-     * Returns the device management ip (ip:port).
+     * Returns the device management address (e.g, "ip:port" or full URI
+     * string).
      *
-     * @return device management address (ip:port) or null if not set
+     * @return device management address or null if not set
      */
     public String managementAddress() {
         return get(MANAGEMENT_ADDRESS, null);
     }
 
     /**
+     * Returns the device pipeconf.
+     *
+     * @return device pipeconf or null if not set
+     */
+    public String pipeconf() {
+        return get(PIPECONF, null);
+    }
+
+    /**
      * Sets the device management ip (ip:port).
      *
      * @param managementAddress new device management address (ip:port); null to clear
@@ -202,11 +215,23 @@
      */
     public BasicDeviceConfig managementAddress(String managementAddress) {
         checkArgument(managementAddress.length() <= MANAGEMENT_ADDRESS_MAX_LENGTH,
-                "serialNumber exceeds maximum length " + MANAGEMENT_ADDRESS_MAX_LENGTH);
+                "managementAddress exceeds maximum length " + MANAGEMENT_ADDRESS_MAX_LENGTH);
         return (BasicDeviceConfig) setOrClear(MANAGEMENT_ADDRESS, managementAddress);
     }
 
     /**
+     * Sets the device pipeconf.
+     *
+     * @param pipeconf new device pipeconf
+     * @return self
+     */
+    public BasicDeviceConfig pipeconf(String pipeconf) {
+        checkArgument(pipeconf.length() <= PIPECONF_MAX_LENGTH,
+                      "pipeconf exceeds maximum length " + MANAGEMENT_ADDRESS_MAX_LENGTH);
+        return (BasicDeviceConfig) setOrClear(PIPECONF, pipeconf);
+    }
+
+    /**
      * Returns the device key id.
      *
      * @return device key id or null if not set
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceAgentEvent.java b/core/api/src/main/java/org/onosproject/net/device/DeviceAgentEvent.java
index d2520b0..b214070 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceAgentEvent.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceAgentEvent.java
@@ -54,19 +54,19 @@
         ROLE_MASTER,
 
         /**
-         * Signifies that the agent has standby/slave mastership role.
+         * Signifies that the agent has acquired standby/slave mastership role.
          */
         ROLE_STANDBY,
 
         /**
-         * Signifies that the agent cannot acquire any valid mastership role for
+         * Signifies that the agent doesn't have any valid mastership role for
          * the device.
          */
         ROLE_NONE,
 
         /**
-         * Signifies that the agent cannot perform operations on the device
-         * because its role is not master.
+         * Signifies that the agent tried to perform some operations on the
+         * device that requires master role.
          */
         NOT_MASTER,
 
@@ -81,15 +81,4 @@
     public DeviceAgentEvent(Type type, DeviceId deviceId) {
         super(type, deviceId);
     }
-
-    /**
-     * Creates a new device agent event for the given type, device ID and time.
-     *
-     * @param type     event type
-     * @param deviceId device ID
-     * @param time     occurrence time
-     */
-    public DeviceAgentEvent(Type type, DeviceId deviceId, long time) {
-        super(type, deviceId, time);
-    }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
index bceaeff..8a941ca 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
@@ -31,13 +31,63 @@
 public interface DeviceHandshaker extends DeviceConnect {
 
     /**
-     * Checks the reachability (connectivity) of a device. Reachability, unlike
-     * availability, denotes whether THIS particular node can send messages and
-     * receive replies from the specified device.
+     * Returns true if this node is presumed to be able to send messages and
+     * receive replies from the device.
+     * <p>
+     * The implementation should not make any attempt at actively probing the
+     * device over the network, as such it should not block execution. Instead,
+     * it should return a result based solely on internal state (e.g. socket
+     * state). If it returns true, then this node is expected to communicate
+     * with the server successfully. In other words, if any message would be
+     * sent to the device immediately after this method is called and returns
+     * true, then such message is expected, but NOT guaranteed, to reach the
+     * device. If false, it means communication with the device is unlikely to
+     * happen soon.
+     * <p>
+     * Some implementations might require a connection to be created via {@link
+     * #connect()} before checking for reachability. Similarly, after invoking
+     * {@link #disconnect()}, this method might always return false.
      *
-     * @return CompletableFuture eventually true if reachable, false otherwise
+     * @return true if the device is deemed reachable, false otherwise
      */
-    CompletableFuture<Boolean> isReachable();
+    boolean isReachable();
+
+    /**
+     * Similar to {@link #isReachable()}, but performs probing of the device
+     * over the network. This method should be called if {@link #isReachable()}
+     * returns false and the caller wants to be sure this is not a transient
+     * failure state by actively probing the device.
+     *
+     * @return completable future eventually true if device responded to probe,
+     * false otherwise
+     */
+    CompletableFuture<Boolean> probeReachability();
+
+    /**
+     * Checks the availability of the device. Availability denotes whether the
+     * device is reachable and able to perform its functions as expected (e.g.,
+     * forward traffic). Similar to {@link #isReachable()}, implementations are
+     * not allowed to probe the device over the network, but the result should
+     * be based solely on internal state.
+     * <p>
+     * Implementation of this method is optional. If not supported, an exception
+     * should be thrown.
+     *
+     * @return true if the device is deemed available, false otherwise
+     * @throws UnsupportedOperationException if this method is not supported and
+     *                                       {@link #probeAvailability()} should
+     *                                       be used instead.
+     */
+    boolean isAvailable();
+
+    /**
+     * Similar to {@link #isAvailable()} but allows probing the device over the
+     * network. Differently from {@link #isAvailable()}, implementation of this
+     * method is mandatory.
+     *
+     * @return completable future eventually true if available, false otherwise
+     */
+    CompletableFuture<Boolean> probeAvailability();
 
     /**
      * Notifies the device a mastership role change as decided by the core. The
@@ -45,10 +95,48 @@
      * signaling the mastership role accepted by the device.
      *
      * @param newRole new mastership role
+     * @throws UnsupportedOperationException if the device does not support
+     *                                       mastership handling
      */
     void roleChanged(MastershipRole newRole);
 
     /**
+     * Notifies the device of a mastership role change as decided by the core.
+     * Differently from {@link #roleChanged(MastershipRole)}, the role is
+     * described by the given preference value, where {@code preference = 0}
+     * signifies {@link MastershipRole#MASTER} role and {@code preference > 0}
+     * signifies {@link MastershipRole#STANDBY}. Smaller preference values
+     * indicates higher mastership priority for different nodes.
+     * <p>
+     * This method does not permit notifying role {@link MastershipRole#NONE},
+     * in which case {@link #roleChanged(MastershipRole)} should be used
+     * instead.
+     * <p>
+     * Term is a monotonically increasing number, increased by one every time a
+     * new master is elected.
+     * <p>
+     * The implementation of this method should trigger a {@link
+     * DeviceAgentEvent} signaling the mastership role accepted by the device.
+     *
+     * @param preference preference value, where 0 signifies {@link
+     *                   MastershipRole#MASTER} and all other values {@link
+     *                   MastershipRole#STANDBY}
+     * @param term       term number
+     * @throws UnsupportedOperationException if the device does not support
+     *                                       mastership handling, or if it does
+     *                                       not support setting preference-based
+     *                                       mastership, and {@link #roleChanged(MastershipRole)}
+     *                                       should be used instead
+     */
+    default void roleChanged(int preference, long term) {
+        if (preference == 0) {
+            roleChanged(MastershipRole.MASTER);
+        } else {
+            roleChanged(MastershipRole.STANDBY);
+        }
+    }
+
+    /**
      * Returns the last known mastership role agreed by the device for this
      * node.
      *
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceProvider.java b/core/api/src/main/java/org/onosproject/net/device/DeviceProvider.java
index ebdf7b1..771a8e4 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceProvider.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceProvider.java
@@ -41,8 +41,8 @@
      * Notifies the provider of a mastership role change for the specified
      * device as decided by the core.
      *
-     * @param deviceId  device identifier
-     * @param newRole newly determined mastership role
+     * @param deviceId device identifier
+     * @param newRole  newly determined mastership role
      */
     void roleChanged(DeviceId deviceId, MastershipRole newRole);
 
@@ -50,18 +50,40 @@
      * Checks the reachability (connectivity) of a device from this provider.
      * Reachability, unlike availability, denotes whether THIS particular node
      * can send messages and receive replies from the specified device.
+     * <p>
+     * Implementations are encouraged to check for reachability by using only
+     * internal provider state, i.e., without blocking execution.
      *
-     * @param deviceId  device identifier
+     * @param deviceId device identifier
      * @return true if reachable, false otherwise
      */
     boolean isReachable(DeviceId deviceId);
 
     /**
-     * Administratively enables or disables a port.
+     * Checks the availability of the device from the provider perspective.
+     * Availability denotes whether the device is reachable by
+     * this node and able to perform its functions as expected (e.g., forward
+     * traffic).
+     *
+     * <p>
+     * Implementations are encouraged to check for availability by using only
+     * internal provider state, i.e., without blocking execution.
      *
      * @param deviceId device identifier
+     * @return completable future eventually true if available, false otherwise
+     */
+    default boolean isAvailable(DeviceId deviceId) {
+        // For most implementations such as OpenFlow, reachability is equivalent
+        // to availability.
+        return isReachable(deviceId);
+    }
+
+    /**
+     * Administratively enables or disables a port.
+     *
+     * @param deviceId   device identifier
      * @param portNumber port number
-     * @param enable true if port is to be enabled, false to disable
+     * @param enable     true if port is to be enabled, false to disable
      */
     void changePortState(DeviceId deviceId, PortNumber portNumber,
                          boolean enable);
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceProviderService.java b/core/api/src/main/java/org/onosproject/net/device/DeviceProviderService.java
index 80996eb..b00fa63 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceProviderService.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceProviderService.java
@@ -74,15 +74,30 @@
     void portStatusChanged(DeviceId deviceId, PortDescription portDescription);
 
     /**
-     * Notifies the core about the result of a RoleRequest sent to a device.
+     * Notifies the core about the result of a role request sent to a device.
+     * This method assumes that the provider knows the original role that was
+     * requested for a given response, if that is not the case, and only the
+     * response is known to the provider, then {@link #receivedRoleReply(DeviceId,
+     * MastershipRole)} should be used instead.
      *
-     * @param deviceId identity of the device
+     * @param deviceId  identity of the device
      * @param requested mastership role that was requested by the node
-     * @param response mastership role the switch accepted
+     * @param response  mastership role the switch accepted
      */
     void receivedRoleReply(DeviceId deviceId, MastershipRole requested, MastershipRole response);
 
     /**
+     * Notifies the core about a mastership role reported by the given device
+     * for this node.
+     *
+     * @param deviceId  identity of the device
+     * @param response  mastership role the switch accepted
+     */
+    default void receivedRoleReply(DeviceId deviceId, MastershipRole response) {
+        receivedRoleReply(deviceId, null, response);
+    }
+
+    /**
      * Updates statistics about all ports of a device.
      *
      * @param deviceId          identity of the device
diff --git a/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java b/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
index f30386d..433be71 100644
--- a/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
+++ b/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
@@ -21,7 +21,9 @@
 
 /**
  * Abstraction of handler behaviour used to set-up and tear-down connections
- * with a device.
+ * with a device. A connection is intended as the presence of state (e.g. a
+ * transport session) required to carry messages between this node and the
+ * device.
  */
 @Beta
 public interface DeviceConnect extends HandlerBehaviour {
@@ -29,17 +31,33 @@
     /**
      * Connects to the device, for example by opening the transport session that
      * will be later used to send control messages. Returns true if the
-     * connection was initiated successfully, false otherwise.
+     * connection was initiated successfully, false otherwise. The
+     * implementation might require probing the device over the network to
+     * initiate the connection.
      * <p>
-     * Calling multiple times this method while a connection to the device is
-     * open should result in a no-op.
+     * When calling this method while a connection to the device already exists,
+     * the behavior is not defined. For example, some implementations might
+     * require to first call {@link #disconnect()}, while other might behave as
+     * a no-op.
      *
-     * @return CompletableFuture with true if the operation was successful
+     * @return CompletableFuture eventually true if a connection was created
+     * successfully, false otherwise
+     * @throws IllegalStateException if a connection already exists and the
+     *                               implementation requires to call {@link
+     *                               #disconnect()} first.
      */
-    CompletableFuture<Boolean> connect();
+    CompletableFuture<Boolean> connect() throws IllegalStateException;
 
     /**
-     * Returns true if a connection to the device is open, false otherwise.
+     * Returns true if a connection to the device exists, false otherwise. This
+     * method is NOT expected to send any message over the network to check for
+     * device reachability, but rather it should only give an indication if any
+     * internal connection state exists for the device. As such, it should NOT
+     * block execution.
+     * <p>
+     * In general, when called after {@link #connect()} it should always return
+     * true, while it is expected to always return false after calling {@link
+     * #disconnect()} or if {@link #connect()} was never called.
      *
      * @return true if the connection is open, false otherwise
      */
@@ -47,14 +65,10 @@
 
     /**
      * Disconnects from the device, for example closing the transport session
-     * previously opened. Returns true if the disconnection procedure was
-     * successful, false otherwise.
+     * previously opened.
      * <p>
      * Calling multiple times this method while a connection to the device is
-     * closed should result in a no-op.
-     *
-     * @return CompletableFuture with true if the operation was successful
+     * already closed should result in a no-op.
      */
-    CompletableFuture<Boolean> disconnect();
-
+    void disconnect();
 }
diff --git a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfConfig.java b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfConfig.java
deleted file mode 100644
index e8aa731..0000000
--- a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfConfig.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.net.pi.service;
-
-import com.google.common.annotations.Beta;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.config.Config;
-import org.onosproject.net.pi.model.PiPipeconfId;
-
-/**
- * Configuration for the PiPipeconf susbystem.
- */
-@Beta
-public final class PiPipeconfConfig extends Config<DeviceId> {
-
-    public static final String PIPIPECONFID = "piPipeconfId";
-
-    @Override
-    public boolean isValid() {
-        return hasOnlyFields(PIPIPECONFID);
-        //TODO will reinstate after synchonization of events
-        //&& !piPipeconfId().id().equals("");
-    }
-
-    public PiPipeconfId piPipeconfId() {
-        return new PiPipeconfId(get(PIPIPECONFID, ""));
-    }
-}
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 6ce7c0c..0ffe615 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -23,13 +23,11 @@
 import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.net.config.basics.PortDescriptionsConfig;
 import org.onosproject.mastership.MastershipEvent;
 import org.onosproject.mastership.MastershipListener;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.mastership.MastershipTerm;
 import org.onosproject.mastership.MastershipTermService;
-import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
 import org.onosproject.net.Device.Type;
@@ -46,6 +44,7 @@
 import org.onosproject.net.config.basics.BasicDeviceConfig;
 import org.onosproject.net.config.basics.DeviceAnnotationConfig;
 import org.onosproject.net.config.basics.PortAnnotationConfig;
+import org.onosproject.net.config.basics.PortDescriptionsConfig;
 import org.onosproject.net.device.DefaultPortDescription;
 import org.onosproject.net.device.DeviceAdminService;
 import org.onosproject.net.device.DeviceDescription;
@@ -499,7 +498,7 @@
             }
 
             // If this node is the master, ensure the device is marked online.
-            if (myRole == MASTER) {
+            if (myRole == MASTER && canMarkOnline(device)) {
                 post(store.markOnline(deviceId));
             }
 
@@ -788,15 +787,26 @@
                 return;
             }
 
+            final MastershipRole expected = mastershipService.getLocalRole(deviceId);
+
+            if (requested == null) {
+                // Provider is not able to reconcile role responses with
+                // requests. We assume what was requested is what we expect.
+                // This will work only if mastership doesn't change too often,
+                // and devices are left enough time to provide responses before
+                // a different role is requested.
+                requested = expected;
+            }
+
             if (Objects.equals(requested, response)) {
-                if (Objects.equals(requested, mastershipService.getLocalRole(deviceId))) {
+                if (Objects.equals(requested, expected)) {
                     return;
                 } else {
-                    log.warn("Role mismatch on {}. set to {}, but store demands {}",
-                             deviceId, response, mastershipService.getLocalRole(deviceId));
+                    log.warn("Role mismatch on {}. Set to {}, but store demands {}",
+                             deviceId, response, expected);
                     // roleManager got the device to comply, but doesn't agree with
                     // the store; use the store's view, then try to reassert.
-                    backgroundService.execute(() -> reassertRole(deviceId, mastershipService.getLocalRole(deviceId)));
+                    backgroundService.execute(() -> reassertRole(deviceId, expected));
                     return;
                 }
             } else {
@@ -830,9 +840,12 @@
     }
 
     private boolean canMarkOnline(Device device) {
-        final boolean providerMarkOnline = Boolean.parseBoolean(
-                device.annotations().value(AnnotationKeys.PROVIDER_MARK_ONLINE));
-        return !providerMarkOnline;
+        DeviceProvider provider = getProvider(device.id());
+        if (provider == null) {
+            log.warn("Provider for {} was not found. Cannot evaluate availability", device.id());
+            return false;
+        }
+        return provider.isAvailable(device.id());
     }
 
     // Applies the specified role to the device; ignores NONE
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 27f6e4d..9653091 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -23,10 +23,8 @@
 import org.onlab.util.ItemNotFoundException;
 import org.onlab.util.SharedExecutors;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.config.basics.BasicDeviceConfig;
-import org.onosproject.net.config.basics.SubjectFactories;
 import org.onosproject.net.driver.Behaviour;
 import org.onosproject.net.driver.DefaultDriver;
 import org.onosproject.net.driver.Driver;
@@ -36,7 +34,6 @@
 import org.onosproject.net.driver.DriverProvider;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.model.PiPipeconfId;
-import org.onosproject.net.pi.service.PiPipeconfConfig;
 import org.onosproject.net.pi.service.PiPipeconfMappingStore;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.osgi.service.component.annotations.Activate;
@@ -73,7 +70,6 @@
     private final Logger log = getLogger(getClass());
 
     private static final String MERGED_DRIVER_SEPARATOR = ":";
-    private static final String CFG_SCHEME = "piPipeconf";
 
     private static final int MISSING_DRIVER_WATCHDOG_INTERVAL = 5; // Seconds.
 
@@ -98,19 +94,8 @@
     protected ExecutorService executor = Executors.newFixedThreadPool(
             10, groupedThreads("onos/pipeconf-manager", "%d", log));
 
-    protected final ConfigFactory configFactory =
-            new ConfigFactory<DeviceId, PiPipeconfConfig>(
-                    SubjectFactories.DEVICE_SUBJECT_FACTORY,
-                    PiPipeconfConfig.class, CFG_SCHEME) {
-                @Override
-                public PiPipeconfConfig createConfig() {
-                    return new PiPipeconfConfig();
-                }
-            };
-
     @Activate
     public void activate() {
-        cfgService.registerConfigFactory(configFactory);
         driverAdminService.addListener(driverListener);
         checkMissingMergedDrivers();
         if (!missingMergedDrivers.isEmpty()) {
@@ -127,7 +112,6 @@
     @Deactivate
     public void deactivate() {
         executor.shutdown();
-        cfgService.unregisterConfigFactory(configFactory);
         driverAdminService.removeListener(driverListener);
         pipeconfs.clear();
         missingMergedDrivers.clear();
@@ -231,7 +215,7 @@
             if (getDriver(newDriverName) != null) {
                 return newDriverName;
             }
-            log.info("Creating merged driver {}...", newDriverName);
+            log.debug("Creating merged driver {}...", newDriverName);
             final Driver mergedDriver = buildMergedDriver(
                     pipeconfId, baseDriverName, newDriverName);
             if (mergedDriver == null) {
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
index 62fb157..eafb778 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
@@ -17,6 +17,7 @@
 package org.onosproject.net.pi.impl;
 
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Striped;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
@@ -250,7 +251,7 @@
         if (!handshaker.isConnected()) {
             return false;
         }
-        if (pipelineProg.isPipeconfSet(pipeconf)) {
+        if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf))) {
             log.debug("Pipeconf {} already configured on {}",
                       pipeconf.id(), device.id());
             return true;
diff --git a/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java b/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
index 20ded6a..de778a0 100644
--- a/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
@@ -44,7 +44,6 @@
 import org.onosproject.net.driver.DriverProvider;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.model.PiPipeconfId;
-import org.onosproject.net.pi.service.PiPipeconfConfig;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -55,7 +54,6 @@
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.onosproject.pipelines.basic.PipeconfLoader.BASIC_PIPECONF;
 
@@ -77,9 +75,6 @@
     private final Set<NetworkConfigListener> netCfgListeners = new HashSet<>();
     private final Set<DriverProvider> providers = new HashSet<>();
 
-    private final PiPipeconfConfig piPipeconfConfig = new PiPipeconfConfig();
-    private final InputStream jsonStream = PiPipeconfManagerTest.class
-            .getResourceAsStream("/org/onosproject/net/pi/impl/piPipeconfId.json");
     private final BasicDeviceConfig basicDeviceConfig = new BasicDeviceConfig();
     private final InputStream jsonStreamBasic = PiPipeconfManagerTest.class
             .getResourceAsStream("/org/onosproject/net/pi/impl/basic.json");
@@ -95,11 +90,8 @@
         piPipeconf = BASIC_PIPECONF;
         piPipeconfService.cfgService = cfgService;
         piPipeconfService.driverAdminService = driverAdminService;
-        String key = "piPipeconf";
         ObjectMapper mapper = new ObjectMapper();
-        JsonNode jsonNode = mapper.readTree(jsonStream);
         ConfigApplyDelegate delegate = new MockDelegate();
-        piPipeconfConfig.init(DEVICE_ID, key, jsonNode, mapper, delegate);
         String keyBasic = "basic";
         JsonNode jsonNodeBasic = mapper.readTree(jsonStreamBasic);
         basicDeviceConfig.init(DEVICE_ID, keyBasic, jsonNodeBasic, mapper, delegate);
@@ -111,7 +103,6 @@
         assertEquals("Incorrect driver admin service", driverAdminService, piPipeconfService.driverAdminService);
         assertEquals("Incorrect driverAdminService service", driverAdminService, piPipeconfService.driverAdminService);
         assertEquals("Incorrect configuration service", cfgService, piPipeconfService.cfgService);
-        assertTrue("Incorrect config factory", cfgFactories.contains(piPipeconfService.configFactory));
     }
 
     @Test
@@ -120,7 +111,6 @@
         assertEquals("Incorrect driver admin service", null, piPipeconfService.driverAdminService);
         assertEquals("Incorrect driverAdminService service", null, piPipeconfService.driverAdminService);
         assertEquals("Incorrect configuration service", null, piPipeconfService.cfgService);
-        assertFalse("Config factory should be unregistered", cfgFactories.contains(piPipeconfService.configFactory));
     }
 
     @Test
@@ -139,7 +129,8 @@
 
     @Test
     public void mergeDriver() {
-        PiPipeconfId piPipeconfId = cfgService.getConfig(DEVICE_ID, PiPipeconfConfig.class).piPipeconfId();
+        PiPipeconfId piPipeconfId = new PiPipeconfId(cfgService.getConfig(
+                DEVICE_ID, BasicDeviceConfig.class).pipeconf());
         assertEquals(piPipeconf.id(), piPipeconfId);
 
         String baseDriverName = cfgService.getConfig(DEVICE_ID, BasicDeviceConfig.class).driver();
@@ -196,10 +187,7 @@
         @Override
         public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
             DeviceId did = (DeviceId) subject;
-            if (configClass.equals(PiPipeconfConfig.class)
-                    && did.equals(DEVICE_ID)) {
-                return (C) piPipeconfConfig;
-            } else if (configClass.equals(BasicDeviceConfig.class)
+            if (configClass.equals(BasicDeviceConfig.class)
                     && did.equals(DEVICE_ID)) {
                 return (C) basicDeviceConfig;
             }
diff --git a/core/net/src/test/resources/org/onosproject/net/pi/impl/basic.json b/core/net/src/test/resources/org/onosproject/net/pi/impl/basic.json
index 42de9ad..ad2a205 100644
--- a/core/net/src/test/resources/org/onosproject/net/pi/impl/basic.json
+++ b/core/net/src/test/resources/org/onosproject/net/pi/impl/basic.json
@@ -1,3 +1,4 @@
 {
-    "driver": "baseDriver"
-}
\ No newline at end of file
+    "driver": "baseDriver",
+    "pipeconf": "org.onosproject.pipelines.basic"
+}
diff --git a/core/net/src/test/resources/org/onosproject/net/pi/impl/piPipeconfId.json b/core/net/src/test/resources/org/onosproject/net/pi/impl/piPipeconfId.json
deleted file mode 100644
index 7f42958..0000000
--- a/core/net/src/test/resources/org/onosproject/net/pi/impl/piPipeconfId.json
+++ /dev/null
@@ -1,3 +0,0 @@
-{
-    "piPipeconfId": "org.onosproject.pipelines.basic"
-}
\ No newline at end of file
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 7dc903a..e2b826d 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -212,13 +212,13 @@
 import org.onosproject.net.pi.model.PiActionId;
 import org.onosproject.net.pi.model.PiActionParamId;
 import org.onosproject.net.pi.model.PiActionProfileId;
-import org.onosproject.net.pi.model.PiPacketMetadataId;
 import org.onosproject.net.pi.model.PiCounterId;
 import org.onosproject.net.pi.model.PiCounterType;
 import org.onosproject.net.pi.model.PiMatchFieldId;
 import org.onosproject.net.pi.model.PiMatchType;
 import org.onosproject.net.pi.model.PiMeterId;
 import org.onosproject.net.pi.model.PiMeterType;
+import org.onosproject.net.pi.model.PiPacketMetadataId;
 import org.onosproject.net.pi.model.PiPacketOperationType;
 import org.onosproject.net.pi.model.PiPipeconfId;
 import org.onosproject.net.pi.model.PiTableId;
@@ -231,7 +231,6 @@
 import org.onosproject.net.pi.runtime.PiActionProfileMember;
 import org.onosproject.net.pi.runtime.PiActionProfileMemberHandle;
 import org.onosproject.net.pi.runtime.PiActionProfileMemberId;
-import org.onosproject.net.pi.runtime.PiPacketMetadata;
 import org.onosproject.net.pi.runtime.PiCounterCell;
 import org.onosproject.net.pi.runtime.PiCounterCellData;
 import org.onosproject.net.pi.runtime.PiCounterCellId;
@@ -244,13 +243,13 @@
 import org.onosproject.net.pi.runtime.PiLpmFieldMatch;
 import org.onosproject.net.pi.runtime.PiMatchKey;
 import org.onosproject.net.pi.runtime.PiMeterCellId;
+import org.onosproject.net.pi.runtime.PiPacketMetadata;
 import org.onosproject.net.pi.runtime.PiPacketOperation;
 import org.onosproject.net.pi.runtime.PiRangeFieldMatch;
 import org.onosproject.net.pi.runtime.PiTableAction;
 import org.onosproject.net.pi.runtime.PiTableEntry;
 import org.onosproject.net.pi.runtime.PiTableEntryHandle;
 import org.onosproject.net.pi.runtime.PiTernaryFieldMatch;
-import org.onosproject.net.pi.service.PiPipeconfConfig;
 import org.onosproject.net.pi.service.PiTranslatable;
 import org.onosproject.net.pi.service.PiTranslatedEntity;
 import org.onosproject.net.provider.ProviderId;
@@ -708,7 +707,6 @@
                     PiLpmFieldMatch.class,
                     PiMatchKey.class,
                     PiPacketOperation.class,
-                    PiPipeconfConfig.class,
                     PiRangeFieldMatch.class,
                     PiTableAction.class,
                     PiTableEntry.class,
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java
index 3bfca46..4ce9f18 100644
--- a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/AbstractGnmiHandlerBehaviour.java
@@ -16,17 +16,22 @@
 
 package org.onosproject.drivers.gnmi;
 
+import com.google.common.base.Strings;
 import io.grpc.StatusRuntimeException;
 import org.onosproject.gnmi.api.GnmiClient;
 import org.onosproject.gnmi.api.GnmiClientKey;
 import org.onosproject.gnmi.api.GnmiController;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.config.basics.BasicDeviceConfig;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -41,9 +46,6 @@
     private static final String DEVICE_REQ_TIMEOUT = "deviceRequestTimeout";
     private static final int DEFAULT_DEVICE_REQ_TIMEOUT = 60;
 
-    private static final String GNMI_SERVER_ADDR_KEY = "gnmi_ip";
-    private static final String GNMI_SERVER_PORT_KEY = "gnmi_port";
-
     protected final Logger log = LoggerFactory.getLogger(getClass());
     protected DeviceId deviceId;
     protected DeviceService deviceService;
@@ -65,32 +67,33 @@
         return true;
     }
 
-    GnmiClient createClient() {
+    GnmiClient getClientByKey() {
+        final GnmiClientKey clientKey = clientKey();
+        if (clientKey == null) {
+            return null;
+        }
+        return handler().get(GnmiController.class).getClient(clientKey);
+    }
+
+    protected GnmiClientKey clientKey() {
         deviceId = handler().data().deviceId();
-        controller = handler().get(GnmiController.class);
 
-        final String serverAddr = this.data().value(GNMI_SERVER_ADDR_KEY);
-        final String serverPortString = this.data().value(GNMI_SERVER_PORT_KEY);
-
-        if (serverAddr == null || serverPortString == null) {
-            log.warn("Unable to create client for {}, missing driver data key (required is {} and {})",
-                    deviceId, GNMI_SERVER_ADDR_KEY, GNMI_SERVER_PORT_KEY);
+        final BasicDeviceConfig cfg = handler().get(NetworkConfigService.class)
+                .getConfig(deviceId, BasicDeviceConfig.class);
+        if (cfg == null || Strings.isNullOrEmpty(cfg.managementAddress())) {
+            log.error("Missing or invalid config for {}, cannot derive " +
+                              "gNMI server endpoints", deviceId);
             return null;
         }
 
-        final int serverPort;
         try {
-            serverPort = Integer.parseUnsignedInt(serverPortString);
-        } catch (NumberFormatException e) {
-            log.error("{} is not a valid port number", serverPortString);
+            return new GnmiClientKey(
+                    deviceId, new URI(cfg.managementAddress()));
+        } catch (URISyntaxException e) {
+            log.error("Management address of {} is not a valid URI: {}",
+                      deviceId, cfg.managementAddress());
             return null;
         }
-        GnmiClientKey clientKey = new GnmiClientKey(deviceId, serverAddr, serverPort);
-        if (!controller.createClient(clientKey)) {
-            log.warn("Unable to create client for {}, aborting operation", deviceId);
-            return null;
-        }
-        return controller.getClient(deviceId);
     }
 
     /**
diff --git a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiHandshaker.java b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiHandshaker.java
index c443460..a6d0c04 100644
--- a/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiHandshaker.java
+++ b/drivers/gnmi/src/main/java/org/onosproject/drivers/gnmi/GnmiHandshaker.java
@@ -17,32 +17,43 @@
 package org.onosproject.drivers.gnmi;
 
 import org.onosproject.gnmi.api.GnmiClient;
+import org.onosproject.gnmi.api.GnmiClientKey;
 import org.onosproject.gnmi.api.GnmiController;
-import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.device.DeviceHandshaker;
 
 import java.util.concurrent.CompletableFuture;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
 /**
  * Implementation of DeviceHandshaker for gNMI.
  */
 public class GnmiHandshaker extends AbstractGnmiHandlerBehaviour implements DeviceHandshaker {
 
     @Override
-    public CompletableFuture<Boolean> isReachable() {
-        return CompletableFuture
-                // gNMI requires a client to be created to
-                // check for reachability.
-                .supplyAsync(super::createClient)
-                .thenApplyAsync(client -> {
-                    if (client == null) {
-                        return false;
-                    }
-                    return handler()
-                            .get(GnmiController.class)
-                            .isReachable(handler().data().deviceId());
-                });
+    public boolean isReachable() {
+        final GnmiClient client = getClientByKey();
+        return client != null && client.isServerReachable();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> probeReachability() {
+        final GnmiClient client = getClientByKey();
+        if (client == null) {
+            return completedFuture(false);
+        }
+        return client.probeService();
+    }
+
+    @Override
+    public boolean isAvailable() {
+        return isReachable();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> probeAvailability() {
+        return probeReachability();
     }
 
     @Override
@@ -57,43 +68,30 @@
 
     @Override
     public CompletableFuture<Boolean> connect() {
-        return CompletableFuture
-                .supplyAsync(super::createClient)
-                .thenComposeAsync(client -> {
-                    if (client == null) {
-                        return CompletableFuture.completedFuture(false);
-                    }
-                    return CompletableFuture.completedFuture(true);
-                });
+        return CompletableFuture.supplyAsync(this::createClient);
+    }
+
+    private boolean createClient() {
+        GnmiClientKey clientKey = clientKey();
+        if (clientKey == null) {
+            return false;
+        }
+        if (!handler().get(GnmiController.class).createClient(clientKey)) {
+            log.warn("Unable to create client for {}",
+                     handler().data().deviceId());
+            return false;
+        }
+        return true;
     }
 
     @Override
     public boolean isConnected() {
-        final GnmiController controller = handler().get(GnmiController.class);
-        final DeviceId deviceId = handler().data().deviceId();
-        final GnmiClient client = controller.getClient(deviceId);
-
-        if (client == null) {
-            return false;
-        }
-
-        return getFutureWithDeadline(
-                client.isServiceAvailable(),
-                "checking if gNMI service is available", false);
+        return getClientByKey() != null;
     }
 
     @Override
-    public CompletableFuture<Boolean> disconnect() {
-        final GnmiController controller = handler().get(GnmiController.class);
-        final DeviceId deviceId = handler().data().deviceId();
-        final GnmiClient client = controller.getClient(deviceId);
-        if (client == null) {
-            return CompletableFuture.completedFuture(true);
-        }
-        return client.shutdown()
-                .thenApplyAsync(v -> {
-                    controller.removeClient(deviceId);
-                    return true;
-                });
+    public void disconnect() {
+        handler().get(GnmiController.class)
+                .removeClient(handler().data().deviceId());
     }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
index 8a9a403..5326861 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
@@ -16,13 +16,14 @@
 
 package org.onosproject.drivers.p4runtime;
 
-import io.grpc.StatusRuntimeException;
+import com.google.common.base.Strings;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.config.basics.BasicDeviceConfig;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.onosproject.net.pi.model.PiPipeconf;
-import org.onosproject.net.pi.model.PiPipeconfId;
 import org.onosproject.net.pi.model.PiPipelineInterpreter;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.net.pi.service.PiTranslationService;
@@ -32,10 +33,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -44,14 +43,6 @@
  */
 public class AbstractP4RuntimeHandlerBehaviour extends AbstractHandlerBehaviour {
 
-    // Default timeout in seconds for device operations.
-    private static final String DEVICE_REQ_TIMEOUT = "deviceRequestTimeout";
-    private static final int DEFAULT_DEVICE_REQ_TIMEOUT = 60;
-
-    private static final String P4RUNTIME_SERVER_ADDR_KEY = "p4runtime_ip";
-    private static final String P4RUNTIME_SERVER_PORT_KEY = "p4runtime_port";
-    private static final String P4RUNTIME_DEVICE_ID_KEY = "p4runtime_deviceId";
-
     protected final Logger log = LoggerFactory.getLogger(getClass());
 
     // Initialized by setupBehaviour()
@@ -77,30 +68,25 @@
         deviceService = handler().get(DeviceService.class);
         device = deviceService.getDevice(deviceId);
         if (device == null) {
-            log.warn("Unable to find device with id {}, aborting operation", deviceId);
+            log.warn("Unable to find device with id {}", deviceId);
             return false;
         }
 
         controller = handler().get(P4RuntimeController.class);
         client = controller.getClient(deviceId);
         if (client == null) {
-            log.warn("Unable to find client for {}, aborting operation", deviceId);
+            log.warn("Unable to find client for {}", deviceId);
             return false;
         }
 
         PiPipeconfService piPipeconfService = handler().get(PiPipeconfService.class);
-        if (!piPipeconfService.ofDevice(deviceId).isPresent()) {
+        if (!piPipeconfService.getPipeconf(deviceId).isPresent()) {
             log.warn("Unable to get assigned pipeconf for {} (mapping " +
-                             "missing in PiPipeconfService), aborting operation",
+                             "missing in PiPipeconfService)",
                      deviceId);
             return false;
         }
-        PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).get();
-        if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
-            log.warn("Cannot find any pipeconf with ID '{}' ({}), aborting operation", pipeconfId, deviceId);
-            return false;
-        }
-        pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
+        pipeconf = piPipeconfService.getPipeconf(deviceId).get();
 
         translationService = handler().get(PiTranslationService.class);
 
@@ -123,49 +109,38 @@
     }
 
     /**
-     * Returns a P4Runtime client for this device, null if such client cannot be
-     * created.
+     * Returns a P4Runtime client previsouly created for this device, null if
+     * such client does not exist.
      *
      * @return client or null
      */
-    P4RuntimeClient createClient() {
+    P4RuntimeClient getClientByKey() {
+        final P4RuntimeClientKey clientKey = clientKey();
+        if (clientKey == null) {
+            return null;
+        }
+        return handler().get(P4RuntimeController.class).getClient(clientKey);
+    }
+
+    protected P4RuntimeClientKey clientKey() {
         deviceId = handler().data().deviceId();
-        controller = handler().get(P4RuntimeController.class);
 
-        final String serverAddr = this.data().value(P4RUNTIME_SERVER_ADDR_KEY);
-        final String serverPortString = this.data().value(P4RUNTIME_SERVER_PORT_KEY);
-        final String p4DeviceIdString = this.data().value(P4RUNTIME_DEVICE_ID_KEY);
-
-        if (serverAddr == null || serverPortString == null || p4DeviceIdString == null) {
-            log.warn("Unable to create client for {}, missing driver data key (required is {}, {}, and {})",
-                     deviceId, P4RUNTIME_SERVER_ADDR_KEY, P4RUNTIME_SERVER_PORT_KEY, P4RUNTIME_DEVICE_ID_KEY);
+        final BasicDeviceConfig cfg = handler().get(NetworkConfigService.class)
+                .getConfig(deviceId, BasicDeviceConfig.class);
+        if (cfg == null || Strings.isNullOrEmpty(cfg.managementAddress())) {
+            log.error("Missing or invalid config for {}, cannot derive " +
+                              "P4Runtime server endpoints", deviceId);
             return null;
         }
 
-        final int serverPort;
-        final long p4DeviceId;
-
         try {
-            serverPort = Integer.parseUnsignedInt(serverPortString);
-        } catch (NumberFormatException e) {
-            log.error("{} is not a valid P4Runtime port number", serverPortString);
+            return new P4RuntimeClientKey(
+                    deviceId, new URI(cfg.managementAddress()));
+        } catch (URISyntaxException e) {
+            log.error("Management address of {} is not a valid URI: {}",
+                      deviceId, cfg.managementAddress());
             return null;
         }
-        try {
-            p4DeviceId = Long.parseUnsignedLong(p4DeviceIdString);
-        } catch (NumberFormatException e) {
-            log.error("{} is not a valid P4Runtime-internal device ID", p4DeviceIdString);
-            return null;
-        }
-
-        final P4RuntimeClientKey clientKey = new P4RuntimeClientKey(
-                deviceId, serverAddr, serverPort, p4DeviceId);
-        if (!controller.createClient(clientKey)) {
-            log.debug("Unable to create client for {}, aborting operation", deviceId);
-            return null;
-        }
-
-        return controller.getClient(deviceId);
     }
 
     /**
@@ -184,58 +159,4 @@
             return Boolean.parseBoolean(handler().driver().getProperty(propName));
         }
     }
-
-    /**
-     * Returns the device request timeout driver property, or a default value
-     * if the property is not present or cannot be parsed.
-     *
-     * @return timeout value
-     */
-    private int getDeviceRequestTimeout() {
-        final String timeout = handler().driver()
-                .getProperty(DEVICE_REQ_TIMEOUT);
-        if (timeout == null) {
-            return DEFAULT_DEVICE_REQ_TIMEOUT;
-        } else {
-            try {
-                return Integer.parseInt(timeout);
-            } catch (NumberFormatException e) {
-                log.error("{} driver property '{}' is not a number, using default value {}",
-                          DEVICE_REQ_TIMEOUT, timeout, DEFAULT_DEVICE_REQ_TIMEOUT);
-                return DEFAULT_DEVICE_REQ_TIMEOUT;
-            }
-        }
-    }
-
-    /**
-     * Convenience method to get the result of a completable future while
-     * setting a timeout and checking for exceptions.
-     *
-     * @param future        completable future
-     * @param opDescription operation description to use in log messages. Should
-     *                      be a sentence starting with a verb ending in -ing,
-     *                      e.g. "reading...", "writing...", etc.
-     * @param defaultValue  value to return if operation fails
-     * @param <U>           type of returned value
-     * @return future result or default value
-     */
-    <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
-                                U defaultValue) {
-        try {
-            return future.get(getDeviceRequestTimeout(), TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            log.error("Exception while {} on {}", opDescription, deviceId);
-        } catch (ExecutionException e) {
-            final Throwable cause = e.getCause();
-            if (cause instanceof StatusRuntimeException) {
-                final StatusRuntimeException grpcError = (StatusRuntimeException) cause;
-                log.warn("Error while {} on {}: {}", opDescription, deviceId, grpcError.getMessage());
-            } else {
-                log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
-            }
-        } catch (TimeoutException e) {
-            log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
-        }
-        return defaultValue;
-    }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
index 664bd59..916a945 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
@@ -16,7 +16,6 @@
 
 package org.onosproject.drivers.p4runtime;
 
-import org.onlab.util.SharedExecutors;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.behaviour.PiPipelineProgrammable;
 import org.onosproject.net.pi.model.PiPipeconf;
@@ -28,6 +27,7 @@
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -49,59 +49,42 @@
 
     @Override
     public CompletableFuture<Boolean> setPipeconf(PiPipeconf pipeconf) {
-        return CompletableFuture.supplyAsync(
-                () -> doDeployConfig(pipeconf),
-                SharedExecutors.getPoolThreadExecutor());
-    }
-
-    private boolean doDeployConfig(PiPipeconf pipeconf) {
-
         DeviceId deviceId = handler().data().deviceId();
         P4RuntimeController controller = handler().get(P4RuntimeController.class);
 
         P4RuntimeClient client = controller.getClient(deviceId);
         if (client == null) {
             log.warn("Unable to find client for {}, aborting pipeconf deploy", deviceId);
-            return false;
+            return completedFuture(false);
         }
 
         ByteBuffer deviceDataBuffer = createDeviceDataBuffer(pipeconf);
         if (deviceDataBuffer == null) {
             // Hopefully the child class logged the problem.
-            return false;
+            return completedFuture(false);
         }
 
-        final Boolean deploySuccess = getFutureWithDeadline(
-                client.setPipelineConfig(pipeconf, deviceDataBuffer),
-                "deploying pipeconf", null);
-        if (deploySuccess == null) {
-            return false;
-        } else if (!deploySuccess) {
-            log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
-            return false;
-        }
-
-        return true;
+        return client.setPipelineConfig(pipeconf, deviceDataBuffer);
     }
 
     @Override
-    public boolean isPipeconfSet(PiPipeconf pipeconf) {
+    public CompletableFuture<Boolean> isPipeconfSet(PiPipeconf pipeconf) {
         DeviceId deviceId = handler().data().deviceId();
         P4RuntimeController controller = handler().get(P4RuntimeController.class);
 
         P4RuntimeClient client = controller.getClient(deviceId);
         if (client == null) {
             log.warn("Unable to find client for {}, cannot check if pipeconf is set", deviceId);
-            return false;
+            return completedFuture(false);
         }
 
         ByteBuffer deviceDataBuffer = createDeviceDataBuffer(pipeconf);
         if (deviceDataBuffer == null) {
             // Hopefully the child class logged the problem.
-            return false;
+            return completedFuture(false);
         }
 
-        return client.isPipelineConfigSetSync(pipeconf, deviceDataBuffer);
+        return client.isPipelineConfigSet(pipeconf, deviceDataBuffer);
     }
 
     @Override
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeDeviceDescriptionDiscovery.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeDeviceDescriptionDiscovery.java
new file mode 100644
index 0000000..337ce6a
--- /dev/null
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeDeviceDescriptionDiscovery.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.p4runtime;
+
+import org.onlab.packet.ChassisId;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
+import org.onosproject.net.device.DefaultDeviceDescription;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceDescriptionDiscovery;
+import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Implementation of DeviceDescriptionDiscovery for P4Runtime devices.
+ */
+public class P4RuntimeDeviceDescriptionDiscovery
+        extends AbstractHandlerBehaviour implements DeviceDescriptionDiscovery {
+
+    private static final String UNKNOWN = "unknown";
+
+    @Override
+    public DeviceDescription discoverDeviceDetails() {
+        return new DefaultDeviceDescription(
+                data().deviceId().uri(),
+                Device.Type.SWITCH,
+                data().driver().manufacturer(),
+                data().driver().hwVersion(),
+                data().driver().swVersion(),
+                UNKNOWN,
+                new ChassisId(),
+                false,
+                DefaultAnnotations.builder()
+                        .set(AnnotationKeys.PROTOCOL, "P4Runtime")
+                        .build());
+    }
+
+    @Override
+    public List<PortDescription> discoverPortDetails() {
+        return Collections.emptyList();
+    }
+}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
index 4d84b2a..60b0dbc 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
@@ -16,86 +16,142 @@
 
 package org.onosproject.drivers.p4runtime;
 
-import org.onosproject.net.DeviceId;
+import org.onosproject.cluster.ClusterService;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.device.DeviceHandshaker;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 
+import java.math.BigInteger;
 import java.util.concurrent.CompletableFuture;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
 /**
  * Implementation of DeviceHandshaker for P4Runtime.
  */
 public class P4RuntimeHandshaker extends AbstractP4RuntimeHandlerBehaviour implements DeviceHandshaker {
 
+    // This is needed to compute an election ID based on mastership term and
+    // preference. At the time of writing the practical maximum cluster size is
+    // 9. Since election IDs are 128bit numbers, we should'nt be too worried of
+    // being conservative when setting a static max size here. Making the
+    // cluster size dynamic would likely cause conflicts when generating
+    // election IDs (e.g. two nodes seeing different cluster sizes).
+    private static final int MAX_CLUSTER_SIZE = 20;
+
     @Override
     public CompletableFuture<Boolean> connect() {
         return CompletableFuture
-                .supplyAsync(super::createClient)
-                .thenApplyAsync(client -> {
-                    if (client == null) {
-                        return false;
-                    }
-                    client.openSession();
-                    return true;
-                });
+                .supplyAsync(this::createClient);
+    }
+
+    private boolean createClient() {
+        final P4RuntimeClientKey clientKey = clientKey();
+        if (clientKey == null) {
+            return false;
+        }
+        if (!handler().get(P4RuntimeController.class).createClient(clientKey)) {
+            log.debug("Unable to create client for {}", data().deviceId());
+            return false;
+        }
+        return true;
     }
 
     @Override
     public boolean isConnected() {
-        final P4RuntimeController controller = handler().get(P4RuntimeController.class);
-        final DeviceId deviceId = handler().data().deviceId();
-        final P4RuntimeClient client = controller.getClient(deviceId);
-        return client != null && client.isSessionOpen();
+        // This is based on the client key obtained from the current netcfg. If
+        // a client already exists for this device, but the netcfg with the
+        // server endpoints has changed, this will return false.
+        return getClientByKey() != null;
     }
 
     @Override
-    public CompletableFuture<Boolean> disconnect() {
-        final P4RuntimeController controller = handler().get(P4RuntimeController.class);
-        final DeviceId deviceId = handler().data().deviceId();
-        final P4RuntimeClient client = controller.getClient(deviceId);
+    public void disconnect() {
+        // This removes clients associated with this device ID, even if the
+        // netcfg has changed and so the client key for this device.
+        handler().get(P4RuntimeController.class).removeClient(data().deviceId());
+    }
+
+    @Override
+    public boolean isReachable() {
+        final P4RuntimeClient client = getClientByKey();
+        return client != null && client.isServerReachable();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> probeReachability() {
+        final P4RuntimeClient client = getClientByKey();
         if (client == null) {
-            return CompletableFuture.completedFuture(true);
+            return completedFuture(false);
         }
-        return client.shutdown()
-                .thenApplyAsync(v -> {
-                    controller.removeClient(deviceId);
-                    return true;
-                });
+        return client.probeService();
     }
 
     @Override
-    public CompletableFuture<Boolean> isReachable() {
-        return CompletableFuture
-                // P4RuntimeController requires a client to be created to
-                // check for reachability.
-                .supplyAsync(super::createClient)
-                .thenApplyAsync(client -> {
-                    if (client == null) {
-                        return false;
-                    }
-                    return handler()
-                            .get(P4RuntimeController.class)
-                            .isReachable(handler().data().deviceId());
-                });
+    public boolean isAvailable() {
+        // To be available, we require a session open (for packet in/out) and a
+        // pipeline config set.
+        final P4RuntimeClient client = getClientByKey();
+        if (client == null || !client.isServerReachable() || !client.isSessionOpen()) {
+            return false;
+        }
+        // Since we cannot probe the device, we rely on what's known by the
+        // pipeconf watchdog service.
+        return PiPipeconfWatchdogService.PipelineStatus.READY.equals(
+                handler().get(PiPipeconfWatchdogService.class)
+                        .getStatus(data().deviceId()));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> probeAvailability() {
+        // To be available, we require a session open (for packet in/out) and a
+        // pipeline config set.
+        final P4RuntimeClient client = getClientByKey();
+        if (client == null || !client.isServerReachable() || !client.isSessionOpen()) {
+            return completedFuture(false);
+        }
+        return client.isAnyPipelineConfigSet();
     }
 
     @Override
     public void roleChanged(MastershipRole newRole) {
-        if (setupBehaviour() && newRole.equals(MastershipRole.MASTER)) {
-            client.runForMastership();
+        if (newRole.equals(MastershipRole.NONE)) {
+            final P4RuntimeClient client = getClientByKey();
+            if (client != null) {
+                log.info("Notified role NONE, closing session...");
+                client.closeSession();
+            }
+        } else {
+            throw new UnsupportedOperationException(
+                    "Use preference-based way for setting MASTER or STANDBY roles");
+        }
+    }
+
+    @Override
+    public void roleChanged(int preference, long term) {
+        if (setupBehaviour()) {
+            final int clusterSize = handler().get(ClusterService.class)
+                    .getNodes().size();
+            if (clusterSize > MAX_CLUSTER_SIZE) {
+                throw new IllegalStateException(
+                        "Cluster too big! Maz size supported is " + MAX_CLUSTER_SIZE);
+            }
+            BigInteger electionId = BigInteger.valueOf(term)
+                    .multiply(BigInteger.valueOf(MAX_CLUSTER_SIZE))
+                    .subtract(BigInteger.valueOf(preference));
+            client.setMastership(preference == 0, electionId);
         }
     }
 
     @Override
     public MastershipRole getRole() {
-        final P4RuntimeController controller = handler().get(P4RuntimeController.class);
-        final DeviceId deviceId = handler().data().deviceId();
-        final P4RuntimeClient client = controller.getClient(deviceId);
-        if (client == null || !client.isSessionOpen()) {
+        final P4RuntimeClient client = getClientByKey();
+        if (client == null || !client.isServerReachable() || !client.isSessionOpen()) {
             return MastershipRole.NONE;
         }
         return client.isMaster() ? MastershipRole.MASTER : MastershipRole.STANDBY;
diff --git a/drivers/p4runtime/src/main/resources/p4runtime-drivers.xml b/drivers/p4runtime/src/main/resources/p4runtime-drivers.xml
index 78188a7..0750805 100644
--- a/drivers/p4runtime/src/main/resources/p4runtime-drivers.xml
+++ b/drivers/p4runtime/src/main/resources/p4runtime-drivers.xml
@@ -16,6 +16,8 @@
   -->
 <drivers>
     <driver name="p4runtime" manufacturer="p4.org" hwVersion="master" swVersion="master">
+        <behaviour api="org.onosproject.net.device.DeviceDescriptionDiscovery"
+                   impl="org.onosproject.drivers.p4runtime.P4RuntimeDeviceDescriptionDiscovery"/>
         <behaviour api="org.onosproject.net.behaviour.TableStatisticsDiscovery"
                    impl="org.onosproject.drivers.p4runtime.P4RuntimeTableStatisticsDiscovery"/>
         <behaviour api="org.onosproject.net.device.DeviceHandshaker"
diff --git a/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumHandshaker.java b/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumHandshaker.java
index 4e169a5..8762af0 100644
--- a/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumHandshaker.java
+++ b/drivers/stratum/src/main/java/org/onosproject/drivers/stratum/StratumHandshaker.java
@@ -16,10 +16,8 @@
 
 package org.onosproject.drivers.stratum;
 
-import io.grpc.StatusRuntimeException;
 import org.onosproject.drivers.gnmi.GnmiHandshaker;
 import org.onosproject.drivers.p4runtime.P4RuntimeHandshaker;
-import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.device.DeviceHandshaker;
@@ -27,109 +25,97 @@
 import org.onosproject.net.driver.DriverData;
 import org.onosproject.net.driver.DriverHandler;
 import org.onosproject.net.provider.ProviderId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /**
  * Implementation of DeviceHandshaker for Stratum device.
  */
 public class StratumHandshaker extends AbstractHandlerBehaviour implements DeviceHandshaker {
 
-    private static final Logger log = LoggerFactory.getLogger(StratumHandshaker.class);
-    private static final int DEFAULT_DEVICE_REQ_TIMEOUT = 10;
-
-    private P4RuntimeHandshaker p4RuntimeHandshaker;
-    private GnmiHandshaker gnmiHandshaker;
-    private DeviceId deviceId;
+    private P4RuntimeHandshaker p4runtime;
+    private GnmiHandshaker gnmi;
 
     public StratumHandshaker() {
-        p4RuntimeHandshaker = new P4RuntimeHandshaker();
-        gnmiHandshaker = new GnmiHandshaker();
+        p4runtime = new P4RuntimeHandshaker();
+        gnmi = new GnmiHandshaker();
     }
 
     @Override
     public void setHandler(DriverHandler handler) {
         super.setHandler(handler);
-        p4RuntimeHandshaker.setHandler(handler);
-        gnmiHandshaker.setHandler(handler);
+        p4runtime.setHandler(handler);
+        gnmi.setHandler(handler);
     }
 
     @Override
     public void setData(DriverData data) {
         super.setData(data);
-        p4RuntimeHandshaker.setData(data);
-        gnmiHandshaker.setData(data);
-        deviceId = data.deviceId();
+        p4runtime.setData(data);
+        gnmi.setData(data);
     }
 
     @Override
-    public CompletableFuture<Boolean> isReachable() {
-        return p4RuntimeHandshaker.isReachable()
-                .thenCombine(gnmiHandshaker.isReachable(), Boolean::logicalAnd);
+    public boolean isReachable() {
+        return p4runtime.isReachable() && gnmi.isReachable();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> probeReachability() {
+        return p4runtime.probeReachability();
+    }
+
+    @Override
+    public boolean isAvailable() {
+        // Availability concerns packet forwarding, hence we consider only
+        // P4Runtime.
+        return p4runtime.isAvailable();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> probeAvailability() {
+        return p4runtime.probeAvailability();
     }
 
     @Override
     public void roleChanged(MastershipRole newRole) {
-        p4RuntimeHandshaker.roleChanged(newRole);
-        // gNMI doesn't support mastership handling.
+        p4runtime.roleChanged(newRole);
+    }
+
+    @Override
+    public void roleChanged(int preference, long term) {
+        p4runtime.roleChanged(preference, term);
     }
 
     @Override
     public MastershipRole getRole() {
-        return p4RuntimeHandshaker.getRole();
+        return p4runtime.getRole();
     }
 
     @Override
     public void addDeviceAgentListener(ProviderId providerId, DeviceAgentListener listener) {
-        p4RuntimeHandshaker.addDeviceAgentListener(providerId, listener);
+        p4runtime.addDeviceAgentListener(providerId, listener);
     }
 
     @Override
     public void removeDeviceAgentListener(ProviderId providerId) {
-        p4RuntimeHandshaker.removeDeviceAgentListener(providerId);
+        p4runtime.removeDeviceAgentListener(providerId);
     }
 
     @Override
     public CompletableFuture<Boolean> connect() {
-        return p4RuntimeHandshaker.connect()
-                .thenCombine(gnmiHandshaker.connect(), Boolean::logicalAnd);
+        // We should execute connections in parallel.
+        return p4runtime.connect().thenCombine(gnmi.connect(), Boolean::logicalAnd);
     }
 
     @Override
     public boolean isConnected() {
-        final CompletableFuture<Boolean> p4runtimeConnected =
-                CompletableFuture.supplyAsync(p4RuntimeHandshaker::isConnected);
-        final CompletableFuture<Boolean> gnmiConnected =
-                CompletableFuture.supplyAsync(gnmiHandshaker::isConnected);
-
-        try {
-            return p4runtimeConnected
-                    .thenCombine(gnmiConnected, Boolean::logicalAnd)
-                    .get(DEFAULT_DEVICE_REQ_TIMEOUT, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            log.error("Exception while checking connectivity on {}", data().deviceId());
-        } catch (ExecutionException e) {
-            final Throwable cause = e.getCause();
-            if (cause instanceof StatusRuntimeException) {
-                final StatusRuntimeException grpcError = (StatusRuntimeException) cause;
-                log.warn("Error while checking connectivity on {}: {}", deviceId, grpcError.getMessage());
-            } else {
-                log.error("Exception while checking connectivity on {}", deviceId, e.getCause());
-            }
-        } catch (TimeoutException e) {
-            log.error("Operation TIMEOUT while checking connectivity on {}", deviceId);
-        }
-        return false;
+        return p4runtime.isConnected() && gnmi.isConnected();
     }
 
     @Override
-    public CompletableFuture<Boolean> disconnect() {
-        return p4RuntimeHandshaker.disconnect()
-                .thenCombine(gnmiHandshaker.disconnect(), Boolean::logicalAnd);
+    public void disconnect() {
+        p4runtime.disconnect();
+        gnmi.disconnect();
     }
 }
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/FabricIntProgrammable.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/FabricIntProgrammable.java
index a2bf569..d593262 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/FabricIntProgrammable.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/FabricIntProgrammable.java
@@ -16,9 +16,6 @@
 package org.onosproject.pipelines.fabric;
 
 import com.google.common.collect.Sets;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.inbandtelemetry.api.IntConfig;
@@ -28,6 +25,7 @@
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -44,7 +42,8 @@
 import org.onosproject.net.pi.model.PiTableId;
 import org.onosproject.net.pi.runtime.PiAction;
 import org.onosproject.net.pi.runtime.PiActionParam;
-import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
 
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -107,30 +106,27 @@
             return false;
         }
 
-        final GeneralProviderDeviceConfig cfg = cfgService.getConfig(
-                deviceId, GeneralProviderDeviceConfig.class);
-        if (cfg == null) {
-            log.warn("Missing GeneralProviderDevice config for {}", deviceId);
-            return false;
-        }
-        final String switchId = cfg.protocolsInfo().containsKey("int") ?
-                cfg.protocolsInfo().get("int").configValues().get("switchId")
-                : null;
-        if (switchId == null || switchId.isEmpty()) {
-            log.warn("Missing INT device config for {}", deviceId);
-            return false;
-        }
+        // FIXME: create config class for INT to allow specifying arbitrary
+        //  switch IDs. The one for the GeneralDeviceProvider was temporary and
+        //  now has been removed. For now we use the chassis ID.
+        // final GeneralProviderDeviceConfig cfg = cfgService.getConfig(
+        //         deviceId, GeneralProviderDeviceConfig.class);
+        // if (cfg == null) {
+        //     log.warn("Missing GeneralProviderDevice config for {}", deviceId);
+        //     return false;
+        // }
+        // final String switchId = cfg.protocolsInfo().containsKey("int") ?
+        //         cfg.protocolsInfo().get("int").configValues().get("switchId")
+        //         : null;
+        // if (switchId == null || switchId.isEmpty()) {
+        //     log.warn("Missing INT device config for {}", deviceId);
+        //     return false;
+        // }
 
-        PiActionParam transitIdParam;
-        try {
-            transitIdParam = new PiActionParam(
-                    FabricConstants.SWITCH_ID,
-                    // FIXME set switch ID from netcfg
-                    copyFrom(Integer.parseInt(switchId)));
-        } catch (NumberFormatException e) {
-            log.warn("Invalid INT switch ID for {}: {}", deviceId, switchId);
-            return false;
-        }
+        PiActionParam transitIdParam = new PiActionParam(
+                FabricConstants.SWITCH_ID,
+                copyFrom(handler().get(DeviceService.class)
+                                 .getDevice(deviceId).chassisId().id()));
 
         PiAction transitAction = PiAction.builder()
                 .withId(FabricConstants.FABRIC_EGRESS_PROCESS_INT_MAIN_PROCESS_INT_TRANSIT_INIT_METADATA)
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
index 6e65dd3..796ef55 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
@@ -68,12 +68,4 @@
      * Terminates the subscription channel of this device.
      */
     void terminateSubscriptionChannel();
-
-    /**
-     * Check weather the gNMI service is available or not by sending a dummy get
-     * request message.
-     *
-     * @return true if gNMI service available; false otherwise
-     */
-    CompletableFuture<Boolean> isServiceAvailable();
 }
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java
index 137cbb8..3a66c18 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClientKey.java
@@ -20,6 +20,8 @@
 import org.onosproject.grpc.api.GrpcClientKey;
 import org.onosproject.net.DeviceId;
 
+import java.net.URI;
+
 /**
  * Key that uniquely identifies a gNMI client.
  */
@@ -31,11 +33,10 @@
     /**
      * Creates a new gNMI client key.
      *
-     * @param deviceId    ONOS device ID
-     * @param serverAddr  gNMI server address
-     * @param serverPort  gNMI server port
+     * @param deviceId  ONOS device ID
+     * @param serverUri gNMI server URI
      */
-    public GnmiClientKey(DeviceId deviceId, String serverAddr, int serverPort) {
-        super(GNMI, deviceId, serverAddr, serverPort);
+    public GnmiClientKey(DeviceId deviceId, URI serverUri) {
+        super(GNMI, deviceId, serverUri);
     }
 }
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index 3701ff7..6645f39 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -50,7 +50,7 @@
     private GnmiSubscriptionManager gnmiSubscriptionManager;
 
     GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
-        super(clientKey);
+        super(clientKey, managedChannel, false, controller);
         this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
         this.gnmiSubscriptionManager =
                 new GnmiSubscriptionManager(managedChannel, deviceId, controller);
@@ -82,7 +82,7 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> isServiceAvailable() {
+    public CompletableFuture<Boolean> probeService() {
         return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
     }
 
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
index c0bdcdd..9c4643a 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
@@ -23,47 +23,24 @@
 import org.onosproject.gnmi.api.GnmiEvent;
 import org.onosproject.gnmi.api.GnmiEventListener;
 import org.onosproject.grpc.ctl.AbstractGrpcClientController;
-import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.slf4j.Logger;
-
-import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Implementation of gNMI controller.
  */
 @Component(immediate = true, service = GnmiController.class)
 public class GnmiControllerImpl
-        extends AbstractGrpcClientController<GnmiClientKey, GnmiClient, GnmiEvent, GnmiEventListener>
+        extends AbstractGrpcClientController
+        <GnmiClientKey, GnmiClient, GnmiEvent, GnmiEventListener>
         implements GnmiController {
 
-    private final Logger log = getLogger(getClass());
-
-    @Activate
-    public void activate() {
-        super.activate();
-        eventDispatcher.addSink(GnmiEvent.class, listenerRegistry);
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        super.deactivate();
-        log.info("Stopped");
+    public GnmiControllerImpl() {
+        super(GnmiEvent.class);
     }
 
     @Override
-    protected GnmiClient createClientInstance(GnmiClientKey clientKey, ManagedChannel channel) {
+    protected GnmiClient createClientInstance(
+            GnmiClientKey clientKey, ManagedChannel channel) {
         return new GnmiClientImpl(clientKey, channel, this);
     }
-
-    /**
-     * Handles event from gNMI client.
-     *
-     * @param event the gNMI event
-     */
-    void postEvent(GnmiEvent event) {
-        post(event);
-    }
 }
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
index 5b07254..ad4324b 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java
@@ -23,6 +23,7 @@
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Abstraction of a gRPC controller that stores and manages gRPC channels.
@@ -33,10 +34,10 @@
     int CONNECTION_TIMEOUT_SECONDS = 20;
 
     /**
-     * Creates a gRPC managed channel from the given builder and opens a
-     * connection to it. If the connection is successful returns the managed
-     * channel object and stores the channel internally, associated with the
-     * given channel ID.
+     * Creates a gRPC managed channel from the given builder and opens the
+     * connection. If the connection is successful, returns the managed channel
+     * object and stores the channel internally, associated with the given
+     * channel ID.
      * <p>
      * This method blocks until the channel is open or a timeout expires. By
      * default the timeout is {@link #CONNECTION_TIMEOUT_SECONDS} seconds. If
@@ -72,15 +73,6 @@
     Map<GrpcChannelId, ManagedChannel> getChannels();
 
     /**
-     * Returns true if the channel associated with the given identifier is open,
-     * i.e. the server is able to successfully reply to RPCs, false otherwise.
-     *
-     * @param channelId channel ID
-     * @return true if channel is open, false otherwise.
-     */
-    boolean isChannelOpen(GrpcChannelId channelId);
-
-    /**
      * If present, returns the channel associated with the given ID.
      *
      * @param channelId channel ID
@@ -88,4 +80,14 @@
      */
     Optional<ManagedChannel> getChannel(GrpcChannelId channelId);
 
+    /**
+     * Probes the server at the endpoint of the given channel. Returns true if
+     * the server responded to the probe, false otherwise or if the channel does
+     * not exist.
+     *
+     * @param channelId channel ID
+     * @return completable future eventually true if the gRPC server responded
+     * to the probe; false otherwise
+     */
+    CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId);
 }
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
index d040a23..d529529 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClient.java
@@ -17,11 +17,11 @@
 package org.onosproject.grpc.api;
 
 import com.google.common.annotations.Beta;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
  * Abstraction of a gRPC client.
- *
  */
 @Beta
 public interface GrpcClient {
@@ -33,4 +33,30 @@
      * procedure
      */
     CompletableFuture<Void> shutdown();
+
+    /**
+     * This method provides a coarse modelling of gRPC channel {@link
+     * io.grpc.ConnectivityState}. The implementation does not make any attempt
+     * at probing the server by sending messages over the network, as such it
+     * does not block execution. It returns true if this client is expected to
+     * communicate with the server successfully. In other words, if any RPC
+     * would be executed immediately after this method is called and returns
+     * true, then it is expected, but NOT guaranteed, for that RPC message to
+     * reach the server and be processed. If false, it means the channel is in a
+     * failure state and communication with the server is unlikely to happen
+     * soon.
+     *
+     * @return true if server is deemed reachable, false otherwise
+     */
+    boolean isServerReachable();
+
+    /**
+     * Similar to {@link #isServerReachable()}, but might involve sending
+     * packets over the network. This checks whether the specific gRPC
+     * service(s) required by this client is available or not on the server.
+     *
+     * @return completable future eventually true if the gRPC service(s) on the
+     * server was available during the probe; false otherwise
+     */
+    CompletableFuture<Boolean> probeService();
 }
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
index 3cdbcd1..b47d499 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientController.java
@@ -18,10 +18,11 @@
 
 import com.google.common.annotations.Beta;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.provider.ProviderId;
 
 /**
- * Abstraction of a gRPC controller which controls specific gRPC client {@link
- * C} with specific client key {@link K}.
+ * Abstraction of controller that manages gRPC clients.
  *
  * @param <K> the gRPC client key
  * @param <C> the gRPC client type
@@ -34,20 +35,27 @@
      * given information. As a result of this method, a client can be later
      * obtained by invoking {@link #getClient(DeviceId)}.
      * <p>
+     * Upon creation, a connection to the server is automatically started, which
+     * blocks execution. If the connection is successful, the client is created
+     * and this method returns true, otherwise (e.g., socket error) any state
+     * associated with this client is destroyed and returns false.
+     * <p>
      * Only one client can exist for the same device ID. Calls to this method
      * are idempotent fot the same client key, i.e. returns true if such client
-     * already exists but a new one is not created. If there exists a client
-     * with same device ID but different address and port, removes old one and
-     * recreate new one.
+     * already exists. Otherwise, if a client for the same device ID but
+     * different client key already exists, throws an exception.
      *
      * @param clientKey the client key
      * @return true if the client was created and the channel to the server is
      * open; false otherwise
+     * @throws IllegalArgumentException if a client for the same device ID but
+     *                                  different client key already exists.
      */
     boolean createClient(K clientKey);
 
     /**
-     * Retrieves the gRPC client to operate on the given device.
+     * Returns the gRPC client previously created for the given device, or null
+     * if such client does not exist.
      *
      * @param deviceId the device identifier
      * @return the gRPC client of the device if exists; null otherwise
@@ -55,24 +63,47 @@
     C getClient(DeviceId deviceId);
 
     /**
-     * Removes the gRPC client for the given device. If no client exists for the
-     * given device, the result is a no-op.
+     * Returns the gRPC client previously created for the given client key, or
+     * null if such client does not exist.
+     *
+     * @param clientKey client key
+     * @return the gRPC client of the device if exists; null otherwise
+     */
+    C getClient(K clientKey);
+
+    /**
+     * Removes the gRPC client for the given device and any gRPC channel state
+     * associated to it. If no client exists for the given device, the result is
+     * a no-op.
      *
      * @param deviceId the device identifier
      */
     void removeClient(DeviceId deviceId);
 
     /**
-     * Check reachability of the gRPC server running on the given device.
-     * Reachability can be tested only if a client is previously created using
-     * {@link #createClient(GrpcClientKey)}. Note that this only checks the
-     * reachability instead of checking service availability, different
-     * service-specific gRPC clients might check service availability in a
-     * different way.
+     * Similar to {@link #removeClient(DeviceId)} but uses the client key to
+     * identify the client to remove.
      *
-     * @param deviceId the device identifier
-     * @return true if client was created and is able to contact the gNMI
-     * server; false otherwise
+     * @param clientKey the client key
      */
-    boolean isReachable(DeviceId deviceId);
+    void removeClient(K clientKey);
+
+    /**
+     * Adds a listener for device agent events for the given provider.
+     *
+     * @param deviceId device identifier
+     * @param providerId provider ID
+     * @param listener the device agent listener
+     */
+    void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId,
+                                DeviceAgentListener listener);
+
+    /**
+     * Removes the listener for device agent events that was previously
+     * registered for the given provider.
+     *
+     * @param deviceId   device identifier
+     * @param providerId the provider ID
+     */
+    void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId);
 }
diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
index 99ea23f..d1d0b0f 100644
--- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
+++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcClientKey.java
@@ -20,8 +20,11 @@
 import com.google.common.base.Objects;
 import org.onosproject.net.DeviceId;
 
+import java.net.URI;
+
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.lang.String.format;
 
 /**
@@ -29,32 +32,36 @@
  */
 @Beta
 public class GrpcClientKey {
+
+    private static final String GRPC = "grpc";
+    private static final String GRPCS = "grpcs";
+
     private final String serviceName;
     private final DeviceId deviceId;
-    private final String serverAddr;
-    private final int serverPort;
+    private final URI serverUri;
 
     /**
      * Creates a new client key.
      *
      * @param serviceName gRPC service name of the client
-     * @param deviceId ONOS device ID
-     * @param serverAddr gRPC server address
-     * @param serverPort gRPC server port
+     * @param deviceId    ONOS device ID
+     * @param serverUri   gRPC server URI
      */
-    public GrpcClientKey(String serviceName, DeviceId deviceId, String serverAddr, int serverPort) {
+    public GrpcClientKey(String serviceName, DeviceId deviceId, URI serverUri) {
         checkNotNull(serviceName);
         checkNotNull(deviceId);
-        checkNotNull(serverAddr);
+        checkNotNull(serverUri);
         checkArgument(!serviceName.isEmpty(),
-                "Service name can not be null");
-        checkArgument(!serverAddr.isEmpty(),
-                "Server address should not be empty");
-        checkArgument(serverPort > 0 && serverPort <= 65535, "Invalid server port");
+                      "Service name can not be null");
+        checkArgument(serverUri.getScheme().equals(GRPC)
+                              || serverUri.getScheme().equals(GRPCS),
+                      format("Server URI scheme must be %s or %s", GRPC, GRPCS));
+        checkArgument(!isNullOrEmpty(serverUri.getHost()),
+                      "Server host address should not be empty");
+        checkArgument(serverUri.getPort() > 0 && serverUri.getPort() <= 65535, "Invalid server port");
         this.serviceName = serviceName;
         this.deviceId = deviceId;
-        this.serverAddr = serverAddr;
-        this.serverPort = serverPort;
+        this.serverUri = serverUri;
     }
 
     /**
@@ -76,21 +83,21 @@
     }
 
     /**
-     * Gets the gRPC server address.
+     * Returns the gRPC server URI.
      *
-     * @return the gRPC server address.
+     * @return the gRPC server URI.
      */
-    public String serverAddr() {
-        return serverAddr;
+    public URI serveUri() {
+        return serverUri;
     }
 
     /**
-     * Gets the gRPC server port.
+     * Returns true if the client requires TLS/SSL, false otherwise.
      *
-     * @return the gRPC server port.
+     * @return boolean
      */
-    public int serverPort() {
-        return serverPort;
+    public boolean requiresSecureChannel() {
+        return serverUri.getScheme().equals(GRPCS);
     }
 
     @Override
@@ -102,19 +109,18 @@
             return false;
         }
         GrpcClientKey that = (GrpcClientKey) o;
-        return serverPort == that.serverPort &&
-                Objects.equal(serviceName, that.serviceName) &&
+        return Objects.equal(serviceName, that.serviceName) &&
                 Objects.equal(deviceId, that.deviceId) &&
-                Objects.equal(serverAddr, that.serverAddr);
+                Objects.equal(serverUri, that.serverUri);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hashCode(serviceName, deviceId, serverAddr, serverPort);
+        return Objects.hashCode(serviceName, deviceId, serverUri);
     }
 
     @Override
     public String toString() {
-        return format("%s/%s@%s:%s", deviceId, serviceName, serverAddr, serverPort);
+        return format("%s/%s@%s", deviceId, serviceName, serverUri);
     }
 }
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
index 0b21ca2..117c6e3 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClient.java
@@ -16,11 +16,14 @@
 
 package org.onosproject.grpc.ctl;
 
+import io.grpc.ConnectivityState;
 import io.grpc.Context;
+import io.grpc.ManagedChannel;
 import io.grpc.StatusRuntimeException;
 import org.onosproject.grpc.api.GrpcClient;
 import org.onosproject.grpc.api.GrpcClientKey;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
 import org.slf4j.Logger;
 
 import java.util.concurrent.CompletableFuture;
@@ -28,6 +31,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
@@ -54,13 +58,51 @@
 
     protected final ExecutorService executorService;
     protected final DeviceId deviceId;
+    protected final ManagedChannel channel;
+    private final boolean persistent;
+    private final AbstractGrpcClientController controller;
+    private final AtomicBoolean channelOpen = new AtomicBoolean(false);
 
-    protected AbstractGrpcClient(GrpcClientKey clientKey) {
+    /**
+     * Creates an new client for the given key and channel. Setting persistent
+     * to true avoids the gRPC channel to stay IDLE. The controller instance is
+     * needed to propagate channel events.
+     *
+     * @param clientKey  client key
+     * @param channel    channel
+     * @param persistent true if the gRPC should never stay IDLE
+     * @param controller controller
+     */
+    protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel,
+                                 boolean persistent, AbstractGrpcClientController controller) {
         checkNotNull(clientKey);
+        checkNotNull(channel);
         this.deviceId = clientKey.deviceId();
+        this.channel = channel;
+        this.persistent = persistent;
+        this.controller = controller;
         this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
                 "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
+
+        setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
+    }
+
+    @Override
+    public boolean isServerReachable() {
+        final ConnectivityState state = channel.getState(false);
+        switch (state) {
+            case READY:
+            case IDLE:
+                return true;
+            case CONNECTING:
+            case TRANSIENT_FAILURE:
+            case SHUTDOWN:
+                return false;
+            default:
+                log.error("Unrecognized channel connectivity state {}", state);
+                return false;
+        }
     }
 
     @Override
@@ -171,4 +213,85 @@
             }
         }, executor);
     }
+
+    private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
+                                    ConnectivityState sourceState) {
+        if (log.isTraceEnabled()) {
+            log.trace("Setting channel callback for {} with source state {}...",
+                      deviceId, sourceState);
+        }
+        channel.notifyWhenStateChanged(
+                sourceState, new ChannelConnectivityCallback(deviceId, channel));
+    }
+
+    /**
+     * Runnable task invoked at each change of the channel connectivity state.
+     * New callbacks are created as long as the channel is not shut down.
+     */
+    private final class ChannelConnectivityCallback implements Runnable {
+
+        private final DeviceId deviceId;
+        private final ManagedChannel channel;
+
+        private ChannelConnectivityCallback(
+                DeviceId deviceId, ManagedChannel channel) {
+            this.deviceId = deviceId;
+            this.channel = channel;
+        }
+
+        @Override
+        public void run() {
+            final ConnectivityState newState = channel.getState(false);
+            final DeviceAgentEvent.Type eventType;
+            switch (newState) {
+                // On gRPC connectivity states:
+                // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
+                case READY:
+                    eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+                    break;
+                case TRANSIENT_FAILURE:
+                    eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
+                    break;
+                case SHUTDOWN:
+                    eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+                    break;
+                case IDLE:
+                    // IDLE and CONNECTING are transient states that will
+                    // eventually move to READY or TRANSIENT_FAILURE. Do not
+                    // generate an event for now.
+                    if (persistent) {
+                        log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
+                        channel.getState(true);
+                    }
+                    eventType = null;
+                    break;
+                case CONNECTING:
+                    eventType = null;
+                    break;
+                default:
+                    log.error("Unrecognized connectivity state {}", newState);
+                    eventType = null;
+            }
+
+            if (log.isTraceEnabled()) {
+                log.trace("Detected channel connectivity change for {}, new state is {}",
+                          deviceId, newState);
+            }
+
+            if (eventType != null) {
+                // Avoid sending consecutive duplicate events.
+                final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
+                final boolean past = channelOpen.getAndSet(present);
+                if (present != past) {
+                    log.debug("Notifying event {} for {}", eventType, deviceId);
+                    controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
+                }
+            }
+
+            if (newState != ConnectivityState.SHUTDOWN) {
+                // Channels never leave SHUTDOWN state, no need for a new callback.
+                setChannelCallback(deviceId, channel, newState);
+            }
+        }
+    }
 }
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
index c0a3c6b..3c38209 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java
@@ -21,7 +21,6 @@
 import io.grpc.ManagedChannel;
 import io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.NettyChannelBuilder;
-import io.netty.handler.ssl.NotSslRecordException;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import org.onosproject.event.AbstractListenerManager;
@@ -33,6 +32,9 @@
 import org.onosproject.grpc.api.GrpcClientController;
 import org.onosproject.grpc.api.GrpcClientKey;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.provider.ProviderId;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
@@ -41,11 +43,11 @@
 
 import javax.net.ssl.SSLException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 import static java.lang.String.format;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -74,40 +76,44 @@
     private final Map<DeviceId, K> clientKeys = Maps.newHashMap();
     private final Map<K, C> clients = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+    private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
+            deviceAgentListeners = Maps.newConcurrentMap();
+    private final Class<E> eventClass;
     private final Striped<Lock> stripedLocks = Striped.lock(DEFAULT_DEVICE_LOCK_SIZE);
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected GrpcChannelController grpcChannelController;
 
+    public AbstractGrpcClientController(Class<E> eventClass) {
+        this.eventClass = eventClass;
+    }
+
     @Activate
     public void activate() {
+        eventDispatcher.addSink(eventClass, listenerRegistry);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        eventDispatcher.removeSink(eventClass);
         clientKeys.keySet().forEach(this::removeClient);
         clientKeys.clear();
         clients.clear();
         channelIds.clear();
+        deviceAgentListeners.clear();
         log.info("Stopped");
     }
 
     @Override
     public boolean createClient(K clientKey) {
         checkNotNull(clientKey);
-        /*
-            FIXME we might want to move "useTls" and "fallback" to properties of the netcfg and clientKey
-                  For now, we will first try to connect with TLS (accepting any cert), then fall back to
-                  plaintext for every device
-         */
-        return withDeviceLock(() -> doCreateClient(clientKey, true, true), clientKey.deviceId());
+        return withDeviceLock(() -> doCreateClient(clientKey),
+                              clientKey.deviceId());
     }
 
-    private boolean doCreateClient(K clientKey, boolean useTls, boolean fallbackToPlainText) {
+    private boolean doCreateClient(K clientKey) {
         DeviceId deviceId = clientKey.deviceId();
-        String serverAddr = clientKey.serverAddr();
-        int serverPort = clientKey.serverPort();
 
         if (clientKeys.containsKey(deviceId)) {
             final GrpcClientKey existingKey = clientKeys.get(deviceId);
@@ -116,19 +122,25 @@
                           clientName(clientKey), clientKey);
                 return true;
             } else {
-                log.info("Requested new {} with updated key, removing old client... (oldKey={})",
-                         clientName(clientKey), existingKey);
-                doRemoveClient(deviceId);
+                throw new IllegalArgumentException(format(
+                        "A client already exists for device %s (%s)",
+                        deviceId, clientKey));
             }
         }
 
-        log.info("Creating client for {} (server={}:{})...", deviceId, serverAddr, serverPort);
+        final String method = clientKey.requiresSecureChannel()
+                ? "TLS" : "plaintext TCP";
+
+        log.info("Connecting {} client for {} to server at {} using {}...",
+                 clientKey.serviceName(), deviceId, clientKey.serveUri(), method);
 
         SslContext sslContext = null;
-        if (useTls) {
+        if (clientKey.requiresSecureChannel()) {
             try {
-                // Accept any server certificate; this is insecure and should not be used in production
-                sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
+                // FIXME: Accept any server certificate; this is insecure and
+                //  should not be used in production
+                sslContext = GrpcSslContexts.forClient().trustManager(
+                        InsecureTrustManagerFactory.INSTANCE).build();
             } catch (SSLException e) {
                 log.error("Failed to build SSL Context", e);
                 return false;
@@ -137,17 +149,15 @@
 
         GrpcChannelId channelId = GrpcChannelId.of(clientKey.toString());
         NettyChannelBuilder channelBuilder = NettyChannelBuilder
-                .forAddress(serverAddr, serverPort)
+                .forAddress(clientKey.serveUri().getHost(),
+                            clientKey.serveUri().getPort())
                 .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES);
+
         if (sslContext != null) {
-            log.debug("Using SSL for gRPC connection to {}", deviceId);
             channelBuilder
                     .sslContext(sslContext)
                     .useTransportSecurity();
         } else {
-            checkState(!useTls,
-                    "Not authorized to use plaintext for gRPC connection to {}", deviceId);
-            log.debug("Using plaintext TCP for gRPC connection to {}", deviceId);
             channelBuilder.usePlaintext();
         }
 
@@ -156,24 +166,9 @@
         try {
             channel = grpcChannelController.connectChannel(channelId, channelBuilder);
         } catch (Throwable e) {
-            for (Throwable cause = e; cause != null; cause = cause.getCause()) {
-                if (useTls && cause instanceof NotSslRecordException) {
-                    // Likely root cause is that server is using plaintext
-                    log.info("Failed to connect to server (device={}) using TLS", deviceId);
-                    log.debug("TLS connection exception", e);
-                    if (fallbackToPlainText) {
-                        log.info("Falling back to plaintext for connection to {}", deviceId);
-                        return doCreateClient(clientKey, false, false);
-                    }
-                }
-                if (!useTls && "Connection reset by peer".equals(cause.getMessage())) {
-                    // Not a great signal, but could indicate the server is expected a TLS connection
-                    log.error("Failed to connect to server (device={}) using plaintext TCP; is the server using TLS?",
-                            deviceId);
-                    break;
-                }
-            }
-            log.warn("Unable to connect to gRPC server for {}", deviceId, e);
+            log.warn("Failed to connect to {} ({}) using {}: {}",
+                     deviceId, clientKey.serveUri(), method, e.toString());
+            log.debug("gRPC client connection exception", e);
             return false;
         }
 
@@ -216,37 +211,79 @@
     }
 
     @Override
+    public C getClient(K clientKey) {
+        checkNotNull(clientKey);
+        return clients.get(clientKey);
+    }
+
+    @Override
     public void removeClient(DeviceId deviceId) {
         checkNotNull(deviceId);
         withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
     }
 
+    @Override
+    public void removeClient(K clientKey) {
+        checkNotNull(clientKey);
+        withDeviceLock(() -> doRemoveClient(clientKey), clientKey.deviceId());
+    }
+
     private Void doRemoveClient(DeviceId deviceId) {
         if (clientKeys.containsKey(deviceId)) {
-            final K clientKey = clientKeys.get(deviceId);
-            clients.get(clientKey).shutdown();
-            grpcChannelController.disconnectChannel(channelIds.get(deviceId));
-            clientKeys.remove(deviceId);
-            clients.remove(clientKey);
-            channelIds.remove(deviceId);
+            doRemoveClient(clientKeys.get(deviceId));
         }
         return null;
     }
 
-    @Override
-    public boolean isReachable(DeviceId deviceId) {
-        checkNotNull(deviceId);
-        return withDeviceLock(() -> doIsReachable(deviceId), deviceId);
+    private Void doRemoveClient(K clientKey) {
+        if (clients.containsKey(clientKey)) {
+            clients.get(clientKey).shutdown();
+        }
+        if (channelIds.containsKey(clientKey.deviceId())) {
+            grpcChannelController.disconnectChannel(
+                    channelIds.get(clientKey.deviceId()));
+        }
+        clientKeys.remove(clientKey.deviceId());
+        clients.remove(clientKey);
+        channelIds.remove(clientKey.deviceId());
+        return null;
     }
 
-    private boolean doIsReachable(DeviceId deviceId) {
-        // Default behaviour checks only the gRPC channel, should
-        // check according to different gRPC service
-        if (!clientKeys.containsKey(deviceId)) {
-            log.debug("Missing client for {}, cannot check for reachability", deviceId);
-            return false;
+    @Override
+    public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
+        checkNotNull(deviceId, "deviceId cannot be null");
+        checkNotNull(deviceId, "providerId cannot be null");
+        checkNotNull(listener, "listener cannot be null");
+        deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
+        deviceAgentListeners.get(deviceId).put(providerId, listener);
+    }
+
+    @Override
+    public void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId) {
+        checkNotNull(deviceId, "deviceId cannot be null");
+        checkNotNull(providerId, "listener cannot be null");
+        deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
+            listeners.remove(providerId);
+            return listeners.isEmpty() ? null : listeners;
+        });
+    }
+
+    public void postEvent(E event) {
+        checkNotNull(event);
+        post(event);
+    }
+
+    public void postEvent(DeviceAgentEvent event) {
+        // We should have only one event delivery mechanism. We have two now
+        // because we have two different types of events, DeviceAgentEvent and
+        // controller/protocol specific ones (e.g. P4Runtime or gNMI).
+        // TODO: extend device agent event to allow delivery protocol-specific
+        //  events, e.g. packet-in
+        checkNotNull(event);
+        if (deviceAgentListeners.containsKey(event.subject())) {
+            deviceAgentListeners.get(event.subject()).values()
+                    .forEach(l -> l.event(event));
         }
-        return grpcChannelController.isChannelOpen(channelIds.get(deviceId));
     }
 
     private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
index ca0c8c3..87c9426 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java
@@ -41,6 +41,7 @@
 import java.util.Dictionary;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -152,50 +153,24 @@
         }
     }
 
-    private boolean doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
+    private void doDummyMessage(ManagedChannel channel) throws StatusRuntimeException {
         DummyServiceGrpc.DummyServiceBlockingStub dummyStub = DummyServiceGrpc
                 .newBlockingStub(channel)
                 .withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         try {
-            return dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
-                                              .getDefaultInstance()) != null;
+            //noinspection ResultOfMethodCallIgnored
+            dummyStub.sayHello(Dummy.DummyMessageThatNoOneWouldReallyUse
+                                       .getDefaultInstance());
         } catch (StatusRuntimeException e) {
-            if (e.getStatus().equals(Status.UNIMPLEMENTED)) {
+            if (!e.getStatus().equals(Status.UNIMPLEMENTED)) {
                 // UNIMPLEMENTED means that the server received our message but
                 // doesn't know how to handle it. Hence, channel is open.
-                return true;
-            } else {
                 throw e;
             }
         }
     }
 
     @Override
-    public boolean isChannelOpen(GrpcChannelId channelId) {
-        checkNotNull(channelId);
-
-        Lock lock = channelLocks.get(channelId);
-        lock.lock();
-
-        try {
-            if (!channels.containsKey(channelId)) {
-                log.warn("Unknown channel ID '{}', can't check if channel is open",
-                         channelId);
-                return false;
-            }
-            try {
-                return doDummyMessage(channels.get(channelId));
-            } catch (StatusRuntimeException e) {
-                log.debug("Unable to send dummy message to {}: {}",
-                          channelId, e.toString());
-                return false;
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
     public void disconnectChannel(GrpcChannelId channelId) {
         checkNotNull(channelId);
 
@@ -240,7 +215,6 @@
 
         Lock lock = channelLocks.get(channelId);
         lock.lock();
-
         try {
             return Optional.ofNullable(channels.get(channelId));
         } finally {
@@ -248,4 +222,23 @@
         }
     }
 
+    @Override
+    public CompletableFuture<Boolean> probeChannel(GrpcChannelId channelId) {
+        final ManagedChannel channel = channels.get(channelId);
+        if (channel == null) {
+            log.warn("Unable to find any channel with ID {}, cannot send probe",
+                     channelId);
+            return CompletableFuture.completedFuture(false);
+        }
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                doDummyMessage(channel);
+                return true;
+            } catch (StatusRuntimeException e) {
+                log.debug("Probe for {} failed", channelId);
+                log.debug("", e);
+                return false;
+            }
+        });
+    }
 }
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
index 4c23bbc..452e978 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClientKey.java
@@ -20,28 +20,58 @@
 import org.onosproject.grpc.api.GrpcClientKey;
 import org.onosproject.net.DeviceId;
 
-import java.util.Objects;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
+
+import static java.lang.String.format;
 
 /**
  * Key that uniquely identifies a P4Runtime client.
  */
 @Beta
 public final class P4RuntimeClientKey extends GrpcClientKey {
+
+    private static final String DEVICE_ID_PARAM = "device_id=";
+
     private static final String P4RUNTIME = "P4Runtime";
     private final long p4DeviceId;
 
     /**
-     * Creates a new client key.
+     * Creates a new client key. The server URI is expected to carry the
+     * P4runtime server-internal 'device_id' as a param in the query string. For
+     * example, grpc://10.0.0.1:5001?device_id=1
      *
-     * @param deviceId   ONOS device ID
-     * @param serverAddr P4Runtime server address
-     * @param serverPort P4Runtime server port
-     * @param p4DeviceId P4Runtime server-internal device ID
+     * @param deviceId  ONOS device ID
+     * @param serverUri P4Runtime server URI
      */
-    public P4RuntimeClientKey(DeviceId deviceId, String serverAddr,
-                              int serverPort, long p4DeviceId) {
-        super(P4RUNTIME, deviceId, serverAddr, serverPort);
-        this.p4DeviceId = p4DeviceId;
+    public P4RuntimeClientKey(DeviceId deviceId, URI serverUri) {
+        super(P4RUNTIME, deviceId, serverUri);
+        this.p4DeviceId = extractP4DeviceId(serverUri);
+    }
+
+    private static Long extractP4DeviceId(URI uri) {
+        String[] segments = uri.getRawQuery().split("&");
+        try {
+            for (String s : segments) {
+                if (s.startsWith(DEVICE_ID_PARAM)) {
+                    return Long.parseUnsignedLong(
+                            URLDecoder.decode(
+                                    s.substring(DEVICE_ID_PARAM.length()), "utf-8"));
+                }
+            }
+        } catch (UnsupportedEncodingException e) {
+            throw new IllegalArgumentException(format(
+                    "Unable to decode P4Runtime-interval device_id from URI %s: %s",
+                    uri, e.toString()));
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException(format(
+                    "Invalid P4Runtime-interval device_id in URI %s: %s",
+                    uri, e.toString()));
+        }
+        throw new IllegalArgumentException(format(
+                "Missing P4Runtime-interval device_id in URI %s",
+                uri));
     }
 
     /**
@@ -52,29 +82,4 @@
     public long p4DeviceId() {
         return p4DeviceId;
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        if (!super.equals(o)) {
-            return false;
-        }
-        P4RuntimeClientKey that = (P4RuntimeClientKey) o;
-        return p4DeviceId == that.p4DeviceId;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(super.hashCode(), p4DeviceId);
-    }
-
-    @Override
-    public String toString() {
-        return super.toString() + "/" + p4DeviceId;
-    }
 }
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 0bc8ed6..18e925a 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -19,33 +19,13 @@
 import com.google.common.annotations.Beta;
 import org.onosproject.event.ListenerService;
 import org.onosproject.grpc.api.GrpcClientController;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceAgentListener;
-import org.onosproject.net.provider.ProviderId;
 
 /**
- * Controller of P4Runtime devices.
+ * Controller of P4Runtime clients.
  */
 @Beta
 public interface P4RuntimeController
         extends GrpcClientController<P4RuntimeClientKey, P4RuntimeClient>,
                 ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
-    /**
-     * Adds a listener for device agent events for the given provider.
-     *
-     * @param deviceId device identifier
-     * @param providerId provider ID
-     * @param listener the device agent listener
-     */
-    void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId,
-                                DeviceAgentListener listener);
 
-    /**
-     * Removes the listener for device agent events that was previously
-     * registered for the given provider.
-     *
-     * @param deviceId   device identifier
-     * @param providerId the provider ID
-     */
-    void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId);
 }
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
index 4e0ea60b..2b3db65 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
@@ -33,21 +33,6 @@
          * A packet-in.
          */
         PACKET_IN,
-
-        /**
-         * Arbitration reply.
-         */
-        ARBITRATION_RESPONSE,
-
-        /**
-         * Channel Event.
-         */
-        CHANNEL_EVENT,
-
-        /**
-         * Permission denied (not master).
-         */
-        PERMISSION_DENIED,
     }
 
     public P4RuntimeEvent(Type type, P4RuntimeEventSubject subject) {
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java
index 34dac66..81928c1 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimePipelineConfigClient.java
@@ -89,4 +89,25 @@
             PiPipeconf pipeconf, ByteBuffer deviceData) {
         return Futures.getUnchecked(isPipelineConfigSet(pipeconf, deviceData));
     }
+
+    /**
+     * Returns true if the device has a pipeline config set, false otherwise.
+     * <p>
+     * This method is expected to return {@code true} if invoked after
+     * successfully calling {@link #setPipelineConfig(PiPipeconf, ByteBuffer)}
+     * with any parameter.
+     *
+     * @return completable future, true if the device has a pipeline config set,
+     * false otherwise.
+     */
+    CompletableFuture<Boolean> isAnyPipelineConfigSet();
+
+    /**
+     * Same as {@link #isAnyPipelineConfigSet()}, but blocks execution.
+     *
+     * @return true if the device has a pipeline config set, false otherwise.
+     */
+    default boolean isAnyPipelineConfigSetSync() {
+        return Futures.getUnchecked(isAnyPipelineConfigSet());
+    }
 }
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java
index 1317e29..555f906 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeStreamClient.java
@@ -19,6 +19,8 @@
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.runtime.PiPacketOperation;
 
+import java.math.BigInteger;
+
 /**
  * P4Runtime client interface for the StreamChannel RPC. It allows management of
  * the P4Runtime session (open/close, mastership arbitration) as well as sending
@@ -30,23 +32,26 @@
 public interface P4RuntimeStreamClient {
 
     /**
-     * Opens a session to the server by starting the Stream RPC and sending a
-     * mastership arbitration update message with an election ID that is
-     * expected to be unique among all available clients. If a client has been
-     * requested to become master via {@link #runForMastership()}, then this
-     * method should pick an election ID that is lower than the one currently
-     * associated with the master client.
+     * Opportunistically opens a session with the server by starting a
+     * StreamChannel RPC and sends a {@code MasterArbitrationUpdate} message
+     * with the given election ID. The {@code master} boolean flag is used to
+     * indicated if we are trying to became master or not. If false, the
+     * implementation might delay sending the {@code MasterArbitrationUpdate}
+     * message until another node becomes master with a higher election ID.
      * <p>
-     * If the server acknowledges the session to this client as open, the {@link
+     * If the server acknowledges this client as master, the {@link
      * P4RuntimeController} is expected to generate a {@link
      * org.onosproject.net.device.DeviceAgentEvent} with type {@link
-     * org.onosproject.net.device.DeviceAgentEvent.Type#CHANNEL_OPEN}.
+     * org.onosproject.net.device.DeviceAgentEvent.Type#ROLE_MASTER}.
+     *
+     * @param master true if we are trying to become master
+     * @param electionId election ID
      */
-    void openSession();
+    void setMastership(boolean master, BigInteger electionId);
 
     /**
-     * Returns true if the Stream RPC is active and the P4Runtime session is
-     * open, false otherwise.
+     * Returns true if the StreamChannel RPC is active and hence the P4Runtime
+     * session is open, false otherwise.
      *
      * @return boolean
      */
@@ -58,17 +63,6 @@
     void closeSession();
 
     /**
-     * Sends a master arbitration update to the device with a new election ID
-     * that is expected to be the highest one between all clients.
-     * <p>
-     * If the server acknowledges this client as master, the {@link
-     * P4RuntimeController} is expected to generate a {@link
-     * org.onosproject.net.device.DeviceAgentEvent} with type {@link
-     * org.onosproject.net.device.DeviceAgentEvent.Type#ROLE_MASTER}.
-     */
-    void runForMastership();
-
-    /**
      * Returns true if this client is master for the server, false otherwise.
      *
      * @return boolean
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
index 353d44e..d06e3fc 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/P4RuntimeClientImpl.java
@@ -17,21 +17,25 @@
 package org.onosproject.p4runtime.ctl.client;
 
 import io.grpc.ManagedChannel;
+import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
 import org.onosproject.grpc.ctl.AbstractGrpcClient;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.runtime.PiPacketOperation;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeClientKey;
-import org.onosproject.p4runtime.api.P4RuntimeEvent;
-import org.onosproject.p4runtime.ctl.controller.BaseEventSubject;
-import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
 import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
 import p4.v1.P4RuntimeGrpc;
 import p4.v1.P4RuntimeOuterClass;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
 
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +43,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
+import static p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest.ResponseType.COOKIE_ONLY;
 
 /**
  * Implementation of P4RuntimeClient.
@@ -59,7 +64,6 @@
     static final int LONG_TIMEOUT_SECONDS = 60;
 
     private final long p4DeviceId;
-    private final ManagedChannel channel;
     private final P4RuntimeControllerImpl controller;
     private final StreamClientImpl streamClient;
     private final PipelineConfigClientImpl pipelineConfigClient;
@@ -67,25 +71,27 @@
     /**
      * Instantiates a new client with the given arguments.
      *
-     * @param clientKey       client key
-     * @param channel         gRPC managed channel
-     * @param controller      P$Runtime controller instance
-     * @param pipeconfService pipeconf service instance
+     * @param clientKey             client key
+     * @param channel               gRPC managed channel
+     * @param controller            P$Runtime controller instance
+     * @param pipeconfService       pipeconf service instance
+     * @param masterElectionIdStore master election ID store
      */
     public P4RuntimeClientImpl(P4RuntimeClientKey clientKey,
                                ManagedChannel channel,
                                P4RuntimeControllerImpl controller,
-                               PiPipeconfService pipeconfService) {
-        super(clientKey);
+                               PiPipeconfService pipeconfService,
+                               MasterElectionIdStore masterElectionIdStore) {
+        super(clientKey, channel, true, controller);
         checkNotNull(channel);
         checkNotNull(controller);
         checkNotNull(pipeconfService);
+        checkNotNull(masterElectionIdStore);
 
         this.p4DeviceId = clientKey.p4DeviceId();
-        this.channel = channel;
         this.controller = controller;
         this.streamClient = new StreamClientImpl(
-                pipeconfService, this, controller);
+                pipeconfService, masterElectionIdStore, this, controller);
         this.pipelineConfigClient = new PipelineConfigClientImpl(this);
     }
 
@@ -108,13 +114,13 @@
     }
 
     @Override
-    public ReadRequest read(PiPipeconf pipeconf) {
-        return new ReadRequestImpl(this, pipeconf);
+    public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
+        return pipelineConfigClient.isAnyPipelineConfigSet();
     }
 
     @Override
-    public void openSession() {
-        streamClient.openSession();
+    public ReadRequest read(PiPipeconf pipeconf) {
+        return new ReadRequestImpl(this, pipeconf);
     }
 
     @Override
@@ -128,8 +134,8 @@
     }
 
     @Override
-    public void runForMastership() {
-        streamClient.runForMastership();
+    public void setMastership(boolean master, BigInteger newElectionId) {
+        streamClient.setMastership(master, newElectionId);
     }
 
     @Override
@@ -147,6 +153,44 @@
         return new WriteRequestImpl(this, pipeconf);
     }
 
+    @Override
+    public CompletableFuture<Boolean> probeService() {
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        final StreamObserver<GetForwardingPipelineConfigResponse> responseObserver =
+                new StreamObserver<GetForwardingPipelineConfigResponse>() {
+                    @Override
+                    public void onNext(GetForwardingPipelineConfigResponse value) {
+                        future.complete(true);
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        if (Status.fromThrowable(t).getCode() ==
+                                Status.Code.FAILED_PRECONDITION) {
+                            // Pipeline not set but service is available.
+                            future.complete(true);
+                        } else {
+                            log.debug("", t);
+                        }
+                        future.complete(false);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                        // Ignore, unary call.
+                    }
+                };
+        // Use long timeout as the device might return the full P4 blob
+        // (e.g. server does not support cookie), over a slow network.
+        execRpc(s -> s.getForwardingPipelineConfig(
+                GetForwardingPipelineConfigRequest.newBuilder()
+                        .setDeviceId(p4DeviceId)
+                        .setResponseType(COOKIE_ONLY)
+                        .build(), responseObserver),
+                SHORT_TIMEOUT_SECONDS);
+        return future;
+    }
+
     /**
      * Returns the P4Runtime-internal device ID associated with this client.
      *
@@ -184,7 +228,8 @@
      * @param stubConsumer P4Runtime stub consumer
      * @param timeout      timeout in seconds
      */
-    void execRpc(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer, int timeout) {
+    void execRpc(Consumer<P4RuntimeGrpc.P4RuntimeStub> stubConsumer,
+                 int timeout) {
         if (log.isTraceEnabled()) {
             log.trace("Executing RPC with timeout {} seconds (context deadline {})...",
                       timeout, context().getDeadline());
@@ -235,21 +280,10 @@
     }
 
     private void checkGrpcException(StatusRuntimeException sre) {
-        switch (sre.getStatus().getCode()) {
-            case PERMISSION_DENIED:
-                // Notify upper layers that this node is not master.
-                controller.postEvent(new P4RuntimeEvent(
-                        P4RuntimeEvent.Type.PERMISSION_DENIED,
-                        new BaseEventSubject(deviceId)));
-                break;
-            case UNAVAILABLE:
-                // Channel might be closed.
-                controller.postEvent(new P4RuntimeEvent(
-                        P4RuntimeEvent.Type.CHANNEL_EVENT,
-                        new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
-                break;
-            default:
-                break;
+        if (sre.getStatus().getCode() == Status.Code.PERMISSION_DENIED) {
+            // Notify upper layers that this node is not master.
+            controller.postEvent(new DeviceAgentEvent(
+                    DeviceAgentEvent.Type.NOT_MASTER, deviceId));
         }
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
index c023f26..b0b6c57 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/PipelineConfigClientImpl.java
@@ -18,6 +18,7 @@
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.TextFormat;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.p4runtime.api.P4RuntimePipelineConfigClient;
@@ -32,11 +33,13 @@
 import p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigResponse;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.LONG_TIMEOUT_SECONDS;
+import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.SHORT_TIMEOUT_SECONDS;
 import static org.slf4j.LoggerFactory.getLogger;
 import static p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest.ResponseType.COOKIE_ONLY;
 import static p4.v1.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
@@ -62,6 +65,12 @@
     public CompletableFuture<Boolean> setPipelineConfig(
             PiPipeconf pipeconf, ByteBuffer deviceData) {
 
+        if (!client.isSessionOpen()) {
+            log.warn("Dropping set pipeline config request for {}, session is CLOSED",
+                     client.deviceId());
+            return completedFuture(false);
+        }
+
         log.info("Setting pipeline config for {} to {}...",
                  client.deviceId(), pipeconf.id());
 
@@ -98,11 +107,13 @@
                         // All good, pipeline is set.
                         future.complete(true);
                     }
+
                     @Override
                     public void onError(Throwable t) {
                         client.handleRpcError(t, "SET-pipeline-config");
                         future.complete(false);
                     }
+
                     @Override
                     public void onCompleted() {
                         // Ignore, unary call.
@@ -154,6 +165,11 @@
                         pipeconf, expectedDeviceData, cfgFromDevice));
     }
 
+    @Override
+    public CompletableFuture<Boolean> isAnyPipelineConfigSet() {
+        return getPipelineCookieFromServer().thenApply(Objects::nonNull);
+    }
+
     private boolean comparePipelineConfig(
             PiPipeconf pipeconf, ByteBuffer expectedDeviceData,
             ForwardingPipelineConfig cfgFromDevice) {
@@ -209,17 +225,38 @@
                     public void onNext(GetForwardingPipelineConfigResponse value) {
                         if (value.hasConfig()) {
                             future.complete(value.getConfig());
+                            if (!value.getConfig().getP4DeviceConfig().isEmpty()) {
+                                log.warn("{} returned GetForwardingPipelineConfigResponse " +
+                                                 "with p4_device_config field set " +
+                                                 "({} bytes), but we requested COOKIE_ONLY",
+                                         client.deviceId(),
+                                         value.getConfig().getP4DeviceConfig().size());
+                            }
+                            if (value.getConfig().hasP4Info()) {
+                                log.warn("{} returned GetForwardingPipelineConfigResponse " +
+                                                 "with p4_info field set " +
+                                                 "({} bytes), but we requested COOKIE_ONLY",
+                                         client.deviceId(),
+                                         value.getConfig().getP4Info().getSerializedSize());
+                            }
                         } else {
+                            future.complete(null);
                             log.warn("{} returned {} with 'config' field unset",
                                      client.deviceId(), value.getClass().getSimpleName());
                         }
-                        future.complete(null);
                     }
 
                     @Override
                     public void onError(Throwable t) {
-                        client.handleRpcError(t, "GET-pipeline-config");
                         future.complete(null);
+                        if (Status.fromThrowable(t).getCode() ==
+                                Status.Code.FAILED_PRECONDITION) {
+                            // FAILED_PRECONDITION means that a pipeline
+                            // config was not set in the first place, don't
+                            // bother logging.
+                            return;
+                        }
+                        client.handleRpcError(t, "GET-pipeline-config");
                     }
 
                     @Override
@@ -231,7 +268,7 @@
         // (e.g. server does not support cookie), over a slow network.
         client.execRpc(
                 s -> s.getForwardingPipelineConfig(request, responseObserver),
-                LONG_TIMEOUT_SECONDS);
+                SHORT_TIMEOUT_SECONDS);
         return future;
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
index 7afd97b..1bf195c 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -22,14 +22,15 @@
 import io.grpc.stub.ClientCallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.runtime.PiPacketOperation;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.onosproject.p4runtime.api.P4RuntimeStreamClient;
 import org.onosproject.p4runtime.ctl.codec.CodecException;
-import org.onosproject.p4runtime.ctl.controller.ArbitrationUpdateEvent;
-import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore.MasterElectionIdListener;
 import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
 import org.onosproject.p4runtime.ctl.controller.PacketInEvent;
 import org.slf4j.Logger;
@@ -42,6 +43,8 @@
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -54,73 +57,95 @@
 
     private static final Logger log = getLogger(StreamClientImpl.class);
 
-    private static final BigInteger ONE_THOUSAND = BigInteger.valueOf(1000);
-
     private final P4RuntimeClientImpl client;
     private final DeviceId deviceId;
     private final long p4DeviceId;
     private final PiPipeconfService pipeconfService;
+    private final MasterElectionIdStore masterElectionIdStore;
     private final P4RuntimeControllerImpl controller;
+
     private final StreamChannelManager streamChannelManager = new StreamChannelManager();
+    private final MasterElectionIdListener masterElectionIdListener = new InternalMasterElectionIdListener();
 
-    private P4RuntimeOuterClass.Uint128 lastUsedElectionId = P4RuntimeOuterClass.Uint128
-            .newBuilder().setLow(1).build();
+    private final AtomicBoolean isMaster = new AtomicBoolean(false);
+    private final AtomicBoolean requestedToBeMaster = new AtomicBoolean(false);
 
-    private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
+    private BigInteger pendingElectionId = null;
+    private BigInteger lastUsedElectionId = null;
 
     StreamClientImpl(
             PiPipeconfService pipeconfService,
+            MasterElectionIdStore masterElectionIdStore,
             P4RuntimeClientImpl client,
             P4RuntimeControllerImpl controller) {
         this.client = client;
         this.deviceId = client.deviceId();
         this.p4DeviceId = client.p4DeviceId();
         this.pipeconfService = pipeconfService;
+        this.masterElectionIdStore = masterElectionIdStore;
         this.controller = controller;
     }
 
     @Override
-    public void openSession() {
-        if (isSessionOpen()) {
-            log.debug("Dropping request to open session for {}, session is already open",
-                      deviceId);
-            return;
-        }
-        log.debug("Opening session for {}...", deviceId);
-        sendMasterArbitrationUpdate(controller.newMasterElectionId(deviceId));
-
-    }
-
-    @Override
     public boolean isSessionOpen() {
         return streamChannelManager.isOpen();
     }
 
     @Override
     public void closeSession() {
-        streamChannelManager.complete();
+        synchronized (requestedToBeMaster) {
+            this.masterElectionIdStore.unsetListener(deviceId);
+            streamChannelManager.teardown();
+            pendingElectionId = null;
+            requestedToBeMaster.set(false);
+            isMaster.set(false);
+        }
     }
 
     @Override
-    public void runForMastership() {
-        if (!isSessionOpen()) {
-            log.debug("Dropping mastership request for {}, session is closed",
-                      deviceId);
-            return;
+    public void setMastership(boolean master, BigInteger newElectionId) {
+        checkNotNull(newElectionId);
+        checkArgument(newElectionId.compareTo(BigInteger.ZERO) > 0,
+                      "newElectionId must be a non zero positive number");
+        synchronized (requestedToBeMaster) {
+            requestedToBeMaster.set(master);
+            pendingElectionId = newElectionId;
+            handlePendingElectionId(masterElectionIdStore.get(deviceId));
         }
-        // Becoming master is a race. Here we increase our chances of win, i.e.
-        // using the highest election ID, against other ONOS nodes in the
-        // cluster that are calling openSession() (which is used to start the
-        // stream RPC session, not to become master).
-        log.debug("Running for mastership on {}...", deviceId);
-        final BigInteger masterId = controller.newMasterElectionId(deviceId)
-                .add(ONE_THOUSAND);
-        sendMasterArbitrationUpdate(masterId);
+    }
+
+    private void handlePendingElectionId(BigInteger masterElectionId) {
+        synchronized (requestedToBeMaster) {
+            if (pendingElectionId == null) {
+                // No pending requests.
+                return;
+            }
+            if (!requestedToBeMaster.get() && masterElectionId != null
+                    && pendingElectionId.compareTo(masterElectionId) > 0) {
+                log.info("Deferring sending master arbitration update, master " +
+                                  "election ID of server ({}) is smaller than " +
+                                  "requested one ({}), but we do NOT want to be master...",
+                          masterElectionId, pendingElectionId);
+                // Will try again as soon as the server reports a new master
+                // election ID that is bigger than the pending non-master one.
+                masterElectionIdStore.setListener(deviceId, masterElectionIdListener);
+            } else {
+                // Send now.
+                log.info("Setting mastership on {}... " +
+                                  "master={}, newElectionId={}, masterElectionId={}",
+                          deviceId, requestedToBeMaster.get(),
+                          pendingElectionId, masterElectionId);
+                sendMasterArbitrationUpdate(pendingElectionId);
+                pendingElectionId = null;
+                // No need to listen for master election ID changes.
+                masterElectionIdStore.unsetListener(deviceId);
+            }
+        }
     }
 
     @Override
     public boolean isMaster() {
-        return streamChannelManager.isOpen() && isClientMaster.get();
+        return isMaster.get();
     }
 
     @Override
@@ -141,7 +166,7 @@
             final StreamMessageRequest packetOutRequest = StreamMessageRequest
                     .newBuilder().setPacket(packetOut).build();
             // Send.
-            streamChannelManager.sendIfOpen(packetOutRequest);
+            streamChannelManager.send(packetOutRequest);
         } catch (CodecException e) {
             log.error("Unable to send packet-out: {}", e.getMessage());
         }
@@ -160,7 +185,7 @@
                                         .setElectionId(idMsg)
                                         .build())
                         .build());
-        lastUsedElectionId = idMsg;
+        lastUsedElectionId = electionId;
     }
 
     private P4RuntimeOuterClass.Uint128 bigIntegerToUint128(BigInteger value) {
@@ -220,13 +245,22 @@
         if (!msg.hasElectionId() || !msg.hasStatus()) {
             return;
         }
-        final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
-        log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
-                  deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
-        controller.postEvent(new P4RuntimeEvent(
-                P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
-                new ArbitrationUpdateEvent(deviceId, isMaster)));
-        isClientMaster.set(isMaster);
+        // Is this client master?
+        isMaster.set(msg.getStatus().getCode() == Status.OK.getCode().value());
+        // Notify new master election IDs to all nodes via distributed store.
+        // This is required for those nodes who do not have a Stream RPC open,
+        // and that otherwise would not be aware of changes, keeping their
+        // pending mastership operations forever.
+        final BigInteger masterElectionId = uint128ToBigInteger(msg.getElectionId());
+        masterElectionIdStore.set(deviceId, masterElectionId);
+
+        log.debug("Received arbitration update from {}: isMaster={}, masterElectionId={}",
+                  deviceId, isMaster.get(), masterElectionId);
+
+        // Post mastership event via controller.
+        controller.postEvent(new DeviceAgentEvent(
+                isMaster.get() ? DeviceAgentEvent.Type.ROLE_MASTER
+                        : DeviceAgentEvent.Type.ROLE_STANDBY, deviceId));
     }
 
     /**
@@ -236,7 +270,22 @@
      * @return election ID uint128 protobuf message
      */
     P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
-        return lastUsedElectionId;
+        return lastUsedElectionId == null
+                ? P4RuntimeOuterClass.Uint128.getDefaultInstance()
+                : bigIntegerToUint128(lastUsedElectionId);
+    }
+
+    /**
+     * Handles updates of the master election ID by applying any pending
+     * mastership operation.
+     */
+    private class InternalMasterElectionIdListener
+            implements MasterElectionIdStore.MasterElectionIdListener {
+
+        @Override
+        public void updated(BigInteger masterElectionId) {
+            handlePendingElectionId(masterElectionId);
+        }
     }
 
     /**
@@ -258,23 +307,12 @@
             }
         }
 
-        void sendIfOpen(StreamMessageRequest value) {
-            // We do not lock here, but we ignore NPEs due to stream RPC not
-            // being active (null requestObserver). Good for frequent
-            // packet-outs.
-            try {
-                doSend(value);
-            } catch (NullPointerException e) {
-                if (requestObserver != null) {
-                    // Must be something else.
-                    throw e;
-                }
-            }
-        }
-
         private void doSend(StreamMessageRequest value) {
             try {
                 requestObserver.onNext(value);
+                // Optimistically set the session as open. In case of errors, it
+                // will be closed by the response stream observer.
+                streamChannelManager.signalOpen();
             } catch (Throwable ex) {
                 if (ex instanceof StatusRuntimeException) {
                     log.warn("Unable to send {} to {}: {}",
@@ -283,7 +321,7 @@
                     log.error("Exception while sending {} to {}: {}",
                               value.getUpdateCase().toString(), deviceId, ex);
                 }
-                complete();
+                teardown();
             }
         }
 
@@ -299,7 +337,7 @@
             }
         }
 
-        void complete() {
+        void teardown() {
             synchronized (this) {
                 signalClosed();
                 if (requestObserver != null) {
@@ -311,23 +349,16 @@
         }
 
         void signalOpen() {
-            synchronized (this) {
-                final boolean wasOpen = open.getAndSet(true);
-                if (!wasOpen) {
-                    controller.postEvent(new P4RuntimeEvent(
-                            P4RuntimeEvent.Type.CHANNEL_EVENT,
-                            new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
-                }
-            }
+            open.set(true);
         }
 
         void signalClosed() {
             synchronized (this) {
                 final boolean wasOpen = open.getAndSet(false);
                 if (wasOpen) {
-                    controller.postEvent(new P4RuntimeEvent(
-                            P4RuntimeEvent.Type.CHANNEL_EVENT,
-                            new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
+                    // We lost any valid mastership role.
+                    controller.postEvent(new DeviceAgentEvent(
+                            DeviceAgentEvent.Type.ROLE_NONE, deviceId));
                 }
             }
         }
@@ -352,13 +383,11 @@
 
         @Override
         public void onNext(StreamMessageResponse message) {
-            streamChannelManager.signalOpen();
             try {
                 if (log.isTraceEnabled()) {
-                    log.trace(
-                            "Received {} from {}: {}",
-                            message.getUpdateCase(), deviceId,
-                            TextFormat.shortDebugString(message));
+                    log.trace("Received {} from {}: {}",
+                              message.getUpdateCase(), deviceId,
+                              TextFormat.shortDebugString(message));
                 }
                 switch (message.getUpdateCase()) {
                     case PACKET:
@@ -388,17 +417,18 @@
                     log.warn("Error on stream channel for {}: {}",
                              deviceId, throwable.getMessage());
                 }
+                log.debug("", throwable);
             } else {
                 log.error(format("Exception on stream channel for %s",
                                  deviceId), throwable);
             }
-            streamChannelManager.complete();
+            streamChannelManager.teardown();
         }
 
         @Override
         public void onCompleted() {
             log.warn("Stream channel for {} has completed", deviceId);
-            streamChannelManager.complete();
+            streamChannelManager.teardown();
         }
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ChannelEvent.java
deleted file mode 100644
index 0a77e46..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ChannelEvent.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl.controller;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
-
-/**
- * Channel event in P4Runtime.
- */
-public final class ChannelEvent implements P4RuntimeEventSubject {
-
-    public enum Type {
-        OPEN,
-        CLOSED,
-        ERROR
-    }
-
-    private DeviceId deviceId;
-    private Type type;
-
-    /**
-     * Creates channel event with given status and throwable.
-     *
-     * @param deviceId  the device
-     * @param type      error type
-     */
-    public ChannelEvent(DeviceId deviceId, Type type) {
-        this.deviceId = deviceId;
-        this.type = type;
-    }
-
-    /**
-     * Gets the type of this event.
-     *
-     * @return the error type
-     */
-    public Type type() {
-        return type;
-    }
-
-    @Override
-    public DeviceId deviceId() {
-        return deviceId;
-    }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedElectionIdGenerator.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedElectionIdGenerator.java
deleted file mode 100644
index 980ab11..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedElectionIdGenerator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl.controller;
-
-import org.onlab.util.KryoNamespace;
-import org.onosproject.net.DeviceId;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.AtomicCounterMap;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.slf4j.Logger;
-
-import java.math.BigInteger;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Distributed implementation of a generator of P4Runtime election IDs.
- */
-final class DistributedElectionIdGenerator {
-
-    private final Logger log = getLogger(this.getClass());
-
-    // FIXME: counter map use long, but P4Runtime accepts 128bit election IDs
-    private AtomicCounterMap<DeviceId> electionIds;
-
-    /**
-     * Creates a new election ID generator using the given storage service.
-     *
-     * @param storageService storage service
-     */
-    DistributedElectionIdGenerator(StorageService storageService) {
-        KryoNamespace serializer = KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .build();
-        this.electionIds = storageService.<DeviceId>atomicCounterMapBuilder()
-                .withName("p4runtime-election-ids")
-                .withSerializer(Serializer.using(serializer))
-                .build();
-    }
-
-    /**
-     * Returns an election ID for the given device ID. The first election ID for
-     * a given device ID is always 1.
-     *
-     * @param deviceId device ID
-     * @return new election ID
-     */
-    BigInteger generate(DeviceId deviceId) {
-        if (electionIds == null) {
-            return null;
-        }
-        // Default value is 0 for AtomicCounterMap.
-        return BigInteger.valueOf(electionIds.incrementAndGet(deviceId));
-    }
-
-    /**
-     * Destroy the backing distributed primitive of this generator.
-     */
-    void destroy() {
-        try {
-            electionIds.destroy().get(10, TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.error("Exception while destroying distributed counter map", e);
-        } finally {
-            electionIds = null;
-        }
-    }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
new file mode 100644
index 0000000..dff2330
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedMasterElectionIdStore.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.controller;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.concurrent.ConcurrentMap;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Distributed implementation of MasterElectionIdStore.
+ */
+@Component(immediate = true, service = MasterElectionIdStore.class)
+public class DistributedMasterElectionIdStore implements MasterElectionIdStore {
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    private static final KryoNamespace SERIALIZER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(BigInteger.class)
+            .build();
+
+    private final Logger log = getLogger(getClass());
+    private final EventuallyConsistentMapListener<DeviceId, BigInteger> mapListener =
+            new InternalMapListener();
+
+    private EventuallyConsistentMap<DeviceId, BigInteger> masterElectionIds;
+    private ConcurrentMap<DeviceId, MasterElectionIdListener> listeners =
+            Maps.newConcurrentMap();
+
+    @Activate
+    public void activate() {
+        this.listeners = Maps.newConcurrentMap();
+        this.masterElectionIds = storageService.<DeviceId, BigInteger>eventuallyConsistentMapBuilder()
+                .withName("p4runtime-master-election-ids")
+                .withSerializer(SERIALIZER)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .build();
+        this.masterElectionIds.addListener(mapListener);
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        this.masterElectionIds.removeListener(mapListener);
+        this.masterElectionIds.destroy();
+        this.masterElectionIds = null;
+        this.listeners.clear();
+        this.listeners = null;
+        log.info("Stopped");
+    }
+
+
+    @Override
+    public void set(DeviceId deviceId, BigInteger electionId) {
+        checkNotNull(deviceId);
+        checkNotNull(electionId);
+        this.masterElectionIds.put(deviceId, electionId);
+    }
+
+    @Override
+    public BigInteger get(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        return this.masterElectionIds.get(deviceId);
+    }
+
+    @Override
+    public void remove(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        this.masterElectionIds.remove(deviceId);
+    }
+
+    @Override
+    public void setListener(DeviceId deviceId, MasterElectionIdListener newListener) {
+        checkNotNull(deviceId);
+        checkNotNull(newListener);
+        listeners.compute(deviceId, (did, existingListener) -> {
+            if (existingListener == null || existingListener == newListener) {
+                return newListener;
+            } else {
+                log.error("Cannot add listener as one already exist for {}", deviceId);
+                return existingListener;
+            }
+        });
+    }
+
+    @Override
+    public void unsetListener(DeviceId deviceId) {
+        listeners.remove(deviceId);
+    }
+
+    private class InternalMapListener implements EventuallyConsistentMapListener<DeviceId, BigInteger> {
+        @Override
+        public void event(EventuallyConsistentMapEvent<DeviceId, BigInteger> event) {
+            final MasterElectionIdListener listener = listeners.get(event.key());
+            if (listener == null) {
+                return;
+            }
+            switch (event.type()) {
+                case PUT:
+                    listener.updated(event.value());
+                    break;
+                case REMOVE:
+                    listener.updated(null);
+                    break;
+                default:
+                    log.error("Unrecognized map event type {}", event.type());
+            }
+        }
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
new file mode 100644
index 0000000..f5393d1
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/MasterElectionIdStore.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.controller;
+
+import org.onosproject.net.DeviceId;
+
+import java.math.BigInteger;
+
+/**
+ * Store that keeps track of master election IDs for each device.
+ */
+public interface MasterElectionIdStore {
+
+    /**
+     * Sets the master election ID for the given device.
+     *
+     * @param deviceId   device ID
+     * @param electionId election ID
+     */
+    void set(DeviceId deviceId, BigInteger electionId);
+
+    /**
+     * Returns the last known master election ID for the given device, or null.
+     *
+     * @param deviceId device ID
+     * @return election ID
+     */
+    BigInteger get(DeviceId deviceId);
+
+    /**
+     * Removes any state associated with the given device.
+     *
+     * @param deviceId device ID
+     */
+    void remove(DeviceId deviceId);
+
+    /**
+     * Sets a listener for the given device that will be invoked every time
+     * there will be changes to the master election ID.
+     *
+     * @param deviceId device ID
+     * @param listener listener
+     */
+    void setListener(DeviceId deviceId, MasterElectionIdListener listener);
+
+    /**
+     * Unset the listener for the given device.
+     *
+     * @param deviceId device ID
+     */
+    void unsetListener(DeviceId deviceId);
+
+    /**
+     * Listener of master election ID changes for a specific device.
+     */
+    interface MasterElectionIdListener {
+
+        /**
+         * Notifies that the master election ID has been updated to the given
+         * (nullable) value.
+         *
+         * @param masterElectionId new master election ID, or null
+         */
+        void updated(BigInteger masterElectionId);
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
index affbf7d..2256d5e 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
@@ -16,33 +16,19 @@
 
 package org.onosproject.p4runtime.ctl.controller;
 
-import com.google.common.collect.Maps;
 import io.grpc.ManagedChannel;
 import org.onosproject.grpc.ctl.AbstractGrpcClientController;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceAgentEvent;
-import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.pi.service.PiPipeconfService;
-import org.onosproject.net.provider.ProviderId;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeClientKey;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.onosproject.p4runtime.api.P4RuntimeEventListener;
 import org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl;
-import org.onosproject.store.service.StorageService;
-import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-
-import java.math.BigInteger;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * P4Runtime controller implementation.
@@ -53,122 +39,34 @@
         <P4RuntimeClientKey, P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
         implements P4RuntimeController {
 
-    private final Logger log = getLogger(getClass());
-
-    private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
-            deviceAgentListeners = Maps.newConcurrentMap();
-
-    private DistributedElectionIdGenerator electionIdGenerator;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    private StorageService storageService;
-
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     private PiPipeconfService pipeconfService;
 
-    @Activate
-    public void activate() {
-        super.activate();
-        eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
-        electionIdGenerator = new DistributedElectionIdGenerator(storageService);
-        log.info("Started");
-    }
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private MasterElectionIdStore masterElectionIdStore;
 
-    @Deactivate
-    public void deactivate() {
-        super.deactivate();
-        deviceAgentListeners.clear();
-        electionIdGenerator.destroy();
-        electionIdGenerator = null;
-        log.info("Stopped");
+    public P4RuntimeControllerImpl() {
+        super(P4RuntimeEvent.class);
     }
 
     @Override
-    protected P4RuntimeClient createClientInstance(P4RuntimeClientKey clientKey, ManagedChannel channel) {
-        return new P4RuntimeClientImpl(clientKey, channel, this, pipeconfService);
+    public void removeClient(DeviceId deviceId) {
+        super.removeClient(deviceId);
+        // Assuming that when a client is removed, it is done so by all nodes,
+        // this is the best place to clear master election ID state.
+        masterElectionIdStore.remove(deviceId);
     }
 
     @Override
-    public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
-        checkNotNull(deviceId, "deviceId cannot be null");
-        checkNotNull(deviceId, "providerId cannot be null");
-        checkNotNull(listener, "listener cannot be null");
-        deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
-        deviceAgentListeners.get(deviceId).put(providerId, listener);
+    public void removeClient(P4RuntimeClientKey clientKey) {
+        super.removeClient(clientKey);
+        masterElectionIdStore.remove(clientKey.deviceId());
     }
 
     @Override
-    public void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId) {
-        checkNotNull(deviceId, "deviceId cannot be null");
-        checkNotNull(providerId, "listener cannot be null");
-        deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
-            listeners.remove(providerId);
-            return listeners;
-        });
-    }
-
-    public BigInteger newMasterElectionId(DeviceId deviceId) {
-        return electionIdGenerator.generate(deviceId);
-    }
-
-    public void postEvent(P4RuntimeEvent event) {
-        switch (event.type()) {
-            case CHANNEL_EVENT:
-                handleChannelEvent(event);
-                break;
-            case ARBITRATION_RESPONSE:
-                handleArbitrationReply(event);
-                break;
-            case PERMISSION_DENIED:
-                handlePermissionDenied(event);
-                break;
-            default:
-                post(event);
-                break;
-        }
-    }
-
-    private void handlePermissionDenied(P4RuntimeEvent event) {
-        postDeviceAgentEvent(event.subject().deviceId(), new DeviceAgentEvent(
-                DeviceAgentEvent.Type.NOT_MASTER, event.subject().deviceId()));
-    }
-
-    private void handleChannelEvent(P4RuntimeEvent event) {
-        final ChannelEvent channelEvent = (ChannelEvent) event.subject();
-        final DeviceId deviceId = channelEvent.deviceId();
-        final DeviceAgentEvent.Type agentEventType;
-        switch (channelEvent.type()) {
-            case OPEN:
-                agentEventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
-                break;
-            case CLOSED:
-                agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
-                break;
-            case ERROR:
-                agentEventType = !isReachable(deviceId)
-                        ? DeviceAgentEvent.Type.CHANNEL_CLOSED
-                        : DeviceAgentEvent.Type.CHANNEL_ERROR;
-                break;
-            default:
-                log.warn("Unrecognized channel event type {}", channelEvent.type());
-                return;
-        }
-        postDeviceAgentEvent(deviceId, new DeviceAgentEvent(agentEventType, deviceId));
-    }
-
-    private void handleArbitrationReply(P4RuntimeEvent event) {
-        final DeviceId deviceId = event.subject().deviceId();
-        final ArbitrationUpdateEvent response = (ArbitrationUpdateEvent) event.subject();
-        final DeviceAgentEvent.Type roleType = response.isMaster()
-                ? DeviceAgentEvent.Type.ROLE_MASTER
-                : DeviceAgentEvent.Type.ROLE_STANDBY;
-        postDeviceAgentEvent(deviceId, new DeviceAgentEvent(
-                roleType, response.deviceId()));
-    }
-
-    private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
-        if (deviceAgentListeners.containsKey(deviceId)) {
-            deviceAgentListeners.get(deviceId).values().forEach(l -> l.event(event));
-        }
+    protected P4RuntimeClient createClientInstance(
+            P4RuntimeClientKey clientKey, ManagedChannel channel) {
+        return new P4RuntimeClientImpl(clientKey, channel, this,
+                                       pipeconfService, masterElectionIdStore);
     }
 }
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
new file mode 100644
index 0000000..595e07d
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/MockMasterElectionIdStore.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.ctl.controller.MasterElectionIdStore;
+
+import java.math.BigInteger;
+
+public class MockMasterElectionIdStore implements MasterElectionIdStore {
+
+    @Override
+    public void set(DeviceId deviceId, BigInteger electionId) {
+
+    }
+
+    @Override
+    public BigInteger get(DeviceId deviceId) {
+        return null;
+    }
+
+    @Override
+    public void remove(DeviceId deviceId) {
+
+    }
+
+    @Override
+    public void setListener(DeviceId deviceId, MasterElectionIdListener listener) {
+
+    }
+
+    @Override
+    public void unsetListener(DeviceId deviceId) {
+
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
index c6a43cb..4c4dcdf 100644
--- a/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
+++ b/protocols/p4runtime/ctl/src/test/java/org/onosproject/p4runtime/ctl/P4RuntimeGroupTest.java
@@ -55,6 +55,8 @@
 import p4.v1.P4RuntimeOuterClass.WriteRequest;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.Collection;
 import java.util.List;
@@ -62,6 +64,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static java.lang.String.format;
 import static org.easymock.EasyMock.niceMock;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -105,7 +108,7 @@
     private static final int SET_EGRESS_PORT_ID = 16794308;
     private static final String GRPC_SERVER_NAME = "P4RuntimeGroupTest";
     private static final long DEFAULT_TIMEOUT_TIME = 10;
-    private static final Uint128 DEFAULT_ELECTION_ID = Uint128.newBuilder().setLow(1).build();
+    private static final Uint128 DEFAULT_ELECTION_ID = Uint128.getDefaultInstance();
     private static final String P4R_IP = "127.0.0.1";
     private static final int P4R_PORT = 50010;
 
@@ -157,10 +160,13 @@
 
 
     @Before
-    public void setup() {
+    public void setup() throws URISyntaxException {
         controller = niceMock(org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl.class);
-        P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, P4R_IP, P4R_PORT, P4_DEVICE_ID);
-        client = new P4RuntimeClientImpl(clientKey, grpcChannel, controller, new MockPipeconfService());
+        P4RuntimeClientKey clientKey = new P4RuntimeClientKey(DEVICE_ID, new URI(
+                format("grpc://%s:%d?device_id=%d", P4R_IP, P4R_PORT, P4_DEVICE_ID)));
+        client = new P4RuntimeClientImpl(
+                clientKey, grpcChannel, controller, new MockPipeconfService(),
+                new MockMasterElectionIdStore());
     }
 
     @Test
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/api/GeneralProviderDeviceConfig.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/api/GeneralProviderDeviceConfig.java
deleted file mode 100644
index 0675a4b..0000000
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/api/GeneralProviderDeviceConfig.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.provider.general.device.api;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.annotations.Beta;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.config.Config;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Configuration for General device provider.
- */
-@Beta
-public class GeneralProviderDeviceConfig extends Config<DeviceId> {
-
-    private static final String DEVICEKEYID = "deviceKeyId";
-
-
-    @Override
-    public boolean isValid() {
-        return true;
-    }
-
-    /**
-     * Gets the information of all protocols associated to the device.
-     *
-     * @return map of protocol name and relative information
-     */
-    public Map<String, DeviceInfoConfig> protocolsInfo() {
-        return getProtocolInfoMap();
-    }
-
-    private Map<String, DeviceInfoConfig> getProtocolInfoMap() {
-        Map<String, DeviceInfoConfig> deviceMap = new HashMap<>();
-        node.fieldNames().forEachRemaining(name -> {
-
-            Map<String, String> configMap = new HashMap<>();
-            JsonNode protocol = node.get(name);
-            protocol.fieldNames().forEachRemaining(info -> configMap.put(info, protocol.get(info).asText()));
-
-            String deviceKeyId = "";
-            if (protocol.has(DEVICEKEYID)) {
-                deviceKeyId = protocol.get(DEVICEKEYID).asText("");
-            }
-
-            deviceMap.put(name, new DeviceInfoConfig(configMap, deviceKeyId));
-        });
-        return deviceMap;
-    }
-
-}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/DeviceTaskExecutor.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/DeviceTaskExecutor.java
new file mode 100644
index 0000000..6f90256
--- /dev/null
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/DeviceTaskExecutor.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.general.device.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Striped;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Allows submitting tasks related to a specific device. Takes care of executing
+ * pending tasks sequentially for each device in a FIFO order, while using a
+ * delegate executor. It also avoids executing duplicate tasks when arriving
+ * back-to-back.
+ *
+ * @param <T> enum describing the type of task
+ */
+
+class DeviceTaskExecutor<T extends Enum> {
+    /**
+     * Minimum interval between duplicate back-to-back tasks.
+     */
+    private static final int DUPLICATE_MIN_INTERVAL_MILLIS = 1000;
+
+    private final Logger log = getLogger(getClass());
+
+    private final ExecutorService delegate;
+    private final AtomicBoolean canceled = new AtomicBoolean(false);
+    private final Set<DeviceId> busyDevices = Sets.newConcurrentHashSet();
+    private final Set<DeviceId> pendingDevices = Sets.newConcurrentHashSet();
+    private final Striped<Lock> deviceLocks = Striped.lock(30);
+    private final LoadingCache<DeviceId, TaskQueue> taskQueues = CacheBuilder.newBuilder()
+            .expireAfterAccess(1, TimeUnit.MINUTES)
+            .removalListener((RemovalListener<DeviceId, TaskQueue>) notification -> {
+                if (!notification.getValue().isEmpty()) {
+                    log.warn("Cache evicted non-empty task queue for {} ({} pending tasks)",
+                             notification.getKey(), notification.getValue().size());
+                }
+            })
+            .build(new CacheLoader<DeviceId, TaskQueue>() {
+                @SuppressWarnings("NullableProblems")
+                @Override
+                public TaskQueue load(DeviceId deviceId) {
+                    return new TaskQueue();
+                }
+            });
+
+    /**
+     * Creates a new executor with the given delegate executor service.
+     *
+     * @param delegate executor service
+     */
+    DeviceTaskExecutor(ExecutorService delegate) {
+        checkNotNull(delegate);
+        this.delegate = delegate;
+    }
+
+    /**
+     * Submit a tasks.
+     *
+     * @param deviceId device associated with the task
+     * @param type     type of task (used to remove eventual back-to-back
+     *                 duplicates)
+     * @param runnable runnable to execute
+     */
+    void submit(DeviceId deviceId, T type, Runnable runnable) {
+        checkNotNull(deviceId);
+        checkNotNull(type);
+        checkNotNull(runnable);
+
+        if (canceled.get()) {
+            log.warn("Executor was cancelled, cannot submit task {} for {}",
+                     type, deviceId);
+            return;
+        }
+
+        final DeviceTask task = new DeviceTask(deviceId, type, runnable);
+        deviceLocks.get(deviceId).lock();
+        try {
+            if (taskQueues.get(deviceId).isBackToBackDuplicate(type)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Dropping back-to-back duplicate task {} for {}",
+                              type, deviceId);
+                }
+                return;
+            }
+            if (taskQueues.get(deviceId).offer(task)) {
+                pendingDevices.add(deviceId);
+                if (!busyDevices.contains(deviceId)) {
+                    // The task was submitted to the queue and we are not
+                    // performing any other task for this device. There is at
+                    // least one task that is ready to be executed.
+                    delegate.execute(this::performTaskIfAny);
+                }
+            } else {
+                log.warn("Unable to submit task {} for {}",
+                         task.type, task.deviceId);
+            }
+        } catch (ExecutionException e) {
+            log.warn("Exception while accessing task queue cache", e);
+        } finally {
+            deviceLocks.get(task.deviceId).unlock();
+        }
+    }
+
+    /**
+     * Prevents the executor from executing any more tasks.
+     */
+    void cancel() {
+        canceled.set(true);
+    }
+
+    private void performTaskIfAny() {
+        final DeviceTask task = pollTask();
+        if (task == null) {
+            // No tasks.
+            return;
+        }
+        if (canceled.get()) {
+            log.warn("Executor was cancelled, dropping task {} for {}",
+                     task.type, task.deviceId);
+            return;
+        }
+        if (log.isTraceEnabled()) {
+            log.trace("STARTING task {} for {}...", task.type.name(), task.deviceId);
+        }
+        try {
+            task.runnable.run();
+        } catch (DeviceTaskException e) {
+            log.error("Unable to complete task {} for {}: {}",
+                      task.type, task.deviceId, e.getMessage());
+        } catch (Throwable t) {
+            log.error(format(
+                    "Uncaught exception when executing task %s for %s",
+                    task.type, task.deviceId), t);
+        }
+        if (log.isTraceEnabled()) {
+            log.trace("COMPLETED task {} for {}", task.type.name(), task.deviceId);
+        }
+        busyDevices.remove(task.deviceId);
+        delegate.execute(this::performTaskIfAny);
+    }
+
+    private DeviceTask pollTask() {
+        for (DeviceId deviceId : pendingDevices) {
+            final DeviceTask task;
+            deviceLocks.get(deviceId).lock();
+            try {
+                if (busyDevices.contains(deviceId)) {
+                    // Next device.
+                    continue;
+                }
+                task = taskQueues.get(deviceId).poll();
+                if (task == null) {
+                    // Next device.
+                    continue;
+                }
+                if (taskQueues.get(deviceId).isEmpty()) {
+                    pendingDevices.remove(deviceId);
+                }
+                busyDevices.add(deviceId);
+                return task;
+            } catch (ExecutionException e) {
+                log.warn("Exception while accessing task queue cache", e);
+            } finally {
+                deviceLocks.get(deviceId).unlock();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Device task as stored in the task queue.
+     */
+    private class DeviceTask {
+
+        private final DeviceId deviceId;
+        private final T type;
+        private final Runnable runnable;
+
+        DeviceTask(DeviceId deviceId, T type, Runnable runnable) {
+            this.deviceId = deviceId;
+            this.type = type;
+            this.runnable = runnable;
+        }
+    }
+
+    /**
+     * A queue that keeps track of the last task added to detects back-to-back
+     * duplicates.
+     */
+    private class TaskQueue extends ConcurrentLinkedQueue<DeviceTask> {
+
+        private T lastTaskAdded;
+        private long lastAddedMillis;
+
+        @Override
+        public boolean offer(DeviceTask deviceTask) {
+            lastTaskAdded = deviceTask.type;
+            lastAddedMillis = currentTimeMillis();
+            return super.offer(deviceTask);
+        }
+
+        boolean isBackToBackDuplicate(T taskType) {
+            return lastTaskAdded != null
+                    && lastTaskAdded.equals(taskType)
+                    && (currentTimeMillis() - lastAddedMillis) <= DUPLICATE_MIN_INTERVAL_MILLIS;
+        }
+    }
+
+    /**
+     * Signals an error that prevented normal execution of the task.
+     */
+    static class DeviceTaskException extends RuntimeException {
+
+        /**
+         * Creates a new exception.
+         *
+         * @param message explanation
+         */
+        DeviceTaskException(String message) {
+            super(message);
+        }
+    }
+}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 1eaa808..481a310 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -16,18 +16,17 @@
 
 package org.onosproject.provider.general.device.impl;
 
-import com.google.common.annotations.Beta;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Striped;
 import org.onlab.packet.ChassisId;
 import org.onlab.util.ItemNotFoundException;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreService;
 import org.onosproject.gnmi.api.GnmiController;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
@@ -35,27 +34,21 @@
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.behaviour.PiPipelineProgrammable;
 import org.onosproject.net.behaviour.PortAdmin;
-import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigEvent;
 import org.onosproject.net.config.NetworkConfigListener;
 import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.config.basics.BasicDeviceConfig;
-import org.onosproject.net.config.basics.SubjectFactories;
 import org.onosproject.net.device.DefaultDeviceDescription;
 import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.device.DeviceDescription;
 import org.onosproject.net.device.DeviceDescriptionDiscovery;
-import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceHandshaker;
-import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceProvider;
 import org.onosproject.net.device.DeviceProviderRegistry;
 import org.onosproject.net.device.DeviceProviderService;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.device.PortDescription;
-import org.onosproject.net.device.PortStatistics;
-import org.onosproject.net.device.PortStatisticsDiscovery;
 import org.onosproject.net.driver.Behaviour;
 import org.onosproject.net.driver.DefaultDriverData;
 import org.onosproject.net.driver.DefaultDriverHandler;
@@ -64,14 +57,13 @@
 import org.onosproject.net.driver.DriverService;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.model.PiPipeconfId;
-import org.onosproject.net.pi.service.PiPipeconfConfig;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
-import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
+import org.onosproject.provider.general.device.impl.DeviceTaskExecutor.DeviceTaskException;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -81,50 +73,43 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
-import java.security.SecureRandom;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.StringJoiner;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Lock;
-import java.util.function.Supplier;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.lang.System.currentTimeMillis;
 import static java.util.concurrent.Executors.newFixedThreadPool;
-import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.net.device.DeviceEvent.Type;
 import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.OP_TIMEOUT_SHORT;
 import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.OP_TIMEOUT_SHORT_DEFAULT;
-import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_FREQUENCY;
-import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_FREQUENCY_DEFAULT;
-import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_FREQUENCY;
-import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_FREQUENCY_DEFAULT;
+import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_INTERVAL;
+import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_INTERVAL_DEFAULT;
+import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL;
+import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL_DEFAULT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Provider which uses drivers to detect device and do initial handshake and
- * channel establishment with devices. Any other provider specific operation is
- * also delegated to the DeviceHandshaker driver.
+ * Provider which uses drivers to discover devices, perform initial handshake,
+ * and notify the core of disconnection events. The implementation listens for
+ * events from netcfg or the drivers (via {@link DeviceAgentListener}) andP
+ * schedules task for each event.
  */
-@Beta
 @Component(immediate = true,
         property = {
-                STATS_POLL_FREQUENCY + ":Integer=" + STATS_POLL_FREQUENCY_DEFAULT,
-                PROBE_FREQUENCY + ":Integer=" + PROBE_FREQUENCY_DEFAULT,
+                PROBE_INTERVAL + ":Integer=" + PROBE_INTERVAL_DEFAULT,
+                STATS_POLL_INTERVAL + ":Integer=" + STATS_POLL_INTERVAL_DEFAULT,
                 OP_TIMEOUT_SHORT + ":Integer=" + OP_TIMEOUT_SHORT_DEFAULT,
         })
 public class GeneralDeviceProvider extends AbstractProvider
@@ -132,16 +117,12 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final String APP_NAME = "org.onosproject.gdp";
+    private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
     private static final String URI_SCHEME = "device";
-    private static final String CFG_SCHEME = "generalprovider";
     private static final String DEVICE_PROVIDER_PACKAGE =
             "org.onosproject.general.provider.device";
     private static final int CORE_POOL_SIZE = 10;
     private static final String UNKNOWN = "unknown";
-    private static final String DRIVER = "driver";
-    private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS =
-            ImmutableSet.of("p4runtime");
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     private DeviceProviderRegistry providerRegistry;
@@ -165,6 +146,9 @@
     private MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     private PiPipeconfService pipeconfService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -180,68 +164,55 @@
 
     private GnmiDeviceStateSubscriber gnmiDeviceStateSubscriber;
 
-    /**
-     * Configure poll frequency for port status and statistics; default is 10 sec.
-     */
-    private int statsPollFrequency = STATS_POLL_FREQUENCY_DEFAULT;
+    /** Configure interval for checking device availability; default is 10 sec. */
+    private int probeInterval = PROBE_INTERVAL_DEFAULT;
 
-    /**
-     * Configure probe frequency for checking device availability; default is 10 sec.
-     */
-    private int probeFrequency = PROBE_FREQUENCY_DEFAULT;
+    /** Configure poll frequency for port status and stats; default is 10 sec. */
+    private int statsPollInterval = STATS_POLL_INTERVAL_DEFAULT;
 
-    /**
-     * Configure timeout in seconds for device operations that are supposed to take a short time
-     * (e.g. checking device reachability); default is 10 seconds.
-     */
+    /** Configure timeout in seconds for device operations; default is 10 sec. */
     private int opTimeoutShort = OP_TIMEOUT_SHORT_DEFAULT;
 
-    //FIXME to be removed when netcfg will issue device events in a bundle or
-    //ensures all configuration needed is present
-    private final Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
-    private final Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
-    private final Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
-
-    private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
-    private final ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
     private final Map<DeviceId, DeviceHandshaker> handshakersWithListeners = Maps.newConcurrentMap();
-    private final InternalDeviceListener deviceListener = new InternalDeviceListener();
+    private final Map<DeviceId, Long> lastProbedAvailability = Maps.newConcurrentMap();
     private final InternalPipeconfWatchdogListener pipeconfWatchdogListener = new InternalPipeconfWatchdogListener();
     private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
     private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
-    private final ConfigFactory factory = new InternalConfigFactory();
-    private final Striped<Lock> deviceLocks = Striped.lock(30);
 
-    private ExecutorService connectionExecutor;
-    private ScheduledExecutorService statsExecutor;
+    private ExecutorService mainExecutor;
+    private DeviceTaskExecutor<TaskType> taskExecutor;
     private ScheduledExecutorService probeExecutor;
     private ScheduledFuture<?> probeTask;
+    private StatsPoller statsPoller;
     private DeviceProviderService providerService;
 
     public GeneralDeviceProvider() {
         super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
     }
 
+    protected DeviceProviderService providerService() {
+        return providerService;
+    }
+
     @Activate
     public void activate(ComponentContext context) {
-        connectionExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
-                "onos/gdp-connect", "%d", log));
-        statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
-                "onos/gdp-stats", "%d", log));
+        mainExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
+                "onos/gdp-task", "%d", log));
+        taskExecutor = new DeviceTaskExecutor<>(mainExecutor);
         probeExecutor = newSingleThreadScheduledExecutor(groupedThreads(
                 "onos/gdp-probe", "%d", log));
         providerService = providerRegistry.register(this);
         componentConfigService.registerProperties(getClass());
         coreService.registerApplication(APP_NAME);
-        cfgService.registerConfigFactory(factory);
         cfgService.addListener(cfgListener);
-        deviceService.addListener(deviceListener);
         pipeconfWatchdogService.addListener(pipeconfWatchdogListener);
-        rescheduleProbeTask(false);
-        modified(context);
-        gnmiDeviceStateSubscriber = new GnmiDeviceStateSubscriber(gnmiController,
-                deviceService, mastershipService, providerService);
+        gnmiDeviceStateSubscriber = new GnmiDeviceStateSubscriber(
+                gnmiController, deviceService, mastershipService, providerService);
         gnmiDeviceStateSubscriber.activate();
+        startOrRescheduleProbeTask();
+        statsPoller = new StatsPoller(deviceService, mastershipService, providerService);
+        statsPoller.activate(statsPollInterval);
+        modified(context);
         log.info("Started");
     }
 
@@ -252,55 +223,35 @@
         }
 
         Dictionary<?, ?> properties = context.getProperties();
-        final int oldStatsPollFrequency = statsPollFrequency;
-        statsPollFrequency = Tools.getIntegerProperty(
-                properties, STATS_POLL_FREQUENCY, STATS_POLL_FREQUENCY_DEFAULT);
+        final int oldProbeFrequency = probeInterval;
+        probeInterval = Tools.getIntegerProperty(
+                properties, PROBE_INTERVAL, PROBE_INTERVAL_DEFAULT);
         log.info("Configured. {} is configured to {} seconds",
-                 STATS_POLL_FREQUENCY, statsPollFrequency);
-        final int oldProbeFrequency = probeFrequency;
-        probeFrequency = Tools.getIntegerProperty(
-                properties, PROBE_FREQUENCY, PROBE_FREQUENCY_DEFAULT);
+                 PROBE_INTERVAL, probeInterval);
+        final int oldStatsPollFrequency = statsPollInterval;
+        statsPollInterval = Tools.getIntegerProperty(
+                properties, STATS_POLL_INTERVAL, STATS_POLL_INTERVAL_DEFAULT);
         log.info("Configured. {} is configured to {} seconds",
-                 PROBE_FREQUENCY, probeFrequency);
+                 STATS_POLL_INTERVAL, statsPollInterval);
         opTimeoutShort = Tools.getIntegerProperty(
                 properties, OP_TIMEOUT_SHORT, OP_TIMEOUT_SHORT_DEFAULT);
         log.info("Configured. {} is configured to {} seconds",
                  OP_TIMEOUT_SHORT, opTimeoutShort);
 
-        if (oldStatsPollFrequency != statsPollFrequency) {
-            rescheduleStatsPollingTasks();
+        if (oldProbeFrequency != probeInterval) {
+            startOrRescheduleProbeTask();
         }
 
-        if (oldProbeFrequency != probeFrequency) {
-            rescheduleProbeTask(true);
-        }
-    }
-
-    private void rescheduleProbeTask(boolean deelay) {
-        synchronized (this) {
-            if (probeTask != null) {
-                probeTask.cancel(false);
-            }
-            probeTask = probeExecutor.scheduleAtFixedRate(
-                    this::triggerProbeAllDevices,
-                    deelay ? probeFrequency : 0,
-                    probeFrequency,
-                    TimeUnit.SECONDS);
+        if (oldStatsPollFrequency != statsPollInterval) {
+            statsPoller.reschedule(statsPollInterval);
         }
     }
 
     @Deactivate
     public void deactivate() {
-        // Shutdown stats polling tasks.
-        statsPollingTasks.keySet().forEach(this::cancelStatsPolling);
-        statsPollingTasks.clear();
-        statsExecutor.shutdownNow();
-        try {
-            statsExecutor.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            log.warn("statsExecutor not terminated properly");
-        }
-        statsExecutor = null;
+        // Shutdown stats poller.
+        statsPoller.deactivate();
+        statsPoller = null;
         // Shutdown probe executor.
         probeTask.cancel(true);
         probeTask = null;
@@ -311,87 +262,125 @@
             log.warn("probeExecutor not terminated properly");
         }
         probeExecutor = null;
-        // Shutdown connection executor.
-        connectionExecutor.shutdownNow();
+        // Shutdown main and task executor.
+        taskExecutor.cancel();
+        taskExecutor = null;
+        mainExecutor.shutdownNow();
         try {
-            connectionExecutor.awaitTermination(5, TimeUnit.SECONDS);
+            mainExecutor.awaitTermination(5, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             log.warn("connectionExecutor not terminated properly");
         }
-        connectionExecutor = null;
+        mainExecutor = null;
         // Remove all device agent listeners
         handshakersWithListeners.values().forEach(h -> h.removeDeviceAgentListener(id()));
         handshakersWithListeners.clear();
         // Other cleanup.
+        lastProbedAvailability.clear();
         componentConfigService.unregisterProperties(getClass(), false);
         cfgService.removeListener(cfgListener);
-        deviceService.removeListener(deviceListener);
         pipeconfWatchdogService.removeListener(pipeconfWatchdogListener);
         providerRegistry.unregister(this);
         providerService = null;
-        cfgService.unregisterConfigFactory(factory);
         gnmiDeviceStateSubscriber.deactivate();
         gnmiDeviceStateSubscriber = null;
         log.info("Stopped");
     }
 
-
     @Override
     public void triggerProbe(DeviceId deviceId) {
-        connectionExecutor.execute(withDeviceLock(
-                () -> doDeviceProbe(deviceId), deviceId));
+        checkNotNull(deviceId);
+        submitTask(deviceId, TaskType.PROBE_AVAILABILITY);
     }
 
     @Override
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
-        log.info("Notifying role {} to device {}", newRole, deviceId);
-        requestedRoles.put(deviceId, newRole);
-        connectionExecutor.execute(() -> doRoleChanged(deviceId, newRole));
-    }
 
-    private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
+        final MastershipInfo mastershipInfo = mastershipService.getMastershipFor(deviceId);
+        final NodeId localNodeId = clusterService.getLocalNode().id();
+
+        if (!mastershipInfo.getRole(localNodeId).equals(newRole)) {
+            log.warn("Inconsistent mastership info for {}! Requested {}, but " +
+                             "mastership service reports {}, will apply the latter...",
+                     deviceId, newRole, mastershipInfo.getRole(localNodeId));
+            newRole = mastershipInfo.getRole(localNodeId);
+        }
+
+        // Derive preference value.
+        final int preference;
+        switch (newRole) {
+            case MASTER:
+                preference = 0;
+                break;
+            case STANDBY:
+                preference = mastershipInfo.backups().indexOf(localNodeId) + 1;
+                if (preference == 0) {
+                    // Not found in list.
+                    log.error("Unable to derive mastership preference for {}, " +
+                                      "requested role {} but local node ID was " +
+                                      "not found among list of backup nodes " +
+                                      "reported by mastership service");
+                    return;
+                }
+                break;
+            case NONE:
+                // No preference for NONE, apply as is.
+                log.info("Notifying role {} to {}", newRole, deviceId);
+                roleChanged(deviceId, newRole);
+                return;
+            default:
+                log.error("Unrecognized mastership role {}", newRole);
+                return;
+        }
+
+        log.info("Notifying role {} (preference {}) for term {} to {}",
+                 newRole, preference, mastershipInfo.term(), deviceId);
+
         final DeviceHandshaker handshaker = getBehaviour(
                 deviceId, DeviceHandshaker.class);
         if (handshaker == null) {
-            log.error("Null handshaker. Unable to notify new role {} to {}",
+            log.error("Null handshaker. Unable to notify role {} to {}",
                       newRole, deviceId);
             return;
         }
-        handshaker.roleChanged(newRole);
+
+        try {
+            handshaker.roleChanged(preference, mastershipInfo.term());
+        } catch (UnsupportedOperationException e) {
+            // Preference-based method not supported.
+            handshaker.roleChanged(newRole);
+        }
     }
 
     @Override
     public boolean isReachable(DeviceId deviceId) {
-        log.debug("Testing reachability for device {}", deviceId);
         final DeviceHandshaker handshaker = getBehaviour(
                 deviceId, DeviceHandshaker.class);
         if (handshaker == null) {
             return false;
         }
-        return getFutureWithDeadline(
-                handshaker.isReachable(), "checking reachability",
-                deviceId, false, opTimeoutShort);
+        return handshaker.isReachable();
     }
 
-    private boolean isConnected(DeviceId deviceId) {
-        log.debug("Testing connection to device {}", deviceId);
+    @Override
+    public boolean isAvailable(DeviceId deviceId) {
         final DeviceHandshaker handshaker = getBehaviour(
                 deviceId, DeviceHandshaker.class);
         if (handshaker == null) {
             return false;
         }
-        return handshaker.isConnected();
+        try {
+            // Try without probing the device...
+            return handshaker.isAvailable();
+        } catch (UnsupportedOperationException e) {
+            // Driver does not support that.
+            return probeAvailability(handshaker);
+        }
     }
 
     @Override
     public void changePortState(DeviceId deviceId, PortNumber portNumber,
                                 boolean enable) {
-        connectionExecutor.execute(
-                () -> doChangePortState(deviceId, portNumber, enable));
-    }
-
-    private void doChangePortState(DeviceId deviceId, PortNumber portNumber,
-                                   boolean enable) {
         if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
             log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
                      deviceId);
@@ -409,9 +398,450 @@
 
     @Override
     public void triggerDisconnect(DeviceId deviceId) {
-        log.debug("Triggering disconnection of device {}", deviceId);
-        connectionExecutor.execute(withDeviceLock(
-                () -> doDisconnectDevice(deviceId), deviceId));
+        checkNotNull(deviceId);
+        log.info("Triggering disconnection of device {}", deviceId);
+        submitTask(deviceId, TaskType.CONNECTION_TEARDOWN);
+    }
+
+    /**
+     * Listener for configuration events.
+     */
+    private class InternalNetworkConfigListener implements NetworkConfigListener {
+        @Override
+        public void event(NetworkConfigEvent event) {
+            DeviceId deviceId = (DeviceId) event.subject();
+            switch (event.type()) {
+                case CONFIG_ADDED:
+                    if (configIsComplete(deviceId)) {
+                        submitTask(deviceId, TaskType.CONNECTION_SETUP);
+                    }
+                    break;
+                case CONFIG_UPDATED:
+                    if (configIsComplete(deviceId) && mgmtAddrUpdated(event)) {
+                        submitTask(deviceId, TaskType.CONNECTION_UPDATE);
+                    }
+                    break;
+                case CONFIG_REMOVED:
+                    if (event.configClass().equals(BasicDeviceConfig.class)) {
+                        submitTask(deviceId, TaskType.CONNECTION_TEARDOWN);
+                    }
+                    break;
+                default:
+                    // Ignore
+                    break;
+            }
+        }
+
+        private boolean mgmtAddrUpdated(NetworkConfigEvent event) {
+            if (!event.prevConfig().isPresent() || !event.config().isPresent()) {
+                return false;
+            }
+            final BasicDeviceConfig prev = (BasicDeviceConfig) event.prevConfig().get();
+            final BasicDeviceConfig current = (BasicDeviceConfig) event.config().get();
+            return !Objects.equals(prev.managementAddress(), current.managementAddress());
+        }
+
+        @Override
+        public boolean isRelevant(NetworkConfigEvent event) {
+            return event.configClass().equals(BasicDeviceConfig.class) &&
+                    (event.subject() instanceof DeviceId) &&
+                    myScheme((DeviceId) event.subject());
+        }
+    }
+
+    /**
+     * Listener for device agent events.
+     */
+    private class InternalDeviceAgentListener implements DeviceAgentListener {
+        @Override
+        public void event(DeviceAgentEvent event) {
+            DeviceId deviceId = event.subject();
+            switch (event.type()) {
+                case CHANNEL_OPEN:
+                    submitTask(deviceId, TaskType.CHANNEL_OPEN);
+                    break;
+                case CHANNEL_CLOSED:
+                case CHANNEL_ERROR:
+                    submitTask(deviceId, TaskType.CHANNEL_CLOSED);
+                    break;
+                case ROLE_MASTER:
+                    submitTask(deviceId, TaskType.ROLE_MASTER);
+                    break;
+                case ROLE_STANDBY:
+                    submitTask(deviceId, TaskType.ROLE_STANDBY);
+                    break;
+                case ROLE_NONE:
+                    submitTask(deviceId, TaskType.ROLE_NONE);
+                    break;
+                case NOT_MASTER:
+                    submitTask(deviceId, TaskType.NOT_MASTER);
+                    break;
+                default:
+                    log.warn("Unrecognized device agent event {}", event.type());
+            }
+        }
+    }
+
+    /**
+     * Pipeline event listener.
+     */
+    private class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
+        @Override
+        public void event(PiPipeconfWatchdogEvent event) {
+            final DeviceId deviceId = event.subject();
+            switch (event.type()) {
+                case PIPELINE_READY:
+                    submitTask(deviceId, TaskType.PIPELINE_READY);
+                    break;
+                case PIPELINE_UNKNOWN:
+                    submitTask(deviceId, TaskType.PIPELINE_NOT_READY);
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        @Override
+        public boolean isRelevant(PiPipeconfWatchdogEvent event) {
+            return myScheme(event.subject());
+        }
+    }
+
+    private void startOrRescheduleProbeTask() {
+        synchronized (this) {
+            if (probeTask != null) {
+                probeTask.cancel(false);
+            }
+            probeTask = probeExecutor.scheduleAtFixedRate(
+                    this::submitProbeTasks,
+                    0,
+                    probeInterval,
+                    TimeUnit.SECONDS);
+        }
+    }
+
+    private void submitProbeTasks() {
+        // Async trigger a task for all devices in the cfg.
+        log.debug("Starting probing for all devices");
+        cfgService.getSubjects(DeviceId.class).stream()
+                .filter(GeneralDeviceProvider::myScheme)
+                .forEach(this::submitProbeTask);
+    }
+
+    private void submitProbeTask(DeviceId deviceId) {
+        final DeviceHandshaker handshaker = handshakersWithListeners.get(deviceId);
+
+        if (handshaker == null) {
+            if (configIsComplete(deviceId)) {
+                // Device in config but we have not initiated a connection.
+                // Perhaps we missed the config event?
+                submitTask(deviceId, TaskType.CONNECTION_SETUP);
+            }
+            return;
+        }
+
+        if (!handshaker.isConnected()) {
+            // Device is in the core, but driver reports there is NOT a
+            // connection to it. Perhaps the netcfg changed and we didn't
+            // pick the event?
+            log.warn("Re-establishing lost connection to {}", deviceId);
+            submitTask(deviceId, TaskType.CONNECTION_TEARDOWN);
+            submitTask(deviceId, TaskType.CONNECTION_SETUP);
+            return;
+        }
+
+        // On probing offline devices, while we expect them to signal
+        // availability via CHANNEL_OPEN or similar events, periodic probing
+        // might be needed to stimulate some channel activity. We might consider
+        // requiring active probing of closed channels in the protocol layer.
+
+        final Long lastProbe = lastProbedAvailability.get(deviceId);
+        if (lastProbe != null &&
+                (currentTimeMillis() - lastProbe) < (probeInterval * 1000 / 3)) {
+            // This avoids overload of probe tasks which might involve sending
+            // messages over the network. We require a minimum interval of 1/3
+            // of the configured probeInterval between consecutive probe tasks.
+            if (log.isDebugEnabled()) {
+                log.debug("Dropping probe task for {} as it happened recently",
+                          deviceId);
+            }
+            return;
+        }
+
+        submitTask(deviceId, TaskType.PROBE_AVAILABILITY);
+    }
+
+    /**
+     * Type of tasks performed by this provider.
+     */
+    enum TaskType {
+        CONNECTION_SETUP,
+        CONNECTION_UPDATE,
+        CONNECTION_TEARDOWN,
+        PIPELINE_READY,
+        CHANNEL_OPEN,
+        CHANNEL_CLOSED,
+        PIPELINE_NOT_READY,
+        PROBE_AVAILABILITY,
+        ROLE_MASTER,
+        ROLE_NONE,
+        ROLE_STANDBY,
+        NOT_MASTER,
+    }
+
+    private void submitTask(DeviceId deviceId, TaskType taskType) {
+        taskExecutor.submit(deviceId, taskType, taskRunnable(deviceId, taskType));
+    }
+
+    private Runnable taskRunnable(DeviceId deviceId, TaskType taskType) {
+        switch (taskType) {
+            case CONNECTION_SETUP:
+                return () -> handleConnectionSetup(deviceId);
+            case CONNECTION_UPDATE:
+                return () -> handleConnectionUpdate(deviceId);
+            case CONNECTION_TEARDOWN:
+                return () -> handleConnectionTeardown(deviceId);
+            case CHANNEL_OPEN:
+                return () -> handleProbeAvailability(deviceId);
+            case CHANNEL_CLOSED:
+                return () -> markOfflineIfNeeded(deviceId);
+            case PIPELINE_NOT_READY:
+                return () -> markOfflineIfNeeded(deviceId);
+            case PIPELINE_READY:
+                return () -> handleProbeAvailability(deviceId);
+            case PROBE_AVAILABILITY:
+                return () -> handleProbeAvailability(deviceId);
+            case ROLE_MASTER:
+                return () -> handleMastershipResponse(deviceId, MastershipRole.MASTER);
+            case ROLE_STANDBY:
+                return () -> handleMastershipResponse(deviceId, MastershipRole.STANDBY);
+            case ROLE_NONE:
+                return () -> handleMastershipResponse(deviceId, MastershipRole.NONE);
+            case NOT_MASTER:
+                return () -> handleNotMaster(deviceId);
+            default:
+                throw new IllegalArgumentException("Unrecognized task type " + taskType);
+        }
+    }
+
+    private void handleConnectionSetup(DeviceId deviceId) {
+        assertConfig(deviceId);
+        // Bind pipeconf (if any and if device is capable).
+        bindPipeconfIfRequired(deviceId);
+        // Get handshaker.
+        final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
+        if (handshaker.isConnected() || handshakersWithListeners.containsKey(deviceId)) {
+            throw new DeviceTaskException("connection already exists");
+        }
+        // Add device agent listener.
+        handshaker.addDeviceAgentListener(id(), deviceAgentListener);
+        handshakersWithListeners.put(deviceId, handshaker);
+        // Start connection via handshaker.
+        final Boolean connectSuccess = getFutureWithDeadline(
+                handshaker.connect(), "initiating connection",
+                deviceId, false, opTimeoutShort);
+        if (!connectSuccess) {
+            // Failed! Remove listeners.
+            handshaker.removeDeviceAgentListener(id());
+            handshakersWithListeners.remove(deviceId);
+            throw new DeviceTaskException("connection failed");
+        }
+        createOrUpdateDevice(deviceId, false);
+        final List<PortDescription> ports = getPortDetails(deviceId);
+        providerService.updatePorts(deviceId, ports);
+        // From here we expect a CHANNEL_OPEN event to update availability.
+    }
+
+    private void handleConnectionUpdate(DeviceId deviceId) {
+        assertConfig(deviceId);
+        final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
+        if (!handshaker.isConnected()) {
+            // If driver reports that a connection still exists, perhaps the
+            // part of the netcfg that changed does not affect the connection.
+            // Otherwise, remove any previous connection state from the old
+            // netcfg and create a new one.
+            log.warn("Detected change of connection endpoints for {}, will " +
+                             "tear down existing connection and set up a new one...",
+                     deviceId);
+            handleConnectionTeardown(deviceId);
+            handleConnectionSetup(deviceId);
+        }
+    }
+
+    private void createOrUpdateDevice(DeviceId deviceId, boolean available) {
+        if (deviceService.getDevice(deviceId) != null
+                && deviceService.isAvailable(deviceId) == available) {
+            // Other nodes might have advertised this device before us.
+            return;
+        }
+        assertConfig(deviceId);
+        providerService.deviceConnected(deviceId, getDeviceDescription(
+                deviceId, available));
+    }
+
+    private boolean probeAvailability(DeviceHandshaker handshaker) {
+        lastProbedAvailability.put(handshaker.data().deviceId(), currentTimeMillis());
+        return getFutureWithDeadline(
+                handshaker.probeAvailability(), "probing availability",
+                handshaker.data().deviceId(), false, opTimeoutShort);
+    }
+
+    private boolean probeReachability(DeviceHandshaker handshaker) {
+        lastProbedAvailability.put(handshaker.data().deviceId(), currentTimeMillis());
+        return getFutureWithDeadline(
+                handshaker.probeReachability(), "probing reachability",
+                handshaker.data().deviceId(), false, opTimeoutShort);
+    }
+
+    private void markOfflineIfNeeded(DeviceId deviceId) {
+        assertDeviceRegistered(deviceId);
+        if (deviceService.isAvailable(deviceId)) {
+            providerService.deviceDisconnected(deviceId);
+        }
+    }
+
+    private void handleProbeAvailability(DeviceId deviceId) {
+        assertDeviceRegistered(deviceId);
+
+        // Make device has a valid mastership role.
+        final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
+        final MastershipRole deviceRole = handshaker.getRole();
+        final MastershipRole expectedRole = mastershipService.getLocalRole(deviceId);
+        if (expectedRole == MastershipRole.NONE || expectedRole != deviceRole) {
+            // Device does NOT have a valid role...
+            if (!handshaker.isReachable() && !probeReachability(handshaker)) {
+                // ...but is not reachable. There isn't much we can do.
+                markOfflineIfNeeded(deviceId);
+                return;
+            }
+            // ...and is reachable, re-assert role.
+            roleChanged(deviceId, expectedRole == MastershipRole.NONE
+                    ? mastershipService.requestRoleForSync(deviceId)
+                    : expectedRole);
+            try {
+                // Wait for role to be notified and reachability state to be
+                // updated. This should be roughly equivalent to one RTT.
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+
+        // Check and update availability.
+        if (probeAvailability(handshakerOrFail(deviceId))) {
+            // Device ready to do its job.
+            createOrUpdateDevice(deviceId, true);
+        } else {
+            markOfflineIfNeeded(deviceId);
+            if (handshaker.isReachable() && isPipelineProgrammable(deviceId)) {
+                // If reachable, but not available, and pipeline programmable, there
+                // is a high chance it's because the pipeline is not READY
+                // (independently from what the pipeconf watchdog reports, as the
+                // status there might be outdated). Encourage pipeconf watchdog to
+                // perform a pipeline probe ASAP.
+                pipeconfWatchdogService.triggerProbe(deviceId);
+            }
+        }
+    }
+
+    private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
+        assertDeviceRegistered(deviceId);
+        log.debug("Device {} asserted role {}", deviceId, response);
+        providerService.receivedRoleReply(deviceId, response);
+    }
+
+    private void handleNotMaster(DeviceId deviceId) {
+        assertDeviceRegistered(deviceId);
+        if (mastershipService.isLocalMaster(deviceId)) {
+            log.warn("Device {} notified that this node is not master, " +
+                             "relinquishing mastership...", deviceId);
+            mastershipService.relinquishMastership(deviceId);
+        }
+    }
+
+    private void assertDeviceRegistered(DeviceId deviceId) {
+        if (deviceService.getDevice(deviceId) == null) {
+            throw new DeviceTaskException("device not registered in the core");
+        }
+    }
+
+    private void handleConnectionTeardown(DeviceId deviceId) {
+        if (deviceService.getDevice(deviceId) != null
+                && deviceService.isAvailable(deviceId)) {
+            providerService.deviceDisconnected(deviceId);
+        }
+        final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
+        handshaker.removeDeviceAgentListener(id());
+        handshakersWithListeners.remove(deviceId);
+        handshaker.disconnect();
+        lastProbedAvailability.remove(deviceId);
+    }
+
+    private void bindPipeconfIfRequired(DeviceId deviceId) {
+        if (pipeconfService.getPipeconf(deviceId).isPresent()
+                || !isPipelineProgrammable(deviceId)) {
+            // Nothing to do.
+            // Device has already a pipeconf or is not programmable.
+            return;
+        }
+        // Get pipeconf from netcfg or driver (default one).
+        final PiPipelineProgrammable pipelineProg = getBehaviour(
+                deviceId, PiPipelineProgrammable.class);
+        final PiPipeconfId pipeconfId = getPipeconfId(deviceId, pipelineProg);
+        if (pipeconfId == null) {
+            throw new DeviceTaskException("unable to find pipeconf");
+        }
+        // Store binding in pipeconf service.
+        pipeconfService.bindToDevice(pipeconfId, deviceId);
+    }
+
+    private PiPipeconfId getPipeconfId(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
+        // Places to look for a pipeconf ID (in priority order)):
+        // 1) netcfg
+        // 2) device driver (default one)
+        final PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
+        if (pipeconfId != null && !pipeconfId.id().isEmpty()) {
+            return pipeconfId;
+        }
+        if (pipelineProg != null
+                && pipelineProg.getDefaultPipeconf().isPresent()) {
+            final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
+            log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
+            return defaultPipeconf.id();
+        }
+        return null;
+    }
+
+    private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
+        BasicDeviceConfig config = cfgService.getConfig(
+                deviceId, BasicDeviceConfig.class);
+        if (config == null) {
+            return null;
+        }
+        return config.pipeconf() != null
+                ? new PiPipeconfId(config.pipeconf()) : null;
+    }
+
+    private DeviceHandshaker handshakerOrFail(DeviceId deviceId) {
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
+        if (handshaker == null) {
+            throw new DeviceTaskException("missing handshaker behavior");
+        }
+        return handshaker;
+    }
+
+    private boolean configIsComplete(DeviceId deviceId) {
+        final BasicDeviceConfig basicDeviceCfg = cfgService.getConfig(
+                deviceId, BasicDeviceConfig.class);
+        return basicDeviceCfg != null && !isNullOrEmpty(basicDeviceCfg.driver());
+    }
+
+    private void assertConfig(DeviceId deviceId) {
+        if (!configIsComplete(deviceId)) {
+            throw new DeviceTaskException("configuration is not complete");
+        }
     }
 
     private Driver getDriver(DeviceId deviceId) {
@@ -425,8 +855,6 @@
     }
 
     private <T extends Behaviour> T getBehaviour(DeviceId deviceId, Class<T> type) {
-        // Get handshaker.
-
         Driver driver = getDriver(deviceId);
         if (driver == null) {
             return null;
@@ -435,83 +863,16 @@
             return null;
         }
         final DriverData data = new DefaultDriverData(driver, deviceId);
-        // Storing deviceKeyId and all other config values as data in the driver
-        // with protocol_<info> name as the key. e.g protocol_ip.
-        final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
-                deviceId, GeneralProviderDeviceConfig.class);
-        if (providerConfig != null) {
-            providerConfig.protocolsInfo().forEach((protocol, info) -> {
-                info.configValues().forEach(
-                        (k, v) -> data.set(protocol + "_" + k, v));
-                data.set(protocol + "_key", info.deviceKeyId());
-            });
-        }
         final DefaultDriverHandler handler = new DefaultDriverHandler(data);
         return driver.createBehaviour(handler, type);
     }
 
-    private void doConnectDevice(DeviceId deviceId) {
-        log.debug("Initiating connection to device {}...", deviceId);
-        // Retrieve config
-        if (configIsMissing(deviceId)) {
-            return;
+    private boolean hasBehaviour(DeviceId deviceId, Class<? extends Behaviour> type) {
+        Driver driver = getDriver(deviceId);
+        if (driver == null) {
+            return false;
         }
-        // Bind pipeconf (if any and if device is capable).
-        if (!bindPipeconfIfRequired(deviceId)) {
-            // We already logged the error.
-            return;
-        }
-        // Get handshaker.
-        final DeviceHandshaker handshaker = getBehaviour(
-                deviceId, DeviceHandshaker.class);
-        if (handshaker == null) {
-            log.error("Missing handshaker behavior for {}, aborting connection",
-                      deviceId);
-            return;
-        }
-        // Add device agent listener.
-        handshaker.addDeviceAgentListener(id(), deviceAgentListener);
-        handshakersWithListeners.put(deviceId, handshaker);
-        // Start connection via handshaker.
-        final Boolean connectSuccess = getFutureWithDeadline(
-                handshaker.connect(), "initiating connection",
-                deviceId, false, opTimeoutShort);
-        if (!connectSuccess) {
-            log.warn("Unable to connect to {}", deviceId);
-        }
-    }
-
-    private void triggerAdvertiseDevice(DeviceId deviceId) {
-        connectionExecutor.execute(withDeviceLock(
-                () -> doAdvertiseDevice(deviceId), deviceId));
-    }
-
-    private void doAdvertiseDevice(DeviceId deviceId) {
-        // Retrieve config
-        if (configIsMissing(deviceId)) {
-            return;
-        }
-        // Obtain device and port description.
-        final boolean isPipelineReady = isPipelineReady(deviceId);
-        final DeviceDescription description = getDeviceDescription(
-                deviceId, isPipelineReady);
-        final List<PortDescription> ports = getPortDetails(deviceId);
-        // Advertise to core.
-        if (deviceService.getDevice(deviceId) == null ||
-                (description.isDefaultAvailable() &&
-                        !deviceService.isAvailable(deviceId))) {
-            if (!isPipelineReady) {
-                log.info("Advertising device {} to core with available={} as " +
-                                 "device pipeline is not ready yet",
-                         deviceId, description.isDefaultAvailable());
-            }
-            providerService.deviceConnected(deviceId, description);
-        }
-        providerService.updatePorts(deviceId, ports);
-        // If pipeline is not ready, encourage watchdog to perform probe ASAP.
-        if (!isPipelineReady) {
-            pipeconfWatchdogService.triggerProbe(deviceId);
-        }
+        return driver.hasBehaviour(type);
     }
 
     private DeviceDescription getDeviceDescription(
@@ -548,18 +909,6 @@
                 deviceId, DeviceHandshaker.class);
         final Driver driver = handshaker != null
                 ? handshaker.handler().driver() : null;
-        final GeneralProviderDeviceConfig cfg = cfgService.getConfig(
-                deviceId, GeneralProviderDeviceConfig.class);
-        final DefaultAnnotations.Builder annBuilder = DefaultAnnotations.builder();
-        // If device is pipeline programmable, let this provider decide when the
-        // device can be marked online.
-        annBuilder.set(AnnotationKeys.PROVIDER_MARK_ONLINE,
-                       String.valueOf(isPipelineProgrammable(deviceId)));
-        if (cfg != null) {
-            StringJoiner protoStringBuilder = new StringJoiner(", ");
-            cfg.protocolsInfo().keySet().forEach(protoStringBuilder::add);
-            annBuilder.set(AnnotationKeys.PROTOCOL, protoStringBuilder.toString());
-        }
         return new DefaultDeviceDescription(
                 deviceId.uri(),
                 Device.Type.SWITCH,
@@ -569,351 +918,15 @@
                 UNKNOWN,
                 new ChassisId(),
                 defaultAvailable,
-                annBuilder.build());
+                DefaultAnnotations.EMPTY);
     }
 
-    private void triggerMarkAvailable(DeviceId deviceId) {
-        connectionExecutor.execute(withDeviceLock(
-                () -> doMarkAvailable(deviceId), deviceId));
-    }
-
-    private void doMarkAvailable(DeviceId deviceId) {
-        if (deviceService.isAvailable(deviceId)) {
-            return;
-        }
-        final DeviceDescription descr = getDeviceDescription(deviceId, true);
-        // It has been observed that devices that were marked offline (e.g.
-        // after device disconnection) might end up with no master. Here we
-        // trigger a new master election (if device has no master).
-        mastershipService.requestRoleForSync(deviceId);
-        providerService.deviceConnected(deviceId, descr);
-    }
-
-    private boolean bindPipeconfIfRequired(DeviceId deviceId) {
-        if (pipeconfService.ofDevice(deviceId).isPresent()
-                || !isPipelineProgrammable(deviceId)) {
-            // Nothing to do, all good.
-            return true;
-        }
-        // Get pipeconf from netcfg or driver (default one).
-        final PiPipelineProgrammable pipelineProg = getBehaviour(
-                deviceId, PiPipelineProgrammable.class);
-        final PiPipeconfId pipeconfId = getPipeconfId(deviceId, pipelineProg);
-        if (pipeconfId == null) {
-            return false;
-        }
-        // Store binding in pipeconf service.
-        pipeconfService.bindToDevice(pipeconfId, deviceId);
-        return true;
-    }
-
-    private PiPipeconfId getPipeconfId(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
-        // Places to look for a pipeconf ID (in priority order)):
-        // 1) netcfg
-        // 2) device driver (default one)
-        final PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
-        if (pipeconfId != null && !pipeconfId.id().isEmpty()) {
-            return pipeconfId;
-        }
-        if (pipelineProg != null
-                && pipelineProg.getDefaultPipeconf().isPresent()) {
-            final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
-            log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
-            return defaultPipeconf.id();
-        } else {
-            log.warn("Unable to associate a pipeconf to {}", deviceId);
-            return null;
-        }
-    }
-
-    private void doDisconnectDevice(DeviceId deviceId) {
-        log.debug("Initiating disconnection from {}...", deviceId);
-        final DeviceHandshaker handshaker = getBehaviour(
-                deviceId, DeviceHandshaker.class);
-        final boolean isAvailable = deviceService.isAvailable(deviceId);
-        // Signal disconnection to core (if master).
-        if (isAvailable && mastershipService.isLocalMaster(deviceId)) {
-            providerService.deviceDisconnected(deviceId);
-        }
-        // Cancel tasks.
-        cancelStatsPolling(deviceId);
-        // Disconnect device.
-        if (handshaker == null) {
-            if (isAvailable) {
-                // If not available don't bother logging. We are probably
-                // invoking this method multiple times for the same device.
-                log.warn("Missing DeviceHandshaker behavior for {}, " +
-                                 "no guarantees of complete disconnection",
-                         deviceId);
-            }
-            return;
-        }
-        handshaker.removeDeviceAgentListener(id());
-        handshakersWithListeners.remove(deviceId);
-        final boolean disconnectSuccess = getFutureWithDeadline(
-                handshaker.disconnect(), "performing disconnection",
-                deviceId, false, opTimeoutShort);
-        if (!disconnectSuccess) {
-            log.warn("Unable to disconnect from {}", deviceId);
-        }
-    }
-
-    // Needed to catch the exception in the executors since are not rethrown otherwise.
-    private Runnable exceptionSafe(Runnable runnable) {
-        return () -> {
-            try {
-                runnable.run();
-            } catch (Exception e) {
-                log.error("Unhandled Exception", e);
-            }
-        };
-    }
-
-    private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
-        final Lock lock = deviceLocks.get(deviceId);
-        lock.lock();
-        try {
-            return task.get();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private Runnable withDeviceLock(Runnable task, DeviceId deviceId) {
-        // Wrapper of withDeviceLock(Supplier, ...) for void tasks.
-        return () -> withDeviceLock(() -> {
-            task.run();
-            return null;
-        }, deviceId);
-    }
-
-    private void updatePortStatistics(DeviceId deviceId) {
-        Device device = deviceService.getDevice(deviceId);
-        if (device != null && deviceService.isAvailable(deviceId) &&
-                device.is(PortStatisticsDiscovery.class)) {
-            Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
-                    .discoverPortStatistics();
-            //updating statistcs only if not empty
-            if (!statistics.isEmpty()) {
-                providerService.updatePortStatistics(deviceId, statistics);
-            }
-        } else {
-            log.debug("Can't update port statistics for device {}", deviceId);
-        }
-    }
-
-    private boolean notMyScheme(DeviceId deviceId) {
-        return !deviceId.uri().getScheme().equals(URI_SCHEME);
-    }
-
-    private void triggerConnect(DeviceId deviceId) {
-        connectionExecutor.execute(withDeviceLock(
-                () -> doConnectDevice(deviceId), deviceId));
+    static boolean myScheme(DeviceId deviceId) {
+        return deviceId.uri().getScheme().equals(URI_SCHEME);
     }
 
     private boolean isPipelineProgrammable(DeviceId deviceId) {
-        final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
-                deviceId, GeneralProviderDeviceConfig.class);
-        if (providerConfig == null) {
-            return false;
-        }
-        return !Collections.disjoint(
-                ImmutableSet.copyOf(providerConfig.node().fieldNames()),
-                PIPELINE_CONFIGURABLE_PROTOCOLS);
-    }
-
-    /**
-     * Listener for configuration events.
-     */
-    private class InternalNetworkConfigListener implements NetworkConfigListener {
-
-        @Override
-        public void event(NetworkConfigEvent event) {
-            connectionExecutor.execute(() -> consumeConfigEvent(event));
-        }
-
-        @Override
-        public boolean isRelevant(NetworkConfigEvent event) {
-            return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
-                    event.configClass().equals(BasicDeviceConfig.class) ||
-                    event.configClass().equals(PiPipeconfConfig.class)) &&
-                    (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
-                            event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
-        }
-
-        private void consumeConfigEvent(NetworkConfigEvent event) {
-            DeviceId deviceId = (DeviceId) event.subject();
-            //Assuming that the deviceId comes with uri 'device:'
-            if (notMyScheme(deviceId)) {
-                // not under my scheme, skipping
-                log.debug("{} is not my scheme, skipping", deviceId);
-                return;
-            }
-            final boolean configComplete = withDeviceLock(
-                    () -> isDeviceConfigComplete(event, deviceId), deviceId);
-            if (!configComplete) {
-                // Still waiting for some configuration.
-                return;
-            }
-            // Good to go.
-            triggerConnect(deviceId);
-            cleanUpConfigInfo(deviceId);
-        }
-
-        private boolean isDeviceConfigComplete(NetworkConfigEvent event, DeviceId deviceId) {
-            // FIXME to be removed when netcfg will issue device events in a bundle or
-            // ensure all configuration needed is present
-            if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
-                //FIXME we currently assume that p4runtime devices are pipeline configurable.
-                //If we want to connect a p4runtime device with no pipeline
-                if (event.config().isPresent()) {
-                    deviceConfigured.add(deviceId);
-                    final boolean isNotPipelineConfigurable = Collections.disjoint(
-                            ImmutableSet.copyOf(event.config().get().node().fieldNames()),
-                            PIPELINE_CONFIGURABLE_PROTOCOLS);
-                    if (isNotPipelineConfigurable) {
-                        // Skip waiting for a pipeline if we can't support it.
-                        pipelineConfigured.add(deviceId);
-                    }
-                }
-            } else if (event.configClass().equals(BasicDeviceConfig.class)) {
-                if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
-                    driverConfigured.add(deviceId);
-                }
-            } else if (event.configClass().equals(PiPipeconfConfig.class)) {
-                if (event.config().isPresent()
-                        && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
-                    pipelineConfigured.add(deviceId);
-                }
-            }
-
-            if (deviceConfigured.contains(deviceId)
-                    && driverConfigured.contains(deviceId)
-                    && pipelineConfigured.contains(deviceId)) {
-                return true;
-            } else {
-                if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
-                    log.debug("Waiting for pipeline configuration for device {}", deviceId);
-                } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
-                    log.debug("Waiting for device configuration for device {}", deviceId);
-                } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
-                    log.debug("Waiting for driver configuration for device {}", deviceId);
-                } else if (driverConfigured.contains(deviceId)) {
-                    log.debug("Only driver configuration for device {}", deviceId);
-                } else if (deviceConfigured.contains(deviceId)) {
-                    log.debug("Only device configuration for device {}", deviceId);
-                }
-            }
-            return false;
-        }
-    }
-
-    private boolean isPipelineReady(DeviceId deviceId) {
-        final boolean isPipelineProg = isPipelineProgrammable(deviceId);
-        final boolean isPipeconfReady = pipeconfWatchdogService
-                .getStatus(deviceId)
-                .equals(PiPipeconfWatchdogService.PipelineStatus.READY);
-        return !isPipelineProg || isPipeconfReady;
-    }
-
-    private void cleanUpConfigInfo(DeviceId deviceId) {
-        deviceConfigured.remove(deviceId);
-        driverConfigured.remove(deviceId);
-        pipelineConfigured.remove(deviceId);
-    }
-
-    private void startStatsPolling(DeviceId deviceId, boolean withRandomDelay) {
-        statsPollingTasks.compute(deviceId, (did, oldTask) -> {
-            if (oldTask != null) {
-                oldTask.cancel(false);
-            }
-            final int delay = withRandomDelay
-                    ? new SecureRandom().nextInt(10) : 0;
-            return statsExecutor.scheduleAtFixedRate(
-                    exceptionSafe(() -> updatePortStatistics(deviceId)),
-                    delay, statsPollFrequency, TimeUnit.SECONDS);
-        });
-    }
-
-    private void cancelStatsPolling(DeviceId deviceId) {
-        statsPollingTasks.computeIfPresent(deviceId, (did, task) -> {
-            task.cancel(false);
-            return null;
-        });
-    }
-
-    private void rescheduleStatsPollingTasks() {
-        statsPollingTasks.keySet().forEach(deviceId -> {
-            // startStatsPolling cancels old one if present.
-            startStatsPolling(deviceId, true);
-        });
-    }
-
-    private void triggerProbeAllDevices() {
-        // Async trigger a task for all devices in the cfg.
-        cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
-                .forEach(this::triggerDeviceProbe);
-    }
-
-    private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
-        PiPipeconfConfig config = cfgService.getConfig(
-                deviceId, PiPipeconfConfig.class);
-        if (config == null) {
-            return null;
-        }
-        return config.piPipeconfId();
-    }
-
-    private void triggerDeviceProbe(DeviceId deviceId) {
-        connectionExecutor.execute(withDeviceLock(
-                () -> doDeviceProbe(deviceId), deviceId));
-    }
-
-    private void doDeviceProbe(DeviceId deviceId) {
-        log.debug("Probing device {}...", deviceId);
-        if (configIsMissing(deviceId)) {
-            return;
-        }
-        if (!isConnected(deviceId)) {
-            if (deviceService.isAvailable(deviceId)) {
-                providerService.deviceDisconnected(deviceId);
-            }
-            triggerConnect(deviceId);
-        }
-    }
-
-    private boolean configIsMissing(DeviceId deviceId) {
-        final boolean present =
-                cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
-                        && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
-        if (!present) {
-            log.warn("Configuration for device {} is not complete", deviceId);
-        }
-        return !present;
-    }
-
-    private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
-        // Notify core about mastership response.
-        final MastershipRole request = requestedRoles.get(deviceId);
-        final boolean isAvailable = deviceService.isAvailable(deviceId);
-        if (request == null || !isAvailable) {
-            return;
-        }
-        log.debug("Device {} asserted role {} (requested was {})",
-                  deviceId, response, request);
-        providerService.receivedRoleReply(deviceId, request, response);
-        // FIXME: this should be based on assigned mastership, not what returned by device
-        if (response.equals(MastershipRole.MASTER)) {
-            startStatsPolling(deviceId, false);
-        } else {
-            cancelStatsPolling(deviceId);
-        }
-    }
-
-    private void handleNotMaster(DeviceId deviceId) {
-        log.warn("Device {} notified that this node is not master, " +
-                         "relinquishing mastership...", deviceId);
-        mastershipService.relinquishMastership(deviceId);
+        return hasBehaviour(deviceId, PiPipelineProgrammable.class);
     }
 
     private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
@@ -930,86 +943,4 @@
         }
         return defaultValue;
     }
-
-    /**
-     * Listener for core device events.
-     */
-    private class InternalDeviceListener implements DeviceListener {
-        @Override
-        public void event(DeviceEvent event) {
-            DeviceId deviceId = event.subject().id();
-            // For now this is scheduled periodically, when streaming API will
-            // be available we check and base it on the streaming API (e.g. gNMI)
-            if (mastershipService.isLocalMaster(deviceId)) {
-                startStatsPolling(deviceId, true);
-            }
-        }
-
-        @Override
-        public boolean isRelevant(DeviceEvent event) {
-            return event.type() == Type.DEVICE_ADDED &&
-                    event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
-        }
-    }
-
-    /**
-     * Listener for device agent events.
-     */
-    private class InternalDeviceAgentListener implements DeviceAgentListener {
-
-        @Override
-        public void event(DeviceAgentEvent event) {
-            DeviceId deviceId = event.subject();
-            switch (event.type()) {
-                case CHANNEL_OPEN:
-                    triggerAdvertiseDevice(deviceId);
-                    break;
-                case CHANNEL_CLOSED:
-                case CHANNEL_ERROR:
-                    triggerDeviceProbe(deviceId);
-                    break;
-                case ROLE_MASTER:
-                    handleMastershipResponse(deviceId, MastershipRole.MASTER);
-                    break;
-                case ROLE_STANDBY:
-                    handleMastershipResponse(deviceId, MastershipRole.STANDBY);
-                    break;
-                case ROLE_NONE:
-                    handleMastershipResponse(deviceId, MastershipRole.NONE);
-                    break;
-                case NOT_MASTER:
-                    handleNotMaster(deviceId);
-                    break;
-                default:
-                    log.warn("Unrecognized device agent event {}", event.type());
-            }
-        }
-
-    }
-
-    private class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
-        @Override
-        public void event(PiPipeconfWatchdogEvent event) {
-            triggerMarkAvailable(event.subject());
-        }
-
-        @Override
-        public boolean isRelevant(PiPipeconfWatchdogEvent event) {
-            return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_READY);
-        }
-    }
-
-    private class InternalConfigFactory
-            extends ConfigFactory<DeviceId, GeneralProviderDeviceConfig> {
-
-        InternalConfigFactory() {
-            super(SubjectFactories.DEVICE_SUBJECT_FACTORY,
-                  GeneralProviderDeviceConfig.class, CFG_SCHEME);
-        }
-
-        @Override
-        public GeneralProviderDeviceConfig createConfig() {
-            return new GeneralProviderDeviceConfig();
-        }
-    }
 }
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java
index 84419f4..358ce5f 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java
@@ -23,11 +23,11 @@
 
     private OsgiPropertyConstants() {}
 
-    public static final String STATS_POLL_FREQUENCY = "deviceStatsPollFrequency";
-    public static final int STATS_POLL_FREQUENCY_DEFAULT = 10;
+    public static final String STATS_POLL_INTERVAL = "deviceStatsPollInterval";
+    public static final int STATS_POLL_INTERVAL_DEFAULT = 10;
 
-    public static final String PROBE_FREQUENCY = "deviceProbeFrequency";
-    public static final int PROBE_FREQUENCY_DEFAULT = 10;
+    public static final String PROBE_INTERVAL = "deviceProbeInterval";
+    public static final int PROBE_INTERVAL_DEFAULT = 10;
 
     public static final String OP_TIMEOUT_SHORT = "deviceOperationTimeoutShort";
     public static final int OP_TIMEOUT_SHORT_DEFAULT = 10;
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java
new file mode 100644
index 0000000..1426946
--- /dev/null
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.general.device.impl;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.device.PortStatisticsDiscovery;
+import org.slf4j.Logger;
+
+import java.security.SecureRandom;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.provider.general.device.impl.GeneralDeviceProvider.myScheme;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Component devoted to polling stats from devices managed by the
+ * GeneralDeviceProvider.
+ */
+public class StatsPoller {
+
+    private static final int CORE_POOL_SIZE = 5;
+
+    private final Logger log = getLogger(getClass());
+
+    private final DeviceService deviceService;
+    private final MastershipService mastershipService;
+    private final DeviceProviderService providerService;
+
+    private final InternalDeviceListener deviceListener = new InternalDeviceListener();
+    private final MastershipListener mastershipListener = new InternalMastershipListener();
+    private final Striped<Lock> deviceLocks = Striped.lock(30);
+
+    private ScheduledExecutorService statsExecutor;
+    private Map<DeviceId, ScheduledFuture<?>> statsPollingTasks;
+    private Map<DeviceId, Integer> pollFrequencies;
+    private int statsPollInterval;
+
+    StatsPoller(DeviceService deviceService, MastershipService mastershipService,
+                DeviceProviderService providerService) {
+        this.deviceService = deviceService;
+        this.mastershipService = mastershipService;
+        this.providerService = providerService;
+    }
+
+
+    void activate(int statsPollInterval) {
+        checkArgument(statsPollInterval > 0, "statsPollInterval must be greater than 0");
+        this.statsPollInterval = statsPollInterval;
+        statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
+                "onos/gdp-stats", "%d", log));
+        statsPollingTasks = Maps.newHashMap();
+        pollFrequencies = Maps.newHashMap();
+        deviceService.getDevices().forEach(d -> updatePollingTask(d.id()));
+        deviceService.addListener(deviceListener);
+        mastershipService.addListener(mastershipListener);
+        log.info("Started");
+    }
+
+    void reschedule(int statsPollInterval) {
+        checkArgument(statsPollInterval > 0, "statsPollInterval must be greater than 0");
+        this.statsPollInterval = statsPollInterval;
+        statsPollingTasks.keySet().forEach(this::updatePollingTask);
+    }
+
+    void deactivate() {
+        deviceService.removeListener(deviceListener);
+        mastershipService.removeListener(mastershipListener);
+
+        statsPollingTasks.values().forEach(t -> t.cancel(false));
+        statsPollingTasks.clear();
+        pollFrequencies.clear();
+        statsPollingTasks = null;
+        pollFrequencies = null;
+
+        statsExecutor.shutdownNow();
+        try {
+            statsExecutor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("statsExecutor not terminated properly");
+        }
+        statsExecutor = null;
+
+        log.info("Stopped");
+    }
+
+
+    private void updatePollingTask(DeviceId deviceId) {
+        deviceLocks.get(deviceId).lock();
+        try {
+            final ScheduledFuture<?> existingTask = statsPollingTasks.get(deviceId);
+            final boolean shouldHaveTask = deviceService.getDevice(deviceId) != null
+                    && deviceService.isAvailable(deviceId)
+                    && mastershipService.isLocalMaster(deviceId)
+                    && deviceService.getDevice(deviceId).is(PortStatisticsDiscovery.class);
+            final boolean pollFrequencyChanged = !Objects.equals(
+                    pollFrequencies.get(deviceId), statsPollInterval);
+
+            if (existingTask != null && (!shouldHaveTask || pollFrequencyChanged)) {
+                existingTask.cancel(false);
+                statsPollingTasks.remove(deviceId);
+                pollFrequencies.remove(deviceId);
+                log.info("Cancelled polling task for {}", deviceId);
+            }
+
+            if (shouldHaveTask) {
+                final int delay = new SecureRandom().nextInt(statsPollInterval);
+                statsPollingTasks.put(deviceId, statsExecutor.scheduleAtFixedRate(
+                        exceptionSafe(() -> updatePortStatistics(deviceId)),
+                        delay, statsPollInterval, TimeUnit.SECONDS));
+                pollFrequencies.put(deviceId, statsPollInterval);
+                log.info("Started polling task for {} with interval {} seconds",
+                         deviceId, statsPollInterval);
+            }
+        } finally {
+            deviceLocks.get(deviceId).unlock();
+        }
+    }
+
+    private void updatePortStatistics(DeviceId deviceId) {
+        final Device device = deviceService.getDevice(deviceId);
+        final Collection<PortStatistics> statistics = device.as(
+                PortStatisticsDiscovery.class).discoverPortStatistics();
+        if (!statistics.isEmpty()) {
+            providerService.updatePortStatistics(deviceId, statistics);
+        }
+    }
+
+    private Runnable exceptionSafe(Runnable runnable) {
+        return () -> {
+            try {
+                runnable.run();
+            } catch (Exception e) {
+                log.error("Unhandled exception in stats poller", e);
+            }
+        };
+    }
+
+    private class InternalMastershipListener implements MastershipListener {
+
+        @Override
+        public void event(MastershipEvent event) {
+            updatePollingTask(event.subject());
+        }
+
+        @Override
+        public boolean isRelevant(MastershipEvent event) {
+            return event.type() == MastershipEvent.Type.MASTER_CHANGED
+                    && myScheme(event.subject());
+        }
+    }
+
+    /**
+     * Listener for core device events.
+     */
+    private class InternalDeviceListener implements DeviceListener {
+        @Override
+        public void event(DeviceEvent event) {
+            updatePollingTask(event.subject().id());
+        }
+
+        @Override
+        public boolean isRelevant(DeviceEvent event) {
+            switch (event.type()) {
+                case DEVICE_ADDED:
+                case DEVICE_UPDATED:
+                case DEVICE_AVAILABILITY_CHANGED:
+                case DEVICE_REMOVED:
+                case DEVICE_SUSPENDED:
+                    return myScheme(event.subject().id());
+                default:
+                    return false;
+            }
+        }
+    }
+}
diff --git a/tools/dev/mininet/bmv2.py b/tools/dev/mininet/bmv2.py
index 56c7dcc..3dc42fd 100644
--- a/tools/dev/mininet/bmv2.py
+++ b/tools/dev/mininet/bmv2.py
@@ -35,6 +35,7 @@
 SWITCH_START_TIMEOUT = 5  # seconds
 BMV2_LOG_LINES = 5
 BMV2_DEFAULT_DEVICE_ID = 1
+DEFAULT_PIPECONF = "org.onosproject.pipelines.basic"
 
 # Stratum paths relative to stratum repo root
 STRATUM_BMV2 = 'stratum_bmv2'
@@ -119,9 +120,10 @@
 
     def __init__(self, name, json=None, debugger=False, loglevel="warn",
                  elogger=False, grpcport=None, cpuport=255, notifications=False,
-                 thriftport=None, netcfg=True, dryrun=False, pipeconf="",
-                 pktdump=False, valgrind=False, gnmi=False,
-                 portcfg=True, onosdevid=None, stratum=False, **kwargs):
+                 thriftport=None, netcfg=True, dryrun=False,
+                 pipeconf=DEFAULT_PIPECONF, pktdump=False, valgrind=False,
+                 gnmi=False, portcfg=True, onosdevid=None, stratum=False,
+                 **kwargs):
         Switch.__init__(self, name, **kwargs)
         self.grpcPort = grpcport
         self.thriftPort = thriftport
@@ -173,7 +175,10 @@
     def getDeviceConfig(self, srcIP):
 
         basicCfg = {
-            "driver": "bmv2"
+            "managementAddress": "grpc://%s:%d?device_id=%d" % (
+                srcIP, self.grpcPort, BMV2_DEFAULT_DEVICE_ID),
+            "driver": "bmv2",
+            "pipeconf": self.pipeconfId
         }
 
         if self.longitude and self.latitude:
@@ -181,30 +186,9 @@
             basicCfg["latitude"] = self.latitude
 
         cfgData = {
-            "generalprovider": {
-                "p4runtime": {
-                    "ip": srcIP,
-                    "port": self.grpcPort,
-                    "deviceId": BMV2_DEFAULT_DEVICE_ID,
-                    "deviceKeyId": "p4runtime:%s" % self.onosDeviceId
-                },
-                # "bmv2-thrift": {
-                #     "ip": srcIP,
-                #     "port": self.thriftPort
-                # }
-            },
-            "piPipeconf": {
-                "piPipeconfId": self.pipeconfId
-            },
             "basic": basicCfg
         }
 
-        if self.withGnmi:
-            cfgData["generalprovider"]["gnmi"] = {
-                "ip": srcIP,
-                "port": self.grpcPort
-            }
-
         if self.injectPorts:
             portData = {}
             portId = 1
@@ -327,7 +311,8 @@
             '-persistent_config_dir=' + config_dir,
             '-initial_pipeline=' + stratumRoot + STRATUM_INIT_PIPELINE,
             '-cpu_port=%s' % self.cpuPort,
-            '-external_hercules_urls=0.0.0.0:%d' % self.grpcPort
+            '-external_hercules_urls=0.0.0.0:%d' % self.grpcPort,
+            '-max_num_controllers_per_node=10'
         ]
         for port, intf in self.intfs.items():
             if not intf.IP():