Refactored GDP and PipeconfManager to fix multi-node tests

Issue was caused by race condition in GDP between the first connection
task, and the periodic one (checking reachability of devices in the cfg).
The issue is fixed by serializing such tasks for the same device.

Moreover, this patch brings better error reporting and handling of
completable futures.

Change-Id: I8c3a685c368541d33395945159b45a5740a5a0c3
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 55a4a11..ba16601 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -30,11 +30,8 @@
 import org.onlab.util.ItemNotFoundException;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.config.ConfigFactory;
-import org.onosproject.net.config.NetworkConfigEvent;
-import org.onosproject.net.config.NetworkConfigListener;
 import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.config.basics.BasicDeviceConfig;
 import org.onosproject.net.config.basics.SubjectFactories;
@@ -57,6 +54,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -78,11 +76,14 @@
     private static final String DRIVER = "driver";
     private static final String CFG_SCHEME = "piPipeconf";
 
+    private static final String DRIVER_MERGE_TOPIC =
+            PiPipeconfManager.class.getSimpleName() + "-driver-merge-";
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetworkConfigRegistry cfgService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LeadershipService leadershipService;
+    private LeadershipService leadershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DriverService driverService;
@@ -91,19 +92,20 @@
     protected DriverAdminService driverAdminService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected PiPipeconfMappingStore pipeconfMappingStore;
+    private PiPipeconfMappingStore pipeconfMappingStore;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
-    // Registered pipeconf are replicated through the app subsystem and registered on app activated events.
-    protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>();
+    // Registered pipeconf are replicated through the app subsystem and
+    // registered on app activated events. Hence, there should be no need of
+    // distributing this map.
+    protected ConcurrentMap<PiPipeconfId, PiPipeconf> pipeconfs = new ConcurrentHashMap<>();
 
-    protected ExecutorService executor =
-            Executors.newFixedThreadPool(5, groupedThreads("onos/pipipeconfservice",
-                    "pipeline-to-device-%d", log));
+    protected ExecutorService executor = Executors.newFixedThreadPool(
+            10, groupedThreads("onos/pipeconf-manager", "%d", log));
 
-    protected final ConfigFactory factory =
+    protected final ConfigFactory configFactory =
             new ConfigFactory<DeviceId, PiPipeconfConfig>(
                     SubjectFactories.DEVICE_SUBJECT_FACTORY,
                     PiPipeconfConfig.class, CFG_SCHEME) {
@@ -113,14 +115,9 @@
                 }
             };
 
-    protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
-
     @Activate
     public void activate() {
-        cfgService.registerConfigFactory(factory);
-        cfgService.addListener(cfgListener);
-        cfgService.getSubjects(DeviceId.class, PiPipeconfConfig.class)
-                .forEach(this::addPipeconfFromCfg);
+        cfgService.registerConfigFactory(configFactory);
         log.info("Started");
     }
 
@@ -128,9 +125,8 @@
     @Deactivate
     public void deactivate() {
         executor.shutdown();
-        cfgService.removeListener(cfgListener);
-        cfgService.unregisterConfigFactory(factory);
-        piPipeconfs.clear();
+        cfgService.unregisterConfigFactory(configFactory);
+        pipeconfs.clear();
         cfgService = null;
         driverAdminService = null;
         driverService = null;
@@ -139,101 +135,40 @@
 
     @Override
     public void register(PiPipeconf pipeconf) throws IllegalStateException {
-        if (piPipeconfs.containsKey(pipeconf.id())) {
+        if (pipeconfs.containsKey(pipeconf.id())) {
             throw new IllegalStateException(format("Pipeconf %s is already registered", pipeconf.id()));
         }
-        piPipeconfs.put(pipeconf.id(), pipeconf);
+        pipeconfs.put(pipeconf.id(), pipeconf);
         log.info("New pipeconf registered: {}", pipeconf.id());
     }
 
     @Override
     public void remove(PiPipeconfId pipeconfId) throws IllegalStateException {
-        //TODO add mechanism to remove from device.
-        if (!piPipeconfs.containsKey(pipeconfId)) {
+        // TODO add mechanism to remove from device.
+        if (!pipeconfs.containsKey(pipeconfId)) {
             throw new IllegalStateException(format("Pipeconf %s is not registered", pipeconfId));
         }
         // TODO remove the binding from the distributed Store when the lifecycle of a pipeconf is defined.
         // pipeconfMappingStore.removeBindings(pipeconfId);
-        piPipeconfs.remove(pipeconfId);
+        log.info("Removing pipeconf {}", pipeconfId);
+        pipeconfs.remove(pipeconfId);
     }
 
     @Override
     public Iterable<PiPipeconf> getPipeconfs() {
-        return piPipeconfs.values();
+        return pipeconfs.values();
     }
 
     @Override
     public Optional<PiPipeconf> getPipeconf(PiPipeconfId id) {
-        return Optional.ofNullable(piPipeconfs.get(id));
+        return Optional.ofNullable(pipeconfs.get(id));
     }
 
     @Override
-    public CompletableFuture<Boolean> bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId) {
-        CompletableFuture<Boolean> operationResult = new CompletableFuture<>();
-
-        executor.execute(() -> {
-            BasicDeviceConfig basicDeviceConfig =
-                    cfgService.getConfig(deviceId, BasicDeviceConfig.class);
-            Driver baseDriver = driverService.getDriver(basicDeviceConfig.driver());
-
-            String completeDriverName = baseDriver.name() + ":" + pipeconfId;
-            PiPipeconf piPipeconf = piPipeconfs.get(pipeconfId);
-            if (piPipeconf == null) {
-                log.warn("Pipeconf {} is not present", pipeconfId);
-                operationResult.complete(false);
-            } else {
-                //if driver exists already we don't create a new one.
-                //needs to be done via exception catching due to DriverRegistry throwing it on a null return from
-                //the driver map.
-                try {
-                    driverService.getDriver(completeDriverName);
-                } catch (ItemNotFoundException e) {
-
-                    log.debug("First time pipeconf {} is used with base driver {}, merging the two",
-                            pipeconfId, baseDriver);
-                    //extract the behaviours from the pipipeconf.
-                    Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours = new HashMap<>();
-                    piPipeconf.behaviours().forEach(b -> {
-                        behaviours.put(b, piPipeconf.implementation(b).get());
-                    });
-
-                    Driver piPipeconfDriver = new DefaultDriver(completeDriverName, baseDriver.parents(),
-                            baseDriver.manufacturer(), baseDriver.hwVersion(),
-                            baseDriver.swVersion(), behaviours, new HashMap<>());
-                    //we take the base driver created with the behaviours of the PiPeconf and
-                    // merge it with the base driver that was assigned to the device
-                    Driver completeDriver = piPipeconfDriver.merge(baseDriver);
-
-                    //This might lead to explosion of number of providers in the core,
-                    // due to 1:1:1 pipeconf:driver:provider maybe find better way
-                    DriverProvider provider = new PiPipeconfDriverProviderInternal(completeDriver);
-
-                    //we register to the driver susbystem the driver provider containing the merged driver
-                    driverAdminService.registerProvider(provider);
-                }
-
-                // Changing the configuration for the device to enforce the full driver with pipipeconf
-                // and base behaviours, updating binding only first time something changes
-                NodeId leaderNodeId = leadershipService.getLeader("deploy-" +
-                        deviceId.toString() + "-pipeconf");
-                NodeId localNodeId = clusterService.getLocalNode().id();
-
-                if (!basicDeviceConfig.driver().equals(completeDriverName) && localNodeId.equals(leaderNodeId)) {
-                    ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
-                    newCfg = newCfg.put(DRIVER, completeDriverName);
-                    ObjectMapper mapper = new ObjectMapper();
-                    JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
-                    log.debug("New driver {} for device {}", completeDriverName, deviceId);
-                    cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
-                    // Completable future is needed for when this method will also apply the pipeline to the device.
-                    // FIXME (maybe): the pipeline is currently applied by the general device provider.
-                    // But we store here the association between device and pipeconf.
-                    pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
-                }
-                operationResult.complete(true);
-            }
-        });
-        return operationResult;
+    public CompletableFuture<Boolean> bindToDevice(
+            PiPipeconfId pipeconfId, DeviceId deviceId) {
+        return CompletableFuture.supplyAsync(() -> doMergeDriver(
+                deviceId, pipeconfId), executor);
     }
 
     @Override
@@ -241,6 +176,83 @@
         return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
     }
 
+    private boolean doMergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
+        // Perform the following operations:
+        // 1) ALL nodes: create and register new merged driver (pipeconf + base driver)
+        // 2) ONE node (leader): updated netcfg with new driver
+        log.warn("Starting device driver merge of {} with {}...", deviceId, pipeconfId);
+        final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
+                deviceId, BasicDeviceConfig.class);
+        final Driver baseDriver = driverService.getDriver(
+                basicDeviceConfig.driver());
+        final String newDriverName = baseDriver.name() + ":" + pipeconfId;
+        if (baseDriver.name().equals(newDriverName)) {
+            log.warn("Requested to merge {} driver with {} for {}, "
+                             + "but current device driver is already merged",
+                     baseDriver.name(), pipeconfId, deviceId);
+            return true;
+        }
+        final PiPipeconf pipeconf = pipeconfs.get(pipeconfId);
+        if (pipeconf == null) {
+            log.error("Pipeconf {} is not registered", pipeconfId);
+            return false;
+        }
+        // 1) if merged driver exists already we don't create a new one.
+        try {
+            driverService.getDriver(newDriverName);
+            log.info("Found existing merged driver {}, re-using that", newDriverName);
+        } catch (ItemNotFoundException e) {
+            log.info("Creating merged driver {}...", newDriverName);
+            createMergedDriver(pipeconf, baseDriver, newDriverName);
+        }
+        // 2) Updating device cfg to enforce the merged driver (one node only)
+        final boolean isLeader = leadershipService
+                .runForLeadership(DRIVER_MERGE_TOPIC + deviceId.toString())
+                .leaderNodeId()
+                .equals(clusterService.getLocalNode().id());
+        if (isLeader) {
+            // FIXME: this binding should be updated by the same entity
+            // deploying the pipeconf.
+            pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
+            if (!basicDeviceConfig.driver().equals(newDriverName)) {
+                log.info("Applying new driver {} for device {} via cfg...",
+                         newDriverName, deviceId);
+                setDriverViaCfg(deviceId, newDriverName, basicDeviceConfig);
+            }
+        }
+        return true;
+    }
+
+    private void createMergedDriver(PiPipeconf pipeconf, Driver baseDriver,
+                                    String newDriverName) {
+        // extract the behaviours from the pipipeconf.
+        final Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours =
+                new HashMap<>();
+        pipeconf.behaviours().forEach(
+                b -> behaviours.put(b, pipeconf.implementation(b).get()));
+        final Driver piPipeconfDriver = new DefaultDriver(
+                newDriverName, baseDriver.parents(),
+                baseDriver.manufacturer(), baseDriver.hwVersion(),
+                baseDriver.swVersion(), behaviours, new HashMap<>());
+        // take the base driver created with the behaviours of the PiPeconf and
+        // merge it with the base driver that was assigned to the device
+        final Driver completeDriver = piPipeconfDriver.merge(baseDriver);
+        // This might lead to explosion of number of providers in the core,
+        // due to 1:1:1 pipeconf:driver:provider maybe find better way
+        final DriverProvider provider = new PiPipeconfDriverProviderInternal(
+                completeDriver);
+        // register the merged driver
+        driverAdminService.registerProvider(provider);
+    }
+
+    private void setDriverViaCfg(DeviceId deviceId, String driverName,
+                                 BasicDeviceConfig basicDeviceConfig) {
+        ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
+        newCfg = newCfg.put(DRIVER, driverName);
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
+        cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
+    }
 
     private class PiPipeconfDriverProviderInternal implements DriverProvider {
 
@@ -255,35 +267,4 @@
             return ImmutableSet.of(driver);
         }
     }
-
-    private void addPipeconfFromCfg(DeviceId deviceId) {
-        PiPipeconfConfig pipeconfConfig =
-                cfgService.getConfig(deviceId, PiPipeconfConfig.class);
-        PiPipeconfId id = pipeconfConfig.piPipeconfId();
-        if (id.id().equals("")) {
-            log.debug("Ignoring empty pipeconf ID for device {}", deviceId);
-        } else {
-            pipeconfMappingStore.createOrUpdateBinding(deviceId, id);
-        }
-    }
-
-    /**
-     * Listener for configuration events.
-     */
-    private class InternalNetworkConfigListener implements NetworkConfigListener {
-
-
-        @Override
-        public void event(NetworkConfigEvent event) {
-            DeviceId deviceId = (DeviceId) event.subject();
-            addPipeconfFromCfg(deviceId);
-        }
-
-        @Override
-        public boolean isRelevant(NetworkConfigEvent event) {
-            return event.configClass().equals(PiPipeconfConfig.class) &&
-                    (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
-                            event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
-        }
-    }
 }
diff --git a/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java b/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
index 0a839a9..4d29e69 100644
--- a/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
@@ -120,8 +120,7 @@
         assertEquals("Incorrect driver service", driverService, piPipeconfService.driverService);
         assertEquals("Incorrect driverAdminService service", driverAdminService, piPipeconfService.driverAdminService);
         assertEquals("Incorrect configuration service", cfgService, piPipeconfService.cfgService);
-        assertTrue("Incorrect config factory", cfgFactories.contains(piPipeconfService.factory));
-        assertTrue("Incorrect network configuration listener", netCfgListeners.contains(piPipeconfService.cfgListener));
+        assertTrue("Incorrect config factory", cfgFactories.contains(piPipeconfService.configFactory));
     }
 
     @Test
@@ -130,15 +129,13 @@
         assertEquals("Incorrect driver service", null, piPipeconfService.driverService);
         assertEquals("Incorrect driverAdminService service", null, piPipeconfService.driverAdminService);
         assertEquals("Incorrect configuration service", null, piPipeconfService.cfgService);
-        assertFalse("Config factory should be unregistered", cfgFactories.contains(piPipeconfService.factory));
-        assertFalse("Network configuration listener should be unregistered",
-                    netCfgListeners.contains(piPipeconfService.cfgListener));
+        assertFalse("Config factory should be unregistered", cfgFactories.contains(piPipeconfService.configFactory));
     }
 
     @Test
     public void register() {
         piPipeconfService.register(piPipeconf);
-        assertTrue("PiPipeconf should be registered", piPipeconfService.piPipeconfs.containsValue(piPipeconf));
+        assertTrue("PiPipeconf should be registered", piPipeconfService.pipeconfs.containsValue(piPipeconf));
     }
 
     @Test
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 2b31b91..29b86c3 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -41,7 +41,6 @@
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.PortNumber;
-import org.onosproject.net.SparseAnnotations;
 import org.onosproject.net.behaviour.PiPipelineProgrammable;
 import org.onosproject.net.behaviour.PortAdmin;
 import org.onosproject.net.config.ConfigFactory;
@@ -110,9 +109,9 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Provider which uses drivers to detect device and do initial handshake
- * and channel establishment with devices. Any other provider specific operation
- * is also delegated to the DeviceHandshaker driver.
+ * Provider which uses drivers to detect device and do initial handshake and
+ * channel establishment with devices. Any other provider specific operation is
+ * also delegated to the DeviceHandshaker driver.
  */
 @Beta
 @Component(immediate = true)
@@ -123,10 +122,8 @@
     private static final int DEVICE_OP_TIMEOUT = 10;
 
     private static final String DRIVER = "driver";
-    private static final String DEPLOY = "deploy-";
-    private static final String PIPECONF_TOPIC = "-pipeconf";
-    private static final String CHECK = "check-";
-    private static final String CONNECTION = "-connection";
+    public static final String FIRST_CONNECTION_TOPIC = "first-connection-";
+    private static final String CHECK_CONNECTION_TOPIC = "check-connection-";
     private static final String POLL_FREQUENCY = "pollFrequency";
 
     private final Logger log = getLogger(getClass());
@@ -183,7 +180,7 @@
     //FIXME this will be removed when the configuration is synced at the source.
     private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
 
-    private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
+    private static final ConcurrentMap<DeviceId, Lock> DEVICE_LOCKS = Maps.newConcurrentMap();
     //FIXME to be removed when netcfg will issue device events in a bundle or
     //ensures all configuration needed is present
     private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
@@ -197,13 +194,13 @@
 
     private ExecutorService connectionExecutor
             = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
-                    "onos/generaldeviceprovider-device-connect", "%d", log));
+            "onos/generaldeviceprovider-device-connect", "%d", log));
     private ScheduledExecutorService portStatsExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
-                    "onos/generaldeviceprovider-port-stats", "%d", log));
+            "onos/generaldeviceprovider-port-stats", "%d", log));
     private ScheduledExecutorService availabilityCheckExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
-                    "onos/generaldeviceprovider-availability-check", "%d", log));
+            "onos/generaldeviceprovider-availability-check", "%d", log));
     private ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
 
     private DeviceProviderService providerService;
@@ -235,7 +232,8 @@
         //This will fail if ONOS has CFG and drivers which depend on this provider
         // are activated, failing due to not finding the driver.
         cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
-                .forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
+                .forEach(did -> triggerConnectWithLeadership(
+                        did, FIRST_CONNECTION_TOPIC + did.toString()));
         //Initiating a periodic check to see if any device is available again and reconnect it.
         availabilityCheckExecutor.scheduleAtFixedRate(
                 this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
@@ -249,7 +247,7 @@
         if (context != null) {
             Dictionary<?, ?> properties = context.getProperties();
             pollFrequency = Tools.getIntegerProperty(properties, POLL_FREQUENCY,
-                    DEFAULT_POLL_FREQUENCY_SECONDS);
+                                                     DEFAULT_POLL_FREQUENCY_SECONDS);
             log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
         }
 
@@ -305,14 +303,15 @@
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
         log.info("Received role {} for device {}", newRole, deviceId);
         requestedRoles.put(deviceId, newRole);
-        connectionExecutor.submit(() -> doRoleChanged(deviceId, newRole));
+        connectionExecutor.submit(exceptionSafe(
+                () -> doRoleChanged(deviceId, newRole)));
     }
 
     private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
-        DeviceHandshaker handshaker = getHandshaker(deviceId);
+        final DeviceHandshaker handshaker = getHandshaker(deviceId);
         if (handshaker == null) {
-            log.warn("Null handshaker. Unable to notify new role {} to {}",
-                     newRole, deviceId);
+            log.error("Null handshaker. Unable to notify new role {} to {}",
+                      newRole, deviceId);
             return;
         }
         handshaker.roleChanged(newRole);
@@ -321,51 +320,41 @@
     @Override
     public boolean isReachable(DeviceId deviceId) {
         log.debug("Testing reachability for device {}", deviceId);
-
-        DeviceHandshaker handshaker = getHandshaker(deviceId);
+        final DeviceHandshaker handshaker = getHandshaker(deviceId);
         if (handshaker == null) {
             return false;
         }
-
-        try {
-            return handshaker.isReachable()
-                    .get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.warn("Device {} is not reachable {}", deviceId, e.getMessage());
-            log.debug("Exception", e);
-            return false;
-        }
+        return getFutureWithDeadline(
+                handshaker.isReachable(), "checking reachability",
+                deviceId, false);
     }
 
     @Override
     public void changePortState(DeviceId deviceId, PortNumber portNumber,
                                 boolean enable) {
-        if (deviceService.getDevice(deviceId).is(PortAdmin.class)) {
-
-            PortAdmin portAdmin = getPortAdmin(deviceId);
-            CompletableFuture<Boolean> modified;
-            if (enable) {
-                modified = portAdmin.enable(portNumber);
-            } else {
-                modified = portAdmin.disable(portNumber);
-            }
-            modified.thenAcceptAsync(result -> {
-                if (!result) {
-                    log.warn("Your device {} port {} status can't be changed to {}",
-                            deviceId, portNumber, enable);
-                }
-            });
-
-        } else {
-            log.warn("Device {} does not support PortAdmin behaviour", deviceId);
+        if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
+            log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
+                     deviceId);
+            return;
         }
+        final PortAdmin portAdmin = getPortAdmin(deviceId);
+        final CompletableFuture<Boolean> modified = enable
+                ? portAdmin.enable(portNumber)
+                : portAdmin.disable(portNumber);
+        modified.thenAcceptAsync(result -> {
+            if (!result) {
+                log.warn("Port {} status cannot be changed on {} (enable={})",
+                         portNumber, deviceId, enable);
+            }
+        });
     }
 
     @Override
     public void triggerDisconnect(DeviceId deviceId) {
         log.debug("Triggering disconnection of device {}", deviceId);
-        connectionExecutor.execute(() -> disconnectDevice(deviceId)
-                .thenRunAsync(() -> checkAndConnect(deviceId)));
+        connectionExecutor.execute(
+                () -> disconnectDevice(deviceId)
+                        .thenRunAsync(() -> checkAndConnect(deviceId)));
     }
 
     private DeviceHandshaker getHandshaker(DeviceId deviceId) {
@@ -380,7 +369,7 @@
     private PortAdmin getPortAdmin(DeviceId deviceId) {
         Driver driver = getDriver(deviceId);
         return getBehaviour(driver, PortAdmin.class,
-                new DefaultDriverData(driver, deviceId));
+                            new DefaultDriverData(driver, deviceId));
 
     }
 
@@ -390,7 +379,7 @@
             driver = driverService.getDriver(deviceId);
         } catch (ItemNotFoundException e) {
             log.debug("Falling back to configuration to fetch driver " +
-                    "for device {}", deviceId);
+                              "for device {}", deviceId);
             BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
             if (cfg != null) {
                 driver = driverService.getDriver(cfg.driver());
@@ -400,7 +389,7 @@
     }
 
     //needed since the device manager will not return the driver through implementation()
-    // method since the device is not pushed to the core so for the connectDevice
+    // method since the device is not pushed to the core so for the connectDeviceAsMaster
     // we need to work around that in order to test before calling
     // store.createOrUpdateDevice
     private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
@@ -413,139 +402,145 @@
         }
     }
 
-    //Connects a general device
-    private void connectDevice(DeviceId deviceId) {
-        //retrieve the configuration
-        GeneralProviderDeviceConfig providerConfig =
-                cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
-        BasicDeviceConfig basicDeviceConfig =
-                cfgService.getConfig(deviceId, BasicDeviceConfig.class);
-
+    private void doConnectDevice(DeviceId deviceId, boolean asMaster) {
+        if (deviceService.getDevice(deviceId) != null
+                && deviceService.isAvailable(deviceId)) {
+            log.info("Device {} is already connected to ONOS and is available",
+                     deviceId);
+            return;
+        }
+        // Retrieve config
+        final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
+                deviceId, GeneralProviderDeviceConfig.class);
+        final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
+                deviceId, BasicDeviceConfig.class);
         if (providerConfig == null || basicDeviceConfig == null) {
-            log.error("Configuration is NULL: basic config {}, general provider " +
-                    "config {}", basicDeviceConfig, providerConfig);
-        } else {
-            log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
-
-            DeviceHandshaker handshaker = getHandshaker(deviceId);
-            if (handshaker == null) {
-                log.error("Device {} does not support DeviceHandshaker behaviour", deviceId);
-                return;
-            }
-            Driver driver = handshaker.handler().driver();
-
-            addConfigData(providerConfig, handshaker.data());
-
-            //Connecting to the device
-            CompletableFuture<Boolean> connected = handshaker.connect();
-
-            connected.thenAcceptAsync(result -> {
-                if (result) {
-                    handshaker.addDeviceAgentListener(deviceAgentListener);
-                    //Populated with the default values obtained by the driver
-                    ChassisId cid = new ChassisId();
-                    SparseAnnotations annotations = DefaultAnnotations.builder()
-                            .set(AnnotationKeys.PROTOCOL,
-                                    providerConfig.protocolsInfo().keySet().toString())
-                            .build();
-                    DeviceDescription description =
-                            new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
-                                    driver.manufacturer(), driver.hwVersion(),
-                                    driver.swVersion(), UNKNOWN,
-                                    cid, true, annotations);
-                    //Empty list of ports
-                    List<PortDescription> ports = new ArrayList<>();
-
-                    DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(driver,
-                            DeviceDescriptionDiscovery.class, handshaker.data());
-                    if (deviceDiscovery != null) {
-                        DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
-                        if (newdescription != null) {
-                            description = newdescription;
-                        }
-                        ports = deviceDiscovery.discoverPortDetails();
-                    } else {
-                        log.info("No Device Description Discovery for device {}, no update for " +
-                                "description or ports.", deviceId);
-                    }
-
-                    if (!handlePipeconf(deviceId, driver, handshaker.data(), true)) {
-                        // Something went wrong during handling of pipeconf.
-                        // We already logged the error.
-                        handshaker.disconnect();
-                        return;
-                    }
-                    advertiseDevice(deviceId, description, ports);
-
-                } else {
-                    log.warn("Can't connect to device {}", deviceId);
-                }
-            });
+            log.error("Configuration missing, cannot connect to {}. " +
+                              "basicDeviceConfig={}, generalProvider={}",
+                      deviceId, basicDeviceConfig, providerConfig);
+            return;
+        }
+        log.info("Initiating connection to device {} with driver {} ... asMaster={}",
+                 deviceId, basicDeviceConfig.driver(), asMaster);
+        // Get handshaker, driver and driver data.
+        final DeviceHandshaker handshaker = getHandshaker(deviceId);
+        if (handshaker == null) {
+            log.error("Missing DeviceHandshaker behavior for {}, aborting connection",
+                      deviceId);
+            return;
+        }
+        final Driver driver = handshaker.handler().driver();
+        // Enhance driver data with info in GDP config.
+        augmentConfigData(providerConfig, handshaker.data());
+        final DriverData driverData = handshaker.data();
+        // Start connection via handshaker.
+        final Boolean connectSuccess = getFutureWithDeadline(
+                handshaker.connect(), "initiating connection",
+                deviceId, null);
+        if (connectSuccess == null) {
+            // Error logged by getFutureWithDeadline().
+            return;
+        } else if (!connectSuccess) {
+            log.warn("Unable to connect to {}", deviceId);
+            return;
+        }
+        // Handle pipeconf (if device is capable)
+        if (!handlePipeconf(deviceId, driver, driverData, asMaster)) {
+            // We already logged the error.
+            handshaker.disconnect();
+            return;
+        }
+        // Add device agent listener.
+        handshaker.addDeviceAgentListener(deviceAgentListener);
+        // All good. Notify core (if master).
+        if (asMaster) {
+            advertiseDevice(deviceId, driver, providerConfig, driverData);
         }
     }
 
-    private void connectStandbyDevice(DeviceId deviceId) {
-        // if device is pipeline programmable we merge pipeconf + base driver for every other role
-        GeneralProviderDeviceConfig providerConfig =
-                cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
 
-        DeviceHandshaker handshaker = getHandshaker(deviceId);
-        if (handshaker == null) {
-            log.error("Device {} does not support DeviceHandshaker behaviour", deviceId);
-            return;
+    private void advertiseDevice(DeviceId deviceId, Driver driver,
+                                 GeneralProviderDeviceConfig providerConfig,
+                                 DriverData driverData) {
+        // Obtain device and port description and advertise device to core.
+        DeviceDescription description = null;
+        final List<PortDescription> ports;
+
+        final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
+                driver, DeviceDescriptionDiscovery.class, driverData);
+
+        if (deviceDiscovery != null) {
+            description = deviceDiscovery.discoverDeviceDetails();
+            ports = deviceDiscovery.discoverPortDetails();
+        } else {
+            log.warn("Missing DeviceDescriptionDiscovery behavior for {}, " +
+                             "no update for description or ports.", deviceId);
+            ports = new ArrayList<>();
         }
-        addConfigData(providerConfig, handshaker.data());
 
-        //Connecting to the device
-        handshaker.connect().thenAcceptAsync(result -> {
-            if (result) {
-                handshaker.addDeviceAgentListener(deviceAgentListener);
-                handlePipeconf(deviceId, handshaker.handler().driver(), handshaker.data(), false);
-            }
-        });
+        if (description == null) {
+            // Generate one here.
+            // FIXME: a behavior impl should not return a null description
+            // (e.g. as GnmiDeviceDescriptionDiscovery). This case should apply
+            // only if a the behavior is not available.
+            description = new DefaultDeviceDescription(
+                    deviceId.uri(), Device.Type.SWITCH,
+                    driver.manufacturer(), driver.hwVersion(),
+                    driver.swVersion(), UNKNOWN,
+                    new ChassisId(), true,
+                    DefaultAnnotations.builder()
+                            .set(AnnotationKeys.PROTOCOL,
+                                 providerConfig.protocolsInfo().keySet().toString())
+                            .build());
+        }
+
+        providerService.deviceConnected(deviceId, description);
+        providerService.updatePorts(deviceId, ports);
     }
 
     /**
-     * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
-     * device can be registered to the core, false otherwise.
+     * Handles the case of a device that is pipeline programmable. Returns true
+     * if the operation wa successful and the device can be registered to the
+     * core, false otherwise.
      */
-    private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) {
-        PiPipelineProgrammable pipelineProg =
-                getBehaviour(driver, PiPipelineProgrammable.class, driverData);
-
+    private boolean handlePipeconf(DeviceId deviceId, Driver driver,
+                                   DriverData driverData, boolean deployPipeconf) {
+        final PiPipelineProgrammable pipelineProg = getBehaviour(
+                driver, PiPipelineProgrammable.class, driverData);
         if (pipelineProg == null) {
             // Device is not pipeline programmable.
             return true;
         }
 
-        PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
+        final PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
+        if (pipeconf == null) {
+            return false;
+        }
+        final PiPipeconfId pipeconfId = pipeconf.id();
 
-        if (pipeconf != null) {
-            PiPipeconfId pipeconfId = pipeconf.id();
-
-            try {
-                if (deployPipeconf) {
-                    if (!pipelineProg.deployPipeconf(pipeconf).get()) {
-                        log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
-                                  pipeconfId, deviceId);
-                        return false;
-                    }
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                log.warn("Exception occurred while deploying pipeconf {} to device {}", pipeconf.id(), deviceId, e);
+        if (deployPipeconf) {
+            final Boolean deploySuccess = getFutureWithDeadline(
+                    pipelineProg.deployPipeconf(pipeconf),
+                    "deploying pipeconf", deviceId, null);
+            if (deploySuccess == null) {
+                // Error logged by getFutureWithDeadline().
+                return false;
+            } else if (!deploySuccess) {
+                log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
+                          pipeconfId, deviceId);
                 return false;
             }
-            try {
-                if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
-                    log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
-                              driver.name(), deviceId, pipeconfId);
-                    return false;
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                log.warn("Exception occurred while binding pipeconf {} to device {}", pipeconf.id(), deviceId, e);
-                return false;
-            }
-        } else {
+        }
+
+        final Boolean mergeSuccess = getFutureWithDeadline(
+                piPipeconfService.bindToDevice(pipeconfId, deviceId),
+                "merging driver", deviceId, null);
+        if (mergeSuccess == null) {
+            // Error logged by getFutureWithDeadline().
+            return false;
+        } else if (!mergeSuccess) {
+            log.error("Unable to merge pipeconf driver for {}, aborting device discovery",
+                      pipeconfId, deviceId);
             return false;
         }
 
@@ -564,46 +559,41 @@
                 return null;
             }
         });
-
         if (pipeconfId == null) {
-            log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
+            log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it", deviceId);
             return null;
         }
-
         if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
             log.warn("Pipeconf {} is not registered", pipeconfId);
             return null;
         }
-
         return piPipeconfService.getPipeconf(pipeconfId).get();
     }
 
-    private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
-        providerService.deviceConnected(deviceId, description);
-        providerService.updatePorts(deviceId, ports);
-    }
-
-    private CompletableFuture<Boolean> disconnectDevice(DeviceId deviceId) {
+    private CompletableFuture<?> disconnectDevice(DeviceId deviceId) {
         log.info("Disconnecting for device {}", deviceId);
-
+        // Remove from core (if master)
+        if (mastershipService.isLocalMaster(deviceId)) {
+            providerService.deviceDisconnected(deviceId);
+        }
+        // Cancel tasks
         if (scheduledTasks.containsKey(deviceId)) {
             scheduledTasks.remove(deviceId).cancel(true);
         }
-
-        DeviceHandshaker handshaker = handshakers.remove(deviceId);
-
+        // Perform disconnection with device.
+        final DeviceHandshaker handshaker = handshakers.remove(deviceId);
         if (handshaker == null) {
-            // gracefully ignoring.
-            log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
-                             "shutdown of communication", deviceId);
+            // Gracefully ignore
+            log.warn("Missing DeviceHandshaker behavior for {}, " +
+                             "no guarantees of complete disconnection",
+                     deviceId);
             return CompletableFuture.completedFuture(false);
         }
-
+        handshaker.removeDeviceAgentListener(deviceAgentListener);
         return handshaker.disconnect()
                 .thenApplyAsync(result -> {
                     if (result) {
                         log.info("Disconnected device {}", deviceId);
-                        providerService.deviceDisconnected(deviceId);
                     } else {
                         log.warn("Device {} was unable to disconnect", deviceId);
                     }
@@ -611,7 +601,7 @@
                 });
     }
 
-    //Needed to catch the exception in the executors since are not rethrown otherwise.
+    // Needed to catch the exception in the executors since are not rethrown otherwise.
     private Runnable exceptionSafe(Runnable runnable) {
         return () -> {
             try {
@@ -622,6 +612,18 @@
         };
     }
 
+    private Runnable withDeviceLock(Runnable runnable, DeviceId deviceId) {
+        return () -> {
+            Lock lock = DEVICE_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
+            lock.lock();
+            try {
+                runnable.run();
+            } finally {
+                lock.unlock();
+            }
+        };
+    }
+
     private void updatePortStatistics(DeviceId deviceId) {
         Device device = deviceService.getDevice(deviceId);
         if (!Objects.isNull(device) && deviceService.isAvailable(deviceId) &&
@@ -641,6 +643,16 @@
         return !deviceId.uri().getScheme().equals(URI_SCHEME);
     }
 
+    private void triggerConnectWithLeadership(DeviceId deviceId,
+                                              String leadershipTopic) {
+        final NodeId leaderNodeId = leadershipService.runForLeadership(
+                leadershipTopic).leader().nodeId();
+        final boolean thisNodeMaster = clusterService
+                .getLocalNode().id().equals(leaderNodeId);
+        connectionExecutor.submit(withDeviceLock(exceptionSafe(
+                () -> doConnectDevice(deviceId, thisNodeMaster)), deviceId));
+    }
+
     /**
      * Listener for configuration events.
      */
@@ -655,49 +667,37 @@
                 log.debug("{} is not my scheme, skipping", deviceId);
                 return;
             }
-            if (deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)) {
-                log.info("Device {} is already connected to ONOS and is available", deviceId);
+            if (!isDeviceConfigComplete(event, deviceId)) {
+                // Still waiting for some configuration.
                 return;
             }
-            NodeId leaderNodeId = leadershipService.runForLeadership(DEPLOY + deviceId.toString() + PIPECONF_TOPIC)
-                    .leader().nodeId();
-            NodeId localNodeId = clusterService.getLocalNode().id();
-            if (localNodeId.equals(leaderNodeId)) {
-                if (processEvent(event, deviceId)) {
-                    log.debug("{} is leader for {}, initiating the connection and deploying pipeline", leaderNodeId,
-                            deviceId);
-                    checkAndSubmitDeviceTask(deviceId);
-                }
-            } else {
-                if (processEvent(event, deviceId)) {
-                    log.debug("{} is not leader for {}, initiating connection but not deploying pipeline, {} is LEADER",
-                            localNodeId, deviceId, leaderNodeId);
-                    connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
-                    //FIXME this will be removed when config is synced
-                    cleanUpConfigInfo(deviceId);
-                }
-            }
+            // Good to go.
+            triggerConnectWithLeadership(
+                    deviceId, FIRST_CONNECTION_TOPIC + deviceId.toString());
+            cleanUpConfigInfo(deviceId);
         }
 
-        private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) {
-            //FIXME to be removed when netcfg will issue device events in a bundle or
+        private boolean isDeviceConfigComplete(NetworkConfigEvent event, DeviceId deviceId) {
+            // FIXME to be removed when netcfg will issue device events in a bundle or
             // ensure all configuration needed is present
-            Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
+            Lock lock = DEVICE_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
             lock.lock();
             try {
                 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
                     //FIXME we currently assume that p4runtime devices are pipeline configurable.
                     //If we want to connect a p4runtime device with no pipeline
-                    if (event.config().isPresent() &&
-                            Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
-                                    PIPELINE_CONFIGURABLE_PROTOCOLS)) {
-                        pipelineConfigured.add(deviceId);
+                    if (event.config().isPresent()) {
+                        deviceConfigured.add(deviceId);
+                        final boolean isNotPipelineConfigurable = Collections.disjoint(
+                                ImmutableSet.copyOf(event.config().get().node().fieldNames()),
+                                PIPELINE_CONFIGURABLE_PROTOCOLS);
+                        if (isNotPipelineConfigurable) {
+                            // Skip waiting for a pipeline if we can't support it.
+                            pipelineConfigured.add(deviceId);
+                        }
                     }
-                    deviceConfigured.add(deviceId);
                 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
                     if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
-                        //TODO add check for pipeline and add it to the pipeline list if no
-                        // p4runtime is present.
                         driverConfigured.add(deviceId);
                     }
                 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
@@ -706,9 +706,9 @@
                         pipelineConfigured.add(deviceId);
                     }
                 }
-                //if the device has no "pipeline configurable protocol it will be present
-                // in the pipelineConfigured
-                if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
+
+                if (deviceConfigured.contains(deviceId)
+                        && driverConfigured.contains(deviceId)
                         && pipelineConfigured.contains(deviceId)) {
                     return true;
                 } else {
@@ -740,14 +740,7 @@
         }
     }
 
-    private void checkAndSubmitDeviceTask(DeviceId deviceId) {
-        connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
-        //FIXME this will be removed when configuration is synced.
-        cleanUpConfigInfo(deviceId);
-
-    }
-
-    private void addConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
+    private void augmentConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
         //Storing deviceKeyId and all other config values
         // as data in the driver with protocol_<info>
         // name as the key. e.g protocol_ip
@@ -780,46 +773,44 @@
     }
 
     private void checkAndConnect(DeviceId deviceId) {
-        // Let's try and reconnect to a device which is stored in the net-cfg.
+        // Let's try and reconnect to a device which is stored in cfg.
         // One of the following conditions must be satisfied:
-        // 1) device is null in the store meaning that is was never connected or it was administratively removed
-        // 2) the device is not available and there is no MASTER instance, meaning the device lost
-        //    it's connection to ONOS at some point in the past.
-        // We also check that the general device provider config and the driver config are present.
-        // We do not check for reachability using isReachable(deviceId) since the behaviour of this method
-        // can vary depending on protocol nuances. We leave this check to the device handshaker
-        // at later stages of the connection process.
-        // IF the conditions are not met but instead the device is present in the store, available and this instance is
-        // MASTER but is not reachable we remove it from the store.
+        // 1) device is null in the store meaning that is was never connected or
+        // it was administratively removed
+        // 2) the device is not available and there is no MASTER instance,
+        // meaning the device lost it's connection to ONOS at some point in the
+        // past.
+        // We also check that the general device provider config and the driver
+        // config are present. We do not check for reachability using
+        // isReachable(deviceId) since the behaviour of this method can vary
+        // depending on protocol nuances. We leave this check to the device
+        // handshaker at later stages of the connection process. IF the
+        // conditions are not met but instead the device is present in the
+        // store, available and this instance is MASTER but is not reachable we
+        // remove it from the store.
 
-        if ((deviceService.getDevice(deviceId) == null || (!deviceService.isAvailable(deviceId) &&
-                mastershipService.getMasterFor(deviceId) == null)) && configIsPresent(deviceId)) {
+        if ((deviceService.getDevice(deviceId) == null
+                || (!deviceService.isAvailable(deviceId)
+                && mastershipService.getMasterFor(deviceId) == null))
+                && configIsPresent(deviceId)) {
             log.debug("Trying to re-connect to device {}", deviceId);
-            NodeId leaderNodeId = leadershipService.runForLeadership(CHECK + deviceId.toString() + CONNECTION)
-                    .leader().nodeId();
-            NodeId localNodeId = clusterService.getLocalNode().id();
-            if (localNodeId.equals(leaderNodeId)) {
-                log.debug("{} is leader for {}, initiating the connection", leaderNodeId,
-                        deviceId);
-                checkAndSubmitDeviceTask(deviceId);
-            } else {
-                log.debug("{} is not leader for {}, initiating connection, {} is LEADER",
-                        localNodeId, deviceId, leaderNodeId);
-                connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
-                //FIXME this will be removed when config is synced
-                cleanUpConfigInfo(deviceId);
-            }
-        } else if ((deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)
-                && mastershipService.isLocalMaster(deviceId) && !isReachable(deviceId) && configIsPresent(deviceId))) {
+            triggerConnectWithLeadership(
+                    deviceId, CHECK_CONNECTION_TOPIC + deviceId.toString());
+            cleanUpConfigInfo(deviceId);
+        } else if (deviceService.getDevice(deviceId) != null
+                && deviceService.isAvailable(deviceId)
+                && mastershipService.isLocalMaster(deviceId)
+                && !isReachable(deviceId)
+                && configIsPresent(deviceId)) {
             log.info("Removing available but unreachable device {}", deviceId);
             disconnectDevice(deviceId);
-            providerService.deviceDisconnected(deviceId);
         }
     }
 
     private boolean configIsPresent(DeviceId deviceId) {
-        boolean present = cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
-                && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
+        final boolean present =
+                cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
+                        && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
         if (!present) {
             log.warn("Configuration for device {} is not complete", deviceId);
         }
@@ -827,14 +818,9 @@
     }
 
     private void handleChannelClosed(DeviceId deviceId) {
-        disconnectDevice(deviceId).thenRunAsync(() -> {
-            // If master, notifies disconnection to the core.
-            if (mastershipService.isLocalMaster(deviceId)) {
-                log.info("Disconnecting device {}, due to channel closed event",
-                         deviceId);
-                providerService.deviceDisconnected(deviceId);
-            }
-        });
+        log.info("Disconnecting device {}, due to channel closed event",
+                 deviceId);
+        disconnectDevice(deviceId);
     }
 
     private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
@@ -854,6 +840,21 @@
         }
     }
 
+    private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
+                                        DeviceId deviceId, U defaultValue) {
+        try {
+            return future.get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("Thread interrupted while {} on {}", opDescription, deviceId);
+            Thread.currentThread().interrupt();
+        } catch (ExecutionException e) {
+            log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
+        } catch (TimeoutException e) {
+            log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
+        }
+        return defaultValue;
+    }
+
     /**
      * Listener for core device events.
      */
@@ -861,7 +862,6 @@
         @Override
         public void event(DeviceEvent event) {
             DeviceId deviceId = event.subject().id();
-            // FIXME handling for mastership change scenario missing?
             // For now this is scheduled periodically, when streaming API will
             // be available we check and base it on the streaming API (e.g. gNMI)
             if (mastershipService.isLocalMaster(deviceId)) {