More improvements and bugfixes in P4Runtime subsystem

Most notably, we fix a bug in which some nodes were not able to find
pipeconf-specific behaviors for a given device. The problem is not
completelly solved but it's mitigated.

There's a race condition caused by the fact that the GDP updates the cfg
with the merged driver name before advertising the device to the core.
Some nodes might receive the cfg update after the device has been
advertised. We mitigate the problem by performing the pipeline deploy
(slow operation) after the cfg update, giving more time for nodes
to catch up. Perhaps we should listen for cfg update events before
advertising the device to the core?

Also:
- NPE when getting P4Runtime client
- Detect if a base driver is already merged in pipeconf manager
- Longer timeouts in P4Runtime driver and protocol (for slow networks)
- Configurable timeout in P4Runtime driver and GDP
- NPE when adding/removing device agent listeners in P4Rtunime handshaker
- Various exceptions due to race conditions in GDP when disconnecting
devices (by serializing disconnect tasks per device)
- NPE when cancelling polling tasks in GDP
- Refactored PipeconfService to distinguish between driver merge,
pipeconf map update, and cfg update (now performed in the GDP)
- Fixed PipeconfManagerTest, not testing driver behaviours
- Use Guava striped locks when possible (more memory-efficient than maps,
and with strict atomicity guarantees w.r.t. to caches).

Change-Id: I30f3887541ba0fd44439a86885e9821ac565b64c
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index ad0d388..b341149 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -16,9 +16,13 @@
 
 package org.onosproject.provider.general.device.impl;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.annotations.Beta;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -32,7 +36,6 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
@@ -87,7 +90,6 @@
 import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -100,10 +102,11 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.device.DeviceEvent.Type;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -118,13 +121,7 @@
 public class GeneralDeviceProvider extends AbstractProvider
         implements DeviceProvider {
 
-    // Timeout in seconds for operations on devices.
-    private static final int DEVICE_OP_TIMEOUT = 10;
-
     private static final String DRIVER = "driver";
-    public static final String FIRST_CONNECTION_TOPIC = "first-connection-";
-    private static final String CHECK_CONNECTION_TOPIC = "check-connection-";
-    private static final String POLL_FREQUENCY = "pollFrequency";
 
     private final Logger log = getLogger(getClass());
 
@@ -158,17 +155,36 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private LeadershipService leadershipService;
 
-    private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
-    @Property(name = POLL_FREQUENCY, intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
+    private static final String STATS_POLL_FREQUENCY = "deviceStatsPollFrequency";
+    private static final int DEFAULT_STATS_POLL_FREQUENCY = 10;
+    @Property(name = STATS_POLL_FREQUENCY, intValue = DEFAULT_STATS_POLL_FREQUENCY,
             label = "Configure poll frequency for port status and statistics; " +
                     "default is 10 sec")
-    private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
+    private int statsPollFrequency = DEFAULT_STATS_POLL_FREQUENCY;
 
-    private static final int DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS = 10;
-    @Property(name = "deviceAvailabilityPollFrequency", intValue = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS,
-            label = "Configure poll frequency for checking device availability; " +
+    private static final String PROBE_FREQUENCY = "deviceProbeFrequency";
+    private static final int DEFAULT_PROBE_FREQUENCY = 10;
+    @Property(name = PROBE_FREQUENCY, intValue = DEFAULT_PROBE_FREQUENCY,
+            label = "Configure probe frequency for checking device availability; " +
                     "default is 10 sec")
-    private int deviceAvailabilityPollFrequency = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS;
+    private int probeFrequency = DEFAULT_PROBE_FREQUENCY;
+
+    private static final String OP_TIMEOUT_SHORT = "deviceOperationTimeoutShort";
+    private static final int DEFAULT_OP_TIMEOUT_SHORT = 10;
+    @Property(name = OP_TIMEOUT_SHORT, intValue = DEFAULT_OP_TIMEOUT_SHORT,
+            label = "Configure timeout in seconds for device operations " +
+                    "that are supposed to take a short time " +
+                    "(e.g. checking device reachability); default is 10 seconds")
+    private int opTimeoutShort = DEFAULT_OP_TIMEOUT_SHORT;
+
+    private static final String OP_TIMEOUT_LONG = "deviceOperationTimeoutLong";
+    private static final int DEFAULT_OP_TIMEOUT_LONG = 60;
+    @Property(name = OP_TIMEOUT_LONG, intValue = DEFAULT_OP_TIMEOUT_LONG,
+            label = "Configure timeout in seconds for device operations " +
+                    "that are supposed to take a relatively long time " +
+                    "(e.g. pushing a large pipeline configuration with slow " +
+                    "network); default is 60 seconds")
+    private int opTimeoutLong = DEFAULT_OP_TIMEOUT_LONG;
 
     private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
     private static final String URI_SCHEME = "device";
@@ -180,7 +196,6 @@
     //FIXME this will be removed when the configuration is synced at the source.
     private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
 
-    private static final ConcurrentMap<DeviceId, Lock> DEVICE_LOCKS = Maps.newConcurrentMap();
     //FIXME to be removed when netcfg will issue device events in a bundle or
     //ensures all configuration needed is present
     private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
@@ -188,20 +203,20 @@
     private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
 
     private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
-
     private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
-
+    private final Striped<Lock> deviceLocks = Striped.lock(30);
 
     private ExecutorService connectionExecutor
             = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
             "onos/generaldeviceprovider-device-connect", "%d", log));
-    private ScheduledExecutorService portStatsExecutor
+    private ScheduledExecutorService statsExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
-            "onos/generaldeviceprovider-port-stats", "%d", log));
-    private ScheduledExecutorService availabilityCheckExecutor
-            = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
-            "onos/generaldeviceprovider-availability-check", "%d", log));
-    private ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
+            "onos/generaldeviceprovider-stats-poll", "%d", log));
+    private ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
+    private ScheduledExecutorService probeExecutor
+            = newSingleThreadScheduledExecutor(groupedThreads(
+            "onos/generaldeviceprovider-probe-", "%d", log));
+    private ScheduledFuture<?> probeTask = null;
 
     private DeviceProviderService providerService;
     private InternalDeviceListener deviceListener = new InternalDeviceListener();
@@ -232,46 +247,61 @@
         //This will fail if ONOS has CFG and drivers which depend on this provider
         // are activated, failing due to not finding the driver.
         cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
-                .forEach(did -> triggerConnectWithLeadership(
-                        did, FIRST_CONNECTION_TOPIC + did.toString()));
+                .forEach(this::triggerConnect);
         //Initiating a periodic check to see if any device is available again and reconnect it.
-        availabilityCheckExecutor.scheduleAtFixedRate(
-                this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
-                deviceAvailabilityPollFrequency, TimeUnit.SECONDS);
+        rescheduleProbeTask();
         modified(context);
         log.info("Started");
     }
 
     @Modified
     public void modified(ComponentContext context) {
-        if (context != null) {
-            Dictionary<?, ?> properties = context.getProperties();
-            pollFrequency = Tools.getIntegerProperty(properties, POLL_FREQUENCY,
-                                                     DEFAULT_POLL_FREQUENCY_SECONDS);
-            log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
+        if (context == null) {
+            return;
         }
 
-        if (!scheduledTasks.isEmpty()) {
-            //cancel all previous tasks
-            scheduledTasks.values().forEach(task -> task.cancel(false));
-            //resubmit task with new timeout.
-            Set<DeviceId> deviceSubjects =
-                    cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
-            deviceSubjects.forEach(deviceId -> {
-                if (notMyScheme(deviceId)) {
-                    // not under my scheme, skipping
-                    log.debug("{} is not my scheme, skipping", deviceId);
-                    return;
-                }
-                scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, true));
-            });
+        Dictionary<?, ?> properties = context.getProperties();
+        final int oldStatsPollFrequency = statsPollFrequency;
+        statsPollFrequency = Tools.getIntegerProperty(
+                properties, STATS_POLL_FREQUENCY, DEFAULT_STATS_POLL_FREQUENCY);
+        log.info("Configured. {} is configured to {} seconds",
+                 STATS_POLL_FREQUENCY, statsPollFrequency);
+        final int oldProbeFrequency = probeFrequency;
+        probeFrequency = Tools.getIntegerProperty(
+                properties, PROBE_FREQUENCY, DEFAULT_PROBE_FREQUENCY);
+        log.info("Configured. {} is configured to {} seconds",
+                 PROBE_FREQUENCY, probeFrequency);
+        opTimeoutShort = Tools.getIntegerProperty(
+                properties, OP_TIMEOUT_SHORT, DEFAULT_OP_TIMEOUT_SHORT);
+        log.info("Configured. {} is configured to {} seconds",
+                 OP_TIMEOUT_SHORT, opTimeoutShort);
+        opTimeoutLong = Tools.getIntegerProperty(
+                properties, OP_TIMEOUT_LONG, DEFAULT_OP_TIMEOUT_LONG);
+        log.info("Configured. {} is configured to {} seconds",
+                 OP_TIMEOUT_LONG, opTimeoutLong);
+
+        if (oldStatsPollFrequency != statsPollFrequency) {
+            rescheduleStatsPollingTasks();
         }
+
+        if (oldProbeFrequency != probeFrequency) {
+            rescheduleProbeTask();
+        }
+    }
+
+    private synchronized void rescheduleProbeTask() {
+        if (probeTask != null) {
+            probeTask.cancel(false);
+        }
+        probeTask = probeExecutor.scheduleAtFixedRate(
+                this::triggerProbeAllDevices, probeFrequency,
+                probeFrequency, TimeUnit.SECONDS);
     }
 
     @Deactivate
     public void deactivate() {
-        portStatsExecutor.shutdown();
-        availabilityCheckExecutor.shutdown();
+        statsExecutor.shutdown();
+        probeExecutor.shutdown();
         componentConfigService.unregisterProperties(getClass(), false);
         cfgService.removeListener(cfgListener);
         //Not Removing the device so they can still be used from other driver providers
@@ -293,18 +323,15 @@
 
     @Override
     public void triggerProbe(DeviceId deviceId) {
-        // TODO Really don't see the point of this in non OF Context,
-        // for now testing reachability, can be moved to no-op
-        log.debug("Triggering probe equals testing reachability on device {}", deviceId);
-        isReachable(deviceId);
+        connectionExecutor.execute(withDeviceLock(
+                () -> doDeviceProbe(deviceId), deviceId));
     }
 
     @Override
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
         log.info("Received role {} for device {}", newRole, deviceId);
         requestedRoles.put(deviceId, newRole);
-        connectionExecutor.submit(exceptionSafe(
-                () -> doRoleChanged(deviceId, newRole)));
+        connectionExecutor.execute(() -> doRoleChanged(deviceId, newRole));
     }
 
     private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
@@ -326,74 +353,63 @@
         }
         return getFutureWithDeadline(
                 handshaker.isReachable(), "checking reachability",
-                deviceId, false);
+                deviceId, false, opTimeoutShort);
     }
 
     @Override
     public void changePortState(DeviceId deviceId, PortNumber portNumber,
                                 boolean enable) {
+        connectionExecutor.execute(
+                () -> doChangePortState(deviceId, portNumber, enable));
+    }
+
+    private void doChangePortState(DeviceId deviceId, PortNumber portNumber,
+                                   boolean enable) {
         if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
             log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
                      deviceId);
             return;
         }
-        final PortAdmin portAdmin = getPortAdmin(deviceId);
-        final CompletableFuture<Boolean> modified = enable
+        final PortAdmin portAdmin = deviceService.getDevice(deviceId)
+                .as(PortAdmin.class);
+        final CompletableFuture<Boolean> modifyTask = enable
                 ? portAdmin.enable(portNumber)
                 : portAdmin.disable(portNumber);
-        modified.thenAcceptAsync(result -> {
-            if (!result) {
-                log.warn("Port {} status cannot be changed on {} (enable={})",
-                         portNumber, deviceId, enable);
-            }
-        });
+        final String descr = (enable ? "enabling" : "disabling") + " port " + portNumber;
+        getFutureWithDeadline(
+                modifyTask, descr, deviceId, null, opTimeoutShort);
     }
 
     @Override
     public void triggerDisconnect(DeviceId deviceId) {
         log.debug("Triggering disconnection of device {}", deviceId);
-        connectionExecutor.execute(
-                () -> disconnectDevice(deviceId)
-                        .thenRunAsync(() -> checkAndConnect(deviceId)));
+        connectionExecutor.execute(withDeviceLock(
+                () -> doDisconnectDevice(deviceId), deviceId));
     }
 
     private DeviceHandshaker getHandshaker(DeviceId deviceId) {
         return handshakers.computeIfAbsent(deviceId, id -> {
             Driver driver = getDriver(deviceId);
-            return driver == null ? null :
-                    getBehaviour(driver, DeviceHandshaker.class,
-                                 new DefaultDriverData(driver, deviceId));
+            return driver == null ? null : getBehaviour(
+                    driver, DeviceHandshaker.class,
+                    new DefaultDriverData(driver, deviceId));
         });
     }
 
-    private PortAdmin getPortAdmin(DeviceId deviceId) {
-        Driver driver = getDriver(deviceId);
-        return getBehaviour(driver, PortAdmin.class,
-                            new DefaultDriverData(driver, deviceId));
-
-    }
-
     private Driver getDriver(DeviceId deviceId) {
-        Driver driver = null;
         try {
-            driver = driverService.getDriver(deviceId);
+            // DriverManager checks first using basic device config.
+            return driverService.getDriver(deviceId);
         } catch (ItemNotFoundException e) {
-            log.debug("Falling back to configuration to fetch driver " +
-                              "for device {}", deviceId);
-            BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
-            if (cfg != null) {
-                driver = driverService.getDriver(cfg.driver());
-            }
+            log.error("Driver not found for {}", deviceId);
+            return null;
         }
-        return driver;
     }
 
-    //needed since the device manager will not return the driver through implementation()
-    // method since the device is not pushed to the core so for the connectDeviceAsMaster
-    // we need to work around that in order to test before calling
-    // store.createOrUpdateDevice
     private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
                                                  DriverData data) {
+        // Allows obtaining behavior implementations before the device is pushed
+        // to the core.
         if (driver != null && driver.hasBehaviour(type)) {
             DefaultDriverHandler handler = new DefaultDriverHandler(data);
             return driver.createBehaviour(handler, type);
@@ -402,7 +418,12 @@
         }
     }
 
-    private void doConnectDevice(DeviceId deviceId, boolean asMaster) {
+    private void doConnectDevice(DeviceId deviceId) {
+        // Some operations can be performed by one node only.
+        final boolean isLocalLeader = leadershipService.runForLeadership(
+                GeneralProviderDeviceConfig.class.getName() + deviceId)
+                .leader().nodeId().equals(clusterService.getLocalNode().id());
+
         if (deviceService.getDevice(deviceId) != null
                 && deviceService.isAvailable(deviceId)) {
             log.info("Device {} is already connected to ONOS and is available",
@@ -421,7 +442,7 @@
             return;
         }
         log.info("Initiating connection to device {} with driver {} ... asMaster={}",
-                 deviceId, basicDeviceConfig.driver(), asMaster);
+                 deviceId, basicDeviceConfig.driver(), isLocalLeader);
         // Get handshaker, driver and driver data.
         final DeviceHandshaker handshaker = getHandshaker(deviceId);
         if (handshaker == null) {
@@ -436,7 +457,7 @@
         // Start connection via handshaker.
         final Boolean connectSuccess = getFutureWithDeadline(
                 handshaker.connect(), "initiating connection",
-                deviceId, null);
+                deviceId, null, opTimeoutShort);
         if (connectSuccess == null) {
             // Error logged by getFutureWithDeadline().
             return;
@@ -445,15 +466,17 @@
             return;
         }
         // Handle pipeconf (if device is capable)
-        if (!handlePipeconf(deviceId, driver, driverData, asMaster)) {
+        if (!handlePipeconf(deviceId, driver, driverData, isLocalLeader)) {
             // We already logged the error.
-            handshaker.disconnect();
+            getFutureWithDeadline(
+                    handshaker.disconnect(), "performing disconnection",
+                    deviceId, null, opTimeoutShort);
             return;
         }
         // Add device agent listener.
         handshaker.addDeviceAgentListener(deviceAgentListener);
         // All good. Notify core (if master).
-        if (asMaster) {
+        if (isLocalLeader) {
             advertiseDevice(deviceId, driver, providerConfig, driverData);
         }
     }
@@ -504,7 +527,7 @@
      * core, false otherwise.
      */
     private boolean handlePipeconf(DeviceId deviceId, Driver driver,
-                                   DriverData driverData, boolean deployPipeconf) {
+                                   DriverData driverData, boolean asMaster) {
         final PiPipelineProgrammable pipelineProg = getBehaviour(
                 driver, PiPipelineProgrammable.class, driverData);
         if (pipelineProg == null) {
@@ -518,28 +541,41 @@
         }
         final PiPipeconfId pipeconfId = pipeconf.id();
 
-        if (deployPipeconf) {
-            final Boolean deploySuccess = getFutureWithDeadline(
-                    pipelineProg.deployPipeconf(pipeconf),
-                    "deploying pipeconf", deviceId, null);
-            if (deploySuccess == null) {
-                // Error logged by getFutureWithDeadline().
-                return false;
-            } else if (!deploySuccess) {
-                log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
-                          pipeconfId, deviceId);
-                return false;
-            }
+        final String mergedDriverName = piPipeconfService.mergeDriver(
+                deviceId, pipeconfId);
+        if (mergedDriverName == null) {
+            log.error("Unable to get merged driver for {} and {}, aborting device discovery",
+                      deviceId, pipeconfId);
+            return false;
         }
 
-        final Boolean mergeSuccess = getFutureWithDeadline(
-                piPipeconfService.bindToDevice(pipeconfId, deviceId),
-                "merging driver", deviceId, null);
-        if (mergeSuccess == null) {
+        if (!asMaster) {
+            // From now one only the master.
+            return true;
+        }
+
+        if (!setDriverViaCfg(deviceId, mergedDriverName)) {
+            return false;
+        }
+
+        // FIXME: we just introduced a race condition as it might happen that a
+        // node does not receive the new cfg (with the merged driver) before the
+        // device is advertised to the core. Perhaps we should be waiting for a
+        // NetworkConfig event signaling that the driver has been updated on all
+        // nodes? The effect is mitigated by deploying the pipeconf (slow
+        // operation), after calling setDriverViaCfg().
+
+        piPipeconfService.bindToDevice(pipeconfId, deviceId);
+
+        final Boolean deploySuccess = getFutureWithDeadline(
+                pipelineProg.deployPipeconf(pipeconf),
+                "deploying pipeconf", deviceId, null,
+                opTimeoutLong);
+        if (deploySuccess == null) {
             // Error logged by getFutureWithDeadline().
             return false;
-        } else if (!mergeSuccess) {
-            log.error("Unable to merge pipeconf driver for {}, aborting device discovery",
+        } else if (!deploySuccess) {
+            log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
                       pipeconfId, deviceId);
             return false;
         }
@@ -547,6 +583,21 @@
         return true;
     }
 
+    private boolean setDriverViaCfg(DeviceId deviceId, String driverName) {
+        BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
+        if (cfg == null) {
+            log.error("Unable to get basic device config for {}, aborting device discovery",
+                      deviceId);
+            return false;
+        }
+        ObjectNode newCfg = (ObjectNode) cfg.node();
+        newCfg = newCfg.put(DRIVER, driverName);
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
+        cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
+        return true;
+    }
+
     private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
         PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
         if (pipeconfId == null || pipeconfId.id().isEmpty()) {
@@ -569,16 +620,14 @@
         return piPipeconfService.getPipeconf(pipeconfId).get();
     }
 
-    private CompletableFuture<?> disconnectDevice(DeviceId deviceId) {
-        log.info("Disconnecting for device {}", deviceId);
+    private void doDisconnectDevice(DeviceId deviceId) {
+        log.info("Initiating disconnection from {}...", deviceId);
         // Remove from core (if master)
-        if (mastershipService.isLocalMaster(deviceId)) {
+        if (mastershipService.isLocalMaster(deviceId)
+                && deviceService.isAvailable(deviceId)) {
             providerService.deviceDisconnected(deviceId);
         }
-        // Cancel tasks
-        if (scheduledTasks.containsKey(deviceId)) {
-            scheduledTasks.remove(deviceId).cancel(true);
-        }
+        cancelStatsPolling(deviceId);
         // Perform disconnection with device.
         final DeviceHandshaker handshaker = handshakers.remove(deviceId);
         if (handshaker == null) {
@@ -586,18 +635,15 @@
             log.warn("Missing DeviceHandshaker behavior for {}, " +
                              "no guarantees of complete disconnection",
                      deviceId);
-            return CompletableFuture.completedFuture(false);
+            return;
         }
         handshaker.removeDeviceAgentListener(deviceAgentListener);
-        return handshaker.disconnect()
-                .thenApplyAsync(result -> {
-                    if (result) {
-                        log.info("Disconnected device {}", deviceId);
-                    } else {
-                        log.warn("Device {} was unable to disconnect", deviceId);
-                    }
-                    return result;
-                });
+        final boolean disconnectSuccess = getFutureWithDeadline(
+                handshaker.disconnect(), "performing disconnection",
+                deviceId, false, opTimeoutShort);
+        if (!disconnectSuccess) {
+            log.warn("Unable to disconnect from {}", deviceId);
+        }
     }
 
     // Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -611,21 +657,27 @@
         };
     }
 
-    private Runnable withDeviceLock(Runnable runnable, DeviceId deviceId) {
-        return () -> {
-            Lock lock = DEVICE_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
-            lock.lock();
-            try {
-                runnable.run();
-            } finally {
-                lock.unlock();
-            }
-        };
+    private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
+        final Lock lock = deviceLocks.get(deviceId);
+        lock.lock();
+        try {
+            return task.get();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private Runnable withDeviceLock(Runnable task, DeviceId deviceId) {
+        // Wrapper of withDeviceLock(Supplier, ...) for void tasks.
+        return () -> withDeviceLock(() -> {
+            task.run();
+            return null;
+        }, deviceId);
     }
 
     private void updatePortStatistics(DeviceId deviceId) {
         Device device = deviceService.getDevice(deviceId);
-        if (!Objects.isNull(device) && deviceService.isAvailable(deviceId) &&
+        if (device != null && deviceService.isAvailable(deviceId) &&
                 device.is(PortStatisticsDiscovery.class)) {
             Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
                     .discoverPortStatistics();
@@ -642,14 +694,9 @@
         return !deviceId.uri().getScheme().equals(URI_SCHEME);
     }
 
-    private void triggerConnectWithLeadership(DeviceId deviceId,
-                                              String leadershipTopic) {
-        final NodeId leaderNodeId = leadershipService.runForLeadership(
-                leadershipTopic).leader().nodeId();
-        final boolean thisNodeMaster = clusterService
-                .getLocalNode().id().equals(leaderNodeId);
-        connectionExecutor.submit(withDeviceLock(exceptionSafe(
-                () -> doConnectDevice(deviceId, thisNodeMaster)), deviceId));
+    private void triggerConnect(DeviceId deviceId) {
+        connectionExecutor.execute(withDeviceLock(
+                () -> doConnectDevice(deviceId), deviceId));
     }
 
     /**
@@ -666,67 +713,62 @@
                 log.debug("{} is not my scheme, skipping", deviceId);
                 return;
             }
-            if (!isDeviceConfigComplete(event, deviceId)) {
+            final boolean configComplete = withDeviceLock(
+                    () -> isDeviceConfigComplete(event, deviceId), deviceId);
+            if (!configComplete) {
                 // Still waiting for some configuration.
                 return;
             }
             // Good to go.
-            triggerConnectWithLeadership(
-                    deviceId, FIRST_CONNECTION_TOPIC + deviceId.toString());
+            triggerConnect(deviceId);
             cleanUpConfigInfo(deviceId);
         }
 
         private boolean isDeviceConfigComplete(NetworkConfigEvent event, DeviceId deviceId) {
             // FIXME to be removed when netcfg will issue device events in a bundle or
             // ensure all configuration needed is present
-            Lock lock = DEVICE_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
-            lock.lock();
-            try {
-                if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
-                    //FIXME we currently assume that p4runtime devices are pipeline configurable.
-                    //If we want to connect a p4runtime device with no pipeline
-                    if (event.config().isPresent()) {
-                        deviceConfigured.add(deviceId);
-                        final boolean isNotPipelineConfigurable = Collections.disjoint(
-                                ImmutableSet.copyOf(event.config().get().node().fieldNames()),
-                                PIPELINE_CONFIGURABLE_PROTOCOLS);
-                        if (isNotPipelineConfigurable) {
-                            // Skip waiting for a pipeline if we can't support it.
-                            pipelineConfigured.add(deviceId);
-                        }
-                    }
-                } else if (event.configClass().equals(BasicDeviceConfig.class)) {
-                    if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
-                        driverConfigured.add(deviceId);
-                    }
-                } else if (event.configClass().equals(PiPipeconfConfig.class)) {
-                    if (event.config().isPresent()
-                            && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
+            if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
+                //FIXME we currently assume that p4runtime devices are pipeline configurable.
+                //If we want to connect a p4runtime device with no pipeline
+                if (event.config().isPresent()) {
+                    deviceConfigured.add(deviceId);
+                    final boolean isNotPipelineConfigurable = Collections.disjoint(
+                            ImmutableSet.copyOf(event.config().get().node().fieldNames()),
+                            PIPELINE_CONFIGURABLE_PROTOCOLS);
+                    if (isNotPipelineConfigurable) {
+                        // Skip waiting for a pipeline if we can't support it.
                         pipelineConfigured.add(deviceId);
                     }
                 }
-
-                if (deviceConfigured.contains(deviceId)
-                        && driverConfigured.contains(deviceId)
-                        && pipelineConfigured.contains(deviceId)) {
-                    return true;
-                } else {
-                    if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
-                        log.debug("Waiting for pipeline configuration for device {}", deviceId);
-                    } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
-                        log.debug("Waiting for device configuration for device {}", deviceId);
-                    } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
-                        log.debug("Waiting for driver configuration for device {}", deviceId);
-                    } else if (driverConfigured.contains(deviceId)) {
-                        log.debug("Only driver configuration for device {}", deviceId);
-                    } else if (deviceConfigured.contains(deviceId)) {
-                        log.debug("Only device configuration for device {}", deviceId);
-                    }
+            } else if (event.configClass().equals(BasicDeviceConfig.class)) {
+                if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
+                    driverConfigured.add(deviceId);
                 }
-                return false;
-            } finally {
-                lock.unlock();
+            } else if (event.configClass().equals(PiPipeconfConfig.class)) {
+                if (event.config().isPresent()
+                        && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
+                    pipelineConfigured.add(deviceId);
+                }
             }
+
+            if (deviceConfigured.contains(deviceId)
+                    && driverConfigured.contains(deviceId)
+                    && pipelineConfigured.contains(deviceId)) {
+                return true;
+            } else {
+                if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
+                    log.debug("Waiting for pipeline configuration for device {}", deviceId);
+                } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
+                    log.debug("Waiting for device configuration for device {}", deviceId);
+                } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
+                    log.debug("Waiting for driver configuration for device {}", deviceId);
+                } else if (driverConfigured.contains(deviceId)) {
+                    log.debug("Only driver configuration for device {}", deviceId);
+                } else if (deviceConfigured.contains(deviceId)) {
+                    log.debug("Only device configuration for device {}", deviceId);
+                }
+            }
+            return false;
         }
 
         @Override
@@ -757,18 +799,38 @@
         pipelineConfigured.remove(deviceId);
     }
 
-    private ScheduledFuture<?> scheduleStatsPolling(DeviceId deviceId, boolean randomize) {
-        int delay = 0;
-        if (randomize) {
-            delay = new SecureRandom().nextInt(10);
-        }
-        return portStatsExecutor.scheduleAtFixedRate(
-                exceptionSafe(() -> updatePortStatistics(deviceId)),
-                delay, pollFrequency, TimeUnit.SECONDS);
+    private void startStatsPolling(DeviceId deviceId, boolean withRandomDelay) {
+        statsPollingTasks.compute(deviceId, (did, oldTask) -> {
+            if (oldTask != null) {
+                oldTask.cancel(false);
+            }
+            final int delay = withRandomDelay
+                    ? new SecureRandom().nextInt(10) : 0;
+            return statsExecutor.scheduleAtFixedRate(
+                    exceptionSafe(() -> updatePortStatistics(deviceId)),
+                    delay, statsPollFrequency, TimeUnit.SECONDS);
+        });
     }
 
-    private void scheduleDevicePolling() {
-        cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class).forEach(this::checkAndConnect);
+    private void cancelStatsPolling(DeviceId deviceId) {
+        statsPollingTasks.computeIfPresent(deviceId, (did, task) -> {
+            task.cancel(false);
+            return null;
+        });
+    }
+
+    private void rescheduleStatsPollingTasks() {
+        statsPollingTasks.keySet().forEach(deviceId -> {
+            // startStatsPolling cancels old one if present.
+            startStatsPolling(deviceId, true);
+        });
+    }
+
+    private void triggerProbeAllDevices() {
+        // Async trigger a task for all devices in the cfg.
+        cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
+                .forEach(deviceId -> connectionExecutor.execute(withDeviceLock(
+                        () -> doDeviceProbe(deviceId), deviceId)));
     }
 
     private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
@@ -780,38 +842,28 @@
         return config.piPipeconfId();
     }
 
-    private void checkAndConnect(DeviceId deviceId) {
-        // Let's try and reconnect to a device which is stored in cfg.
-        // One of the following conditions must be satisfied:
-        // 1) device is null in the store meaning that is was never connected or
-        // it was administratively removed
-        // 2) the device is not available and there is no MASTER instance,
-        // meaning the device lost it's connection to ONOS at some point in the
-        // past.
-        // We also check that the general device provider config and the driver
-        // config are present. We do not check for reachability using
-        // isReachable(deviceId) since the behaviour of this method can vary
-        // depending on protocol nuances. We leave this check to the device
-        // handshaker at later stages of the connection process. IF the
-        // conditions are not met but instead the device is present in the
-        // store, available and this instance is MASTER but is not reachable we
-        // remove it from the store.
-
-        if ((deviceService.getDevice(deviceId) == null
-                || (!deviceService.isAvailable(deviceId)
-                && mastershipService.getMasterFor(deviceId) == null))
-                && configIsPresent(deviceId)) {
-            log.debug("Trying to re-connect to device {}", deviceId);
-            triggerConnectWithLeadership(
-                    deviceId, CHECK_CONNECTION_TOPIC + deviceId.toString());
-            cleanUpConfigInfo(deviceId);
-        } else if (deviceService.getDevice(deviceId) != null
-                && deviceService.isAvailable(deviceId)
-                && mastershipService.isLocalMaster(deviceId)
-                && !isReachable(deviceId)
-                && configIsPresent(deviceId)) {
-            log.info("Removing available but unreachable device {}", deviceId);
-            disconnectDevice(deviceId);
+    private void doDeviceProbe(DeviceId deviceId) {
+        if (!configIsPresent(deviceId)) {
+            return;
+        }
+        final boolean isAvailable = deviceService.getDevice(deviceId) != null
+                && deviceService.isAvailable(deviceId);
+        final boolean isLocalMaster = mastershipService.isLocalMaster(deviceId);
+        if (isAvailable) {
+            if (!isLocalMaster) {
+                return;
+            }
+            if (!isReachable(deviceId)) {
+                log.info("Disconnecting available but unreachable device {}...",
+                         deviceId);
+                triggerDisconnect(deviceId);
+            }
+        } else {
+            // We do not check for reachability using isReachable()
+            // since the behaviour of this method can vary depending on protocol
+            // nuances. We leave this check to the device handshaker at later
+            // stages of the connection process.
+            triggerConnect(deviceId);
         }
     }
 
@@ -828,7 +880,7 @@
     private void handleChannelClosed(DeviceId deviceId) {
         log.info("Disconnecting device {}, due to channel closed event",
                  deviceId);
-        disconnectDevice(deviceId);
+        triggerDisconnect(deviceId);
     }
 
     private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
@@ -837,21 +889,17 @@
             return;
         }
         providerService.receivedRoleReply(deviceId, requestedRoles.get(deviceId), response);
-        // If not master, cancel polling tasks, otherwise start them.
-        if (!response.equals(MastershipRole.MASTER)
-                && scheduledTasks.get(deviceId) != null) {
-            scheduledTasks.remove(deviceId).cancel(false);
-        } else if (response.equals(MastershipRole.MASTER)
-                && scheduledTasks.get(deviceId) == null) {
-            scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
-            updatePortStatistics(deviceId);
+        if (response.equals(MastershipRole.MASTER)) {
+            startStatsPolling(deviceId, false);
+        } else {
+            cancelStatsPolling(deviceId);
         }
     }
 
     private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
-                                        DeviceId deviceId, U defaultValue) {
+                                        DeviceId deviceId, U defaultValue, int timeout) {
         try {
-            return future.get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
+            return future.get(timeout, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             log.error("Thread interrupted while {} on {}", opDescription, deviceId);
             Thread.currentThread().interrupt();
@@ -873,7 +921,7 @@
             // For now this is scheduled periodically, when streaming API will
             // be available we check and base it on the streaming API (e.g. gNMI)
             if (mastershipService.isLocalMaster(deviceId)) {
-                scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
+                startStatsPolling(deviceId, true);
             }
         }