[AETHER-1324] Force push pipeline config

Introduces a distributed set to "remember" the devices already configured by the cluster.
In this way, we can force push the pipeline after a new installation even though the
pipeline results to be the same.

Change-Id: I9d04b04828daf8d1e6944b5a8e07d580e978ee69
(cherry picked from commit ce191fe9983698b3988bee8e6e8c41382ab1fce3)
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
index 21df778..103d6cf 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
@@ -42,10 +42,14 @@
 import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
+import org.onosproject.store.primitives.DefaultDistributedSet;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.DistributedSet;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.WallClockTimestamp;
 import org.osgi.service.component.ComponentContext;
@@ -129,11 +133,20 @@
     private EventuallyConsistentMap<DeviceId, PipelineStatus> statusMap;
     private Map<DeviceId, PipelineStatus> localStatusMap;
 
+    // Configured devices by this cluster. We use a set to keep track of all devices for which
+    // we have pushed the forwarding pipeline config at least once. This guarantees that device
+    // pipelines are wiped out/reset at least once when starting the cluster, minimizing the risk
+    // of any stale state from previous runs affecting control operations. Another effect of this
+    // approach is that the default entries mirror will get populated even though the pipeline results
+    // to be the same across different ONOS installations.
+    private static final String CONFIGURED_DEVICES = "onos-pipeconf-configured-set";
+    private DistributedSet<DeviceId> configuredDevices;
+
     @Activate
     public void activate() {
         eventDispatcher.addSink(PiPipeconfWatchdogEvent.class, listenerRegistry);
         localStatusMap = Maps.newConcurrentMap();
-        // Init distributed status map.
+        // Init distributed status map and configured devices set
         KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API)
                 .register(PipelineStatus.class);
@@ -142,6 +155,12 @@
                 .withSerializer(serializer)
                 .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
         statusMap.addListener(new StatusMapListener());
+        // Init the set of the configured devices
+        configuredDevices = new DefaultDistributedSet<>(storageService.<DeviceId>setBuilder()
+                .withName(CONFIGURED_DEVICES)
+                .withSerializer(Serializer.using(KryoNamespaces.API))
+                .build(),
+                DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
         // Register component configurable properties.
         componentConfigService.registerProperties(getClass());
         // Start periodic watchdog task.
@@ -230,6 +249,7 @@
                 final boolean success = doSetPipeconfIfRequired(device, pipeconf);
                 if (success) {
                     signalStatusReady(device.id());
+                    signalStatusConfigured(device.id());
                 } else {
                     signalStatusUnknown(device.id());
                 }
@@ -253,7 +273,8 @@
         if (!handshaker.hasConnection()) {
             return false;
         }
-        if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf))) {
+        if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf)) &&
+                configuredDevices.contains(device.id())) {
             log.debug("Pipeconf {} already configured on {}",
                       pipeconf.id(), device.id());
             return true;
@@ -281,6 +302,14 @@
         statusMap.put(deviceId, PipelineStatus.READY);
     }
 
+    private void signalStatusUnconfigured(DeviceId deviceId) {
+        configuredDevices.remove(deviceId);
+    }
+
+    private void signalStatusConfigured(DeviceId deviceId) {
+        configuredDevices.add(deviceId);
+    }
+
     private boolean isLocalMaster(Device device) {
         if (mastershipService.isLocalMaster(device.id())) {
             return true;
@@ -354,6 +383,7 @@
                 case DEVICE_REMOVED:
                 case DEVICE_SUSPENDED:
                     signalStatusUnknown(device.id());
+                    signalStatusUnconfigured(device.id());
                     break;
                 case PORT_ADDED:
                 case PORT_UPDATED: