More improvements and bugfixes in P4Runtime subsystem

Most notably, we fix a bug in which some nodes were not able to find
pipeconf-specific behaviors for a given device. The problem is not
completelly solved but it's mitigated.

There's a race condition caused by the fact that the GDP updates the cfg
with the merged driver name before advertising the device to the core.
Some nodes might receive the cfg update after the device has been
advertised. We mitigate the problem by performing the pipeline deploy
(slow operation) after the cfg update, giving more time for nodes
to catch up. Perhaps we should listen for cfg update events before
advertising the device to the core?

Also:
- NPE when getting P4Runtime client
- Detect if a base driver is already merged in pipeconf manager
- Longer timeouts in P4Runtime driver and protocol (for slow networks)
- Configurable timeout in P4Runtime driver and GDP
- NPE when adding/removing device agent listeners in P4Rtunime handshaker
- Various exceptions due to race conditions in GDP when disconnecting
devices (by serializing disconnect tasks per device)
- NPE when cancelling polling tasks in GDP
- Refactored PipeconfService to distinguish between driver merge,
pipeconf map update, and cfg update (now performed in the GDP)
- Fixed PipeconfManagerTest, not testing driver behaviours
- Use Guava striped locks when possible (more memory-efficient than maps,
and with strict atomicity guarantees w.r.t. to caches).

Change-Id: I30f3887541ba0fd44439a86885e9821ac565b64c
diff --git a/apps/p4-tutorial/pipeconf/src/main/java/org/onosproject/p4tutorial/pipeconf/PortStatisticsDiscoveryImpl.java b/apps/p4-tutorial/pipeconf/src/main/java/org/onosproject/p4tutorial/pipeconf/PortStatisticsDiscoveryImpl.java
index f5fd70c..2afbce0 100644
--- a/apps/p4-tutorial/pipeconf/src/main/java/org/onosproject/p4tutorial/pipeconf/PortStatisticsDiscoveryImpl.java
+++ b/apps/p4-tutorial/pipeconf/src/main/java/org/onosproject/p4tutorial/pipeconf/PortStatisticsDiscoveryImpl.java
@@ -62,11 +62,11 @@
 
         // Get a client for this device.
         P4RuntimeController controller = handler().get(P4RuntimeController.class);
-        if (!controller.hasClient(deviceId)) {
+        P4RuntimeClient client = controller.getClient(deviceId);
+        if (client == null) {
             log.warn("Unable to find client for {}, aborting operation", deviceId);
             return Collections.emptyList();
         }
-        P4RuntimeClient client = controller.getClient(deviceId);
 
         // Get the pipeconf of this device.
         PiPipeconfService piPipeconfService = handler().get(PiPipeconfService.class);
diff --git a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfService.java b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfService.java
index 1e4088e..b574b5b 100644
--- a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfService.java
+++ b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfService.java
@@ -22,7 +22,6 @@
 import org.onosproject.net.pi.model.PiPipeconfId;
 
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * A service to manage the configurations of protocol-independent pipelines.
@@ -36,17 +35,21 @@
      * Registers the given pipeconf.
      *
      * @param pipeconf a pipeconf
-     * @throws IllegalStateException if the same pipeconf identifier is already registered.
+     * @throws IllegalStateException if the same pipeconf identifier is already
+     *                               registered.
      */
     void register(PiPipeconf pipeconf) throws IllegalStateException;
 
     /**
-     * Unregisters the Pipeconf identified by the given PiPipeconfId. Unregistering a Pipeconf removes it from the ONOS
-     * controller, thus making it un-capable of controlling (e.g installing flow rules) the devices that have the
-     * pipeconf's P4 program deployed. For now this method DOES NOT remove the P4 program from the devices.
+     * Unregisters the Pipeconf identified by the given PiPipeconfId.
+     * Unregistering a Pipeconf removes it from the ONOS controller, thus making
+     * it un-capable of controlling (e.g installing flow rules) the devices that
+     * have the pipeconf's P4 program deployed. For now this method DOES NOT
+     * remove the P4 program from the devices.
      *
      * @param pipeconfId a pipeconfId
-     * @throws IllegalStateException if the same pipeconf identifier is already registered.
+     * @throws IllegalStateException if the same pipeconf identifier is already
+     *                               registered.
      */
     void remove(PiPipeconfId pipeconfId) throws IllegalStateException;
 
@@ -58,8 +61,9 @@
     Iterable<PiPipeconf> getPipeconfs();
 
     /**
-     * Returns the pipeconf instance associated with the given identifier, if present. If not present, it means that no
-     * pipeconf with such identifier has been registered so far.
+     * Returns the pipeconf instance associated with the given identifier, if
+     * present. If not present, it means that no pipeconf with such identifier
+     * has been registered so far.
      *
      * @param id a pipeconf identifier
      * @return an optional pipeconf
@@ -67,21 +71,39 @@
     Optional<PiPipeconf> getPipeconf(PiPipeconfId id);
 
     /**
-     * Binds the given pipeconf to the given infrastructure device. As a result of this method call, if the given
-     * pipeconf exposes any pipeline-specific behaviours, those will be merged to the device's driver. Returns a
-     * completable future to provide async methods with a boolean if the merge of the drivers succeeded.
+     * Signals that the given pipeconf is associated to the given infrastructure
+     * device. As a result of this method, the pipeconf for the given device can
+     * be later retrieved using {@link #ofDevice(DeviceId)}
+     *
+     * @param pipeconfId a pipeconf identifier
+     * @param deviceId   a device identifier
+     */
+    void bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId);
+
+    /**
+     * Returns the name of a driver that is equivalent to the base driver of the
+     * given device plus all the pipeline-specific behaviors exposed by the
+     * given pipeconf (previously registered using {@link
+     * #register(PiPipeconf)}). If such driver does not exist, this method
+     * creates one and registers is with all necessary ONOS subsystems, such
+     * that the returned name can be used to retrieve the driver instance using
+     * {@link org.onosproject.net.driver.DriverService#getDriver(String)}.
+     * <p>
+     * This method needs to be called on all nodes of the cluster that wants to
+     * use such merged driver.
+     * <p>
+     * Returns null if such merged driver cannot be created.
      *
      * @param deviceId   a device identifier
      * @param pipeconfId a pipeconf identifier
-     * @return a CompletableFuture with a boolean, true if operation succeeded
+     * @return driver name or null.
      */
-    // TODO: This service doesn't make any effort in deploying the configuration to the device.
-    // Someone else should do that.
-    CompletableFuture<Boolean> bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId);
+    String mergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId);
 
     /**
-     * Returns the pipeconf identifier currently associated with the given device identifier, if present. If not
-     * present, it means no pipeconf has been associated with that device so far.
+     * Returns the pipeconf identifier currently associated with the given
+     * device identifier, if present. If not present, it means no pipeconf has
+     * been associated with that device so far.
      *
      * @param deviceId device identifier
      * @return an optional pipeconf identifier
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index ba16601..5e07333 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -16,10 +16,8 @@
 
 package org.onosproject.net.pi.impl;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.annotations.Beta;
+import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableSet;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -29,7 +27,6 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.ItemNotFoundException;
 import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigRegistry;
@@ -52,7 +49,6 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -73,19 +69,14 @@
 
     private final Logger log = getLogger(getClass());
 
+    private static final String MERGED_DRIVER_SEPARATOR = ":";
     private static final String DRIVER = "driver";
     private static final String CFG_SCHEME = "piPipeconf";
 
-    private static final String DRIVER_MERGE_TOPIC =
-            PiPipeconfManager.class.getSimpleName() + "-driver-merge-";
-
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetworkConfigRegistry cfgService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private LeadershipService leadershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DriverService driverService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -165,10 +156,13 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> bindToDevice(
-            PiPipeconfId pipeconfId, DeviceId deviceId) {
-        return CompletableFuture.supplyAsync(() -> doMergeDriver(
-                deviceId, pipeconfId), executor);
+    public void bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId) {
+        pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
+    }
+
+    @Override
+    public String mergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
+        return doMergeDriver(deviceId, pipeconfId);
     }
 
     @Override
@@ -176,55 +170,79 @@
         return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
     }
 
-    private boolean doMergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
-        // Perform the following operations:
-        // 1) ALL nodes: create and register new merged driver (pipeconf + base driver)
-        // 2) ONE node (leader): updated netcfg with new driver
-        log.warn("Starting device driver merge of {} with {}...", deviceId, pipeconfId);
+    private String doMergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
+        log.debug("Starting device driver merge of {} with {}...", deviceId, pipeconfId);
         final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
                 deviceId, BasicDeviceConfig.class);
-        final Driver baseDriver = driverService.getDriver(
-                basicDeviceConfig.driver());
-        final String newDriverName = baseDriver.name() + ":" + pipeconfId;
-        if (baseDriver.name().equals(newDriverName)) {
-            log.warn("Requested to merge {} driver with {} for {}, "
-                             + "but current device driver is already merged",
-                     baseDriver.name(), pipeconfId, deviceId);
-            return true;
+        if (basicDeviceConfig == null) {
+            log.warn("Unable to get basic device config for {}, " +
+                             "aborting pipeconf driver merge");
+            return null;
         }
-        final PiPipeconf pipeconf = pipeconfs.get(pipeconfId);
-        if (pipeconf == null) {
-            log.error("Pipeconf {} is not registered", pipeconfId);
-            return false;
+        String baseDriverName = basicDeviceConfig.driver();
+        if (baseDriverName.endsWith(mergedDriverSuffix(pipeconfId))) {
+            // The config already has driver name that is a merged one. We still
+            // need to make sure an instance of that merged driver is present in
+            // this node.
+            log.debug("Base driver of {} ({}) has been already merged with {}",
+                      deviceId, baseDriverName, pipeconfId);
+            baseDriverName = getBaseDriverNameFromMerged(baseDriverName);
         }
-        // 1) if merged driver exists already we don't create a new one.
+
+        final String newDriverName = mergedDriverName(baseDriverName, pipeconfId);
+        // If merged driver exists already we don't create a new one.
         try {
             driverService.getDriver(newDriverName);
-            log.info("Found existing merged driver {}, re-using that", newDriverName);
+            return newDriverName;
         } catch (ItemNotFoundException e) {
             log.info("Creating merged driver {}...", newDriverName);
-            createMergedDriver(pipeconf, baseDriver, newDriverName);
         }
-        // 2) Updating device cfg to enforce the merged driver (one node only)
-        final boolean isLeader = leadershipService
-                .runForLeadership(DRIVER_MERGE_TOPIC + deviceId.toString())
-                .leaderNodeId()
-                .equals(clusterService.getLocalNode().id());
-        if (isLeader) {
-            // FIXME: this binding should be updated by the same entity
-            // deploying the pipeconf.
-            pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
-            if (!basicDeviceConfig.driver().equals(newDriverName)) {
-                log.info("Applying new driver {} for device {} via cfg...",
-                         newDriverName, deviceId);
-                setDriverViaCfg(deviceId, newDriverName, basicDeviceConfig);
-            }
+        final Driver mergedDriver = buildMergedDriver(
+                pipeconfId, baseDriverName, newDriverName);
+        if (mergedDriver == null) {
+            // Error logged by buildMergedDriver
+            return null;
         }
-        return true;
+        registerMergedDriver(mergedDriver);
+        return newDriverName;
     }
 
-    private void createMergedDriver(PiPipeconf pipeconf, Driver baseDriver,
-                                    String newDriverName) {
+    private String mergedDriverSuffix(PiPipeconfId pipeconfId) {
+        return MERGED_DRIVER_SEPARATOR + pipeconfId.id();
+    }
+
+    private String mergedDriverName(String baseDriverName, PiPipeconfId pipeconfId) {
+        return baseDriverName + mergedDriverSuffix(pipeconfId);
+    }
+
+    private String getBaseDriverNameFromMerged(String mergedDriverName) {
+        final String[] pieces = mergedDriverName.split(MERGED_DRIVER_SEPARATOR);
+        if (pieces.length != 2) {
+            log.error("Unrecognized merged driver name format '{}', cannot " +
+                              "extract base driver name", mergedDriverName);
+            return null;
+        }
+        return pieces[0];
+    }
+
+    private Driver buildMergedDriver(PiPipeconfId pipeconfId, String baseDriverName,
+                                     String newDriverName) {
+        final Driver baseDriver;
+        try {
+            baseDriver = driverService.getDriver(baseDriverName);
+        } catch (ItemNotFoundException e) {
+            log.error("Base driver {} not found, cannot build a merged one",
+                      baseDriverName);
+            return null;
+        }
+
+        final PiPipeconf pipeconf = pipeconfs.get(pipeconfId);
+        if (pipeconf == null) {
+            log.error("Pipeconf {} is not registered, cannot build a merged driver",
+                      pipeconfId);
+            return null;
+        }
+
         // extract the behaviours from the pipipeconf.
         final Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours =
                 new HashMap<>();
@@ -236,29 +254,27 @@
                 baseDriver.swVersion(), behaviours, new HashMap<>());
         // take the base driver created with the behaviours of the PiPeconf and
         // merge it with the base driver that was assigned to the device
-        final Driver completeDriver = piPipeconfDriver.merge(baseDriver);
-        // This might lead to explosion of number of providers in the core,
-        // due to 1:1:1 pipeconf:driver:provider maybe find better way
-        final DriverProvider provider = new PiPipeconfDriverProviderInternal(
-                completeDriver);
-        // register the merged driver
+        return piPipeconfDriver.merge(baseDriver);
+    }
+
+    private void registerMergedDriver(Driver driver) {
+        final DriverProvider provider = new InternalDriverProvider(driver);
+        if (driverAdminService.getProviders().contains(provider)) {
+            // A provider for this driver already exist.
+            return;
+        }
         driverAdminService.registerProvider(provider);
     }
 
-    private void setDriverViaCfg(DeviceId deviceId, String driverName,
-                                 BasicDeviceConfig basicDeviceConfig) {
-        ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
-        newCfg = newCfg.put(DRIVER, driverName);
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
-        cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
-    }
-
-    private class PiPipeconfDriverProviderInternal implements DriverProvider {
+    /**
+     * Internal driver provider used to register merged pipeconf drivers in the
+     * core.
+     */
+    private class InternalDriverProvider implements DriverProvider {
 
         Driver driver;
 
-        PiPipeconfDriverProviderInternal(Driver driver) {
+        InternalDriverProvider(Driver driver) {
             this.driver = driver;
         }
 
@@ -266,5 +282,22 @@
         public Set<Driver> getDrivers() {
             return ImmutableSet.of(driver);
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            InternalDriverProvider that = (InternalDriverProvider) o;
+            return Objects.equal(driver.name(), that.driver.name());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(driver.name());
+        }
     }
 }
diff --git a/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java b/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
index 4d29e69..a418461 100644
--- a/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
@@ -20,11 +20,11 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.util.ItemNotFoundException;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.behaviour.Pipeliner;
 import org.onosproject.net.config.Config;
 import org.onosproject.net.config.ConfigApplyDelegate;
 import org.onosproject.net.config.ConfigFactory;
@@ -46,9 +46,7 @@
 import org.onosproject.net.driver.DriverServiceAdapter;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.model.PiPipeconfId;
-import org.onosproject.net.pi.model.PiPipelineInterpreter;
 import org.onosproject.net.pi.service.PiPipeconfConfig;
-import org.onosproject.pipelines.basic.PipeconfLoader;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -61,6 +59,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.onosproject.pipelines.basic.PipeconfLoader.BASIC_PIPECONF;
 
 
 /**
@@ -70,15 +69,12 @@
 
     private static final DeviceId DEVICE_ID = DeviceId.deviceId("test:test");
     private static final String BASE_DRIVER = "baseDriver";
-    private static final Set<Class<? extends Behaviour>> EXPECTED_BEHAVIOURS =
-            ImmutableSet.of(DeviceDescriptionDiscovery.class, Pipeliner.class, PiPipelineInterpreter.class);
 
     //Mock util sets and classes
     private final NetworkConfigRegistry cfgService = new MockNetworkConfigRegistry();
     private final DriverService driverService = new MockDriverService();
     private final DriverAdminService driverAdminService = new MockDriverAdminService();
     private Driver baseDriver = new MockDriver();
-    private String completeDriverName;
 
     private final Set<ConfigFactory> cfgFactories = new HashSet<>();
     private final Set<NetworkConfigListener> netCfgListeners = new HashSet<>();
@@ -99,8 +95,7 @@
     @Before
     public void setUp() throws IOException {
         piPipeconfService = new PiPipeconfManager();
-        piPipeconf = PipeconfLoader.BASIC_PIPECONF;
-        completeDriverName = BASE_DRIVER + ":" + piPipeconf.id();
+        piPipeconf = BASIC_PIPECONF;
         piPipeconfService.cfgService = cfgService;
         piPipeconfService.driverService = driverService;
         piPipeconfService.driverAdminService = driverAdminService;
@@ -147,7 +142,7 @@
 
 
     @Test
-    public void bindToDevice() throws Exception {
+    public void mergeDriver() {
         PiPipeconfId piPipeconfId = cfgService.getConfig(DEVICE_ID, PiPipeconfConfig.class).piPipeconfId();
         assertEquals(piPipeconf.id(), piPipeconfId);
 
@@ -158,25 +153,24 @@
         assertEquals("Returned PiPipeconf is not correct", piPipeconf,
                      piPipeconfService.getPipeconf(piPipeconf.id()).get());
 
-        piPipeconfService.bindToDevice(piPipeconfId, DEVICE_ID).whenComplete((booleanResult, ex) -> {
+        String mergedDriverName = piPipeconfService.mergeDriver(DEVICE_ID, piPipeconfId);
 
-            //we assume that the provider is 1 and that it contains 1 driver
-            //we also assume that everything after driverAdminService.registerProvider(provider); has been tested.
-            assertTrue("Provider should be registered", providers.size() != 0);
+        //we assume that the provider is 1 and that it contains 1 driver
+        //we also assume that everything after driverAdminService.registerProvider(provider); has been tested.
+        assertTrue("Provider should be registered", providers.size() == 1);
 
-            assertTrue("Boolean Result of method should be True", booleanResult);
+        assertTrue("Merged driver name should be valid",
+                   mergedDriverName != null && !mergedDriverName.isEmpty());
 
-            providers.forEach(p -> {
-                assertTrue("Provider should contain a driver", p.getDrivers().size() != 0);
-                p.getDrivers().forEach(driver -> {
-                    assertEquals("The driver has wrong name", driver.name(), completeDriverName);
-                    assertEquals("The driver contains wrong behaviours", EXPECTED_BEHAVIOURS, driver.behaviours());
+        DriverProvider provider = providers.iterator().next();
+        assertTrue("Provider should contain one driver", provider.getDrivers().size() == 1);
 
-                });
-            });
-        }).exceptionally(ex -> {
-            throw new IllegalStateException(ex);
-        });
+        Driver driver = provider.getDrivers().iterator().next();
+
+        Set<Class<? extends Behaviour>> expectedBehaviours = Sets.newHashSet();
+        expectedBehaviours.addAll(BASIC_PIPECONF.behaviours());
+        expectedBehaviours.addAll(baseDriver.behaviours());
+        assertEquals("The driver contains wrong behaviours", expectedBehaviours, driver.behaviours());
     }
 
     private class MockNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
@@ -230,6 +224,11 @@
         public void registerProvider(DriverProvider provider) {
             providers.add(provider);
         }
+
+        @Override
+        public Set<DriverProvider> getProviders() {
+            return providers;
+        }
     }
 
     private class MockDelegate implements ConfigApplyDelegate {
diff --git a/drivers/barefoot/src/main/resources/barefoot-drivers.xml b/drivers/barefoot/src/main/resources/barefoot-drivers.xml
index 9a2ebcc..68c9aec 100644
--- a/drivers/barefoot/src/main/resources/barefoot-drivers.xml
+++ b/drivers/barefoot/src/main/resources/barefoot-drivers.xml
@@ -18,6 +18,7 @@
     <driver name="barefoot" manufacturer="Barefoot Networks" hwVersion="1.0" swVersion="1.0" extends="p4runtime">
         <behaviour api="org.onosproject.net.behaviour.PiPipelineProgrammable"
                    impl="org.onosproject.drivers.barefoot.TofinoPipelineProgrammable"/>
+        <property name="tableDeleteBeforeUpdate">true</property>
     </driver>
 </drivers>
 
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
index ac4856a..3f6c9ad 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
@@ -23,6 +23,7 @@
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.model.PiPipelineInterpreter;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.net.pi.service.PiTranslationService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
@@ -42,9 +43,9 @@
  */
 public class AbstractP4RuntimeHandlerBehaviour extends AbstractHandlerBehaviour {
 
-    // Timeout in seconds for device operations.
-    // TODO make configurable via driver properties
-    private static final int DEVICE_OP_TIMEOUT = 5;
+    // Default timeout in seconds for device operations.
+    private static final String DEVICE_REQ_TIMEOUT = "deviceRequestTimeout";
+    private static final int DEFAULT_DEVICE_REQ_TIMEOUT = 60;
 
     public static final String P4RUNTIME_SERVER_ADDR_KEY = "p4runtime_ip";
     public static final String P4RUNTIME_SERVER_PORT_KEY = "p4runtime_port";
@@ -80,11 +81,11 @@
         }
 
         controller = handler().get(P4RuntimeController.class);
-        if (!controller.hasClient(deviceId)) {
+        client = controller.getClient(deviceId);
+        if (client == null) {
             log.warn("Unable to find client for {}, aborting operation", deviceId);
             return false;
         }
-        client = controller.getClient(deviceId);
 
         PiPipeconfService piPipeconfService = handler().get(PiPipeconfService.class);
         if (!piPipeconfService.ofDevice(deviceId).isPresent()) {
@@ -106,12 +107,27 @@
     }
 
     /**
-     * Returns a P4Runtime client for this device, null if such client cannot
-     * be created.
+     * Returns an instance of the interpreter implementation for this device,
+     * null if an interpreter cannot be retrieved.
+     *
+     * @return interpreter or null
+     */
+    PiPipelineInterpreter getInterpreter() {
+        if (!device.is(PiPipelineInterpreter.class)) {
+            log.warn("Unable to get interpreter for {}, missing behaviour",
+                     deviceId);
+            return null;
+        }
+        return device.as(PiPipelineInterpreter.class);
+    }
+
+    /**
+     * Returns a P4Runtime client for this device, null if such client cannot be
+     * created.
      *
      * @return client or null
      */
-     P4RuntimeClient createClient() {
+    P4RuntimeClient createClient() {
         deviceId = handler().data().deviceId();
         controller = handler().get(P4RuntimeController.class);
 
@@ -125,21 +141,21 @@
             return null;
         }
 
-         final int serverPort;
-         final long p4DeviceId;
+        final int serverPort;
+        final long p4DeviceId;
 
-         try {
+        try {
             serverPort = Integer.parseUnsignedInt(serverPortString);
         } catch (NumberFormatException e) {
             log.error("{} is not a valid P4Runtime port number", serverPortString);
             return null;
         }
-         try {
-             p4DeviceId = Long.parseUnsignedLong(p4DeviceIdString);
-         } catch (NumberFormatException e) {
-             log.error("{} is not a valid P4Runtime-internal device ID", p4DeviceIdString);
-             return null;
-         }
+        try {
+            p4DeviceId = Long.parseUnsignedLong(p4DeviceIdString);
+        } catch (NumberFormatException e) {
+            log.error("{} is not a valid P4Runtime-internal device ID", p4DeviceIdString);
+            return null;
+        }
 
         if (!controller.createClient(deviceId, serverAddr, serverPort, p4DeviceId)) {
             log.warn("Unable to create client for {}, aborting operation", deviceId);
@@ -167,6 +183,28 @@
     }
 
     /**
+     * Returns the device request timeout driver property, or a default value
+     * if the property is not present or cannot be parsed.
+     *
+     * @return timeout value
+     */
+    private int getDeviceRequestTimeout() {
+        final String timeout = handler().driver()
+                .getProperty(DEVICE_REQ_TIMEOUT);
+        if (timeout == null) {
+            return DEFAULT_DEVICE_REQ_TIMEOUT;
+        } else {
+            try {
+                return Integer.parseInt(timeout);
+            } catch (NumberFormatException e) {
+                log.error("{} driver property '{}' is not a number, using default value {}",
+                          DEVICE_REQ_TIMEOUT, timeout, DEFAULT_DEVICE_REQ_TIMEOUT);
+                return DEFAULT_DEVICE_REQ_TIMEOUT;
+            }
+        }
+    }
+
+    /**
      * Convenience method to get the result of a completable future while
      * setting a timeout and checking for exceptions.
      *
@@ -181,7 +219,7 @@
     <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
                                 U defaultValue) {
         try {
-            return future.get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
+            return future.get(getDeviceRequestTimeout(), TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             log.error("Exception while {} on {}", opDescription, deviceId);
         } catch (ExecutionException e) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
index cfe717d..3e217c8 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
@@ -59,11 +59,11 @@
         DeviceId deviceId = handler().data().deviceId();
         P4RuntimeController controller = handler().get(P4RuntimeController.class);
 
-        if (!controller.hasClient(deviceId)) {
+        P4RuntimeClient client = controller.getClient(deviceId);
+        if (client == null) {
             log.warn("Unable to find client for {}, aborting pipeconf deploy", deviceId);
             return false;
         }
-        P4RuntimeClient client = controller.getClient(deviceId);
 
         ByteBuffer deviceDataBuffer = createDeviceDataBuffer(pipeconf);
         if (deviceDataBuffer == null) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index 2b941cb..d655132 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -28,7 +28,6 @@
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleProgrammable;
-import org.onosproject.net.pi.model.PiPipelineInterpreter;
 import org.onosproject.net.pi.model.PiPipelineModel;
 import org.onosproject.net.pi.model.PiTableId;
 import org.onosproject.net.pi.runtime.PiCounterCellData;
@@ -118,10 +117,6 @@
             return false;
         }
 
-        if (!device.is(PiPipelineInterpreter.class)) {
-            log.warn("Unable to get interpreter of {}", deviceId);
-            return false;
-        }
         pipelineModel = pipeconf.pipelineModel();
         tableMirror = handler().get(P4RuntimeTableMirror.class);
         translator = piTranslationService.flowRuleTranslator();
@@ -143,10 +138,10 @@
         final List<PiTableEntry> inconsistentEntries = Lists.newArrayList();
 
         // Read table entries.
-        final Collection<PiTableEntry> installedEntries;
         // TODO: ONOS-7596 read counters with table entries
-        installedEntries = getFutureWithDeadline(client.dumpAllTables(pipeconf),
-                                                 "dumping tables", Collections.emptyList());
+        final Collection<PiTableEntry> installedEntries = getFutureWithDeadline(
+                client.dumpAllTables(pipeconf), "dumping all tables",
+                Collections.emptyList());
 
         if (installedEntries.isEmpty()) {
             return Collections.emptyList();
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
index 23fcf1f..c7e12aa4 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
@@ -20,6 +20,7 @@
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.device.DeviceHandshaker;
+import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 
 import java.util.concurrent.CompletableFuture;
@@ -33,7 +34,7 @@
     public CompletableFuture<Boolean> connect() {
         return CompletableFuture
                 .supplyAsync(super::createClient)
-                .thenCompose(client -> {
+                .thenComposeAsync(client -> {
                     if (client == null) {
                         return CompletableFuture.completedFuture(false);
                     }
@@ -45,22 +46,22 @@
     public CompletableFuture<Boolean> disconnect() {
         final P4RuntimeController controller = handler().get(P4RuntimeController.class);
         final DeviceId deviceId = handler().data().deviceId();
-        if (!controller.hasClient(deviceId)) {
+        final P4RuntimeClient client = controller.getClient(deviceId);
+        if (client == null) {
             return CompletableFuture.completedFuture(true);
-        } else {
-            return controller.getClient(deviceId).shutdown()
-                    .thenApplyAsync(v -> {
-                        controller.removeClient(deviceId);
-                        return true;
-                    });
         }
+        return client.shutdown()
+                .thenApplyAsync(v -> {
+                    controller.removeClient(deviceId);
+                    return true;
+                });
     }
 
     @Override
     public CompletableFuture<Boolean> isReachable() {
         return CompletableFuture.supplyAsync(() -> handler()
                 .get(P4RuntimeController.class)
-                .isReacheable(handler().data().deviceId())
+                .isReachable(handler().data().deviceId())
         );
     }
 
@@ -78,11 +79,15 @@
 
     @Override
     public void addDeviceAgentListener(DeviceAgentListener listener) {
-        controller.addDeviceAgentListener(deviceId, listener);
+        // Don't use controller class variable as it might be uninitialized.
+        handler().get(P4RuntimeController.class)
+                .addDeviceAgentListener(deviceId, listener);
     }
 
     @Override
     public void removeDeviceAgentListener(DeviceAgentListener listener) {
-        controller.removeDeviceAgentListener(deviceId, listener);
+        // Don't use controller class variable as it might be uninitialized.
+        handler().get(P4RuntimeController.class)
+                .removeDeviceAgentListener(deviceId, listener);
     }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
index cc79cc1..7122784 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
@@ -37,11 +37,9 @@
             return;
         }
 
-        final PiPipelineInterpreter interpreter = device.is(PiPipelineInterpreter.class)
-                ? device.as(PiPipelineInterpreter.class) : null;
+        final PiPipelineInterpreter interpreter = getInterpreter();
         if (interpreter == null) {
-            log.warn("Unable to get interpreter for {} with pipeconf {}, aborting emit operation",
-                     deviceId, pipeconf.id());
+            // Error logged by getInterpreter().
             return;
         }
 
diff --git a/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/PortStatisticsDiscoveryImpl.java b/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/PortStatisticsDiscoveryImpl.java
index 0db588d..31c7266 100644
--- a/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/PortStatisticsDiscoveryImpl.java
+++ b/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/PortStatisticsDiscoveryImpl.java
@@ -85,11 +85,11 @@
         PiPipeconf pipeconf = piPipeconfService.getPipeconf(piPipeconfService.ofDevice(deviceId).get()).get();
 
         P4RuntimeController controller = handler().get(P4RuntimeController.class);
-        if (!controller.hasClient(deviceId)) {
+        P4RuntimeClient client = controller.getClient(deviceId);
+        if (client == null) {
             log.warn("Unable to find client for {}, aborting operation", deviceId);
             return Collections.emptyList();
         }
-        P4RuntimeClient client = controller.getClient(deviceId);
 
         Map<Long, DefaultPortStatistics.Builder> portStatBuilders = Maps.newHashMap();
         deviceService.getPorts(deviceId)
diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
index 4362501..464cb85 100644
--- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
+++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcControllerImpl.java
@@ -18,7 +18,7 @@
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
 import io.grpc.CallOptions;
 import io.grpc.Channel;
 import io.grpc.ClientCall;
@@ -60,7 +60,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -86,7 +85,7 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private Map<GrpcChannelId, ManagedChannel> channels;
-    private final Map<GrpcChannelId, Lock> channelLocks = Maps.newConcurrentMap();
+    private final Striped<Lock> channelLocks = Striped.lock(30);
 
     @Activate
     public void activate() {
@@ -120,7 +119,7 @@
         checkNotNull(channelId);
         checkNotNull(channelBuilder);
 
-        Lock lock = channelLocks.computeIfAbsent(channelId, k -> new ReentrantLock());
+        Lock lock = channelLocks.get(channelId);
         lock.lock();
 
         try {
@@ -156,7 +155,7 @@
     public boolean isChannelOpen(GrpcChannelId channelId) {
         checkNotNull(channelId);
 
-        Lock lock = channelLocks.computeIfAbsent(channelId, k -> new ReentrantLock());
+        Lock lock = channelLocks.get(channelId);
         lock.lock();
 
         try {
@@ -182,7 +181,7 @@
     public void disconnectChannel(GrpcChannelId channelId) {
         checkNotNull(channelId);
 
-        Lock lock = channelLocks.computeIfAbsent(channelId, k -> new ReentrantLock());
+        Lock lock = channelLocks.get(channelId);
         lock.lock();
 
         try {
@@ -201,7 +200,6 @@
             }
 
             channels.remove(channelId);
-            channelLocks.remove(channelId);
         } finally {
             lock.unlock();
         }
@@ -229,7 +227,7 @@
     public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
         checkNotNull(channelId);
 
-        Lock lock = channelLocks.computeIfAbsent(channelId, k -> new ReentrantLock());
+        Lock lock = channelLocks.get(channelId);
         lock.lock();
 
         try {
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index fc76a23..836ed7a 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -36,10 +36,10 @@
      * channel to the device is open, false otherwise.
      * <p>
      * Only one client can exist for the same device ID. Calls to this method
-     * are idempotent for the same [device ID, address, port, p4DeviceId] tuple,
-     * i.e. returns true if such client already exists but a new one is not
-     * created. Throws an {@link IllegalStateException} if a client for device
-     * ID already exists but for different [address, port, p4DeviceId].
+     * are idempotent for the same [device ID, address, port, p4DeviceId]
+     * triplet, i.e. returns true if such client already exists but a new one is
+     * not created. Throws an {@link IllegalStateException} if a client for
+     * device ID already exists but for different [address, port, p4DeviceId].
      *
      * @param deviceId   device identifier
      * @param serverAddr address of the P4Runtime server
@@ -49,18 +49,17 @@
      * open
      * @throws IllegalStateException if a client already exists for this device
      *                               ID but for different [address, port,
-     *                               p4DeviceId].
+     *                               p4DeviceId] triplet.
      */
     boolean createClient(DeviceId deviceId, String serverAddr, int serverPort,
                          long p4DeviceId);
 
     /**
-     * Returns a client to operate on the given device.
+     * Returns a client to operate on the given device, or null if a client for
+     * such device does not exist in this controller.
      *
      * @param deviceId device identifier
-     * @return client instance
-     * @throws IllegalStateException if no client exists for the given device
-     *                               identifier
+     * @return client instance or null
      */
     P4RuntimeClient getClient(DeviceId deviceId);
 
@@ -92,7 +91,7 @@
      * @return true if a client was created and is able to contact the P4Runtime
      * server, false otherwise.
      */
-    boolean isReacheable(DeviceId deviceId);
+    boolean isReachable(DeviceId deviceId);
 
     /**
      * Adds a listener for device agent events.
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index bd688f7..a20a0e6 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -105,8 +105,8 @@
  */
 final class P4RuntimeClientImpl implements P4RuntimeClient {
 
-    // Timeout in seconds to obtain the client lock.
-    private static final int LOCK_TIMEOUT = 10;
+    // Timeout in seconds to obtain the request lock.
+    private static final int LOCK_TIMEOUT = 60;
 
     private static final Map<WriteOperationType, Update.Type> UPDATE_TYPES = ImmutableMap.of(
             WriteOperationType.UNSPECIFIED, Update.Type.UNSPECIFIED,
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index d2773b2..b35b999 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -16,10 +16,8 @@
 
 package org.onosproject.p4runtime.ctl;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.NameResolverProvider;
@@ -50,10 +48,10 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -66,21 +64,16 @@
         extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
         implements P4RuntimeController {
 
-    private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
     private final Logger log = getLogger(getClass());
     private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
-    private final Map<DeviceId, ClientKey> deviceIdToClientKey = Maps.newHashMap();
-    private final Map<ClientKey, P4RuntimeClient> clientKeyToClient = Maps.newHashMap();
+
+    private final Map<DeviceId, ClientKey> clientKeys = Maps.newHashMap();
+    private final Map<ClientKey, P4RuntimeClient> clients = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+
     private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
-    private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
-            .expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
-            .build(new CacheLoader<DeviceId, ReadWriteLock>() {
-                @Override
-                public ReadWriteLock load(DeviceId deviceId) {
-                    return new ReentrantReadWriteLock();
-                }
-            });
+    private final Striped<Lock> stripedLocks = Striped.lock(30);
+
     private DistributedElectionIdGenerator electionIdGenerator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -99,6 +92,11 @@
 
     @Deactivate
     public void deactivate() {
+        clientKeys.keySet().forEach(this::removeClient);
+        clientKeys.clear();
+        clients.clear();
+        channelIds.clear();
+        deviceAgentListeners.clear();
         grpcController = null;
         electionIdGenerator.destroy();
         electionIdGenerator = null;
@@ -111,44 +109,40 @@
                                 int serverPort, long p4DeviceId) {
         checkNotNull(deviceId);
         checkNotNull(serverAddr);
+        checkArgument(serverPort > 0, "Invalid server port");
 
-        ClientKey newKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
+        return withDeviceLock(() -> doCreateClient(
+                deviceId, serverAddr, serverPort, p4DeviceId), deviceId);
+    }
+
+    private boolean doCreateClient(DeviceId deviceId, String serverAddr,
+                                   int serverPort, long p4DeviceId) {
+
+        ClientKey clientKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
+
+        if (clientKeys.containsKey(deviceId)) {
+            final ClientKey existingKey = clientKeys.get(deviceId);
+            if (clientKey.equals(existingKey)) {
+                log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
+                         deviceId, serverAddr, serverPort, p4DeviceId);
+                return true;
+            } else {
+                throw new IllegalStateException(
+                        "A client for the same device ID but different " +
+                                "server endpoints already exists");
+            }
+        }
+
+        log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
+                 deviceId, serverAddr, serverPort, p4DeviceId);
+
+        GrpcChannelId channelId = GrpcChannelId.of(
+                clientKey.deviceId(), "p4runtime-" + clientKey);
 
         ManagedChannelBuilder channelBuilder = NettyChannelBuilder
                 .forAddress(serverAddr, serverPort)
-                .usePlaintext(true);
-
-        deviceLocks.getUnchecked(deviceId).writeLock().lock();
-
-        try {
-            if (deviceIdToClientKey.containsKey(deviceId)) {
-                final ClientKey existingKey = deviceIdToClientKey.get(deviceId);
-                if (newKey.equals(existingKey)) {
-                    log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
-                             deviceId, serverAddr, serverPort, p4DeviceId);
-                    return true;
-                } else {
-                    throw new IllegalStateException(
-                            "A client for the same device ID but different " +
-                                    "server endpoints already exists");
-                }
-            } else {
-                log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
-                         deviceId, serverAddr, serverPort, p4DeviceId);
-                return doCreateClient(newKey, channelBuilder);
-            }
-        } finally {
-            deviceLocks.getUnchecked(deviceId).writeLock().unlock();
-        }
-    }
-
-    private boolean doCreateClient(ClientKey clientKey, ManagedChannelBuilder channelBuilder) {
-
-        GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(),
-                                                   "p4runtime-" + clientKey.p4DeviceId());
-
-        // Channel defaults.
-        channelBuilder.nameResolverFactory(nameResolverProvider);
+                .usePlaintext(true)
+                .nameResolverFactory(nameResolverProvider);
 
         ManagedChannel channel;
         try {
@@ -162,72 +156,69 @@
         P4RuntimeClient client = new P4RuntimeClientImpl(
                 clientKey.deviceId(), clientKey.p4DeviceId(), channel, this);
 
+        clientKeys.put(clientKey.deviceId(), clientKey);
+        clients.put(clientKey, client);
         channelIds.put(clientKey.deviceId(), channelId);
-        deviceIdToClientKey.put(clientKey.deviceId(), clientKey);
-        clientKeyToClient.put(clientKey, client);
 
         return true;
     }
 
     @Override
     public P4RuntimeClient getClient(DeviceId deviceId) {
+        if (deviceId == null) {
+            return null;
+        }
+        return withDeviceLock(() -> doGetClient(deviceId), deviceId);
+    }
 
-        deviceLocks.getUnchecked(deviceId).readLock().lock();
-
-        try {
-            if (!deviceIdToClientKey.containsKey(deviceId)) {
-                return null;
-            } else {
-                return clientKeyToClient.get(deviceIdToClientKey.get(deviceId));
-            }
-        } finally {
-            deviceLocks.getUnchecked(deviceId).readLock().unlock();
+    private P4RuntimeClient doGetClient(DeviceId deviceId) {
+        if (!clientKeys.containsKey(deviceId)) {
+            return null;
+        } else {
+            return clients.get(clientKeys.get(deviceId));
         }
     }
 
     @Override
     public void removeClient(DeviceId deviceId) {
-
-        deviceLocks.getUnchecked(deviceId).writeLock().lock();
-        try {
-            if (deviceIdToClientKey.containsKey(deviceId)) {
-                final ClientKey clientKey = deviceIdToClientKey.get(deviceId);
-                clientKeyToClient.remove(clientKey).shutdown();
-                grpcController.disconnectChannel(channelIds.get(deviceId));
-                deviceIdToClientKey.remove(deviceId);
-                channelIds.remove(deviceId);
-            }
-        } finally {
-           deviceLocks.getUnchecked(deviceId).writeLock().unlock();
+        if (deviceId == null) {
+            return;
         }
+        withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
+    }
+
+    private Void doRemoveClient(DeviceId deviceId) {
+        if (clientKeys.containsKey(deviceId)) {
+            final ClientKey clientKey = clientKeys.get(deviceId);
+            clients.get(clientKey).shutdown();
+            grpcController.disconnectChannel(channelIds.get(deviceId));
+            clientKeys.remove(deviceId);
+            clients.remove(clientKey);
+            channelIds.remove(deviceId);
+        }
+        return null;
     }
 
     @Override
     public boolean hasClient(DeviceId deviceId) {
-        deviceLocks.getUnchecked(deviceId).readLock().lock();
-
-        try {
-            return deviceIdToClientKey.containsKey(deviceId);
-        } finally {
-            deviceLocks.getUnchecked(deviceId).readLock().unlock();
-        }
+        return clientKeys.containsKey(deviceId);
     }
 
     @Override
-    public boolean isReacheable(DeviceId deviceId) {
-
-        deviceLocks.getUnchecked(deviceId).readLock().lock();
-
-        try {
-            if (!deviceIdToClientKey.containsKey(deviceId)) {
-                log.debug("No client for {}, can't check for reachability", deviceId);
-                return false;
-            }
-            // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
-            return grpcController.isChannelOpen(channelIds.get(deviceId));
-        } finally {
-            deviceLocks.getUnchecked(deviceId).readLock().unlock();
+    public boolean isReachable(DeviceId deviceId) {
+        if (deviceId == null) {
+            return false;
         }
+        return withDeviceLock(() -> doIsReacheable(deviceId), deviceId);
+    }
+
+    private boolean doIsReacheable(DeviceId deviceId) {
+        // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
+        if (!clientKeys.containsKey(deviceId)) {
+            log.debug("No client for {}, can't check for reachability", deviceId);
+            return false;
+        }
+        return grpcController.isChannelOpen(channelIds.get(deviceId));
     }
 
     @Override
@@ -244,6 +235,16 @@
         });
     }
 
+    private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
+        final Lock lock = stripedLocks.get(deviceId);
+        lock.lock();
+        try {
+            return task.get();
+        } finally {
+            lock.unlock();
+        }
+    }
+
     BigInteger newMasterElectionId(DeviceId deviceId) {
         return electionIdGenerator.generate(deviceId);
     }
@@ -274,7 +275,7 @@
                 agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
                 break;
             case ERROR:
-                agentEventType = !isReacheable(deviceId)
+                agentEventType = !isReachable(deviceId)
                         ? DeviceAgentEvent.Type.CHANNEL_CLOSED
                         : DeviceAgentEvent.Type.CHANNEL_ERROR;
                 break;
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index ad0d388..b341149 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -16,9 +16,13 @@
 
 package org.onosproject.provider.general.device.impl;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.annotations.Beta;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -32,7 +36,6 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
@@ -87,7 +90,6 @@
 import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -100,10 +102,11 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.device.DeviceEvent.Type;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -118,13 +121,7 @@
 public class GeneralDeviceProvider extends AbstractProvider
         implements DeviceProvider {
 
-    // Timeout in seconds for operations on devices.
-    private static final int DEVICE_OP_TIMEOUT = 10;
-
     private static final String DRIVER = "driver";
-    public static final String FIRST_CONNECTION_TOPIC = "first-connection-";
-    private static final String CHECK_CONNECTION_TOPIC = "check-connection-";
-    private static final String POLL_FREQUENCY = "pollFrequency";
 
     private final Logger log = getLogger(getClass());
 
@@ -158,17 +155,36 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private LeadershipService leadershipService;
 
-    private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
-    @Property(name = POLL_FREQUENCY, intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
+    private static final String STATS_POLL_FREQUENCY = "deviceStatsPollFrequency";
+    private static final int DEFAULT_STATS_POLL_FREQUENCY = 10;
+    @Property(name = STATS_POLL_FREQUENCY, intValue = DEFAULT_STATS_POLL_FREQUENCY,
             label = "Configure poll frequency for port status and statistics; " +
                     "default is 10 sec")
-    private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
+    private int statsPollFrequency = DEFAULT_STATS_POLL_FREQUENCY;
 
-    private static final int DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS = 10;
-    @Property(name = "deviceAvailabilityPollFrequency", intValue = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS,
-            label = "Configure poll frequency for checking device availability; " +
+    private static final String PROBE_FREQUENCY = "deviceProbeFrequency";
+    private static final int DEFAULT_PROBE_FREQUENCY = 10;
+    @Property(name = PROBE_FREQUENCY, intValue = DEFAULT_PROBE_FREQUENCY,
+            label = "Configure probe frequency for checking device availability; " +
                     "default is 10 sec")
-    private int deviceAvailabilityPollFrequency = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS;
+    private int probeFrequency = DEFAULT_PROBE_FREQUENCY;
+
+    private static final String OP_TIMEOUT_SHORT = "deviceOperationTimeoutShort";
+    private static final int DEFAULT_OP_TIMEOUT_SHORT = 10;
+    @Property(name = OP_TIMEOUT_SHORT, intValue = DEFAULT_OP_TIMEOUT_SHORT,
+            label = "Configure timeout in seconds for device operations " +
+                    "that are supposed to take a short time " +
+                    "(e.g. checking device reachability); default is 10 seconds")
+    private int opTimeoutShort = DEFAULT_OP_TIMEOUT_SHORT;
+
+    private static final String OP_TIMEOUT_LONG = "deviceOperationTimeoutLong";
+    private static final int DEFAULT_OP_TIMEOUT_LONG = 60;
+    @Property(name = OP_TIMEOUT_LONG, intValue = DEFAULT_OP_TIMEOUT_LONG,
+            label = "Configure timeout in seconds for device operations " +
+                    "that are supposed to take a relatively long time " +
+                    "(e.g. pushing a large pipeline configuration with slow " +
+                    "network); default is 60 seconds")
+    private int opTimeoutLong = DEFAULT_OP_TIMEOUT_LONG;
 
     private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
     private static final String URI_SCHEME = "device";
@@ -180,7 +196,6 @@
     //FIXME this will be removed when the configuration is synced at the source.
     private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
 
-    private static final ConcurrentMap<DeviceId, Lock> DEVICE_LOCKS = Maps.newConcurrentMap();
     //FIXME to be removed when netcfg will issue device events in a bundle or
     //ensures all configuration needed is present
     private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
@@ -188,20 +203,20 @@
     private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
 
     private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
-
     private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
-
+    private final Striped<Lock> deviceLocks = Striped.lock(30);
 
     private ExecutorService connectionExecutor
             = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
             "onos/generaldeviceprovider-device-connect", "%d", log));
-    private ScheduledExecutorService portStatsExecutor
+    private ScheduledExecutorService statsExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
-            "onos/generaldeviceprovider-port-stats", "%d", log));
-    private ScheduledExecutorService availabilityCheckExecutor
-            = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
-            "onos/generaldeviceprovider-availability-check", "%d", log));
-    private ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
+            "onos/generaldeviceprovider-stats-poll", "%d", log));
+    private ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
+    private ScheduledExecutorService probeExecutor
+            = newSingleThreadScheduledExecutor(groupedThreads(
+            "onos/generaldeviceprovider-probe-", "%d", log));
+    private ScheduledFuture<?> probeTask = null;
 
     private DeviceProviderService providerService;
     private InternalDeviceListener deviceListener = new InternalDeviceListener();
@@ -232,46 +247,61 @@
         //This will fail if ONOS has CFG and drivers which depend on this provider
         // are activated, failing due to not finding the driver.
         cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
-                .forEach(did -> triggerConnectWithLeadership(
-                        did, FIRST_CONNECTION_TOPIC + did.toString()));
+                .forEach(this::triggerConnect);
         //Initiating a periodic check to see if any device is available again and reconnect it.
-        availabilityCheckExecutor.scheduleAtFixedRate(
-                this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
-                deviceAvailabilityPollFrequency, TimeUnit.SECONDS);
+        rescheduleProbeTask();
         modified(context);
         log.info("Started");
     }
 
     @Modified
     public void modified(ComponentContext context) {
-        if (context != null) {
-            Dictionary<?, ?> properties = context.getProperties();
-            pollFrequency = Tools.getIntegerProperty(properties, POLL_FREQUENCY,
-                                                     DEFAULT_POLL_FREQUENCY_SECONDS);
-            log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
+        if (context == null) {
+            return;
         }
 
-        if (!scheduledTasks.isEmpty()) {
-            //cancel all previous tasks
-            scheduledTasks.values().forEach(task -> task.cancel(false));
-            //resubmit task with new timeout.
-            Set<DeviceId> deviceSubjects =
-                    cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
-            deviceSubjects.forEach(deviceId -> {
-                if (notMyScheme(deviceId)) {
-                    // not under my scheme, skipping
-                    log.debug("{} is not my scheme, skipping", deviceId);
-                    return;
-                }
-                scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, true));
-            });
+        Dictionary<?, ?> properties = context.getProperties();
+        final int oldStatsPollFrequency = statsPollFrequency;
+        statsPollFrequency = Tools.getIntegerProperty(
+                properties, STATS_POLL_FREQUENCY, DEFAULT_STATS_POLL_FREQUENCY);
+        log.info("Configured. {} is configured to {} seconds",
+                 STATS_POLL_FREQUENCY, statsPollFrequency);
+        final int oldProbeFrequency = probeFrequency;
+        probeFrequency = Tools.getIntegerProperty(
+                properties, PROBE_FREQUENCY, DEFAULT_PROBE_FREQUENCY);
+        log.info("Configured. {} is configured to {} seconds",
+                 PROBE_FREQUENCY, probeFrequency);
+        opTimeoutShort = Tools.getIntegerProperty(
+                properties, OP_TIMEOUT_SHORT, DEFAULT_OP_TIMEOUT_SHORT);
+        log.info("Configured. {} is configured to {} seconds",
+                 OP_TIMEOUT_SHORT, opTimeoutShort);
+        opTimeoutLong = Tools.getIntegerProperty(
+                properties, OP_TIMEOUT_LONG, DEFAULT_OP_TIMEOUT_LONG);
+        log.info("Configured. {} is configured to {} seconds",
+                 OP_TIMEOUT_LONG, opTimeoutLong);
+
+        if (oldStatsPollFrequency != statsPollFrequency) {
+            rescheduleStatsPollingTasks();
         }
+
+        if (oldProbeFrequency != probeFrequency) {
+            rescheduleProbeTask();
+        }
+    }
+
+    private synchronized void rescheduleProbeTask() {
+        if (probeTask != null) {
+            probeTask.cancel(false);
+        }
+        probeTask = probeExecutor.scheduleAtFixedRate(
+                this::triggerProbeAllDevices, probeFrequency,
+                probeFrequency, TimeUnit.SECONDS);
     }
 
     @Deactivate
     public void deactivate() {
-        portStatsExecutor.shutdown();
-        availabilityCheckExecutor.shutdown();
+        statsExecutor.shutdown();
+        probeExecutor.shutdown();
         componentConfigService.unregisterProperties(getClass(), false);
         cfgService.removeListener(cfgListener);
         //Not Removing the device so they can still be used from other driver providers
@@ -293,18 +323,15 @@
 
     @Override
     public void triggerProbe(DeviceId deviceId) {
-        // TODO Really don't see the point of this in non OF Context,
-        // for now testing reachability, can be moved to no-op
-        log.debug("Triggering probe equals testing reachability on device {}", deviceId);
-        isReachable(deviceId);
+        connectionExecutor.execute(withDeviceLock(
+                () -> doDeviceProbe(deviceId), deviceId));
     }
 
     @Override
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
         log.info("Received role {} for device {}", newRole, deviceId);
         requestedRoles.put(deviceId, newRole);
-        connectionExecutor.submit(exceptionSafe(
-                () -> doRoleChanged(deviceId, newRole)));
+        connectionExecutor.execute(() -> doRoleChanged(deviceId, newRole));
     }
 
     private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
@@ -326,74 +353,63 @@
         }
         return getFutureWithDeadline(
                 handshaker.isReachable(), "checking reachability",
-                deviceId, false);
+                deviceId, false, opTimeoutShort);
     }
 
     @Override
     public void changePortState(DeviceId deviceId, PortNumber portNumber,
                                 boolean enable) {
+        connectionExecutor.execute(
+                () -> doChangePortState(deviceId, portNumber, enable));
+    }
+
+    private void doChangePortState(DeviceId deviceId, PortNumber portNumber,
+                                   boolean enable) {
         if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
             log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
                      deviceId);
             return;
         }
-        final PortAdmin portAdmin = getPortAdmin(deviceId);
-        final CompletableFuture<Boolean> modified = enable
+        final PortAdmin portAdmin = deviceService.getDevice(deviceId)
+                .as(PortAdmin.class);
+        final CompletableFuture<Boolean> modifyTask = enable
                 ? portAdmin.enable(portNumber)
                 : portAdmin.disable(portNumber);
-        modified.thenAcceptAsync(result -> {
-            if (!result) {
-                log.warn("Port {} status cannot be changed on {} (enable={})",
-                         portNumber, deviceId, enable);
-            }
-        });
+        final String descr = (enable ? "enabling" : "disabling") + " port " + portNumber;
+        getFutureWithDeadline(
+                modifyTask, descr, deviceId, null, opTimeoutShort);
     }
 
     @Override
     public void triggerDisconnect(DeviceId deviceId) {
         log.debug("Triggering disconnection of device {}", deviceId);
-        connectionExecutor.execute(
-                () -> disconnectDevice(deviceId)
-                        .thenRunAsync(() -> checkAndConnect(deviceId)));
+        connectionExecutor.execute(withDeviceLock(
+                () -> doDisconnectDevice(deviceId), deviceId));
     }
 
     private DeviceHandshaker getHandshaker(DeviceId deviceId) {
         return handshakers.computeIfAbsent(deviceId, id -> {
             Driver driver = getDriver(deviceId);
-            return driver == null ? null :
-                    getBehaviour(driver, DeviceHandshaker.class,
-                                 new DefaultDriverData(driver, deviceId));
+            return driver == null ? null : getBehaviour(
+                    driver, DeviceHandshaker.class,
+                    new DefaultDriverData(driver, deviceId));
         });
     }
 
-    private PortAdmin getPortAdmin(DeviceId deviceId) {
-        Driver driver = getDriver(deviceId);
-        return getBehaviour(driver, PortAdmin.class,
-                            new DefaultDriverData(driver, deviceId));
-
-    }
-
     private Driver getDriver(DeviceId deviceId) {
-        Driver driver = null;
         try {
-            driver = driverService.getDriver(deviceId);
+            // DriverManager checks first using basic device config.
+            return driverService.getDriver(deviceId);
         } catch (ItemNotFoundException e) {
-            log.debug("Falling back to configuration to fetch driver " +
-                              "for device {}", deviceId);
-            BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
-            if (cfg != null) {
-                driver = driverService.getDriver(cfg.driver());
-            }
+            log.error("Driver not found for {}", deviceId);
+            return null;
         }
-        return driver;
     }
 
-    //needed since the device manager will not return the driver through implementation()
-    // method since the device is not pushed to the core so for the connectDeviceAsMaster
-    // we need to work around that in order to test before calling
-    // store.createOrUpdateDevice
     private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
                                                  DriverData data) {
+        // Allows obtaining behavior implementations before the device is pushed
+        // to the core.
         if (driver != null && driver.hasBehaviour(type)) {
             DefaultDriverHandler handler = new DefaultDriverHandler(data);
             return driver.createBehaviour(handler, type);
@@ -402,7 +418,12 @@
         }
     }
 
-    private void doConnectDevice(DeviceId deviceId, boolean asMaster) {
+    private void doConnectDevice(DeviceId deviceId) {
+        // Some operations can be performed by one node only.
+        final boolean isLocalLeader = leadershipService.runForLeadership(
+                GeneralProviderDeviceConfig.class.getName() + deviceId)
+                .leader().nodeId().equals(clusterService.getLocalNode().id());
+
         if (deviceService.getDevice(deviceId) != null
                 && deviceService.isAvailable(deviceId)) {
             log.info("Device {} is already connected to ONOS and is available",
@@ -421,7 +442,7 @@
             return;
         }
         log.info("Initiating connection to device {} with driver {} ... asMaster={}",
-                 deviceId, basicDeviceConfig.driver(), asMaster);
+                 deviceId, basicDeviceConfig.driver(), isLocalLeader);
         // Get handshaker, driver and driver data.
         final DeviceHandshaker handshaker = getHandshaker(deviceId);
         if (handshaker == null) {
@@ -436,7 +457,7 @@
         // Start connection via handshaker.
         final Boolean connectSuccess = getFutureWithDeadline(
                 handshaker.connect(), "initiating connection",
-                deviceId, null);
+                deviceId, null, opTimeoutShort);
         if (connectSuccess == null) {
             // Error logged by getFutureWithDeadline().
             return;
@@ -445,15 +466,17 @@
             return;
         }
         // Handle pipeconf (if device is capable)
-        if (!handlePipeconf(deviceId, driver, driverData, asMaster)) {
+        if (!handlePipeconf(deviceId, driver, driverData, isLocalLeader)) {
             // We already logged the error.
-            handshaker.disconnect();
+            getFutureWithDeadline(
+                    handshaker.disconnect(), "performing disconnection",
+                    deviceId, null, opTimeoutShort);
             return;
         }
         // Add device agent listener.
         handshaker.addDeviceAgentListener(deviceAgentListener);
         // All good. Notify core (if master).
-        if (asMaster) {
+        if (isLocalLeader) {
             advertiseDevice(deviceId, driver, providerConfig, driverData);
         }
     }
@@ -504,7 +527,7 @@
      * core, false otherwise.
      */
     private boolean handlePipeconf(DeviceId deviceId, Driver driver,
-                                   DriverData driverData, boolean deployPipeconf) {
+                                   DriverData driverData, boolean asMaster) {
         final PiPipelineProgrammable pipelineProg = getBehaviour(
                 driver, PiPipelineProgrammable.class, driverData);
         if (pipelineProg == null) {
@@ -518,28 +541,41 @@
         }
         final PiPipeconfId pipeconfId = pipeconf.id();
 
-        if (deployPipeconf) {
-            final Boolean deploySuccess = getFutureWithDeadline(
-                    pipelineProg.deployPipeconf(pipeconf),
-                    "deploying pipeconf", deviceId, null);
-            if (deploySuccess == null) {
-                // Error logged by getFutureWithDeadline().
-                return false;
-            } else if (!deploySuccess) {
-                log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
-                          pipeconfId, deviceId);
-                return false;
-            }
+        final String mergedDriverName = piPipeconfService.mergeDriver(
+                deviceId, pipeconfId);
+        if (mergedDriverName == null) {
+            log.error("Unable to get merged driver for {} and {}, aborting device discovery",
+                      deviceId, pipeconfId);
+            return false;
         }
 
-        final Boolean mergeSuccess = getFutureWithDeadline(
-                piPipeconfService.bindToDevice(pipeconfId, deviceId),
-                "merging driver", deviceId, null);
-        if (mergeSuccess == null) {
+        if (!asMaster) {
+            // From now one only the master.
+            return true;
+        }
+
+        if (!setDriverViaCfg(deviceId, mergedDriverName)) {
+            return false;
+        }
+
+        // FIXME: we just introduced a race condition as it might happen that a
+        // node does not receive the new cfg (with the merged driver) before the
+        // device is advertised to the core. Perhaps we should be waiting for a
+        // NetworkConfig event signaling that the driver has been updated on all
+        // nodes? The effect is mitigated by deploying the pipeconf (slow
+        // operation), after calling setDriverViaCfg().
+
+        piPipeconfService.bindToDevice(pipeconfId, deviceId);
+
+        final Boolean deploySuccess = getFutureWithDeadline(
+                pipelineProg.deployPipeconf(pipeconf),
+                "deploying pipeconf", deviceId, null,
+                opTimeoutLong);
+        if (deploySuccess == null) {
             // Error logged by getFutureWithDeadline().
             return false;
-        } else if (!mergeSuccess) {
-            log.error("Unable to merge pipeconf driver for {}, aborting device discovery",
+        } else if (!deploySuccess) {
+            log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
                       pipeconfId, deviceId);
             return false;
         }
@@ -547,6 +583,21 @@
         return true;
     }
 
+    private boolean setDriverViaCfg(DeviceId deviceId, String driverName) {
+        BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
+        if (cfg == null) {
+            log.error("Unable to get basic device config for {}, aborting device discovery",
+                      deviceId);
+            return false;
+        }
+        ObjectNode newCfg = (ObjectNode) cfg.node();
+        newCfg = newCfg.put(DRIVER, driverName);
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
+        cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
+        return true;
+    }
+
     private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
         PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
         if (pipeconfId == null || pipeconfId.id().isEmpty()) {
@@ -569,16 +620,14 @@
         return piPipeconfService.getPipeconf(pipeconfId).get();
     }
 
-    private CompletableFuture<?> disconnectDevice(DeviceId deviceId) {
-        log.info("Disconnecting for device {}", deviceId);
+    private void doDisconnectDevice(DeviceId deviceId) {
+        log.info("Initiating disconnection from {}...", deviceId);
         // Remove from core (if master)
-        if (mastershipService.isLocalMaster(deviceId)) {
+        if (mastershipService.isLocalMaster(deviceId)
+                && deviceService.isAvailable(deviceId)) {
             providerService.deviceDisconnected(deviceId);
         }
-        // Cancel tasks
-        if (scheduledTasks.containsKey(deviceId)) {
-            scheduledTasks.remove(deviceId).cancel(true);
-        }
+        cancelStatsPolling(deviceId);
         // Perform disconnection with device.
         final DeviceHandshaker handshaker = handshakers.remove(deviceId);
         if (handshaker == null) {
@@ -586,18 +635,15 @@
             log.warn("Missing DeviceHandshaker behavior for {}, " +
                              "no guarantees of complete disconnection",
                      deviceId);
-            return CompletableFuture.completedFuture(false);
+            return;
         }
         handshaker.removeDeviceAgentListener(deviceAgentListener);
-        return handshaker.disconnect()
-                .thenApplyAsync(result -> {
-                    if (result) {
-                        log.info("Disconnected device {}", deviceId);
-                    } else {
-                        log.warn("Device {} was unable to disconnect", deviceId);
-                    }
-                    return result;
-                });
+        final boolean disconnectSuccess = getFutureWithDeadline(
+                handshaker.disconnect(), "performing disconnection",
+                deviceId, false, opTimeoutShort);
+        if (!disconnectSuccess) {
+            log.warn("Unable to disconnect from {}", deviceId);
+        }
     }
 
     // Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -611,21 +657,27 @@
         };
     }
 
-    private Runnable withDeviceLock(Runnable runnable, DeviceId deviceId) {
-        return () -> {
-            Lock lock = DEVICE_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
-            lock.lock();
-            try {
-                runnable.run();
-            } finally {
-                lock.unlock();
-            }
-        };
+    private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
+        final Lock lock = deviceLocks.get(deviceId);
+        lock.lock();
+        try {
+            return task.get();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private Runnable withDeviceLock(Runnable task, DeviceId deviceId) {
+        // Wrapper of withDeviceLock(Supplier, ...) for void tasks.
+        return () -> withDeviceLock(() -> {
+            task.run();
+            return null;
+        }, deviceId);
     }
 
     private void updatePortStatistics(DeviceId deviceId) {
         Device device = deviceService.getDevice(deviceId);
-        if (!Objects.isNull(device) && deviceService.isAvailable(deviceId) &&
+        if (device != null && deviceService.isAvailable(deviceId) &&
                 device.is(PortStatisticsDiscovery.class)) {
             Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
                     .discoverPortStatistics();
@@ -642,14 +694,9 @@
         return !deviceId.uri().getScheme().equals(URI_SCHEME);
     }
 
-    private void triggerConnectWithLeadership(DeviceId deviceId,
-                                              String leadershipTopic) {
-        final NodeId leaderNodeId = leadershipService.runForLeadership(
-                leadershipTopic).leader().nodeId();
-        final boolean thisNodeMaster = clusterService
-                .getLocalNode().id().equals(leaderNodeId);
-        connectionExecutor.submit(withDeviceLock(exceptionSafe(
-                () -> doConnectDevice(deviceId, thisNodeMaster)), deviceId));
+    private void triggerConnect(DeviceId deviceId) {
+        connectionExecutor.execute(withDeviceLock(
+                () -> doConnectDevice(deviceId), deviceId));
     }
 
     /**
@@ -666,67 +713,62 @@
                 log.debug("{} is not my scheme, skipping", deviceId);
                 return;
             }
-            if (!isDeviceConfigComplete(event, deviceId)) {
+            final boolean configComplete = withDeviceLock(
+                    () -> isDeviceConfigComplete(event, deviceId), deviceId);
+            if (!configComplete) {
                 // Still waiting for some configuration.
                 return;
             }
             // Good to go.
-            triggerConnectWithLeadership(
-                    deviceId, FIRST_CONNECTION_TOPIC + deviceId.toString());
+            triggerConnect(deviceId);
             cleanUpConfigInfo(deviceId);
         }
 
         private boolean isDeviceConfigComplete(NetworkConfigEvent event, DeviceId deviceId) {
             // FIXME to be removed when netcfg will issue device events in a bundle or
             // ensure all configuration needed is present
-            Lock lock = DEVICE_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
-            lock.lock();
-            try {
-                if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
-                    //FIXME we currently assume that p4runtime devices are pipeline configurable.
-                    //If we want to connect a p4runtime device with no pipeline
-                    if (event.config().isPresent()) {
-                        deviceConfigured.add(deviceId);
-                        final boolean isNotPipelineConfigurable = Collections.disjoint(
-                                ImmutableSet.copyOf(event.config().get().node().fieldNames()),
-                                PIPELINE_CONFIGURABLE_PROTOCOLS);
-                        if (isNotPipelineConfigurable) {
-                            // Skip waiting for a pipeline if we can't support it.
-                            pipelineConfigured.add(deviceId);
-                        }
-                    }
-                } else if (event.configClass().equals(BasicDeviceConfig.class)) {
-                    if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
-                        driverConfigured.add(deviceId);
-                    }
-                } else if (event.configClass().equals(PiPipeconfConfig.class)) {
-                    if (event.config().isPresent()
-                            && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
+            if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
+                //FIXME we currently assume that p4runtime devices are pipeline configurable.
+                //If we want to connect a p4runtime device with no pipeline
+                if (event.config().isPresent()) {
+                    deviceConfigured.add(deviceId);
+                    final boolean isNotPipelineConfigurable = Collections.disjoint(
+                            ImmutableSet.copyOf(event.config().get().node().fieldNames()),
+                            PIPELINE_CONFIGURABLE_PROTOCOLS);
+                    if (isNotPipelineConfigurable) {
+                        // Skip waiting for a pipeline if we can't support it.
                         pipelineConfigured.add(deviceId);
                     }
                 }
-
-                if (deviceConfigured.contains(deviceId)
-                        && driverConfigured.contains(deviceId)
-                        && pipelineConfigured.contains(deviceId)) {
-                    return true;
-                } else {
-                    if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
-                        log.debug("Waiting for pipeline configuration for device {}", deviceId);
-                    } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
-                        log.debug("Waiting for device configuration for device {}", deviceId);
-                    } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
-                        log.debug("Waiting for driver configuration for device {}", deviceId);
-                    } else if (driverConfigured.contains(deviceId)) {
-                        log.debug("Only driver configuration for device {}", deviceId);
-                    } else if (deviceConfigured.contains(deviceId)) {
-                        log.debug("Only device configuration for device {}", deviceId);
-                    }
+            } else if (event.configClass().equals(BasicDeviceConfig.class)) {
+                if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
+                    driverConfigured.add(deviceId);
                 }
-                return false;
-            } finally {
-                lock.unlock();
+            } else if (event.configClass().equals(PiPipeconfConfig.class)) {
+                if (event.config().isPresent()
+                        && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
+                    pipelineConfigured.add(deviceId);
+                }
             }
+
+            if (deviceConfigured.contains(deviceId)
+                    && driverConfigured.contains(deviceId)
+                    && pipelineConfigured.contains(deviceId)) {
+                return true;
+            } else {
+                if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
+                    log.debug("Waiting for pipeline configuration for device {}", deviceId);
+                } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
+                    log.debug("Waiting for device configuration for device {}", deviceId);
+                } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
+                    log.debug("Waiting for driver configuration for device {}", deviceId);
+                } else if (driverConfigured.contains(deviceId)) {
+                    log.debug("Only driver configuration for device {}", deviceId);
+                } else if (deviceConfigured.contains(deviceId)) {
+                    log.debug("Only device configuration for device {}", deviceId);
+                }
+            }
+            return false;
         }
 
         @Override
@@ -757,18 +799,38 @@
         pipelineConfigured.remove(deviceId);
     }
 
-    private ScheduledFuture<?> scheduleStatsPolling(DeviceId deviceId, boolean randomize) {
-        int delay = 0;
-        if (randomize) {
-            delay = new SecureRandom().nextInt(10);
-        }
-        return portStatsExecutor.scheduleAtFixedRate(
-                exceptionSafe(() -> updatePortStatistics(deviceId)),
-                delay, pollFrequency, TimeUnit.SECONDS);
+    private void startStatsPolling(DeviceId deviceId, boolean withRandomDelay) {
+        statsPollingTasks.compute(deviceId, (did, oldTask) -> {
+            if (oldTask != null) {
+                oldTask.cancel(false);
+            }
+            final int delay = withRandomDelay
+                    ? new SecureRandom().nextInt(10) : 0;
+            return statsExecutor.scheduleAtFixedRate(
+                    exceptionSafe(() -> updatePortStatistics(deviceId)),
+                    delay, statsPollFrequency, TimeUnit.SECONDS);
+        });
     }
 
-    private void scheduleDevicePolling() {
-        cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class).forEach(this::checkAndConnect);
+    private void cancelStatsPolling(DeviceId deviceId) {
+        statsPollingTasks.computeIfPresent(deviceId, (did, task) -> {
+            task.cancel(false);
+            return null;
+        });
+    }
+
+    private void rescheduleStatsPollingTasks() {
+        statsPollingTasks.keySet().forEach(deviceId -> {
+            // startStatsPolling cancels old one if present.
+            startStatsPolling(deviceId, true);
+        });
+    }
+
+    private void triggerProbeAllDevices() {
+        // Async trigger a task for all devices in the cfg.
+        cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
+                .forEach(deviceId -> connectionExecutor.execute(withDeviceLock(
+                        () -> doDeviceProbe(deviceId), deviceId)));
     }
 
     private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
@@ -780,38 +842,28 @@
         return config.piPipeconfId();
     }
 
-    private void checkAndConnect(DeviceId deviceId) {
-        // Let's try and reconnect to a device which is stored in cfg.
-        // One of the following conditions must be satisfied:
-        // 1) device is null in the store meaning that is was never connected or
-        // it was administratively removed
-        // 2) the device is not available and there is no MASTER instance,
-        // meaning the device lost it's connection to ONOS at some point in the
-        // past.
-        // We also check that the general device provider config and the driver
-        // config are present. We do not check for reachability using
-        // isReachable(deviceId) since the behaviour of this method can vary
-        // depending on protocol nuances. We leave this check to the device
-        // handshaker at later stages of the connection process. IF the
-        // conditions are not met but instead the device is present in the
-        // store, available and this instance is MASTER but is not reachable we
-        // remove it from the store.
-
-        if ((deviceService.getDevice(deviceId) == null
-                || (!deviceService.isAvailable(deviceId)
-                && mastershipService.getMasterFor(deviceId) == null))
-                && configIsPresent(deviceId)) {
-            log.debug("Trying to re-connect to device {}", deviceId);
-            triggerConnectWithLeadership(
-                    deviceId, CHECK_CONNECTION_TOPIC + deviceId.toString());
-            cleanUpConfigInfo(deviceId);
-        } else if (deviceService.getDevice(deviceId) != null
-                && deviceService.isAvailable(deviceId)
-                && mastershipService.isLocalMaster(deviceId)
-                && !isReachable(deviceId)
-                && configIsPresent(deviceId)) {
-            log.info("Removing available but unreachable device {}", deviceId);
-            disconnectDevice(deviceId);
+    private void doDeviceProbe(DeviceId deviceId) {
+        if (!configIsPresent(deviceId)) {
+            return;
+        }
+        final boolean isAvailable = deviceService.getDevice(deviceId) != null
+                && deviceService.isAvailable(deviceId);
+        final boolean isLocalMaster = mastershipService.isLocalMaster(deviceId);
+        if (isAvailable) {
+            if (!isLocalMaster) {
+                return;
+            }
+            if (!isReachable(deviceId)) {
+                log.info("Disconnecting available but unreachable device {}...",
+                         deviceId);
+                triggerDisconnect(deviceId);
+            }
+        } else {
+            // We do not check for reachability using isReachable()
+            // since the behaviour of this method can vary depending on protocol
+            // nuances. We leave this check to the device handshaker at later
+            // stages of the connection process.
+            triggerConnect(deviceId);
         }
     }
 
@@ -828,7 +880,7 @@
     private void handleChannelClosed(DeviceId deviceId) {
         log.info("Disconnecting device {}, due to channel closed event",
                  deviceId);
-        disconnectDevice(deviceId);
+        triggerDisconnect(deviceId);
     }
 
     private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
@@ -837,21 +889,17 @@
             return;
         }
         providerService.receivedRoleReply(deviceId, requestedRoles.get(deviceId), response);
-        // If not master, cancel polling tasks, otherwise start them.
-        if (!response.equals(MastershipRole.MASTER)
-                && scheduledTasks.get(deviceId) != null) {
-            scheduledTasks.remove(deviceId).cancel(false);
-        } else if (response.equals(MastershipRole.MASTER)
-                && scheduledTasks.get(deviceId) == null) {
-            scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
-            updatePortStatistics(deviceId);
+        if (response.equals(MastershipRole.MASTER)) {
+            startStatsPolling(deviceId, false);
+        } else {
+            cancelStatsPolling(deviceId);
         }
     }
 
     private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
-                                        DeviceId deviceId, U defaultValue) {
+                                        DeviceId deviceId, U defaultValue, int timeout) {
         try {
-            return future.get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
+            return future.get(timeout, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             log.error("Thread interrupted while {} on {}", opDescription, deviceId);
             Thread.currentThread().interrupt();
@@ -873,7 +921,7 @@
             // For now this is scheduled periodically, when streaming API will
             // be available we check and base it on the streaming API (e.g. gNMI)
             if (mastershipService.isLocalMaster(deviceId)) {
-                scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
+                startStatsPolling(deviceId, true);
             }
         }