Keep P4Runtime device groups in sync with translator/mirror state

Change-Id: I7257c2ab5f3d4118f30ecf3ae3820d95e5afa4c8
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index 5e3e155..4175489 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -41,6 +41,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -188,12 +189,12 @@
         final TimedEntry<PiTableEntry> timedEntry = tableMirror.get(handle);
 
         if (!translatedEntity.isPresent()) {
-            log.debug("Handle not found in store: {}", handle);
+            log.warn("Table entry handle not found in translation store: {}", handle);
             return null;
         }
 
         if (timedEntry == null) {
-            log.debug("Handle not found in device mirror: {}", handle);
+            log.warn("Table entry handle not found in device mirror: {}", handle);
             return null;
         }
 
@@ -211,16 +212,22 @@
         return tableMirror.getAll(deviceId).stream()
                 .map(timedEntry -> forgeFlowEntry(
                         timedEntry.entry(), null))
+                .filter(Objects::nonNull)
                 .collect(Collectors.toList());
     }
 
     private void cleanUpInconsistentEntries(Collection<PiTableEntry> piEntries) {
-        log.warn("Found {} entries from {} not on translation store, removing them...",
+        log.warn("Found {} inconsistent table entries on {}, removing them...",
                  piEntries.size(), deviceId);
         piEntries.forEach(entry -> {
             log.debug(entry.toString());
-            applyEntry(PiTableEntryHandle.of(deviceId, entry),
-                       entry, null, REMOVE);
+            final PiTableEntryHandle handle = PiTableEntryHandle.of(deviceId, entry);
+            ENTRY_LOCKS.get(handle).lock();
+            try {
+                applyEntry(handle, entry, null, REMOVE);
+            } finally {
+                ENTRY_LOCKS.get(handle).unlock();
+            }
         });
     }
 
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
index 1cfe079..e6f0a04 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
@@ -17,8 +17,10 @@
 package org.onosproject.drivers.p4runtime;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Striped;
+import org.onlab.util.SharedExecutors;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeGroupMirror;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMulticastGroupMirror;
 import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
@@ -47,12 +49,15 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.String.format;
 import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.DELETE;
 import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.INSERT;
@@ -85,8 +90,8 @@
 
     // If true, we avoid querying the device and return what's already known by
     // the ONOS store.
-    private static final String IGNORE_DEVICE_WHEN_GET = "ignoreDeviceWhenGet";
-    private static final boolean DEFAULT_IGNORE_DEVICE_WHEN_GET = false;
+    private static final String READ_ACTION_GROUPS_FROM_MIRROR = "actionGroupReadFromMirror";
+    private static final boolean DEFAULT_READ_ACTION_GROUPS_FROM_MIRROR = false;
 
     protected GroupStore groupStore;
     private P4RuntimeGroupMirror groupMirror;
@@ -116,6 +121,11 @@
         if (!setupBehaviour()) {
             return;
         }
+
+        // TODO: fix GroupProgrammable API, passing the device ID is ambiguous
+        checkArgument(deviceId.equals(this.deviceId),
+                      "passed deviceId must be the same assigned to this behavior");
+
         groupOps.operations().forEach(op -> {
             // ONOS-7785 We need app cookie (action profile id) from the group
             Group groupOnStore = groupStore.getGroup(deviceId, op.groupId());
@@ -127,10 +137,10 @@
                                                                      groupOnStore.appId());
             DefaultGroup groupToApply = new DefaultGroup(op.groupId(), groupDesc);
             if (op.groupType().equals(GroupDescription.Type.ALL)) {
-                processMcGroupOp(deviceId, groupToApply, op.opType());
+                processMcGroupOp(groupToApply, op.opType());
             } else {
 
-                processGroupOp(deviceId, groupToApply, op.opType());
+                processGroupOp(groupToApply, op.opType());
             }
         });
     }
@@ -140,32 +150,91 @@
         if (!setupBehaviour()) {
             return Collections.emptyList();
         }
-        final ImmutableList.Builder<Group> groups = ImmutableList.builder();
-
-        if (!driverBoolProperty(IGNORE_DEVICE_WHEN_GET, DEFAULT_IGNORE_DEVICE_WHEN_GET)) {
-            groups.addAll(pipeconf.pipelineModel().actionProfiles().stream()
-                                  .map(PiActionProfileModel::id)
-                                  .flatMap(this::streamGroupsFromDevice)
-                                  .iterator());
-            // FIXME: enable reading MC groups from device once reading from
-            // PRE is supported in PI
-            // groups.addAll(getMcGroupsFromDevice());
-        } else {
-            groups.addAll(groupMirror.getAll(deviceId).stream()
-                                  .map(TimedEntry::entry)
-                                  .map(this::forgeGroupEntry)
-                                  .iterator());
-        }
-        // FIXME: same as before..
-        groups.addAll(mcGroupMirror.getAll(deviceId).stream()
-                              .map(TimedEntry::entry)
-                              .map(this::forgeMcGroupEntry)
-                              .iterator());
-
-        return groups.build();
+        return new ImmutableList.Builder<Group>()
+                .addAll(getActionGroups())
+                .addAll(getMcGroups()).build();
     }
 
-    private void processGroupOp(DeviceId deviceId, Group pdGroup, GroupOperation.Type opType) {
+    private Collection<Group> getActionGroups() {
+
+        if (driverBoolProperty(READ_ACTION_GROUPS_FROM_MIRROR,
+                               DEFAULT_READ_ACTION_GROUPS_FROM_MIRROR)) {
+            return getActionGroupsFromMirror();
+        }
+
+        final Collection<PiActionGroup> piGroups = pipeconf.pipelineModel()
+                .actionProfiles()
+                .stream()
+                .map(PiActionProfileModel::id)
+                .flatMap(this::streamPiGroupsFromDevice)
+                .collect(Collectors.toList());
+
+        if (piGroups.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        final List<Group> result = Lists.newArrayList();
+        final List<PiActionGroup> inconsistentGroups = Lists.newArrayList();
+
+        for (PiActionGroup piGroupOnDevice : piGroups) {
+            final Group group = forgeGroupEntry(piGroupOnDevice);
+            if (group == null) {
+                // Entry is on device but unknown to translation service or
+                // device mirror. Inconsistent. Mark for removal.
+                inconsistentGroups.add(piGroupOnDevice);
+            } else {
+                result.add(group);
+            }
+        }
+        // Trigger clean up of inconsistent entries (is any).
+        // TODO: make this behaviour configurable, in some cases it's fine for
+        // the device to have groups that were not installed by us.
+        if (!inconsistentGroups.isEmpty()) {
+            SharedExecutors.getSingleThreadExecutor().execute(
+                    () -> cleanUpInconsistentGroups(inconsistentGroups));
+        }
+        return result;
+    }
+
+    private Collection<Group> getActionGroupsFromMirror() {
+        return groupMirror.getAll(deviceId).stream()
+                .map(TimedEntry::entry)
+                .map(this::forgeGroupEntry)
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+    }
+
+    private void cleanUpInconsistentGroups(Collection<PiActionGroup> piGroups) {
+        log.warn("Found {} inconsistent groups on {}, removing them...",
+                 piGroups.size(), deviceId);
+        piGroups.forEach(piGroup -> {
+            log.debug(piGroup.toString());
+            // Per-piGroup lock.
+            final PiActionGroupHandle handle = PiActionGroupHandle.of(deviceId, piGroup);
+            STRIPED_LOCKS.get(handle).lock();
+            try {
+                processPiGroup(handle, piGroup, piGroup, null,
+                               GroupOperation.Type.DELETE);
+            } finally {
+                STRIPED_LOCKS.get(handle).unlock();
+            }
+        });
+    }
+
+    private Collection<Group> getMcGroups() {
+        // TODO: missing support for reading multicast groups is ready in PI/Stratum.
+        return getMcGroupsFromMirror();
+    }
+
+    private Collection<Group> getMcGroupsFromMirror() {
+        return mcGroupMirror.getAll(deviceId).stream()
+                .map(TimedEntry::entry)
+                .map(this::forgeMcGroupEntry)
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+    }
+
+    private void processGroupOp(Group pdGroup, GroupOperation.Type opType) {
         final PiActionGroup piGroup;
         try {
             piGroup = groupTranslator.translate(pdGroup, pipeconf);
@@ -175,11 +244,9 @@
             return;
         }
         final PiActionGroupHandle handle = PiActionGroupHandle.of(deviceId, piGroup);
-
         final PiActionGroup groupOnDevice = groupMirror.get(handle) == null
-                ? null
-                : groupMirror.get(handle).entry();
-
+                ? null : groupMirror.get(handle).entry();
+        // Per-piGroup lock.
         final Lock lock = STRIPED_LOCKS.get(handle);
         lock.lock();
         try {
@@ -190,7 +257,7 @@
         }
     }
 
-    private void processMcGroupOp(DeviceId deviceId, Group pdGroup, GroupOperation.Type opType) {
+    private void processMcGroupOp(Group pdGroup, GroupOperation.Type opType) {
         final PiMulticastGroupEntry mcGroup;
         try {
             mcGroup = mcGroupTranslator.translate(pdGroup, pipeconf);
@@ -429,52 +496,51 @@
                 completableFuture, format("performing %s %s", action, topic), false);
     }
 
-    private Stream<Group> streamGroupsFromDevice(PiActionProfileId actProfId) {
+    private Stream<PiActionGroup> streamPiGroupsFromDevice(PiActionProfileId actProfId) {
         // Read PI groups and return original PD one.
-        Collection<PiActionGroup> groups = getFutureWithDeadline(
+        // TODO: implement P4Runtime client call to read all groups with one call
+        // Good is pipeline has multiple action profiles.
+        final Collection<PiActionGroup> groups = getFutureWithDeadline(
                 client.dumpGroups(actProfId, pipeconf),
                 "dumping groups", Collections.emptyList());
-        return groups.stream()
-                .map(this::forgeGroupEntry)
-                .filter(Objects::nonNull);
-    }
-
-    private Collection<Group> getMcGroupsFromDevice() {
-        Collection<PiMulticastGroupEntry> groups = getFutureWithDeadline(
-                client.readAllMulticastGroupEntries(),
-                "dumping multicast groups", Collections.emptyList());
-        return groups.stream()
-                .map(this::forgeMcGroupEntry)
-                .filter(Objects::nonNull)
-                .collect(Collectors.toList());
+        return groups.stream();
     }
 
     private Group forgeGroupEntry(PiActionGroup piGroup) {
         final PiActionGroupHandle handle = PiActionGroupHandle.of(deviceId, piGroup);
-        if (!groupTranslator.lookup(handle).isPresent()) {
-            log.warn("Missing PI group from translation store: {} - {}:{}",
-                     pipeconf.id(), piGroup.actionProfileId(),
-                     piGroup.id());
+        final Optional<PiTranslatedEntity<Group, PiActionGroup>>
+                translatedEntity = groupTranslator.lookup(handle);
+        final TimedEntry<PiActionGroup> timedEntry = groupMirror.get(handle);
+        // Is entry consistent with our state?
+        if (!translatedEntity.isPresent()) {
+            log.warn("Group handle not found in translation store: {}", handle);
             return null;
         }
-        final long life = groupMirror.get(handle) != null
-                ? groupMirror.get(handle).lifeSec() : 0;
-        final Group original = groupTranslator.lookup(handle).get().original();
-        return addedGroup(original, life);
+        if (timedEntry == null) {
+            // Don't bother logging more than debug, most probably it's the EC
+            // map backing the store that has not received all the updates yet.
+            log.debug("Group handle not found in device mirror: {}", handle);
+            return null;
+        }
+        return addedGroup(translatedEntity.get().original(), timedEntry.lifeSec());
     }
 
     private Group forgeMcGroupEntry(PiMulticastGroupEntry mcGroup) {
         final PiMulticastGroupEntryHandle handle = PiMulticastGroupEntryHandle.of(
                 deviceId, mcGroup);
-        if (!mcGroupTranslator.lookup(handle).isPresent()) {
-            log.warn("Missing PI multicast group {} from translation store",
-                     mcGroup.groupId());
+        final Optional<PiTranslatedEntity<Group, PiMulticastGroupEntry>>
+                translatedEntity = mcGroupTranslator.lookup(handle);
+        final TimedEntry<PiMulticastGroupEntry> timedEntry = mcGroupMirror.get(handle);
+        // Is entry consistent with our state?
+        if (!translatedEntity.isPresent()) {
+            log.warn("Multicast group handle not found in translation store: {}", handle);
             return null;
         }
-        final long life = mcGroupMirror.get(handle) != null
-                ? mcGroupMirror.get(handle).lifeSec() : 0;
-        final Group original = mcGroupTranslator.lookup(handle).get().original();
-        return addedGroup(original, life);
+        if (timedEntry == null) {
+            log.warn("Multicast group handle not found in device mirror: {}", handle);
+            return null;
+        }
+        return addedGroup(translatedEntity.get().original(), timedEntry.lifeSec());
     }
 
     private Group addedGroup(Group original, long life) {