Flush device mirror in P4Runtime driver when pipeline status is unknown

Change-Id: Ida5874adc68a5d6ba5f1b5063880ac31c0a2ee9b
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
index dd3ea16..b2da5fc 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -23,9 +23,13 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.SharedExecutors;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.pi.runtime.PiEntity;
 import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.WallClockTimestamp;
@@ -36,6 +40,7 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.net.pi.service.PiPipeconfWatchdogService.PipelineStatus.READY;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -56,8 +61,14 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private StorageService storageService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private PiPipeconfWatchdogService pipeconfWatchdogService;
+
     private EventuallyConsistentMap<H, TimedEntry<E>> mirrorMap;
 
+    private final PiPipeconfWatchdogListener pipeconfListener =
+            new InternalPipeconfWatchdogListener();
+
     @Activate
     public void activate() {
         mirrorMap = storageService
@@ -66,6 +77,7 @@
                 .withSerializer(storeSerializer())
                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
                 .build();
+        pipeconfWatchdogService.addListener(pipeconfListener);
         log.info("Started");
     }
 
@@ -75,6 +87,7 @@
 
     @Deactivate
     public void deactivate() {
+        pipeconfWatchdogService.removeListener(pipeconfListener);
         mirrorMap.destroy();
         mirrorMap = null;
         log.info("Stopped");
@@ -99,6 +112,14 @@
     public void put(H handle, E entry) {
         checkNotNull(handle);
         checkNotNull(entry);
+        final PiPipeconfWatchdogService.PipelineStatus status =
+                pipeconfWatchdogService.getStatus(handle.deviceId());
+        if (!status.equals(READY)) {
+            log.info("Ignoring device mirror update because pipeline " +
+                             "status of {} is {}: {}",
+                     handle.deviceId(), status, entry);
+            return;
+        }
         final long now = new WallClockTimestamp().unixTimestamp();
         final TimedEntry<E> timedEntry = new TimedEntry<>(now, entry);
         mirrorMap.put(handle, timedEntry);
@@ -110,4 +131,26 @@
         mirrorMap.remove(handle);
     }
 
+    private void removeAll(DeviceId deviceId) {
+        checkNotNull(deviceId);
+        Collection<H> handles = mirrorMap.keySet().stream()
+                .filter(h -> h.deviceId().equals(deviceId))
+                .collect(Collectors.toList());
+        handles.forEach(mirrorMap::remove);
+    }
+
+    public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
+        @Override
+        public void event(PiPipeconfWatchdogEvent event) {
+            log.debug("Flushing mirror for {}, pipeline status is {}",
+                      event.subject(), event.type());
+            SharedExecutors.getPoolThreadExecutor().execute(
+                    () -> removeAll(event.subject()));
+        }
+
+        @Override
+        public boolean isRelevant(PiPipeconfWatchdogEvent event) {
+            return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN);
+        }
+    }
 }