Refactor P4Runtime subsystem to implement async connection procedure

This patch is an attempt to solve issues observed when restarting both
switches and ONOS nodes. Most of the issues seemed to depend on a
brittle mastership handling when deploying the pipeline.

With this patch, GDP registers devices to the core with available=false
(i.e. offline) and marks them online only when the P4 pipeline has been
deployed to the device. A new PiPipeconfWatchdogService takes care of
deploying pipelines and producing event when devices are ready.

Moreover, we fix a race condition where pipeconf-related behaviors
were not found. This was caused by GDP enforcing the merged
driver name in the network config, while external entities (e.g.
Mininet) were pushing a JSON blob with the base driver name. This patch
removes the need to rely on such a trick and instead uses
pipeconf-aware logic directly in the driver manager (change #19622).

Finally, we fix issues in P4RuntimeClientImpl that were causing the
stream channel not detecting unreachable devices. The solution is to
follow gRPC APIs and re-instantiate a new channel once the first fails.

Change-Id: I6fbc91859c0fb58a6db3bc197b7081a8fe9f97f7
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 21c6ae7..42faa0c 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -16,9 +16,6 @@
 
 package org.onosproject.provider.general.device.impl;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.annotations.Beta;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
@@ -34,8 +31,6 @@
 import org.onlab.util.ItemNotFoundException;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
 import org.onosproject.core.CoreService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
@@ -77,6 +72,9 @@
 import org.onosproject.net.pi.model.PiPipeconfId;
 import org.onosproject.net.pi.service.PiPipeconfConfig;
 import org.onosproject.net.pi.service.PiPipeconfService;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
@@ -84,13 +82,13 @@
 import org.slf4j.Logger;
 
 import java.security.SecureRandom;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.StringJoiner;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -121,10 +119,19 @@
 public class GeneralDeviceProvider extends AbstractProvider
         implements DeviceProvider {
 
-    private static final String DRIVER = "driver";
-
     private final Logger log = getLogger(getClass());
 
+    private static final String APP_NAME = "org.onosproject.gdp";
+    private static final String URI_SCHEME = "device";
+    private static final String CFG_SCHEME = "generalprovider";
+    private static final String DEVICE_PROVIDER_PACKAGE =
+            "org.onosproject.general.provider.device";
+    private static final int CORE_POOL_SIZE = 10;
+    private static final String UNKNOWN = "unknown";
+    private static final String DRIVER = "driver";
+    private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS =
+            ImmutableSet.of("p4runtime");
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private DeviceProviderRegistry providerRegistry;
 
@@ -147,13 +154,10 @@
     private MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private PiPipeconfService piPipeconfService;
+    private PiPipeconfService pipeconfService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterService clusterService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private LeadershipService leadershipService;
+    private PiPipeconfWatchdogService pipeconfWatchdogService;
 
     private static final String STATS_POLL_FREQUENCY = "deviceStatsPollFrequency";
     private static final int DEFAULT_STATS_POLL_FREQUENCY = 10;
@@ -177,79 +181,48 @@
                     "(e.g. checking device reachability); default is 10 seconds")
     private int opTimeoutShort = DEFAULT_OP_TIMEOUT_SHORT;
 
-    private static final String OP_TIMEOUT_LONG = "deviceOperationTimeoutLong";
-    private static final int DEFAULT_OP_TIMEOUT_LONG = 60;
-    @Property(name = OP_TIMEOUT_LONG, intValue = DEFAULT_OP_TIMEOUT_LONG,
-            label = "Configure timeout in seconds for device operations " +
-                    "that are supposed to take a relatively long time " +
-                    "(e.g. pushing a large pipeline configuration with slow " +
-                    "network); default is 60 seconds")
-    private int opTimeoutLong = DEFAULT_OP_TIMEOUT_LONG;
-
-    private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
-    private static final String URI_SCHEME = "device";
-    private static final String CFG_SCHEME = "generalprovider";
-    private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
-    private static final int CORE_POOL_SIZE = 10;
-    private static final String UNKNOWN = "unknown";
-
-    //FIXME this will be removed when the configuration is synced at the source.
-    private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
-
     //FIXME to be removed when netcfg will issue device events in a bundle or
     //ensures all configuration needed is present
-    private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
-    private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
-    private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
+    private final Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
+    private final Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
+    private final Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
 
-    private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
     private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
-    private final Striped<Lock> deviceLocks = Striped.lock(30);
-
-    private ExecutorService connectionExecutor
-            = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
-            "onos/generaldeviceprovider-device-connect", "%d", log));
-    private ScheduledExecutorService statsExecutor
-            = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
-            "onos/generaldeviceprovider-stats-poll", "%d", log));
-    private ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
-    private ScheduledExecutorService probeExecutor
-            = newSingleThreadScheduledExecutor(groupedThreads(
-            "onos/generaldeviceprovider-probe-", "%d", log));
-    private ScheduledFuture<?> probeTask = null;
-
-    private DeviceProviderService providerService;
-    private InternalDeviceListener deviceListener = new InternalDeviceListener();
-
-    private final ConfigFactory factory =
-            new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
-                    SubjectFactories.DEVICE_SUBJECT_FACTORY,
-                    GeneralProviderDeviceConfig.class, CFG_SCHEME) {
-                @Override
-                public GeneralProviderDeviceConfig createConfig() {
-                    return new GeneralProviderDeviceConfig();
-                }
-            };
-
+    private final ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
+    private final Map<DeviceId, DeviceHandshaker> handshakersWithListeners = Maps.newConcurrentMap();
+    private final InternalDeviceListener deviceListener = new InternalDeviceListener();
+    private final InternalPipeconfWatchdogListener pipeconfWatchdogListener = new InternalPipeconfWatchdogListener();
     private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
     private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
+    private final ConfigFactory factory = new InternalConfigFactory();
+    private final Striped<Lock> deviceLocks = Striped.lock(30);
 
+    private ExecutorService connectionExecutor;
+    private ScheduledExecutorService statsExecutor;
+    private ScheduledExecutorService probeExecutor;
+    private ScheduledFuture<?> probeTask;
+    private DeviceProviderService providerService;
+
+    public GeneralDeviceProvider() {
+        super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
+    }
 
     @Activate
     public void activate(ComponentContext context) {
+        connectionExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
+                "onos/gdp-connect", "%d", log));
+        statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
+                "onos/gdp-stats", "%d", log));
+        probeExecutor = newSingleThreadScheduledExecutor(groupedThreads(
+                "onos/gdp-probe", "%d", log));
         providerService = providerRegistry.register(this);
         componentConfigService.registerProperties(getClass());
         coreService.registerApplication(APP_NAME);
         cfgService.registerConfigFactory(factory);
         cfgService.addListener(cfgListener);
         deviceService.addListener(deviceListener);
-        handshakers.clear();
-        //This will fail if ONOS has CFG and drivers which depend on this provider
-        // are activated, failing due to not finding the driver.
-        cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
-                .forEach(this::triggerConnect);
-        //Initiating a periodic check to see if any device is available again and reconnect it.
-        rescheduleProbeTask();
+        pipeconfWatchdogService.addListener(pipeconfWatchdogListener);
+        rescheduleProbeTask(false);
         modified(context);
         log.info("Started");
     }
@@ -275,51 +248,73 @@
                 properties, OP_TIMEOUT_SHORT, DEFAULT_OP_TIMEOUT_SHORT);
         log.info("Configured. {} is configured to {} seconds",
                  OP_TIMEOUT_SHORT, opTimeoutShort);
-        opTimeoutLong = Tools.getIntegerProperty(
-                properties, OP_TIMEOUT_LONG, DEFAULT_OP_TIMEOUT_LONG);
-        log.info("Configured. {} is configured to {} seconds",
-                 OP_TIMEOUT_LONG, opTimeoutLong);
 
         if (oldStatsPollFrequency != statsPollFrequency) {
             rescheduleStatsPollingTasks();
         }
 
         if (oldProbeFrequency != probeFrequency) {
-            rescheduleProbeTask();
+            rescheduleProbeTask(true);
         }
     }
 
-    private synchronized void rescheduleProbeTask() {
-        if (probeTask != null) {
-            probeTask.cancel(false);
+    private void rescheduleProbeTask(boolean deelay) {
+        synchronized (this) {
+            if (probeTask != null) {
+                probeTask.cancel(false);
+            }
+            probeTask = probeExecutor.scheduleAtFixedRate(
+                    this::triggerProbeAllDevices,
+                    deelay ? probeFrequency : 0,
+                    probeFrequency,
+                    TimeUnit.SECONDS);
         }
-        probeTask = probeExecutor.scheduleAtFixedRate(
-                this::triggerProbeAllDevices, probeFrequency,
-                probeFrequency, TimeUnit.SECONDS);
     }
 
     @Deactivate
     public void deactivate() {
-        statsExecutor.shutdown();
-        probeExecutor.shutdown();
+        // Shutdown stats polling tasks.
+        statsPollingTasks.keySet().forEach(this::cancelStatsPolling);
+        statsPollingTasks.clear();
+        statsExecutor.shutdownNow();
+        try {
+            statsExecutor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("statsExecutor not terminated properly");
+        }
+        statsExecutor = null;
+        // Shutdown probe executor.
+        probeTask.cancel(true);
+        probeTask = null;
+        probeExecutor.shutdownNow();
+        try {
+            probeExecutor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("probeExecutor not terminated properly");
+        }
+        probeExecutor = null;
+        // Shutdown connection executor.
+        connectionExecutor.shutdownNow();
+        try {
+            connectionExecutor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("connectionExecutor not terminated properly");
+        }
+        connectionExecutor = null;
+        // Remove all device agent listeners
+        handshakersWithListeners.values().forEach(h -> h.removeDeviceAgentListener(id()));
+        handshakersWithListeners.clear();
+        // Other cleanup.
         componentConfigService.unregisterProperties(getClass(), false);
         cfgService.removeListener(cfgListener);
-        //Not Removing the device so they can still be used from other driver providers
-        //cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
-        //          .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
-        connectionExecutor.shutdown();
         deviceService.removeListener(deviceListener);
+        pipeconfWatchdogService.removeListener(pipeconfWatchdogListener);
         providerRegistry.unregister(this);
-        handshakers.clear();
         providerService = null;
         cfgService.unregisterConfigFactory(factory);
         log.info("Stopped");
     }
 
-    public GeneralDeviceProvider() {
-        super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
-    }
-
 
     @Override
     public void triggerProbe(DeviceId deviceId) {
@@ -329,13 +324,14 @@
 
     @Override
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
-        log.info("Received role {} for device {}", newRole, deviceId);
+        log.info("Notifying role {} to device {}", newRole, deviceId);
         requestedRoles.put(deviceId, newRole);
         connectionExecutor.execute(() -> doRoleChanged(deviceId, newRole));
     }
 
     private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
-        final DeviceHandshaker handshaker = getHandshaker(deviceId);
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
         if (handshaker == null) {
             log.error("Null handshaker. Unable to notify new role {} to {}",
                       newRole, deviceId);
@@ -347,7 +343,8 @@
     @Override
     public boolean isReachable(DeviceId deviceId) {
         log.debug("Testing reachability for device {}", deviceId);
-        final DeviceHandshaker handshaker = getHandshaker(deviceId);
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
         if (handshaker == null) {
             return false;
         }
@@ -356,6 +353,16 @@
                 deviceId, false, opTimeoutShort);
     }
 
+    private boolean isConnected(DeviceId deviceId) {
+        log.debug("Testing connection to device {}", deviceId);
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
+        if (handshaker == null) {
+            return false;
+        }
+        return handshaker.isConnected();
+    }
+
     @Override
     public void changePortState(DeviceId deviceId, PortNumber portNumber,
                                 boolean enable) {
@@ -387,15 +394,6 @@
                 () -> doDisconnectDevice(deviceId), deviceId));
     }
 
-    private DeviceHandshaker getHandshaker(DeviceId deviceId) {
-        return handshakers.computeIfAbsent(deviceId, id -> {
-            Driver driver = getDriver(deviceId);
-            return driver == null ? null : getBehaviour(
-                    driver, DeviceHandshaker.class,
-                    new DefaultDriverData(driver, deviceId));
-        });
-    }
-
     private Driver getDriver(DeviceId deviceId) {
         try {
             // DriverManager checks first using basic device config.
@@ -406,224 +404,209 @@
         }
     }
 
-    private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
-                                                 DriverData data) {
-        // Allows obtaining behavior implementations before the device is pushed
-        // to the core.
-        if (driver != null && driver.hasBehaviour(type)) {
-            DefaultDriverHandler handler = new DefaultDriverHandler(data);
-            return driver.createBehaviour(handler, type);
-        } else {
+    private <T extends Behaviour> T getBehaviour(DeviceId deviceId, Class<T> type) {
+        // Get handshaker.
+
+        Driver driver = getDriver(deviceId);
+        if (driver == null) {
             return null;
         }
+        if (!driver.hasBehaviour(type)) {
+            return null;
+        }
+        final DriverData data = new DefaultDriverData(driver, deviceId);
+        // Storing deviceKeyId and all other config values as data in the driver
+        // with protocol_<info> name as the key. e.g protocol_ip.
+        final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
+                deviceId, GeneralProviderDeviceConfig.class);
+        if (providerConfig != null) {
+            providerConfig.protocolsInfo().forEach((protocol, info) -> {
+                info.configValues().forEach(
+                        (k, v) -> data.set(protocol + "_" + k, v));
+                data.set(protocol + "_key", info.deviceKeyId());
+            });
+        }
+        final DefaultDriverHandler handler = new DefaultDriverHandler(data);
+        return driver.createBehaviour(handler, type);
     }
 
     private void doConnectDevice(DeviceId deviceId) {
-        // Some operations can be performed by one node only.
-        final boolean isLocalLeader = leadershipService.runForLeadership(
-                GeneralProviderDeviceConfig.class.getName() + deviceId)
-                .leader().nodeId().equals(clusterService.getLocalNode().id());
-
-        if (deviceService.getDevice(deviceId) != null
-                && deviceService.isAvailable(deviceId)) {
-            log.info("Device {} is already connected to ONOS and is available",
-                     deviceId);
-            return;
-        }
+        log.debug("Initiating connection to device {}...", deviceId);
         // Retrieve config
-        final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
-                deviceId, GeneralProviderDeviceConfig.class);
-        final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
-                deviceId, BasicDeviceConfig.class);
-        if (providerConfig == null || basicDeviceConfig == null) {
-            log.error("Configuration missing, cannot connect to {}. " +
-                              "basicDeviceConfig={}, generalProvider={}",
-                      deviceId, basicDeviceConfig, providerConfig);
+        if (configIsMissing(deviceId)) {
             return;
         }
-        log.info("Initiating connection to device {} with driver {} ... asMaster={}",
-                 deviceId, basicDeviceConfig.driver(), isLocalLeader);
-        // Get handshaker, driver and driver data.
-        final DeviceHandshaker handshaker = getHandshaker(deviceId);
+        // Bind pipeconf (if any and if device is capable).
+        if (!bindPipeconfIfRequired(deviceId)) {
+            // We already logged the error.
+            return;
+        }
+        // Get handshaker.
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
         if (handshaker == null) {
-            log.error("Missing DeviceHandshaker behavior for {}, aborting connection",
+            log.error("Missing handshaker behavior for {}, aborting connection",
                       deviceId);
             return;
         }
-        final Driver driver = handshaker.handler().driver();
-        // Enhance driver data with info in GDP config.
-        augmentConfigData(providerConfig, handshaker.data());
-        final DriverData driverData = handshaker.data();
+        // Add device agent listener.
+        handshaker.addDeviceAgentListener(id(), deviceAgentListener);
+        handshakersWithListeners.put(deviceId, handshaker);
         // Start connection via handshaker.
         final Boolean connectSuccess = getFutureWithDeadline(
                 handshaker.connect(), "initiating connection",
                 deviceId, null, opTimeoutShort);
-        if (connectSuccess == null) {
-            // Error logged by getFutureWithDeadline().
-            return;
-        } else if (!connectSuccess) {
+        if (!connectSuccess) {
             log.warn("Unable to connect to {}", deviceId);
+        }
+    }
+
+    private void triggerAdvertiseDevice(DeviceId deviceId) {
+        connectionExecutor.execute(withDeviceLock(
+                () -> doAdvertiseDevice(deviceId), deviceId));
+    }
+
+    private void doAdvertiseDevice(DeviceId deviceId) {
+        // Retrieve config
+        if (configIsMissing(deviceId)) {
             return;
         }
-        // Handle pipeconf (if device is capable)
-        if (!handlePipeconf(deviceId, driver, driverData, isLocalLeader)) {
-            // We already logged the error.
-            getFutureWithDeadline(
-                    handshaker.disconnect(), "performing disconnection",
-                    deviceId, null, opTimeoutShort);
-            return;
-        }
-        // Add device agent listener.
-        handshaker.addDeviceAgentListener(deviceAgentListener);
-        // All good. Notify core (if master).
-        if (isLocalLeader) {
-            advertiseDevice(deviceId, driver, providerConfig, driverData);
-        }
-    }
-
-
-    private void advertiseDevice(DeviceId deviceId, Driver driver,
-                                 GeneralProviderDeviceConfig providerConfig,
-                                 DriverData driverData) {
-        // Obtain device and port description and advertise device to core.
-        DeviceDescription description = null;
-        final List<PortDescription> ports;
-
-        final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
-                driver, DeviceDescriptionDiscovery.class, driverData);
-
-        if (deviceDiscovery != null) {
-            description = deviceDiscovery.discoverDeviceDetails();
-            ports = deviceDiscovery.discoverPortDetails();
-        } else {
-            log.warn("Missing DeviceDescriptionDiscovery behavior for {}, " +
-                             "no update for description or ports.", deviceId);
-            ports = new ArrayList<>();
-        }
-
-        if (description == null) {
-            // Generate one here.
-            // FIXME: a behavior impl should not return a null description
-            // (e.g. as GnmiDeviceDescriptionDiscovery). This case should apply
-            // only if a the behavior is not available.
-            description = new DefaultDeviceDescription(
-                    deviceId.uri(), Device.Type.SWITCH,
-                    driver.manufacturer(), driver.hwVersion(),
-                    driver.swVersion(), UNKNOWN,
-                    new ChassisId(), true,
-                    DefaultAnnotations.builder()
-                            .set(AnnotationKeys.PROTOCOL,
-                                 providerConfig.protocolsInfo().keySet().toString())
-                            .build());
-        }
-
-        providerService.deviceConnected(deviceId, description);
-        providerService.updatePorts(deviceId, ports);
-    }
-
-    /**
-     * Handles the case of a device that is pipeline programmable. Returns true
-     * if the operation wa successful and the device can be registered to the
-     * core, false otherwise.
-     */
-    private boolean handlePipeconf(DeviceId deviceId, Driver driver,
-                                   DriverData driverData, boolean asMaster) {
-        final PiPipelineProgrammable pipelineProg = getBehaviour(
-                driver, PiPipelineProgrammable.class, driverData);
-        if (pipelineProg == null) {
-            // Device is not pipeline programmable.
-            return true;
-        }
-
-        final PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
-        if (pipeconf == null) {
-            return false;
-        }
-        final PiPipeconfId pipeconfId = pipeconf.id();
-
-        // To be removed in change #19606
-        // final String mergedDriverName = piPipeconfService.mergeDriver(
-        //         deviceId, pipeconfId);
-        // if (mergedDriverName == null) {
-        //     log.error("Unable to get merged driver for {} and {}, aborting device discovery",
-        //               deviceId, pipeconfId);
-        //     return false;
-        // }
-
-        if (!asMaster) {
-            // From now one only the master.
-            return true;
-        }
-
-        // if (!setDriverViaCfg(deviceId, mergedDriverName)) {
-        //     return false;
-        // }
-
-        // FIXME: we just introduced a race condition as it might happen that a
-        // node does not receive the new cfg (with the merged driver) before the
-        // device is advertised to the core. Perhaps we should be waiting for a
-        // NetworkConfig event signaling that the driver has been updated on all
-        // nodes? The effect is mitigated by deploying the pipeconf (slow
-        // operation), after calling setDriverViaCfg().
-
-        piPipeconfService.bindToDevice(pipeconfId, deviceId);
-
-        final Boolean deploySuccess = getFutureWithDeadline(
-                pipelineProg.deployPipeconf(pipeconf),
-                "deploying pipeconf", deviceId, null,
-                opTimeoutLong);
-        if (deploySuccess == null) {
-            // Error logged by getFutureWithDeadline().
-            return false;
-        } else if (!deploySuccess) {
-            log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
-                      pipeconfId, deviceId);
-            return false;
-        }
-
-        return true;
-    }
-
-    private boolean setDriverViaCfg(DeviceId deviceId, String driverName) {
-        BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
-        if (cfg == null) {
-            log.error("Unable to get basic device config for {}, aborting device discovery",
-                      deviceId);
-            return false;
-        }
-        ObjectNode newCfg = (ObjectNode) cfg.node();
-        newCfg = newCfg.put(DRIVER, driverName);
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
-        cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
-        return true;
-    }
-
-    private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
-        PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
-        if (pipeconfId == null || pipeconfId.id().isEmpty()) {
-            // No pipeconf has been provided in the cfg.
-            // Check if device driver provides a default one.
-            if (pipelineProg.getDefaultPipeconf().isPresent()) {
-                final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
-                log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
-                pipeconfId = defaultPipeconf.id();
-            } else {
-                log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it", deviceId);
-                return null;
+        // Obtain device and port description.
+        final boolean isPipelineReady = isPipelineReady(deviceId);
+        final DeviceDescription description = getDeviceDescription(
+                deviceId, isPipelineReady);
+        final List<PortDescription> ports = getPortDetails(deviceId);
+        // Advertise to core.
+        if (deviceService.getDevice(deviceId) == null ||
+                (description.isDefaultAvailable() &&
+                        !deviceService.isAvailable(deviceId))) {
+            if (!isPipelineReady) {
+                log.info("Advertising device to core with available={} as " +
+                                 "device pipeline is not ready yet",
+                         description.isDefaultAvailable());
             }
+            providerService.deviceConnected(deviceId, description);
         }
-        // Check if registered
-        if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
-            log.warn("Pipeconf {} is not registered", pipeconfId);
+        providerService.updatePorts(deviceId, ports);
+        // If pipeline is not ready, encourage watchdog to perform probe ASAP.
+        if (!isPipelineReady) {
+            pipeconfWatchdogService.triggerProbe(deviceId);
+        }
+    }
+
+    private DeviceDescription getDeviceDescription(
+            DeviceId deviceId, boolean defaultAvailable) {
+        // Get one from driver or forge.
+        final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
+                deviceId, DeviceDescriptionDiscovery.class);
+        if (deviceDiscovery != null) {
+            // Enforce defaultAvailable flag over the one obtained from driver.
+            final DeviceDescription d = deviceDiscovery.discoverDeviceDetails();
+            return new DefaultDeviceDescription(d, defaultAvailable, d.annotations());
+        } else {
+            return forgeDeviceDescription(deviceId, defaultAvailable);
+        }
+    }
+
+    private List<PortDescription> getPortDetails(DeviceId deviceId) {
+        final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
+                deviceId, DeviceDescriptionDiscovery.class);
+        if (deviceDiscovery != null) {
+            return deviceDiscovery.discoverPortDetails();
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    private DeviceDescription forgeDeviceDescription(
+            DeviceId deviceId, boolean defaultAvailable) {
+        // Uses handshaker and provider config to get driver data.
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
+        final Driver driver = handshaker != null
+                ? handshaker.handler().driver() : null;
+        final GeneralProviderDeviceConfig cfg = cfgService.getConfig(
+                deviceId, GeneralProviderDeviceConfig.class);
+        final DefaultAnnotations.Builder annBuilder = DefaultAnnotations.builder();
+        // If device is pipeline programmable, let this provider decide when the
+        // device can be marked online.
+        annBuilder.set(AnnotationKeys.PROVIDER_MARK_ONLINE,
+                       String.valueOf(isPipelineProgrammable(deviceId)));
+        if (cfg != null) {
+            StringJoiner protoStringBuilder = new StringJoiner(", ");
+            cfg.protocolsInfo().keySet().forEach(protoStringBuilder::add);
+            annBuilder.set(AnnotationKeys.PROTOCOL, protoStringBuilder.toString());
+        }
+        return new DefaultDeviceDescription(
+                deviceId.uri(),
+                Device.Type.SWITCH,
+                driver != null ? driver.manufacturer() : UNKNOWN,
+                driver != null ? driver.hwVersion() : UNKNOWN,
+                driver != null ? driver.swVersion() : UNKNOWN,
+                UNKNOWN,
+                new ChassisId(),
+                defaultAvailable,
+                annBuilder.build());
+    }
+
+    private void triggerMarkAvailable(DeviceId deviceId) {
+        connectionExecutor.execute(withDeviceLock(
+                () -> doMarkAvailable(deviceId), deviceId));
+    }
+
+    private void doMarkAvailable(DeviceId deviceId) {
+        if (deviceService.isAvailable(deviceId)) {
+            return;
+        }
+        final DeviceDescription descr = getDeviceDescription(deviceId, true);
+        // It has been observed that devices that were marked offline (e.g.
+        // after device disconnection) might end up with no master. Here we
+        // trigger a new master election (if device has no master).
+        mastershipService.requestRoleForSync(deviceId);
+        providerService.deviceConnected(deviceId, descr);
+    }
+
+    private boolean bindPipeconfIfRequired(DeviceId deviceId) {
+        if (pipeconfService.ofDevice(deviceId).isPresent()
+                || !isPipelineProgrammable(deviceId)) {
+            // Nothing to do, all good.
+            return true;
+        }
+        // Get pipeconf from netcfg or driver (default one).
+        final PiPipelineProgrammable pipelineProg = getBehaviour(
+                deviceId, PiPipelineProgrammable.class);
+        final PiPipeconfId pipeconfId = getPipeconfId(deviceId, pipelineProg);
+        if (pipeconfId == null) {
+            return false;
+        }
+        // Store binding in pipeconf service.
+        pipeconfService.bindToDevice(pipeconfId, deviceId);
+        return true;
+    }
+
+    private PiPipeconfId getPipeconfId(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
+        // Places to look for a pipeconf ID (in priority order)):
+        // 1) netcfg
+        // 2) device driver (default one)
+        final PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
+        if (pipeconfId != null && !pipeconfId.id().isEmpty()) {
+            return pipeconfId;
+        }
+        if (pipelineProg != null
+                && pipelineProg.getDefaultPipeconf().isPresent()) {
+            final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
+            log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
+            return defaultPipeconf.id();
+        } else {
+            log.warn("Unable to associate a pipeconf to {}", deviceId);
             return null;
         }
-        return piPipeconfService.getPipeconf(pipeconfId).get();
     }
 
     private void doDisconnectDevice(DeviceId deviceId) {
         log.debug("Initiating disconnection from {}...", deviceId);
-        final DeviceHandshaker handshaker = handshakers.remove(deviceId);
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
         final boolean isAvailable = deviceService.isAvailable(deviceId);
         // Signal disconnection to core (if master).
         if (isAvailable && mastershipService.isLocalMaster(deviceId)) {
@@ -642,7 +625,8 @@
             }
             return;
         }
-        handshaker.removeDeviceAgentListener(deviceAgentListener);
+        handshaker.removeDeviceAgentListener(id());
+        handshakersWithListeners.remove(deviceId);
         final boolean disconnectSuccess = getFutureWithDeadline(
                 handshaker.disconnect(), "performing disconnection",
                 deviceId, false, opTimeoutShort);
@@ -704,6 +688,17 @@
                 () -> doConnectDevice(deviceId), deviceId));
     }
 
+    private boolean isPipelineProgrammable(DeviceId deviceId) {
+        final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
+                deviceId, GeneralProviderDeviceConfig.class);
+        if (providerConfig == null) {
+            return false;
+        }
+        return !Collections.disjoint(
+                ImmutableSet.copyOf(providerConfig.node().fieldNames()),
+                PIPELINE_CONFIGURABLE_PROTOCOLS);
+    }
+
     /**
      * Listener for configuration events.
      */
@@ -790,16 +785,12 @@
         }
     }
 
-    private void augmentConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
-        //Storing deviceKeyId and all other config values
-        // as data in the driver with protocol_<info>
-        // name as the key. e.g protocol_ip
-        providerConfig.protocolsInfo()
-                .forEach((protocol, deviceInfoConfig) -> {
-                    deviceInfoConfig.configValues()
-                            .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
-                    driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
-                });
+    private boolean isPipelineReady(DeviceId deviceId) {
+        final boolean isPipelineProg = isPipelineProgrammable(deviceId);
+        final boolean isPipeconfReady = pipeconfWatchdogService
+                .getStatus(deviceId)
+                .equals(PiPipeconfWatchdogService.PipelineStatus.READY);
+        return !isPipelineProg || isPipeconfReady;
     }
 
     private void cleanUpConfigInfo(DeviceId deviceId) {
@@ -838,8 +829,7 @@
     private void triggerProbeAllDevices() {
         // Async trigger a task for all devices in the cfg.
         cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
-                .forEach(deviceId -> connectionExecutor.execute(withDeviceLock(
-                        () -> doDeviceProbe(deviceId), deviceId)));
+                .forEach(this::triggerDeviceProbe);
     }
 
     private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
@@ -851,53 +841,45 @@
         return config.piPipeconfId();
     }
 
+    private void triggerDeviceProbe(DeviceId deviceId) {
+        connectionExecutor.execute(withDeviceLock(
+                () -> doDeviceProbe(deviceId), deviceId));
+    }
+
     private void doDeviceProbe(DeviceId deviceId) {
-        if (!configIsPresent(deviceId)) {
+        log.debug("Probing device {}...", deviceId);
+        if (configIsMissing(deviceId)) {
             return;
         }
-        final boolean isAvailable = deviceService.getDevice(deviceId) != null
-                && deviceService.isAvailable(deviceId);
-        final boolean isLocalMaster = mastershipService.isLocalMaster(deviceId);
-        if (isAvailable) {
-            if (!isLocalMaster) {
-                return;
+        if (!isConnected(deviceId)) {
+            if (deviceService.isAvailable(deviceId)) {
+                providerService.deviceDisconnected(deviceId);
             }
-            if (!isReachable(deviceId)) {
-                log.info("Disconnecting available but unreachable device {}...",
-                         deviceId);
-                triggerDisconnect(deviceId);
-            }
-        } else {
-            // We do not check for reachability using isReachable()
-            // since the behaviour of this method can vary depending on protocol
-            // nuances. We leave this check to the device handshaker at later
-            // stages of the connection process.
             triggerConnect(deviceId);
         }
     }
 
-    private boolean configIsPresent(DeviceId deviceId) {
+    private boolean configIsMissing(DeviceId deviceId) {
         final boolean present =
                 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
                         && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
         if (!present) {
             log.warn("Configuration for device {} is not complete", deviceId);
         }
-        return present;
-    }
-
-    private void handleChannelClosed(DeviceId deviceId) {
-        log.info("Disconnecting device {}, due to channel closed event",
-                 deviceId);
-        triggerDisconnect(deviceId);
+        return !present;
     }
 
     private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
-        //Notify core about response.
-        if (!requestedRoles.containsKey(deviceId)) {
+        // Notify core about mastership response.
+        final MastershipRole request = requestedRoles.get(deviceId);
+        final boolean isAvailable = deviceService.isAvailable(deviceId);
+        if (request == null || !isAvailable) {
             return;
         }
-        providerService.receivedRoleReply(deviceId, requestedRoles.get(deviceId), response);
+        log.debug("Device {} asserted role {} (requested was {})",
+                  deviceId, response, request);
+        providerService.receivedRoleReply(deviceId, request, response);
+        // FIXME: this should be based on assigned mastership, not what returned by device
         if (response.equals(MastershipRole.MASTER)) {
             startStatsPolling(deviceId, false);
         } else {
@@ -951,16 +933,11 @@
             DeviceId deviceId = event.subject();
             switch (event.type()) {
                 case CHANNEL_OPEN:
-                    // Ignore.
+                    triggerAdvertiseDevice(deviceId);
                     break;
                 case CHANNEL_CLOSED:
-                    handleChannelClosed(deviceId);
-                    break;
                 case CHANNEL_ERROR:
-                    // TODO evaluate other reaction to channel error.
-                    log.warn("Received CHANNEL_ERROR from {}. Is the channel still open?",
-                             deviceId);
-                    handleChannelClosed(deviceId);
+                    triggerDeviceProbe(deviceId);
                     break;
                 case ROLE_MASTER:
                     handleMastershipResponse(deviceId, MastershipRole.MASTER);
@@ -977,4 +954,30 @@
         }
 
     }
+
+    private class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
+        @Override
+        public void event(PiPipeconfWatchdogEvent event) {
+            triggerMarkAvailable(event.subject());
+        }
+
+        @Override
+        public boolean isRelevant(PiPipeconfWatchdogEvent event) {
+            return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_READY);
+        }
+    }
+
+    private class InternalConfigFactory
+            extends ConfigFactory<DeviceId, GeneralProviderDeviceConfig> {
+
+        InternalConfigFactory() {
+            super(SubjectFactories.DEVICE_SUBJECT_FACTORY,
+                  GeneralProviderDeviceConfig.class, CFG_SCHEME);
+        }
+
+        @Override
+        public GeneralProviderDeviceConfig createConfig() {
+            return new GeneralProviderDeviceConfig();
+        }
+    }
 }