[AETHER-1324] Force push pipeline config
Introduces a distributed set to "remember" the devices already configured by the cluster.
In this way, we can force push the pipeline after a new installation even though the
pipeline results to be the same.
Change-Id: I9d04b04828daf8d1e6944b5a8e07d580e978ee69
(cherry picked from commit ce191fe9983698b3988bee8e6e8c41382ab1fce3)
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
index 21df778..103d6cf 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
@@ -42,10 +42,14 @@
import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
+import org.onosproject.store.primitives.DefaultDistributedSet;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.ComponentContext;
@@ -129,11 +133,20 @@
private EventuallyConsistentMap<DeviceId, PipelineStatus> statusMap;
private Map<DeviceId, PipelineStatus> localStatusMap;
+ // Configured devices by this cluster. We use a set to keep track of all devices for which
+ // we have pushed the forwarding pipeline config at least once. This guarantees that device
+ // pipelines are wiped out/reset at least once when starting the cluster, minimizing the risk
+ // of any stale state from previous runs affecting control operations. Another effect of this
+ // approach is that the default entries mirror will get populated even though the pipeline results
+ // to be the same across different ONOS installations.
+ private static final String CONFIGURED_DEVICES = "onos-pipeconf-configured-set";
+ private DistributedSet<DeviceId> configuredDevices;
+
@Activate
public void activate() {
eventDispatcher.addSink(PiPipeconfWatchdogEvent.class, listenerRegistry);
localStatusMap = Maps.newConcurrentMap();
- // Init distributed status map.
+ // Init distributed status map and configured devices set
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(PipelineStatus.class);
@@ -142,6 +155,12 @@
.withSerializer(serializer)
.withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
statusMap.addListener(new StatusMapListener());
+ // Init the set of the configured devices
+ configuredDevices = new DefaultDistributedSet<>(storageService.<DeviceId>setBuilder()
+ .withName(CONFIGURED_DEVICES)
+ .withSerializer(Serializer.using(KryoNamespaces.API))
+ .build(),
+ DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
// Register component configurable properties.
componentConfigService.registerProperties(getClass());
// Start periodic watchdog task.
@@ -230,6 +249,7 @@
final boolean success = doSetPipeconfIfRequired(device, pipeconf);
if (success) {
signalStatusReady(device.id());
+ signalStatusConfigured(device.id());
} else {
signalStatusUnknown(device.id());
}
@@ -253,7 +273,8 @@
if (!handshaker.hasConnection()) {
return false;
}
- if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf))) {
+ if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf)) &&
+ configuredDevices.contains(device.id())) {
log.debug("Pipeconf {} already configured on {}",
pipeconf.id(), device.id());
return true;
@@ -281,6 +302,14 @@
statusMap.put(deviceId, PipelineStatus.READY);
}
+ private void signalStatusUnconfigured(DeviceId deviceId) {
+ configuredDevices.remove(deviceId);
+ }
+
+ private void signalStatusConfigured(DeviceId deviceId) {
+ configuredDevices.add(deviceId);
+ }
+
private boolean isLocalMaster(Device device) {
if (mastershipService.isLocalMaster(device.id())) {
return true;
@@ -354,6 +383,7 @@
case DEVICE_REMOVED:
case DEVICE_SUSPENDED:
signalStatusUnknown(device.id());
+ signalStatusUnconfigured(device.id());
break;
case PORT_ADDED:
case PORT_UPDATED: