Fix missing P4Runtime multicast/clone groups after device reboot

Change-Id: Ica1cef51aa4bd1c83486d92fcc59f32da9567acd
(cherry picked from commit 14301075549fba5568d8b6b7e32602b5452577d2)
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
index f955706..4bc9536 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -19,11 +19,15 @@
 import com.google.common.annotations.Beta;
 import com.google.common.collect.Maps;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.SharedExecutors;
 import org.onosproject.net.Annotations;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.pi.runtime.PiEntity;
 import org.onosproject.net.pi.runtime.PiEntityType;
 import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
 import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateRequest;
 import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
 import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
@@ -39,10 +43,12 @@
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.net.pi.service.PiPipeconfWatchdogService.PipelineStatus.READY;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -62,13 +68,28 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected StorageService storageService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected PiPipeconfWatchdogService pipeconfWatchdogService;
+
     private EventuallyConsistentMap<PiHandle, TimedEntry<E>> mirrorMap;
     private EventuallyConsistentMap<PiHandle, Annotations> annotationsMap;
 
     private final PiEntityType entityType;
 
+    private final boolean flushOnPipelineUnknown;
+
+    private final PiPipeconfWatchdogListener pipeconfListener =
+            new InternalPipeconfWatchdogListener();
+
     AbstractDistributedP4RuntimeMirror(PiEntityType entityType) {
         this.entityType = entityType;
+        this.flushOnPipelineUnknown = false;
+    }
+
+    AbstractDistributedP4RuntimeMirror(PiEntityType entityType,
+                                       boolean flushOnPipelineUnknown) {
+        this.entityType = entityType;
+        this.flushOnPipelineUnknown = flushOnPipelineUnknown;
     }
 
     @Activate
@@ -94,11 +115,13 @@
                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
                 .build();
 
+        pipeconfWatchdogService.addListener(pipeconfListener);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        pipeconfWatchdogService.removeListener(pipeconfListener);
         mirrorMap.destroy();
         mirrorMap = null;
         log.info("Stopped");
@@ -123,6 +146,15 @@
     public void put(H handle, E entry) {
         checkNotNull(handle);
         checkNotNull(entry);
+        final PiPipeconfWatchdogService.PipelineStatus status =
+                pipeconfWatchdogService.getStatus(handle.deviceId());
+        if (flushOnPipelineUnknown && !status.equals(READY)) {
+            // Keep mirror empty if pipeline status is UNKNOWN.
+            log.info("Ignoring {} mirror update because pipeline " +
+                             "status of {} is {}: {}",
+                     entityType, handle.deviceId(), status, entry);
+            return;
+        }
         final long now = new WallClockTimestamp().unixTimestamp();
         final TimedEntry<E> timedEntry = new TimedEntry<>(now, entry);
         mirrorMap.put(handle, timedEntry);
@@ -190,6 +222,12 @@
         }
     }
 
+    private Set<PiHandle> getHandlesForDevice(DeviceId deviceId) {
+        return mirrorMap.keySet().stream()
+                .filter(h -> h.deviceId().equals(deviceId))
+                .collect(Collectors.toSet());
+    }
+
     private Map<PiHandle, E> deviceHandleMap(DeviceId deviceId) {
         final Map<PiHandle, E> deviceMap = Maps.newHashMap();
         mirrorMap.entrySet().stream()
@@ -198,6 +236,14 @@
         return deviceMap;
     }
 
+
+    private void removeAll(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        @SuppressWarnings("unchecked")
+        Collection<H> handles = (Collection<H>) getHandlesForDevice(deviceId);
+        handles.forEach(this::remove);
+    }
+
     @Override
     public void applyWriteRequest(WriteRequest request) {
         // Optimistically assume all requests will be successful.
@@ -228,4 +274,20 @@
                     }
                 });
     }
+
+    public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
+        @Override
+        public void event(PiPipeconfWatchdogEvent event) {
+            log.debug("Flushing mirror for {}, pipeline status is {}",
+                      event.subject(), event.type());
+            SharedExecutors.getPoolThreadExecutor().execute(
+                    () -> removeAll(event.subject()));
+        }
+
+        @Override
+        public boolean isRelevant(PiPipeconfWatchdogEvent event) {
+            return flushOnPipelineUnknown &&
+                    event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN);
+        }
+    }
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimePreEntryMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimePreEntryMirror.java
index 4876aaa..35f209e 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimePreEntryMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimePreEntryMirror.java
@@ -30,6 +30,9 @@
         implements P4RuntimePreEntryMirror {
 
     public DistributedP4RuntimePreEntryMirror() {
-        super(PiEntityType.PRE_ENTRY);
+        // PI does not support reading PRE entries. To avoid inconsistencies,
+        // flush mirror on device disconnection and other events which
+        // invalidate pipeline status.
+        super(PiEntityType.PRE_ENTRY, true);
     }
 }