Keep P4Runtime device groups in sync with translator/mirror state
Change-Id: I7257c2ab5f3d4118f30ecf3ae3820d95e5afa4c8
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index 5e3e155..4175489 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -41,6 +41,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -188,12 +189,12 @@
final TimedEntry<PiTableEntry> timedEntry = tableMirror.get(handle);
if (!translatedEntity.isPresent()) {
- log.debug("Handle not found in store: {}", handle);
+ log.warn("Table entry handle not found in translation store: {}", handle);
return null;
}
if (timedEntry == null) {
- log.debug("Handle not found in device mirror: {}", handle);
+ log.warn("Table entry handle not found in device mirror: {}", handle);
return null;
}
@@ -211,16 +212,22 @@
return tableMirror.getAll(deviceId).stream()
.map(timedEntry -> forgeFlowEntry(
timedEntry.entry(), null))
+ .filter(Objects::nonNull)
.collect(Collectors.toList());
}
private void cleanUpInconsistentEntries(Collection<PiTableEntry> piEntries) {
- log.warn("Found {} entries from {} not on translation store, removing them...",
+ log.warn("Found {} inconsistent table entries on {}, removing them...",
piEntries.size(), deviceId);
piEntries.forEach(entry -> {
log.debug(entry.toString());
- applyEntry(PiTableEntryHandle.of(deviceId, entry),
- entry, null, REMOVE);
+ final PiTableEntryHandle handle = PiTableEntryHandle.of(deviceId, entry);
+ ENTRY_LOCKS.get(handle).lock();
+ try {
+ applyEntry(handle, entry, null, REMOVE);
+ } finally {
+ ENTRY_LOCKS.get(handle).unlock();
+ }
});
}
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
index 1cfe079..e6f0a04 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
@@ -17,8 +17,10 @@
package org.onosproject.drivers.p4runtime;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Striped;
+import org.onlab.util.SharedExecutors;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeGroupMirror;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMulticastGroupMirror;
import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
@@ -47,12 +49,15 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.DELETE;
import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.INSERT;
@@ -85,8 +90,8 @@
// If true, we avoid querying the device and return what's already known by
// the ONOS store.
- private static final String IGNORE_DEVICE_WHEN_GET = "ignoreDeviceWhenGet";
- private static final boolean DEFAULT_IGNORE_DEVICE_WHEN_GET = false;
+ private static final String READ_ACTION_GROUPS_FROM_MIRROR = "actionGroupReadFromMirror";
+ private static final boolean DEFAULT_READ_ACTION_GROUPS_FROM_MIRROR = false;
protected GroupStore groupStore;
private P4RuntimeGroupMirror groupMirror;
@@ -116,6 +121,11 @@
if (!setupBehaviour()) {
return;
}
+
+ // TODO: fix GroupProgrammable API, passing the device ID is ambiguous
+ checkArgument(deviceId.equals(this.deviceId),
+ "passed deviceId must be the same assigned to this behavior");
+
groupOps.operations().forEach(op -> {
// ONOS-7785 We need app cookie (action profile id) from the group
Group groupOnStore = groupStore.getGroup(deviceId, op.groupId());
@@ -127,10 +137,10 @@
groupOnStore.appId());
DefaultGroup groupToApply = new DefaultGroup(op.groupId(), groupDesc);
if (op.groupType().equals(GroupDescription.Type.ALL)) {
- processMcGroupOp(deviceId, groupToApply, op.opType());
+ processMcGroupOp(groupToApply, op.opType());
} else {
- processGroupOp(deviceId, groupToApply, op.opType());
+ processGroupOp(groupToApply, op.opType());
}
});
}
@@ -140,32 +150,91 @@
if (!setupBehaviour()) {
return Collections.emptyList();
}
- final ImmutableList.Builder<Group> groups = ImmutableList.builder();
-
- if (!driverBoolProperty(IGNORE_DEVICE_WHEN_GET, DEFAULT_IGNORE_DEVICE_WHEN_GET)) {
- groups.addAll(pipeconf.pipelineModel().actionProfiles().stream()
- .map(PiActionProfileModel::id)
- .flatMap(this::streamGroupsFromDevice)
- .iterator());
- // FIXME: enable reading MC groups from device once reading from
- // PRE is supported in PI
- // groups.addAll(getMcGroupsFromDevice());
- } else {
- groups.addAll(groupMirror.getAll(deviceId).stream()
- .map(TimedEntry::entry)
- .map(this::forgeGroupEntry)
- .iterator());
- }
- // FIXME: same as before..
- groups.addAll(mcGroupMirror.getAll(deviceId).stream()
- .map(TimedEntry::entry)
- .map(this::forgeMcGroupEntry)
- .iterator());
-
- return groups.build();
+ return new ImmutableList.Builder<Group>()
+ .addAll(getActionGroups())
+ .addAll(getMcGroups()).build();
}
- private void processGroupOp(DeviceId deviceId, Group pdGroup, GroupOperation.Type opType) {
+ private Collection<Group> getActionGroups() {
+
+ if (driverBoolProperty(READ_ACTION_GROUPS_FROM_MIRROR,
+ DEFAULT_READ_ACTION_GROUPS_FROM_MIRROR)) {
+ return getActionGroupsFromMirror();
+ }
+
+ final Collection<PiActionGroup> piGroups = pipeconf.pipelineModel()
+ .actionProfiles()
+ .stream()
+ .map(PiActionProfileModel::id)
+ .flatMap(this::streamPiGroupsFromDevice)
+ .collect(Collectors.toList());
+
+ if (piGroups.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final List<Group> result = Lists.newArrayList();
+ final List<PiActionGroup> inconsistentGroups = Lists.newArrayList();
+
+ for (PiActionGroup piGroupOnDevice : piGroups) {
+ final Group group = forgeGroupEntry(piGroupOnDevice);
+ if (group == null) {
+ // Entry is on device but unknown to translation service or
+ // device mirror. Inconsistent. Mark for removal.
+ inconsistentGroups.add(piGroupOnDevice);
+ } else {
+ result.add(group);
+ }
+ }
+ // Trigger clean up of inconsistent entries (is any).
+ // TODO: make this behaviour configurable, in some cases it's fine for
+ // the device to have groups that were not installed by us.
+ if (!inconsistentGroups.isEmpty()) {
+ SharedExecutors.getSingleThreadExecutor().execute(
+ () -> cleanUpInconsistentGroups(inconsistentGroups));
+ }
+ return result;
+ }
+
+ private Collection<Group> getActionGroupsFromMirror() {
+ return groupMirror.getAll(deviceId).stream()
+ .map(TimedEntry::entry)
+ .map(this::forgeGroupEntry)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ private void cleanUpInconsistentGroups(Collection<PiActionGroup> piGroups) {
+ log.warn("Found {} inconsistent groups on {}, removing them...",
+ piGroups.size(), deviceId);
+ piGroups.forEach(piGroup -> {
+ log.debug(piGroup.toString());
+ // Per-piGroup lock.
+ final PiActionGroupHandle handle = PiActionGroupHandle.of(deviceId, piGroup);
+ STRIPED_LOCKS.get(handle).lock();
+ try {
+ processPiGroup(handle, piGroup, piGroup, null,
+ GroupOperation.Type.DELETE);
+ } finally {
+ STRIPED_LOCKS.get(handle).unlock();
+ }
+ });
+ }
+
+ private Collection<Group> getMcGroups() {
+ // TODO: missing support for reading multicast groups is ready in PI/Stratum.
+ return getMcGroupsFromMirror();
+ }
+
+ private Collection<Group> getMcGroupsFromMirror() {
+ return mcGroupMirror.getAll(deviceId).stream()
+ .map(TimedEntry::entry)
+ .map(this::forgeMcGroupEntry)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ private void processGroupOp(Group pdGroup, GroupOperation.Type opType) {
final PiActionGroup piGroup;
try {
piGroup = groupTranslator.translate(pdGroup, pipeconf);
@@ -175,11 +244,9 @@
return;
}
final PiActionGroupHandle handle = PiActionGroupHandle.of(deviceId, piGroup);
-
final PiActionGroup groupOnDevice = groupMirror.get(handle) == null
- ? null
- : groupMirror.get(handle).entry();
-
+ ? null : groupMirror.get(handle).entry();
+ // Per-piGroup lock.
final Lock lock = STRIPED_LOCKS.get(handle);
lock.lock();
try {
@@ -190,7 +257,7 @@
}
}
- private void processMcGroupOp(DeviceId deviceId, Group pdGroup, GroupOperation.Type opType) {
+ private void processMcGroupOp(Group pdGroup, GroupOperation.Type opType) {
final PiMulticastGroupEntry mcGroup;
try {
mcGroup = mcGroupTranslator.translate(pdGroup, pipeconf);
@@ -429,52 +496,51 @@
completableFuture, format("performing %s %s", action, topic), false);
}
- private Stream<Group> streamGroupsFromDevice(PiActionProfileId actProfId) {
+ private Stream<PiActionGroup> streamPiGroupsFromDevice(PiActionProfileId actProfId) {
// Read PI groups and return original PD one.
- Collection<PiActionGroup> groups = getFutureWithDeadline(
+ // TODO: implement P4Runtime client call to read all groups with one call
+ // Good is pipeline has multiple action profiles.
+ final Collection<PiActionGroup> groups = getFutureWithDeadline(
client.dumpGroups(actProfId, pipeconf),
"dumping groups", Collections.emptyList());
- return groups.stream()
- .map(this::forgeGroupEntry)
- .filter(Objects::nonNull);
- }
-
- private Collection<Group> getMcGroupsFromDevice() {
- Collection<PiMulticastGroupEntry> groups = getFutureWithDeadline(
- client.readAllMulticastGroupEntries(),
- "dumping multicast groups", Collections.emptyList());
- return groups.stream()
- .map(this::forgeMcGroupEntry)
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
+ return groups.stream();
}
private Group forgeGroupEntry(PiActionGroup piGroup) {
final PiActionGroupHandle handle = PiActionGroupHandle.of(deviceId, piGroup);
- if (!groupTranslator.lookup(handle).isPresent()) {
- log.warn("Missing PI group from translation store: {} - {}:{}",
- pipeconf.id(), piGroup.actionProfileId(),
- piGroup.id());
+ final Optional<PiTranslatedEntity<Group, PiActionGroup>>
+ translatedEntity = groupTranslator.lookup(handle);
+ final TimedEntry<PiActionGroup> timedEntry = groupMirror.get(handle);
+ // Is entry consistent with our state?
+ if (!translatedEntity.isPresent()) {
+ log.warn("Group handle not found in translation store: {}", handle);
return null;
}
- final long life = groupMirror.get(handle) != null
- ? groupMirror.get(handle).lifeSec() : 0;
- final Group original = groupTranslator.lookup(handle).get().original();
- return addedGroup(original, life);
+ if (timedEntry == null) {
+ // Don't bother logging more than debug, most probably it's the EC
+ // map backing the store that has not received all the updates yet.
+ log.debug("Group handle not found in device mirror: {}", handle);
+ return null;
+ }
+ return addedGroup(translatedEntity.get().original(), timedEntry.lifeSec());
}
private Group forgeMcGroupEntry(PiMulticastGroupEntry mcGroup) {
final PiMulticastGroupEntryHandle handle = PiMulticastGroupEntryHandle.of(
deviceId, mcGroup);
- if (!mcGroupTranslator.lookup(handle).isPresent()) {
- log.warn("Missing PI multicast group {} from translation store",
- mcGroup.groupId());
+ final Optional<PiTranslatedEntity<Group, PiMulticastGroupEntry>>
+ translatedEntity = mcGroupTranslator.lookup(handle);
+ final TimedEntry<PiMulticastGroupEntry> timedEntry = mcGroupMirror.get(handle);
+ // Is entry consistent with our state?
+ if (!translatedEntity.isPresent()) {
+ log.warn("Multicast group handle not found in translation store: {}", handle);
return null;
}
- final long life = mcGroupMirror.get(handle) != null
- ? mcGroupMirror.get(handle).lifeSec() : 0;
- final Group original = mcGroupTranslator.lookup(handle).get().original();
- return addedGroup(original, life);
+ if (timedEntry == null) {
+ log.warn("Multicast group handle not found in device mirror: {}", handle);
+ return null;
+ }
+ return addedGroup(translatedEntity.get().original(), timedEntry.lifeSec());
}
private Group addedGroup(Group original, long life) {