Refactored GDP and PipeconfManager to fix multi-node tests

Issue was caused by race condition in GDP between the first connection
task, and the periodic one (checking reachability of devices in the cfg).
The issue is fixed by serializing such tasks for the same device.

Moreover, this patch brings better error reporting and handling of
completable futures.

Change-Id: I8c3a685c368541d33395945159b45a5740a5a0c3
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 2b31b91..29b86c3 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -41,7 +41,6 @@
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.PortNumber;
-import org.onosproject.net.SparseAnnotations;
 import org.onosproject.net.behaviour.PiPipelineProgrammable;
 import org.onosproject.net.behaviour.PortAdmin;
 import org.onosproject.net.config.ConfigFactory;
@@ -110,9 +109,9 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Provider which uses drivers to detect device and do initial handshake
- * and channel establishment with devices. Any other provider specific operation
- * is also delegated to the DeviceHandshaker driver.
+ * Provider which uses drivers to detect device and do initial handshake and
+ * channel establishment with devices. Any other provider specific operation is
+ * also delegated to the DeviceHandshaker driver.
  */
 @Beta
 @Component(immediate = true)
@@ -123,10 +122,8 @@
     private static final int DEVICE_OP_TIMEOUT = 10;
 
     private static final String DRIVER = "driver";
-    private static final String DEPLOY = "deploy-";
-    private static final String PIPECONF_TOPIC = "-pipeconf";
-    private static final String CHECK = "check-";
-    private static final String CONNECTION = "-connection";
+    public static final String FIRST_CONNECTION_TOPIC = "first-connection-";
+    private static final String CHECK_CONNECTION_TOPIC = "check-connection-";
     private static final String POLL_FREQUENCY = "pollFrequency";
 
     private final Logger log = getLogger(getClass());
@@ -183,7 +180,7 @@
     //FIXME this will be removed when the configuration is synced at the source.
     private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
 
-    private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
+    private static final ConcurrentMap<DeviceId, Lock> DEVICE_LOCKS = Maps.newConcurrentMap();
     //FIXME to be removed when netcfg will issue device events in a bundle or
     //ensures all configuration needed is present
     private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
@@ -197,13 +194,13 @@
 
     private ExecutorService connectionExecutor
             = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
-                    "onos/generaldeviceprovider-device-connect", "%d", log));
+            "onos/generaldeviceprovider-device-connect", "%d", log));
     private ScheduledExecutorService portStatsExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
-                    "onos/generaldeviceprovider-port-stats", "%d", log));
+            "onos/generaldeviceprovider-port-stats", "%d", log));
     private ScheduledExecutorService availabilityCheckExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
-                    "onos/generaldeviceprovider-availability-check", "%d", log));
+            "onos/generaldeviceprovider-availability-check", "%d", log));
     private ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
 
     private DeviceProviderService providerService;
@@ -235,7 +232,8 @@
         //This will fail if ONOS has CFG and drivers which depend on this provider
         // are activated, failing due to not finding the driver.
         cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
-                .forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
+                .forEach(did -> triggerConnectWithLeadership(
+                        did, FIRST_CONNECTION_TOPIC + did.toString()));
         //Initiating a periodic check to see if any device is available again and reconnect it.
         availabilityCheckExecutor.scheduleAtFixedRate(
                 this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
@@ -249,7 +247,7 @@
         if (context != null) {
             Dictionary<?, ?> properties = context.getProperties();
             pollFrequency = Tools.getIntegerProperty(properties, POLL_FREQUENCY,
-                    DEFAULT_POLL_FREQUENCY_SECONDS);
+                                                     DEFAULT_POLL_FREQUENCY_SECONDS);
             log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
         }
 
@@ -305,14 +303,15 @@
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
         log.info("Received role {} for device {}", newRole, deviceId);
         requestedRoles.put(deviceId, newRole);
-        connectionExecutor.submit(() -> doRoleChanged(deviceId, newRole));
+        connectionExecutor.submit(exceptionSafe(
+                () -> doRoleChanged(deviceId, newRole)));
     }
 
     private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
-        DeviceHandshaker handshaker = getHandshaker(deviceId);
+        final DeviceHandshaker handshaker = getHandshaker(deviceId);
         if (handshaker == null) {
-            log.warn("Null handshaker. Unable to notify new role {} to {}",
-                     newRole, deviceId);
+            log.error("Null handshaker. Unable to notify new role {} to {}",
+                      newRole, deviceId);
             return;
         }
         handshaker.roleChanged(newRole);
@@ -321,51 +320,41 @@
     @Override
     public boolean isReachable(DeviceId deviceId) {
         log.debug("Testing reachability for device {}", deviceId);
-
-        DeviceHandshaker handshaker = getHandshaker(deviceId);
+        final DeviceHandshaker handshaker = getHandshaker(deviceId);
         if (handshaker == null) {
             return false;
         }
-
-        try {
-            return handshaker.isReachable()
-                    .get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.warn("Device {} is not reachable {}", deviceId, e.getMessage());
-            log.debug("Exception", e);
-            return false;
-        }
+        return getFutureWithDeadline(
+                handshaker.isReachable(), "checking reachability",
+                deviceId, false);
     }
 
     @Override
     public void changePortState(DeviceId deviceId, PortNumber portNumber,
                                 boolean enable) {
-        if (deviceService.getDevice(deviceId).is(PortAdmin.class)) {
-
-            PortAdmin portAdmin = getPortAdmin(deviceId);
-            CompletableFuture<Boolean> modified;
-            if (enable) {
-                modified = portAdmin.enable(portNumber);
-            } else {
-                modified = portAdmin.disable(portNumber);
-            }
-            modified.thenAcceptAsync(result -> {
-                if (!result) {
-                    log.warn("Your device {} port {} status can't be changed to {}",
-                            deviceId, portNumber, enable);
-                }
-            });
-
-        } else {
-            log.warn("Device {} does not support PortAdmin behaviour", deviceId);
+        if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
+            log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
+                     deviceId);
+            return;
         }
+        final PortAdmin portAdmin = getPortAdmin(deviceId);
+        final CompletableFuture<Boolean> modified = enable
+                ? portAdmin.enable(portNumber)
+                : portAdmin.disable(portNumber);
+        modified.thenAcceptAsync(result -> {
+            if (!result) {
+                log.warn("Port {} status cannot be changed on {} (enable={})",
+                         portNumber, deviceId, enable);
+            }
+        });
     }
 
     @Override
     public void triggerDisconnect(DeviceId deviceId) {
         log.debug("Triggering disconnection of device {}", deviceId);
-        connectionExecutor.execute(() -> disconnectDevice(deviceId)
-                .thenRunAsync(() -> checkAndConnect(deviceId)));
+        connectionExecutor.execute(
+                () -> disconnectDevice(deviceId)
+                        .thenRunAsync(() -> checkAndConnect(deviceId)));
     }
 
     private DeviceHandshaker getHandshaker(DeviceId deviceId) {
@@ -380,7 +369,7 @@
     private PortAdmin getPortAdmin(DeviceId deviceId) {
         Driver driver = getDriver(deviceId);
         return getBehaviour(driver, PortAdmin.class,
-                new DefaultDriverData(driver, deviceId));
+                            new DefaultDriverData(driver, deviceId));
 
     }
 
@@ -390,7 +379,7 @@
             driver = driverService.getDriver(deviceId);
         } catch (ItemNotFoundException e) {
             log.debug("Falling back to configuration to fetch driver " +
-                    "for device {}", deviceId);
+                              "for device {}", deviceId);
             BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
             if (cfg != null) {
                 driver = driverService.getDriver(cfg.driver());
@@ -400,7 +389,7 @@
     }
 
     //needed since the device manager will not return the driver through implementation()
-    // method since the device is not pushed to the core so for the connectDevice
+    // method since the device is not pushed to the core so for the connectDeviceAsMaster
     // we need to work around that in order to test before calling
     // store.createOrUpdateDevice
     private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
@@ -413,139 +402,145 @@
         }
     }
 
-    //Connects a general device
-    private void connectDevice(DeviceId deviceId) {
-        //retrieve the configuration
-        GeneralProviderDeviceConfig providerConfig =
-                cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
-        BasicDeviceConfig basicDeviceConfig =
-                cfgService.getConfig(deviceId, BasicDeviceConfig.class);
-
+    private void doConnectDevice(DeviceId deviceId, boolean asMaster) {
+        if (deviceService.getDevice(deviceId) != null
+                && deviceService.isAvailable(deviceId)) {
+            log.info("Device {} is already connected to ONOS and is available",
+                     deviceId);
+            return;
+        }
+        // Retrieve config
+        final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
+                deviceId, GeneralProviderDeviceConfig.class);
+        final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
+                deviceId, BasicDeviceConfig.class);
         if (providerConfig == null || basicDeviceConfig == null) {
-            log.error("Configuration is NULL: basic config {}, general provider " +
-                    "config {}", basicDeviceConfig, providerConfig);
-        } else {
-            log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
-
-            DeviceHandshaker handshaker = getHandshaker(deviceId);
-            if (handshaker == null) {
-                log.error("Device {} does not support DeviceHandshaker behaviour", deviceId);
-                return;
-            }
-            Driver driver = handshaker.handler().driver();
-
-            addConfigData(providerConfig, handshaker.data());
-
-            //Connecting to the device
-            CompletableFuture<Boolean> connected = handshaker.connect();
-
-            connected.thenAcceptAsync(result -> {
-                if (result) {
-                    handshaker.addDeviceAgentListener(deviceAgentListener);
-                    //Populated with the default values obtained by the driver
-                    ChassisId cid = new ChassisId();
-                    SparseAnnotations annotations = DefaultAnnotations.builder()
-                            .set(AnnotationKeys.PROTOCOL,
-                                    providerConfig.protocolsInfo().keySet().toString())
-                            .build();
-                    DeviceDescription description =
-                            new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
-                                    driver.manufacturer(), driver.hwVersion(),
-                                    driver.swVersion(), UNKNOWN,
-                                    cid, true, annotations);
-                    //Empty list of ports
-                    List<PortDescription> ports = new ArrayList<>();
-
-                    DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(driver,
-                            DeviceDescriptionDiscovery.class, handshaker.data());
-                    if (deviceDiscovery != null) {
-                        DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
-                        if (newdescription != null) {
-                            description = newdescription;
-                        }
-                        ports = deviceDiscovery.discoverPortDetails();
-                    } else {
-                        log.info("No Device Description Discovery for device {}, no update for " +
-                                "description or ports.", deviceId);
-                    }
-
-                    if (!handlePipeconf(deviceId, driver, handshaker.data(), true)) {
-                        // Something went wrong during handling of pipeconf.
-                        // We already logged the error.
-                        handshaker.disconnect();
-                        return;
-                    }
-                    advertiseDevice(deviceId, description, ports);
-
-                } else {
-                    log.warn("Can't connect to device {}", deviceId);
-                }
-            });
+            log.error("Configuration missing, cannot connect to {}. " +
+                              "basicDeviceConfig={}, generalProvider={}",
+                      deviceId, basicDeviceConfig, providerConfig);
+            return;
+        }
+        log.info("Initiating connection to device {} with driver {} ... asMaster={}",
+                 deviceId, basicDeviceConfig.driver(), asMaster);
+        // Get handshaker, driver and driver data.
+        final DeviceHandshaker handshaker = getHandshaker(deviceId);
+        if (handshaker == null) {
+            log.error("Missing DeviceHandshaker behavior for {}, aborting connection",
+                      deviceId);
+            return;
+        }
+        final Driver driver = handshaker.handler().driver();
+        // Enhance driver data with info in GDP config.
+        augmentConfigData(providerConfig, handshaker.data());
+        final DriverData driverData = handshaker.data();
+        // Start connection via handshaker.
+        final Boolean connectSuccess = getFutureWithDeadline(
+                handshaker.connect(), "initiating connection",
+                deviceId, null);
+        if (connectSuccess == null) {
+            // Error logged by getFutureWithDeadline().
+            return;
+        } else if (!connectSuccess) {
+            log.warn("Unable to connect to {}", deviceId);
+            return;
+        }
+        // Handle pipeconf (if device is capable)
+        if (!handlePipeconf(deviceId, driver, driverData, asMaster)) {
+            // We already logged the error.
+            handshaker.disconnect();
+            return;
+        }
+        // Add device agent listener.
+        handshaker.addDeviceAgentListener(deviceAgentListener);
+        // All good. Notify core (if master).
+        if (asMaster) {
+            advertiseDevice(deviceId, driver, providerConfig, driverData);
         }
     }
 
-    private void connectStandbyDevice(DeviceId deviceId) {
-        // if device is pipeline programmable we merge pipeconf + base driver for every other role
-        GeneralProviderDeviceConfig providerConfig =
-                cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
 
-        DeviceHandshaker handshaker = getHandshaker(deviceId);
-        if (handshaker == null) {
-            log.error("Device {} does not support DeviceHandshaker behaviour", deviceId);
-            return;
+    private void advertiseDevice(DeviceId deviceId, Driver driver,
+                                 GeneralProviderDeviceConfig providerConfig,
+                                 DriverData driverData) {
+        // Obtain device and port description and advertise device to core.
+        DeviceDescription description = null;
+        final List<PortDescription> ports;
+
+        final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
+                driver, DeviceDescriptionDiscovery.class, driverData);
+
+        if (deviceDiscovery != null) {
+            description = deviceDiscovery.discoverDeviceDetails();
+            ports = deviceDiscovery.discoverPortDetails();
+        } else {
+            log.warn("Missing DeviceDescriptionDiscovery behavior for {}, " +
+                             "no update for description or ports.", deviceId);
+            ports = new ArrayList<>();
         }
-        addConfigData(providerConfig, handshaker.data());
 
-        //Connecting to the device
-        handshaker.connect().thenAcceptAsync(result -> {
-            if (result) {
-                handshaker.addDeviceAgentListener(deviceAgentListener);
-                handlePipeconf(deviceId, handshaker.handler().driver(), handshaker.data(), false);
-            }
-        });
+        if (description == null) {
+            // Generate one here.
+            // FIXME: a behavior impl should not return a null description
+            // (e.g. as GnmiDeviceDescriptionDiscovery). This case should apply
+            // only if a the behavior is not available.
+            description = new DefaultDeviceDescription(
+                    deviceId.uri(), Device.Type.SWITCH,
+                    driver.manufacturer(), driver.hwVersion(),
+                    driver.swVersion(), UNKNOWN,
+                    new ChassisId(), true,
+                    DefaultAnnotations.builder()
+                            .set(AnnotationKeys.PROTOCOL,
+                                 providerConfig.protocolsInfo().keySet().toString())
+                            .build());
+        }
+
+        providerService.deviceConnected(deviceId, description);
+        providerService.updatePorts(deviceId, ports);
     }
 
     /**
-     * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
-     * device can be registered to the core, false otherwise.
+     * Handles the case of a device that is pipeline programmable. Returns true
+     * if the operation wa successful and the device can be registered to the
+     * core, false otherwise.
      */
-    private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) {
-        PiPipelineProgrammable pipelineProg =
-                getBehaviour(driver, PiPipelineProgrammable.class, driverData);
-
+    private boolean handlePipeconf(DeviceId deviceId, Driver driver,
+                                   DriverData driverData, boolean deployPipeconf) {
+        final PiPipelineProgrammable pipelineProg = getBehaviour(
+                driver, PiPipelineProgrammable.class, driverData);
         if (pipelineProg == null) {
             // Device is not pipeline programmable.
             return true;
         }
 
-        PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
+        final PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
+        if (pipeconf == null) {
+            return false;
+        }
+        final PiPipeconfId pipeconfId = pipeconf.id();
 
-        if (pipeconf != null) {
-            PiPipeconfId pipeconfId = pipeconf.id();
-
-            try {
-                if (deployPipeconf) {
-                    if (!pipelineProg.deployPipeconf(pipeconf).get()) {
-                        log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
-                                  pipeconfId, deviceId);
-                        return false;
-                    }
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                log.warn("Exception occurred while deploying pipeconf {} to device {}", pipeconf.id(), deviceId, e);
+        if (deployPipeconf) {
+            final Boolean deploySuccess = getFutureWithDeadline(
+                    pipelineProg.deployPipeconf(pipeconf),
+                    "deploying pipeconf", deviceId, null);
+            if (deploySuccess == null) {
+                // Error logged by getFutureWithDeadline().
+                return false;
+            } else if (!deploySuccess) {
+                log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
+                          pipeconfId, deviceId);
                 return false;
             }
-            try {
-                if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
-                    log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
-                              driver.name(), deviceId, pipeconfId);
-                    return false;
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                log.warn("Exception occurred while binding pipeconf {} to device {}", pipeconf.id(), deviceId, e);
-                return false;
-            }
-        } else {
+        }
+
+        final Boolean mergeSuccess = getFutureWithDeadline(
+                piPipeconfService.bindToDevice(pipeconfId, deviceId),
+                "merging driver", deviceId, null);
+        if (mergeSuccess == null) {
+            // Error logged by getFutureWithDeadline().
+            return false;
+        } else if (!mergeSuccess) {
+            log.error("Unable to merge pipeconf driver for {}, aborting device discovery",
+                      pipeconfId, deviceId);
             return false;
         }
 
@@ -564,46 +559,41 @@
                 return null;
             }
         });
-
         if (pipeconfId == null) {
-            log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
+            log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it", deviceId);
             return null;
         }
-
         if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
             log.warn("Pipeconf {} is not registered", pipeconfId);
             return null;
         }
-
         return piPipeconfService.getPipeconf(pipeconfId).get();
     }
 
-    private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
-        providerService.deviceConnected(deviceId, description);
-        providerService.updatePorts(deviceId, ports);
-    }
-
-    private CompletableFuture<Boolean> disconnectDevice(DeviceId deviceId) {
+    private CompletableFuture<?> disconnectDevice(DeviceId deviceId) {
         log.info("Disconnecting for device {}", deviceId);
-
+        // Remove from core (if master)
+        if (mastershipService.isLocalMaster(deviceId)) {
+            providerService.deviceDisconnected(deviceId);
+        }
+        // Cancel tasks
         if (scheduledTasks.containsKey(deviceId)) {
             scheduledTasks.remove(deviceId).cancel(true);
         }
-
-        DeviceHandshaker handshaker = handshakers.remove(deviceId);
-
+        // Perform disconnection with device.
+        final DeviceHandshaker handshaker = handshakers.remove(deviceId);
         if (handshaker == null) {
-            // gracefully ignoring.
-            log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
-                             "shutdown of communication", deviceId);
+            // Gracefully ignore
+            log.warn("Missing DeviceHandshaker behavior for {}, " +
+                             "no guarantees of complete disconnection",
+                     deviceId);
             return CompletableFuture.completedFuture(false);
         }
-
+        handshaker.removeDeviceAgentListener(deviceAgentListener);
         return handshaker.disconnect()
                 .thenApplyAsync(result -> {
                     if (result) {
                         log.info("Disconnected device {}", deviceId);
-                        providerService.deviceDisconnected(deviceId);
                     } else {
                         log.warn("Device {} was unable to disconnect", deviceId);
                     }
@@ -611,7 +601,7 @@
                 });
     }
 
-    //Needed to catch the exception in the executors since are not rethrown otherwise.
+    // Needed to catch the exception in the executors since are not rethrown otherwise.
     private Runnable exceptionSafe(Runnable runnable) {
         return () -> {
             try {
@@ -622,6 +612,18 @@
         };
     }
 
+    private Runnable withDeviceLock(Runnable runnable, DeviceId deviceId) {
+        return () -> {
+            Lock lock = DEVICE_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
+            lock.lock();
+            try {
+                runnable.run();
+            } finally {
+                lock.unlock();
+            }
+        };
+    }
+
     private void updatePortStatistics(DeviceId deviceId) {
         Device device = deviceService.getDevice(deviceId);
         if (!Objects.isNull(device) && deviceService.isAvailable(deviceId) &&
@@ -641,6 +643,16 @@
         return !deviceId.uri().getScheme().equals(URI_SCHEME);
     }
 
+    private void triggerConnectWithLeadership(DeviceId deviceId,
+                                              String leadershipTopic) {
+        final NodeId leaderNodeId = leadershipService.runForLeadership(
+                leadershipTopic).leader().nodeId();
+        final boolean thisNodeMaster = clusterService
+                .getLocalNode().id().equals(leaderNodeId);
+        connectionExecutor.submit(withDeviceLock(exceptionSafe(
+                () -> doConnectDevice(deviceId, thisNodeMaster)), deviceId));
+    }
+
     /**
      * Listener for configuration events.
      */
@@ -655,49 +667,37 @@
                 log.debug("{} is not my scheme, skipping", deviceId);
                 return;
             }
-            if (deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)) {
-                log.info("Device {} is already connected to ONOS and is available", deviceId);
+            if (!isDeviceConfigComplete(event, deviceId)) {
+                // Still waiting for some configuration.
                 return;
             }
-            NodeId leaderNodeId = leadershipService.runForLeadership(DEPLOY + deviceId.toString() + PIPECONF_TOPIC)
-                    .leader().nodeId();
-            NodeId localNodeId = clusterService.getLocalNode().id();
-            if (localNodeId.equals(leaderNodeId)) {
-                if (processEvent(event, deviceId)) {
-                    log.debug("{} is leader for {}, initiating the connection and deploying pipeline", leaderNodeId,
-                            deviceId);
-                    checkAndSubmitDeviceTask(deviceId);
-                }
-            } else {
-                if (processEvent(event, deviceId)) {
-                    log.debug("{} is not leader for {}, initiating connection but not deploying pipeline, {} is LEADER",
-                            localNodeId, deviceId, leaderNodeId);
-                    connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
-                    //FIXME this will be removed when config is synced
-                    cleanUpConfigInfo(deviceId);
-                }
-            }
+            // Good to go.
+            triggerConnectWithLeadership(
+                    deviceId, FIRST_CONNECTION_TOPIC + deviceId.toString());
+            cleanUpConfigInfo(deviceId);
         }
 
-        private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) {
-            //FIXME to be removed when netcfg will issue device events in a bundle or
+        private boolean isDeviceConfigComplete(NetworkConfigEvent event, DeviceId deviceId) {
+            // FIXME to be removed when netcfg will issue device events in a bundle or
             // ensure all configuration needed is present
-            Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
+            Lock lock = DEVICE_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
             lock.lock();
             try {
                 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
                     //FIXME we currently assume that p4runtime devices are pipeline configurable.
                     //If we want to connect a p4runtime device with no pipeline
-                    if (event.config().isPresent() &&
-                            Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
-                                    PIPELINE_CONFIGURABLE_PROTOCOLS)) {
-                        pipelineConfigured.add(deviceId);
+                    if (event.config().isPresent()) {
+                        deviceConfigured.add(deviceId);
+                        final boolean isNotPipelineConfigurable = Collections.disjoint(
+                                ImmutableSet.copyOf(event.config().get().node().fieldNames()),
+                                PIPELINE_CONFIGURABLE_PROTOCOLS);
+                        if (isNotPipelineConfigurable) {
+                            // Skip waiting for a pipeline if we can't support it.
+                            pipelineConfigured.add(deviceId);
+                        }
                     }
-                    deviceConfigured.add(deviceId);
                 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
                     if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
-                        //TODO add check for pipeline and add it to the pipeline list if no
-                        // p4runtime is present.
                         driverConfigured.add(deviceId);
                     }
                 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
@@ -706,9 +706,9 @@
                         pipelineConfigured.add(deviceId);
                     }
                 }
-                //if the device has no "pipeline configurable protocol it will be present
-                // in the pipelineConfigured
-                if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
+
+                if (deviceConfigured.contains(deviceId)
+                        && driverConfigured.contains(deviceId)
                         && pipelineConfigured.contains(deviceId)) {
                     return true;
                 } else {
@@ -740,14 +740,7 @@
         }
     }
 
-    private void checkAndSubmitDeviceTask(DeviceId deviceId) {
-        connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
-        //FIXME this will be removed when configuration is synced.
-        cleanUpConfigInfo(deviceId);
-
-    }
-
-    private void addConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
+    private void augmentConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
         //Storing deviceKeyId and all other config values
         // as data in the driver with protocol_<info>
         // name as the key. e.g protocol_ip
@@ -780,46 +773,44 @@
     }
 
     private void checkAndConnect(DeviceId deviceId) {
-        // Let's try and reconnect to a device which is stored in the net-cfg.
+        // Let's try and reconnect to a device which is stored in cfg.
         // One of the following conditions must be satisfied:
-        // 1) device is null in the store meaning that is was never connected or it was administratively removed
-        // 2) the device is not available and there is no MASTER instance, meaning the device lost
-        //    it's connection to ONOS at some point in the past.
-        // We also check that the general device provider config and the driver config are present.
-        // We do not check for reachability using isReachable(deviceId) since the behaviour of this method
-        // can vary depending on protocol nuances. We leave this check to the device handshaker
-        // at later stages of the connection process.
-        // IF the conditions are not met but instead the device is present in the store, available and this instance is
-        // MASTER but is not reachable we remove it from the store.
+        // 1) device is null in the store meaning that is was never connected or
+        // it was administratively removed
+        // 2) the device is not available and there is no MASTER instance,
+        // meaning the device lost it's connection to ONOS at some point in the
+        // past.
+        // We also check that the general device provider config and the driver
+        // config are present. We do not check for reachability using
+        // isReachable(deviceId) since the behaviour of this method can vary
+        // depending on protocol nuances. We leave this check to the device
+        // handshaker at later stages of the connection process. IF the
+        // conditions are not met but instead the device is present in the
+        // store, available and this instance is MASTER but is not reachable we
+        // remove it from the store.
 
-        if ((deviceService.getDevice(deviceId) == null || (!deviceService.isAvailable(deviceId) &&
-                mastershipService.getMasterFor(deviceId) == null)) && configIsPresent(deviceId)) {
+        if ((deviceService.getDevice(deviceId) == null
+                || (!deviceService.isAvailable(deviceId)
+                && mastershipService.getMasterFor(deviceId) == null))
+                && configIsPresent(deviceId)) {
             log.debug("Trying to re-connect to device {}", deviceId);
-            NodeId leaderNodeId = leadershipService.runForLeadership(CHECK + deviceId.toString() + CONNECTION)
-                    .leader().nodeId();
-            NodeId localNodeId = clusterService.getLocalNode().id();
-            if (localNodeId.equals(leaderNodeId)) {
-                log.debug("{} is leader for {}, initiating the connection", leaderNodeId,
-                        deviceId);
-                checkAndSubmitDeviceTask(deviceId);
-            } else {
-                log.debug("{} is not leader for {}, initiating connection, {} is LEADER",
-                        localNodeId, deviceId, leaderNodeId);
-                connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
-                //FIXME this will be removed when config is synced
-                cleanUpConfigInfo(deviceId);
-            }
-        } else if ((deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)
-                && mastershipService.isLocalMaster(deviceId) && !isReachable(deviceId) && configIsPresent(deviceId))) {
+            triggerConnectWithLeadership(
+                    deviceId, CHECK_CONNECTION_TOPIC + deviceId.toString());
+            cleanUpConfigInfo(deviceId);
+        } else if (deviceService.getDevice(deviceId) != null
+                && deviceService.isAvailable(deviceId)
+                && mastershipService.isLocalMaster(deviceId)
+                && !isReachable(deviceId)
+                && configIsPresent(deviceId)) {
             log.info("Removing available but unreachable device {}", deviceId);
             disconnectDevice(deviceId);
-            providerService.deviceDisconnected(deviceId);
         }
     }
 
     private boolean configIsPresent(DeviceId deviceId) {
-        boolean present = cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
-                && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
+        final boolean present =
+                cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
+                        && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
         if (!present) {
             log.warn("Configuration for device {} is not complete", deviceId);
         }
@@ -827,14 +818,9 @@
     }
 
     private void handleChannelClosed(DeviceId deviceId) {
-        disconnectDevice(deviceId).thenRunAsync(() -> {
-            // If master, notifies disconnection to the core.
-            if (mastershipService.isLocalMaster(deviceId)) {
-                log.info("Disconnecting device {}, due to channel closed event",
-                         deviceId);
-                providerService.deviceDisconnected(deviceId);
-            }
-        });
+        log.info("Disconnecting device {}, due to channel closed event",
+                 deviceId);
+        disconnectDevice(deviceId);
     }
 
     private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
@@ -854,6 +840,21 @@
         }
     }
 
+    private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
+                                        DeviceId deviceId, U defaultValue) {
+        try {
+            return future.get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("Thread interrupted while {} on {}", opDescription, deviceId);
+            Thread.currentThread().interrupt();
+        } catch (ExecutionException e) {
+            log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
+        } catch (TimeoutException e) {
+            log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
+        }
+        return defaultValue;
+    }
+
     /**
      * Listener for core device events.
      */
@@ -861,7 +862,6 @@
         @Override
         public void event(DeviceEvent event) {
             DeviceId deviceId = event.subject().id();
-            // FIXME handling for mastership change scenario missing?
             // For now this is scheduled periodically, when streaming API will
             // be available we check and base it on the streaming API (e.g. gNMI)
             if (mastershipService.isLocalMaster(deviceId)) {