Build missing pipeconf-merged driver at startup

Change-Id: Ia5600de362978be1f551c7581d138e35b9736108
(cherry picked from commit 11296f34f16041f3ae32c735b7218cb4ca98637a)
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 5e07333..5be4a26 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -17,8 +17,9 @@
 package org.onosproject.net.pi.impl;
 
 import com.google.common.annotations.Beta;
-import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Striped;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -36,6 +37,8 @@
 import org.onosproject.net.driver.DefaultDriver;
 import org.onosproject.net.driver.Driver;
 import org.onosproject.net.driver.DriverAdminService;
+import org.onosproject.net.driver.DriverEvent;
+import org.onosproject.net.driver.DriverListener;
 import org.onosproject.net.driver.DriverProvider;
 import org.onosproject.net.driver.DriverService;
 import org.onosproject.net.pi.model.PiPipeconf;
@@ -47,12 +50,14 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
 
 import static java.lang.String.format;
 import static org.onlab.util.Tools.groupedThreads;
@@ -70,7 +75,6 @@
     private final Logger log = getLogger(getClass());
 
     private static final String MERGED_DRIVER_SEPARATOR = ":";
-    private static final String DRIVER = "driver";
     private static final String CFG_SCHEME = "piPipeconf";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -93,6 +97,10 @@
     // distributing this map.
     protected ConcurrentMap<PiPipeconfId, PiPipeconf> pipeconfs = new ConcurrentHashMap<>();
 
+    private final DriverListener driverListener = new InternalDriverListener();
+    private final Set<String> missingMergedDrivers = Sets.newCopyOnWriteArraySet();
+    private final Striped<Lock> locks = Striped.lock(20);
+
     protected ExecutorService executor = Executors.newFixedThreadPool(
             10, groupedThreads("onos/pipeconf-manager", "%d", log));
 
@@ -109,6 +117,8 @@
     @Activate
     public void activate() {
         cfgService.registerConfigFactory(configFactory);
+        driverService.addListener(driverListener);
+        checkMissingMergedDrivers();
         log.info("Started");
     }
 
@@ -117,7 +127,9 @@
     public void deactivate() {
         executor.shutdown();
         cfgService.unregisterConfigFactory(configFactory);
+        driverService.removeListener(driverListener);
         pipeconfs.clear();
+        missingMergedDrivers.clear();
         cfgService = null;
         driverAdminService = null;
         driverService = null;
@@ -131,6 +143,7 @@
         }
         pipeconfs.put(pipeconf.id(), pipeconf);
         log.info("New pipeconf registered: {}", pipeconf.id());
+        executor.execute(() -> mergeAll(pipeconf.id()));
     }
 
     @Override
@@ -162,15 +175,6 @@
 
     @Override
     public String mergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
-        return doMergeDriver(deviceId, pipeconfId);
-    }
-
-    @Override
-    public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) {
-        return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
-    }
-
-    private String doMergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
         log.debug("Starting device driver merge of {} with {}...", deviceId, pipeconfId);
         final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
                 deviceId, BasicDeviceConfig.class);
@@ -180,31 +184,48 @@
             return null;
         }
         String baseDriverName = basicDeviceConfig.driver();
-        if (baseDriverName.endsWith(mergedDriverSuffix(pipeconfId))) {
+        if (isMergedDriverName(baseDriverName)) {
             // The config already has driver name that is a merged one. We still
             // need to make sure an instance of that merged driver is present in
             // this node.
-            log.debug("Base driver of {} ({}) has been already merged with {}",
-                      deviceId, baseDriverName, pipeconfId);
+            log.debug("Base driver of {} ({}) is a merged one",
+                      deviceId, baseDriverName);
             baseDriverName = getBaseDriverNameFromMerged(baseDriverName);
         }
 
+        return doMergeDriver(baseDriverName, pipeconfId);
+    }
+
+    @Override
+    public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) {
+        return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
+    }
+
+    private String doMergeDriver(String baseDriverName, PiPipeconfId pipeconfId) {
         final String newDriverName = mergedDriverName(baseDriverName, pipeconfId);
-        // If merged driver exists already we don't create a new one.
+        // Serialize per newDriverName, avoid creating duplicates.
+        locks.get(newDriverName).lock();
         try {
-            driverService.getDriver(newDriverName);
-            return newDriverName;
-        } catch (ItemNotFoundException e) {
+            // If merged driver exists already we don't create a new one.
+            if (getDriver(newDriverName) != null) {
+                return newDriverName;
+            }
             log.info("Creating merged driver {}...", newDriverName);
+            final Driver mergedDriver = buildMergedDriver(
+                    pipeconfId, baseDriverName, newDriverName);
+            if (mergedDriver == null) {
+                // Error logged by buildMergedDriver
+                return null;
+            }
+            registerMergedDriver(mergedDriver);
+            if (missingMergedDrivers.remove(newDriverName)) {
+                log.info("There are still {} missing merged drivers",
+                         missingMergedDrivers.size());
+            }
+            return newDriverName;
+        } finally {
+            locks.get(newDriverName).unlock();
         }
-        final Driver mergedDriver = buildMergedDriver(
-                pipeconfId, baseDriverName, newDriverName);
-        if (mergedDriver == null) {
-            // Error logged by buildMergedDriver
-            return null;
-        }
-        registerMergedDriver(mergedDriver);
-        return newDriverName;
     }
 
     private String mergedDriverSuffix(PiPipeconfId pipeconfId) {
@@ -218,19 +239,28 @@
     private String getBaseDriverNameFromMerged(String mergedDriverName) {
         final String[] pieces = mergedDriverName.split(MERGED_DRIVER_SEPARATOR);
         if (pieces.length != 2) {
-            log.error("Unrecognized merged driver name format '{}', cannot " +
-                              "extract base driver name", mergedDriverName);
             return null;
         }
         return pieces[0];
     }
 
+    private PiPipeconfId getPipeconfIdFromMerged(String mergedDriverName) {
+        final String[] pieces = mergedDriverName.split(MERGED_DRIVER_SEPARATOR);
+        if (pieces.length != 2) {
+            return null;
+        }
+        return new PiPipeconfId(pieces[1]);
+    }
+
+    private boolean isMergedDriverName(String driverName) {
+        final String[] pieces = driverName.split(MERGED_DRIVER_SEPARATOR);
+        return pieces.length == 2;
+    }
+
     private Driver buildMergedDriver(PiPipeconfId pipeconfId, String baseDriverName,
                                      String newDriverName) {
-        final Driver baseDriver;
-        try {
-            baseDriver = driverService.getDriver(baseDriverName);
-        } catch (ItemNotFoundException e) {
+        final Driver baseDriver = getDriver(baseDriverName);
+        if (baseDriver == null) {
             log.error("Base driver {} not found, cannot build a merged one",
                       baseDriverName);
             return null;
@@ -266,6 +296,80 @@
         driverAdminService.registerProvider(provider);
     }
 
+    private Driver getDriver(String name) {
+        try {
+            return driverService.getDriver(name);
+        } catch (ItemNotFoundException e) {
+            return null;
+        }
+    }
+
+    private void checkMissingMergedDrivers() {
+        cfgService.getSubjects(DeviceId.class, BasicDeviceConfig.class).stream()
+                .map(d -> cfgService.getConfig(d, BasicDeviceConfig.class))
+                .map(BasicDeviceConfig::driver)
+                .filter(Objects::nonNull)
+                .filter(d -> getDriver(d) == null)
+                .forEach(driverName -> {
+                    final String baseDriverName = getBaseDriverNameFromMerged(driverName);
+                    final PiPipeconfId pipeconfId = getPipeconfIdFromMerged(driverName);
+                    if (baseDriverName == null || pipeconfId == null) {
+                        // Not a merged driver.
+                        return;
+                    }
+                    log.info("Detected missing merged driver: {}", driverName);
+                    missingMergedDrivers.add(driverName);
+                    // Attempt building the driver now if all pieces are present.
+                    // If not, either a driver or pipeconf event will re-trigger
+                    // the merge process.
+                    if (getDriver(baseDriverName) != null
+                            && pipeconfs.containsKey(pipeconfId)) {
+                        mergedDriverName(baseDriverName, pipeconfId);
+                    }
+                });
+    }
+
+    private void mergeAll(String baseDriverName) {
+        missingMergedDrivers.stream()
+                .filter(driverName -> {
+                    final String xx = getBaseDriverNameFromMerged(driverName);
+                    return xx != null && xx.equals(baseDriverName);
+                })
+                .forEach(driverName -> {
+                    final PiPipeconfId pipeconfId = getPipeconfIdFromMerged(driverName);
+                    if (pipeconfs.containsKey(pipeconfId)) {
+                        doMergeDriver(baseDriverName, pipeconfId);
+                    }
+                });
+    }
+
+    private void mergeAll(PiPipeconfId pipeconfId) {
+        missingMergedDrivers.stream()
+                .filter(driverName -> {
+                    final PiPipeconfId xx = getPipeconfIdFromMerged(driverName);
+                    return xx != null && xx.equals(pipeconfId);
+                })
+                .forEach(driverName -> {
+                    final String baseDriverName = getBaseDriverNameFromMerged(driverName);
+                    if (getDriver(baseDriverName) != null) {
+                        doMergeDriver(baseDriverName, pipeconfId);
+                    }
+                });
+    }
+
+    private class InternalDriverListener implements DriverListener {
+
+        @Override
+        public void event(DriverEvent event) {
+            executor.execute(() -> mergeAll(event.subject().name()));
+        }
+
+        @Override
+        public boolean isRelevant(DriverEvent event) {
+            return event.type() == DriverEvent.Type.DRIVER_ENHANCED;
+        }
+    }
+
     /**
      * Internal driver provider used to register merged pipeconf drivers in the
      * core.
@@ -292,7 +396,7 @@
                 return false;
             }
             InternalDriverProvider that = (InternalDriverProvider) o;
-            return Objects.equal(driver.name(), that.driver.name());
+            return Objects.equals(driver.name(), that.driver.name());
         }
 
         @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
index 7086321..64c23d8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
@@ -39,6 +39,7 @@
 import org.onosproject.store.service.WallClockTimestamp;
 import org.slf4j.Logger;
 
+import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -95,7 +96,7 @@
 
     @Override
     public Set<DeviceId> getDevices(PiPipeconfId pipeconfId) {
-        return pipeconfToDevices.get(pipeconfId);
+        return pipeconfToDevices.getOrDefault(pipeconfId, Collections.emptySet());
     }
 
     @Override