Refactored GDP and PipeconfManager to fix multi-node tests

Issue was caused by race condition in GDP between the first connection
task, and the periodic one (checking reachability of devices in the cfg).
The issue is fixed by serializing such tasks for the same device.

Moreover, this patch brings better error reporting and handling of
completable futures.

Change-Id: I8c3a685c368541d33395945159b45a5740a5a0c3
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 55a4a11..ba16601 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -30,11 +30,8 @@
 import org.onlab.util.ItemNotFoundException;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.config.ConfigFactory;
-import org.onosproject.net.config.NetworkConfigEvent;
-import org.onosproject.net.config.NetworkConfigListener;
 import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.config.basics.BasicDeviceConfig;
 import org.onosproject.net.config.basics.SubjectFactories;
@@ -57,6 +54,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -78,11 +76,14 @@
     private static final String DRIVER = "driver";
     private static final String CFG_SCHEME = "piPipeconf";
 
+    private static final String DRIVER_MERGE_TOPIC =
+            PiPipeconfManager.class.getSimpleName() + "-driver-merge-";
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetworkConfigRegistry cfgService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LeadershipService leadershipService;
+    private LeadershipService leadershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DriverService driverService;
@@ -91,19 +92,20 @@
     protected DriverAdminService driverAdminService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected PiPipeconfMappingStore pipeconfMappingStore;
+    private PiPipeconfMappingStore pipeconfMappingStore;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
-    // Registered pipeconf are replicated through the app subsystem and registered on app activated events.
-    protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>();
+    // Registered pipeconf are replicated through the app subsystem and
+    // registered on app activated events. Hence, there should be no need of
+    // distributing this map.
+    protected ConcurrentMap<PiPipeconfId, PiPipeconf> pipeconfs = new ConcurrentHashMap<>();
 
-    protected ExecutorService executor =
-            Executors.newFixedThreadPool(5, groupedThreads("onos/pipipeconfservice",
-                    "pipeline-to-device-%d", log));
+    protected ExecutorService executor = Executors.newFixedThreadPool(
+            10, groupedThreads("onos/pipeconf-manager", "%d", log));
 
-    protected final ConfigFactory factory =
+    protected final ConfigFactory configFactory =
             new ConfigFactory<DeviceId, PiPipeconfConfig>(
                     SubjectFactories.DEVICE_SUBJECT_FACTORY,
                     PiPipeconfConfig.class, CFG_SCHEME) {
@@ -113,14 +115,9 @@
                 }
             };
 
-    protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
-
     @Activate
     public void activate() {
-        cfgService.registerConfigFactory(factory);
-        cfgService.addListener(cfgListener);
-        cfgService.getSubjects(DeviceId.class, PiPipeconfConfig.class)
-                .forEach(this::addPipeconfFromCfg);
+        cfgService.registerConfigFactory(configFactory);
         log.info("Started");
     }
 
@@ -128,9 +125,8 @@
     @Deactivate
     public void deactivate() {
         executor.shutdown();
-        cfgService.removeListener(cfgListener);
-        cfgService.unregisterConfigFactory(factory);
-        piPipeconfs.clear();
+        cfgService.unregisterConfigFactory(configFactory);
+        pipeconfs.clear();
         cfgService = null;
         driverAdminService = null;
         driverService = null;
@@ -139,101 +135,40 @@
 
     @Override
     public void register(PiPipeconf pipeconf) throws IllegalStateException {
-        if (piPipeconfs.containsKey(pipeconf.id())) {
+        if (pipeconfs.containsKey(pipeconf.id())) {
             throw new IllegalStateException(format("Pipeconf %s is already registered", pipeconf.id()));
         }
-        piPipeconfs.put(pipeconf.id(), pipeconf);
+        pipeconfs.put(pipeconf.id(), pipeconf);
         log.info("New pipeconf registered: {}", pipeconf.id());
     }
 
     @Override
     public void remove(PiPipeconfId pipeconfId) throws IllegalStateException {
-        //TODO add mechanism to remove from device.
-        if (!piPipeconfs.containsKey(pipeconfId)) {
+        // TODO add mechanism to remove from device.
+        if (!pipeconfs.containsKey(pipeconfId)) {
             throw new IllegalStateException(format("Pipeconf %s is not registered", pipeconfId));
         }
         // TODO remove the binding from the distributed Store when the lifecycle of a pipeconf is defined.
         // pipeconfMappingStore.removeBindings(pipeconfId);
-        piPipeconfs.remove(pipeconfId);
+        log.info("Removing pipeconf {}", pipeconfId);
+        pipeconfs.remove(pipeconfId);
     }
 
     @Override
     public Iterable<PiPipeconf> getPipeconfs() {
-        return piPipeconfs.values();
+        return pipeconfs.values();
     }
 
     @Override
     public Optional<PiPipeconf> getPipeconf(PiPipeconfId id) {
-        return Optional.ofNullable(piPipeconfs.get(id));
+        return Optional.ofNullable(pipeconfs.get(id));
     }
 
     @Override
-    public CompletableFuture<Boolean> bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId) {
-        CompletableFuture<Boolean> operationResult = new CompletableFuture<>();
-
-        executor.execute(() -> {
-            BasicDeviceConfig basicDeviceConfig =
-                    cfgService.getConfig(deviceId, BasicDeviceConfig.class);
-            Driver baseDriver = driverService.getDriver(basicDeviceConfig.driver());
-
-            String completeDriverName = baseDriver.name() + ":" + pipeconfId;
-            PiPipeconf piPipeconf = piPipeconfs.get(pipeconfId);
-            if (piPipeconf == null) {
-                log.warn("Pipeconf {} is not present", pipeconfId);
-                operationResult.complete(false);
-            } else {
-                //if driver exists already we don't create a new one.
-                //needs to be done via exception catching due to DriverRegistry throwing it on a null return from
-                //the driver map.
-                try {
-                    driverService.getDriver(completeDriverName);
-                } catch (ItemNotFoundException e) {
-
-                    log.debug("First time pipeconf {} is used with base driver {}, merging the two",
-                            pipeconfId, baseDriver);
-                    //extract the behaviours from the pipipeconf.
-                    Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours = new HashMap<>();
-                    piPipeconf.behaviours().forEach(b -> {
-                        behaviours.put(b, piPipeconf.implementation(b).get());
-                    });
-
-                    Driver piPipeconfDriver = new DefaultDriver(completeDriverName, baseDriver.parents(),
-                            baseDriver.manufacturer(), baseDriver.hwVersion(),
-                            baseDriver.swVersion(), behaviours, new HashMap<>());
-                    //we take the base driver created with the behaviours of the PiPeconf and
-                    // merge it with the base driver that was assigned to the device
-                    Driver completeDriver = piPipeconfDriver.merge(baseDriver);
-
-                    //This might lead to explosion of number of providers in the core,
-                    // due to 1:1:1 pipeconf:driver:provider maybe find better way
-                    DriverProvider provider = new PiPipeconfDriverProviderInternal(completeDriver);
-
-                    //we register to the driver susbystem the driver provider containing the merged driver
-                    driverAdminService.registerProvider(provider);
-                }
-
-                // Changing the configuration for the device to enforce the full driver with pipipeconf
-                // and base behaviours, updating binding only first time something changes
-                NodeId leaderNodeId = leadershipService.getLeader("deploy-" +
-                        deviceId.toString() + "-pipeconf");
-                NodeId localNodeId = clusterService.getLocalNode().id();
-
-                if (!basicDeviceConfig.driver().equals(completeDriverName) && localNodeId.equals(leaderNodeId)) {
-                    ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
-                    newCfg = newCfg.put(DRIVER, completeDriverName);
-                    ObjectMapper mapper = new ObjectMapper();
-                    JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
-                    log.debug("New driver {} for device {}", completeDriverName, deviceId);
-                    cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
-                    // Completable future is needed for when this method will also apply the pipeline to the device.
-                    // FIXME (maybe): the pipeline is currently applied by the general device provider.
-                    // But we store here the association between device and pipeconf.
-                    pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
-                }
-                operationResult.complete(true);
-            }
-        });
-        return operationResult;
+    public CompletableFuture<Boolean> bindToDevice(
+            PiPipeconfId pipeconfId, DeviceId deviceId) {
+        return CompletableFuture.supplyAsync(() -> doMergeDriver(
+                deviceId, pipeconfId), executor);
     }
 
     @Override
@@ -241,6 +176,83 @@
         return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
     }
 
+    private boolean doMergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
+        // Perform the following operations:
+        // 1) ALL nodes: create and register new merged driver (pipeconf + base driver)
+        // 2) ONE node (leader): updated netcfg with new driver
+        log.warn("Starting device driver merge of {} with {}...", deviceId, pipeconfId);
+        final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
+                deviceId, BasicDeviceConfig.class);
+        final Driver baseDriver = driverService.getDriver(
+                basicDeviceConfig.driver());
+        final String newDriverName = baseDriver.name() + ":" + pipeconfId;
+        if (baseDriver.name().equals(newDriverName)) {
+            log.warn("Requested to merge {} driver with {} for {}, "
+                             + "but current device driver is already merged",
+                     baseDriver.name(), pipeconfId, deviceId);
+            return true;
+        }
+        final PiPipeconf pipeconf = pipeconfs.get(pipeconfId);
+        if (pipeconf == null) {
+            log.error("Pipeconf {} is not registered", pipeconfId);
+            return false;
+        }
+        // 1) if merged driver exists already we don't create a new one.
+        try {
+            driverService.getDriver(newDriverName);
+            log.info("Found existing merged driver {}, re-using that", newDriverName);
+        } catch (ItemNotFoundException e) {
+            log.info("Creating merged driver {}...", newDriverName);
+            createMergedDriver(pipeconf, baseDriver, newDriverName);
+        }
+        // 2) Updating device cfg to enforce the merged driver (one node only)
+        final boolean isLeader = leadershipService
+                .runForLeadership(DRIVER_MERGE_TOPIC + deviceId.toString())
+                .leaderNodeId()
+                .equals(clusterService.getLocalNode().id());
+        if (isLeader) {
+            // FIXME: this binding should be updated by the same entity
+            // deploying the pipeconf.
+            pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
+            if (!basicDeviceConfig.driver().equals(newDriverName)) {
+                log.info("Applying new driver {} for device {} via cfg...",
+                         newDriverName, deviceId);
+                setDriverViaCfg(deviceId, newDriverName, basicDeviceConfig);
+            }
+        }
+        return true;
+    }
+
+    private void createMergedDriver(PiPipeconf pipeconf, Driver baseDriver,
+                                    String newDriverName) {
+        // extract the behaviours from the pipipeconf.
+        final Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours =
+                new HashMap<>();
+        pipeconf.behaviours().forEach(
+                b -> behaviours.put(b, pipeconf.implementation(b).get()));
+        final Driver piPipeconfDriver = new DefaultDriver(
+                newDriverName, baseDriver.parents(),
+                baseDriver.manufacturer(), baseDriver.hwVersion(),
+                baseDriver.swVersion(), behaviours, new HashMap<>());
+        // take the base driver created with the behaviours of the PiPeconf and
+        // merge it with the base driver that was assigned to the device
+        final Driver completeDriver = piPipeconfDriver.merge(baseDriver);
+        // This might lead to explosion of number of providers in the core,
+        // due to 1:1:1 pipeconf:driver:provider maybe find better way
+        final DriverProvider provider = new PiPipeconfDriverProviderInternal(
+                completeDriver);
+        // register the merged driver
+        driverAdminService.registerProvider(provider);
+    }
+
+    private void setDriverViaCfg(DeviceId deviceId, String driverName,
+                                 BasicDeviceConfig basicDeviceConfig) {
+        ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
+        newCfg = newCfg.put(DRIVER, driverName);
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
+        cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
+    }
 
     private class PiPipeconfDriverProviderInternal implements DriverProvider {
 
@@ -255,35 +267,4 @@
             return ImmutableSet.of(driver);
         }
     }
-
-    private void addPipeconfFromCfg(DeviceId deviceId) {
-        PiPipeconfConfig pipeconfConfig =
-                cfgService.getConfig(deviceId, PiPipeconfConfig.class);
-        PiPipeconfId id = pipeconfConfig.piPipeconfId();
-        if (id.id().equals("")) {
-            log.debug("Ignoring empty pipeconf ID for device {}", deviceId);
-        } else {
-            pipeconfMappingStore.createOrUpdateBinding(deviceId, id);
-        }
-    }
-
-    /**
-     * Listener for configuration events.
-     */
-    private class InternalNetworkConfigListener implements NetworkConfigListener {
-
-
-        @Override
-        public void event(NetworkConfigEvent event) {
-            DeviceId deviceId = (DeviceId) event.subject();
-            addPipeconfFromCfg(deviceId);
-        }
-
-        @Override
-        public boolean isRelevant(NetworkConfigEvent event) {
-            return event.configClass().equals(PiPipeconfConfig.class) &&
-                    (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
-                            event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
-        }
-    }
 }