[ONOS-6556] Distributed Implementation of PiPipeconfService
Change-Id: I7196ce6eee333e732c0cd5015d4d8d32ee069e27
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 9072f3f..a0bf93e 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -44,6 +44,7 @@
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.net.pi.runtime.PiPipeconfConfig;
+import org.onosproject.net.pi.runtime.PiPipeconfMappingStore;
import org.onosproject.net.pi.runtime.PiPipeconfService;
import org.slf4j.Logger;
@@ -83,10 +84,11 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverAdminService driverAdminService;
- //TODO move to replicated map
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PiPipeconfMappingStore pipeconfMappingStore;
+
+ // Registered pipeconf are replicated through the app subsystem and registered on app activated events.
protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>();
- //TODO move to replicated map
- protected ConcurrentHashMap<DeviceId, PiPipeconfId> devicesToPipeconf = new ConcurrentHashMap<>();
protected ExecutorService executor =
Executors.newFixedThreadPool(5, groupedThreads("onos/pipipeconfservice",
@@ -120,7 +122,6 @@
cfgService.removeListener(cfgListener);
cfgService.unregisterConfigFactory(factory);
piPipeconfs.clear();
- devicesToPipeconf.clear();
cfgService = null;
driverAdminService = null;
driverService = null;
@@ -139,11 +140,12 @@
@Override
public void remove(PiPipeconfId pipeconfId) throws IllegalStateException {
- //TODO move to the distributed mechanism
//TODO add mechanism to remove from device.
if (!piPipeconfs.containsKey(pipeconfId)) {
throw new IllegalStateException(format("Pipeconf %s is not registered", pipeconfId));
}
+ // TODO remove the binding from the distributed Store when the lifecycle of a pipeconf is defined.
+ // pipeconfMappingStore.removeBindings(pipeconfId);
piPipeconfs.remove(pipeconfId);
}
@@ -212,7 +214,7 @@
// Completable future is needed for when this method will also apply the pipeline to the device.
// FIXME (maybe): the pipeline is currently applied by the general device provider. But we store here
// the association between device and pipeconf.
- devicesToPipeconf.put(deviceId, pipeconfId);
+ pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
operationResult.complete(true);
}
});
@@ -221,9 +223,10 @@
@Override
public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) {
- return Optional.ofNullable(devicesToPipeconf.get(deviceId));
+ return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
}
+
private class PiPipeconfDriverProviderInternal implements DriverProvider {
Driver driver;
@@ -245,7 +248,7 @@
if (id.id().equals("")) {
log.warn("Not adding empty pipeconfId for device {}", deviceId);
} else {
- devicesToPipeconf.put(deviceId, pipeconfConfig.piPipeconfId());
+ pipeconfMappingStore.createOrUpdateBinding(deviceId, id);
}
}