Flush device mirror in P4Runtime driver when pipeline status is unknown
Change-Id: Ida5874adc68a5d6ba5f1b5063880ac31c0a2ee9b
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
index dd3ea16..b2da5fc 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -23,9 +23,13 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.SharedExecutors;
import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.runtime.PiEntity;
import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
@@ -36,6 +40,7 @@
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.net.pi.service.PiPipeconfWatchdogService.PipelineStatus.READY;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -56,8 +61,14 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private StorageService storageService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private PiPipeconfWatchdogService pipeconfWatchdogService;
+
private EventuallyConsistentMap<H, TimedEntry<E>> mirrorMap;
+ private final PiPipeconfWatchdogListener pipeconfListener =
+ new InternalPipeconfWatchdogListener();
+
@Activate
public void activate() {
mirrorMap = storageService
@@ -66,6 +77,7 @@
.withSerializer(storeSerializer())
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.build();
+ pipeconfWatchdogService.addListener(pipeconfListener);
log.info("Started");
}
@@ -75,6 +87,7 @@
@Deactivate
public void deactivate() {
+ pipeconfWatchdogService.removeListener(pipeconfListener);
mirrorMap.destroy();
mirrorMap = null;
log.info("Stopped");
@@ -99,6 +112,14 @@
public void put(H handle, E entry) {
checkNotNull(handle);
checkNotNull(entry);
+ final PiPipeconfWatchdogService.PipelineStatus status =
+ pipeconfWatchdogService.getStatus(handle.deviceId());
+ if (!status.equals(READY)) {
+ log.info("Ignoring device mirror update because pipeline " +
+ "status of {} is {}: {}",
+ handle.deviceId(), status, entry);
+ return;
+ }
final long now = new WallClockTimestamp().unixTimestamp();
final TimedEntry<E> timedEntry = new TimedEntry<>(now, entry);
mirrorMap.put(handle, timedEntry);
@@ -110,4 +131,26 @@
mirrorMap.remove(handle);
}
+ private void removeAll(DeviceId deviceId) {
+ checkNotNull(deviceId);
+ Collection<H> handles = mirrorMap.keySet().stream()
+ .filter(h -> h.deviceId().equals(deviceId))
+ .collect(Collectors.toList());
+ handles.forEach(mirrorMap::remove);
+ }
+
+ public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
+ @Override
+ public void event(PiPipeconfWatchdogEvent event) {
+ log.debug("Flushing mirror for {}, pipeline status is {}",
+ event.subject(), event.type());
+ SharedExecutors.getPoolThreadExecutor().execute(
+ () -> removeAll(event.subject()));
+ }
+
+ @Override
+ public boolean isRelevant(PiPipeconfWatchdogEvent event) {
+ return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN);
+ }
+ }
}