Build missing pipeconf-merged driver at startup
Change-Id: Ia5600de362978be1f551c7581d138e35b9736108
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 5e07333..5be4a26 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -17,8 +17,9 @@
package org.onosproject.net.pi.impl;
import com.google.common.annotations.Beta;
-import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Striped;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -36,6 +37,8 @@
import org.onosproject.net.driver.DefaultDriver;
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverAdminService;
+import org.onosproject.net.driver.DriverEvent;
+import org.onosproject.net.driver.DriverListener;
import org.onosproject.net.driver.DriverProvider;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.pi.model.PiPipeconf;
@@ -47,12 +50,14 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
import static java.lang.String.format;
import static org.onlab.util.Tools.groupedThreads;
@@ -70,7 +75,6 @@
private final Logger log = getLogger(getClass());
private static final String MERGED_DRIVER_SEPARATOR = ":";
- private static final String DRIVER = "driver";
private static final String CFG_SCHEME = "piPipeconf";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -93,6 +97,10 @@
// distributing this map.
protected ConcurrentMap<PiPipeconfId, PiPipeconf> pipeconfs = new ConcurrentHashMap<>();
+ private final DriverListener driverListener = new InternalDriverListener();
+ private final Set<String> missingMergedDrivers = Sets.newCopyOnWriteArraySet();
+ private final Striped<Lock> locks = Striped.lock(20);
+
protected ExecutorService executor = Executors.newFixedThreadPool(
10, groupedThreads("onos/pipeconf-manager", "%d", log));
@@ -109,6 +117,8 @@
@Activate
public void activate() {
cfgService.registerConfigFactory(configFactory);
+ driverService.addListener(driverListener);
+ checkMissingMergedDrivers();
log.info("Started");
}
@@ -117,7 +127,9 @@
public void deactivate() {
executor.shutdown();
cfgService.unregisterConfigFactory(configFactory);
+ driverService.removeListener(driverListener);
pipeconfs.clear();
+ missingMergedDrivers.clear();
cfgService = null;
driverAdminService = null;
driverService = null;
@@ -131,6 +143,7 @@
}
pipeconfs.put(pipeconf.id(), pipeconf);
log.info("New pipeconf registered: {}", pipeconf.id());
+ executor.execute(() -> mergeAll(pipeconf.id()));
}
@Override
@@ -162,15 +175,6 @@
@Override
public String mergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
- return doMergeDriver(deviceId, pipeconfId);
- }
-
- @Override
- public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) {
- return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
- }
-
- private String doMergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
log.debug("Starting device driver merge of {} with {}...", deviceId, pipeconfId);
final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
deviceId, BasicDeviceConfig.class);
@@ -180,31 +184,48 @@
return null;
}
String baseDriverName = basicDeviceConfig.driver();
- if (baseDriverName.endsWith(mergedDriverSuffix(pipeconfId))) {
+ if (isMergedDriverName(baseDriverName)) {
// The config already has driver name that is a merged one. We still
// need to make sure an instance of that merged driver is present in
// this node.
- log.debug("Base driver of {} ({}) has been already merged with {}",
- deviceId, baseDriverName, pipeconfId);
+ log.debug("Base driver of {} ({}) is a merged one",
+ deviceId, baseDriverName);
baseDriverName = getBaseDriverNameFromMerged(baseDriverName);
}
+ return doMergeDriver(baseDriverName, pipeconfId);
+ }
+
+ @Override
+ public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) {
+ return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
+ }
+
+ private String doMergeDriver(String baseDriverName, PiPipeconfId pipeconfId) {
final String newDriverName = mergedDriverName(baseDriverName, pipeconfId);
- // If merged driver exists already we don't create a new one.
+ // Serialize per newDriverName, avoid creating duplicates.
+ locks.get(newDriverName).lock();
try {
- driverService.getDriver(newDriverName);
- return newDriverName;
- } catch (ItemNotFoundException e) {
+ // If merged driver exists already we don't create a new one.
+ if (getDriver(newDriverName) != null) {
+ return newDriverName;
+ }
log.info("Creating merged driver {}...", newDriverName);
+ final Driver mergedDriver = buildMergedDriver(
+ pipeconfId, baseDriverName, newDriverName);
+ if (mergedDriver == null) {
+ // Error logged by buildMergedDriver
+ return null;
+ }
+ registerMergedDriver(mergedDriver);
+ if (missingMergedDrivers.remove(newDriverName)) {
+ log.info("There are still {} missing merged drivers",
+ missingMergedDrivers.size());
+ }
+ return newDriverName;
+ } finally {
+ locks.get(newDriverName).unlock();
}
- final Driver mergedDriver = buildMergedDriver(
- pipeconfId, baseDriverName, newDriverName);
- if (mergedDriver == null) {
- // Error logged by buildMergedDriver
- return null;
- }
- registerMergedDriver(mergedDriver);
- return newDriverName;
}
private String mergedDriverSuffix(PiPipeconfId pipeconfId) {
@@ -218,19 +239,28 @@
private String getBaseDriverNameFromMerged(String mergedDriverName) {
final String[] pieces = mergedDriverName.split(MERGED_DRIVER_SEPARATOR);
if (pieces.length != 2) {
- log.error("Unrecognized merged driver name format '{}', cannot " +
- "extract base driver name", mergedDriverName);
return null;
}
return pieces[0];
}
+ private PiPipeconfId getPipeconfIdFromMerged(String mergedDriverName) {
+ final String[] pieces = mergedDriverName.split(MERGED_DRIVER_SEPARATOR);
+ if (pieces.length != 2) {
+ return null;
+ }
+ return new PiPipeconfId(pieces[1]);
+ }
+
+ private boolean isMergedDriverName(String driverName) {
+ final String[] pieces = driverName.split(MERGED_DRIVER_SEPARATOR);
+ return pieces.length == 2;
+ }
+
private Driver buildMergedDriver(PiPipeconfId pipeconfId, String baseDriverName,
String newDriverName) {
- final Driver baseDriver;
- try {
- baseDriver = driverService.getDriver(baseDriverName);
- } catch (ItemNotFoundException e) {
+ final Driver baseDriver = getDriver(baseDriverName);
+ if (baseDriver == null) {
log.error("Base driver {} not found, cannot build a merged one",
baseDriverName);
return null;
@@ -266,6 +296,80 @@
driverAdminService.registerProvider(provider);
}
+ private Driver getDriver(String name) {
+ try {
+ return driverService.getDriver(name);
+ } catch (ItemNotFoundException e) {
+ return null;
+ }
+ }
+
+ private void checkMissingMergedDrivers() {
+ cfgService.getSubjects(DeviceId.class, BasicDeviceConfig.class).stream()
+ .map(d -> cfgService.getConfig(d, BasicDeviceConfig.class))
+ .map(BasicDeviceConfig::driver)
+ .filter(Objects::nonNull)
+ .filter(d -> getDriver(d) == null)
+ .forEach(driverName -> {
+ final String baseDriverName = getBaseDriverNameFromMerged(driverName);
+ final PiPipeconfId pipeconfId = getPipeconfIdFromMerged(driverName);
+ if (baseDriverName == null || pipeconfId == null) {
+ // Not a merged driver.
+ return;
+ }
+ log.info("Detected missing merged driver: {}", driverName);
+ missingMergedDrivers.add(driverName);
+ // Attempt building the driver now if all pieces are present.
+ // If not, either a driver or pipeconf event will re-trigger
+ // the merge process.
+ if (getDriver(baseDriverName) != null
+ && pipeconfs.containsKey(pipeconfId)) {
+ mergedDriverName(baseDriverName, pipeconfId);
+ }
+ });
+ }
+
+ private void mergeAll(String baseDriverName) {
+ missingMergedDrivers.stream()
+ .filter(driverName -> {
+ final String xx = getBaseDriverNameFromMerged(driverName);
+ return xx != null && xx.equals(baseDriverName);
+ })
+ .forEach(driverName -> {
+ final PiPipeconfId pipeconfId = getPipeconfIdFromMerged(driverName);
+ if (pipeconfs.containsKey(pipeconfId)) {
+ doMergeDriver(baseDriverName, pipeconfId);
+ }
+ });
+ }
+
+ private void mergeAll(PiPipeconfId pipeconfId) {
+ missingMergedDrivers.stream()
+ .filter(driverName -> {
+ final PiPipeconfId xx = getPipeconfIdFromMerged(driverName);
+ return xx != null && xx.equals(pipeconfId);
+ })
+ .forEach(driverName -> {
+ final String baseDriverName = getBaseDriverNameFromMerged(driverName);
+ if (getDriver(baseDriverName) != null) {
+ doMergeDriver(baseDriverName, pipeconfId);
+ }
+ });
+ }
+
+ private class InternalDriverListener implements DriverListener {
+
+ @Override
+ public void event(DriverEvent event) {
+ executor.execute(() -> mergeAll(event.subject().name()));
+ }
+
+ @Override
+ public boolean isRelevant(DriverEvent event) {
+ return event.type() == DriverEvent.Type.DRIVER_ENHANCED;
+ }
+ }
+
/**
* Internal driver provider used to register merged pipeconf drivers in the
* core.
@@ -292,7 +396,7 @@
return false;
}
InternalDriverProvider that = (InternalDriverProvider) o;
- return Objects.equal(driver.name(), that.driver.name());
+ return Objects.equals(driver.name(), that.driver.name());
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
index 7086321..64c23d8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
@@ -39,6 +39,7 @@
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
+import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -95,7 +96,7 @@
@Override
public Set<DeviceId> getDevices(PiPipeconfId pipeconfId) {
- return pipeconfToDevices.get(pipeconfId);
+ return pipeconfToDevices.getOrDefault(pipeconfId, Collections.emptySet());
}
@Override