Fix missing P4Runtime multicast/clone groups after device reboot
Change-Id: Ica1cef51aa4bd1c83486d92fcc59f32da9567acd
(cherry picked from commit 14301075549fba5568d8b6b7e32602b5452577d2)
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
index f955706..4bc9536 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -19,11 +19,15 @@
import com.google.common.annotations.Beta;
import com.google.common.collect.Maps;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.SharedExecutors;
import org.onosproject.net.Annotations;
import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.runtime.PiEntity;
import org.onosproject.net.pi.runtime.PiEntityType;
import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateRequest;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
@@ -39,10 +43,12 @@
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.net.pi.service.PiPipeconfWatchdogService.PipelineStatus.READY;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -62,13 +68,28 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected PiPipeconfWatchdogService pipeconfWatchdogService;
+
private EventuallyConsistentMap<PiHandle, TimedEntry<E>> mirrorMap;
private EventuallyConsistentMap<PiHandle, Annotations> annotationsMap;
private final PiEntityType entityType;
+ private final boolean flushOnPipelineUnknown;
+
+ private final PiPipeconfWatchdogListener pipeconfListener =
+ new InternalPipeconfWatchdogListener();
+
AbstractDistributedP4RuntimeMirror(PiEntityType entityType) {
this.entityType = entityType;
+ this.flushOnPipelineUnknown = false;
+ }
+
+ AbstractDistributedP4RuntimeMirror(PiEntityType entityType,
+ boolean flushOnPipelineUnknown) {
+ this.entityType = entityType;
+ this.flushOnPipelineUnknown = flushOnPipelineUnknown;
}
@Activate
@@ -94,11 +115,13 @@
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.build();
+ pipeconfWatchdogService.addListener(pipeconfListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
+ pipeconfWatchdogService.removeListener(pipeconfListener);
mirrorMap.destroy();
mirrorMap = null;
log.info("Stopped");
@@ -123,6 +146,15 @@
public void put(H handle, E entry) {
checkNotNull(handle);
checkNotNull(entry);
+ final PiPipeconfWatchdogService.PipelineStatus status =
+ pipeconfWatchdogService.getStatus(handle.deviceId());
+ if (flushOnPipelineUnknown && !status.equals(READY)) {
+ // Keep mirror empty if pipeline status is UNKNOWN.
+ log.info("Ignoring {} mirror update because pipeline " +
+ "status of {} is {}: {}",
+ entityType, handle.deviceId(), status, entry);
+ return;
+ }
final long now = new WallClockTimestamp().unixTimestamp();
final TimedEntry<E> timedEntry = new TimedEntry<>(now, entry);
mirrorMap.put(handle, timedEntry);
@@ -190,6 +222,12 @@
}
}
+ private Set<PiHandle> getHandlesForDevice(DeviceId deviceId) {
+ return mirrorMap.keySet().stream()
+ .filter(h -> h.deviceId().equals(deviceId))
+ .collect(Collectors.toSet());
+ }
+
private Map<PiHandle, E> deviceHandleMap(DeviceId deviceId) {
final Map<PiHandle, E> deviceMap = Maps.newHashMap();
mirrorMap.entrySet().stream()
@@ -198,6 +236,14 @@
return deviceMap;
}
+
+ private void removeAll(DeviceId deviceId) {
+ checkNotNull(deviceId);
+ @SuppressWarnings("unchecked")
+ Collection<H> handles = (Collection<H>) getHandlesForDevice(deviceId);
+ handles.forEach(this::remove);
+ }
+
@Override
public void applyWriteRequest(WriteRequest request) {
// Optimistically assume all requests will be successful.
@@ -228,4 +274,20 @@
}
});
}
+
+ public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
+ @Override
+ public void event(PiPipeconfWatchdogEvent event) {
+ log.debug("Flushing mirror for {}, pipeline status is {}",
+ event.subject(), event.type());
+ SharedExecutors.getPoolThreadExecutor().execute(
+ () -> removeAll(event.subject()));
+ }
+
+ @Override
+ public boolean isRelevant(PiPipeconfWatchdogEvent event) {
+ return flushOnPipelineUnknown &&
+ event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN);
+ }
+ }
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimePreEntryMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimePreEntryMirror.java
index 4876aaa..35f209e 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimePreEntryMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/DistributedP4RuntimePreEntryMirror.java
@@ -30,6 +30,9 @@
implements P4RuntimePreEntryMirror {
public DistributedP4RuntimePreEntryMirror() {
- super(PiEntityType.PRE_ENTRY);
+ // PI does not support reading PRE entries. To avoid inconsistencies,
+ // flush mirror on device disconnection and other events which
+ // invalidate pipeline status.
+ super(PiEntityType.PRE_ENTRY, true);
}
}