Fix missing P4Runtime groups because of infinite delete/insert cycle

The fix is simple: when cleaning up inconsistent entries from device,
we update the mirror when the response is received, and not before
sending the request. Otherwise, if delete goes wrong, writes happening
right after reconciliation cycle might find an inconsistent mirror state.

When writing entries (e.g. apply group/flow rule) we keep updating the
mirror before sending the request to handled the case of back-to-back
writes.

Change-Id: I9e1cc5cac3f8746c67e93e2cee17aff78d3f1d7e
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
index 01af5eb..9603241 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
@@ -18,7 +18,6 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Striped;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileGroupMirror;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileMemberMirror;
@@ -45,7 +44,6 @@
 import org.onosproject.net.pi.service.PiTranslationException;
 import org.onosproject.p4runtime.api.P4RuntimeReadClient;
 import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -54,7 +52,6 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 
@@ -163,8 +160,8 @@
         for (PiActionProfileGroup piGroup : groupsOnDevice) {
             final Group pdGroup = checkAndForgeGroupEntry(piGroup, membersOnDevice);
             if (pdGroup == null) {
-                // Entry is on device but unknown to translation service or
-                // device mirror. Inconsistent. Mark for removal.
+                // Entry is on device but is inconsistent with controller state.
+                // Mark for removal.
                 groupsToRemove.add(piGroup);
             } else {
                 result.add(pdGroup);
@@ -176,10 +173,9 @@
             }
         }
 
-        // Trigger clean up of inconsistent groups and members (if any). This
-        // process takes care of removing any orphan member, e.g. from a
+        // Trigger clean up of inconsistent groups and members (if any). Also
+        // take care of removing any orphan member, e.g. from a
         // partial/unsuccessful group insertion.
-        // This will update the mirror accordingly.
         final Set<PiActionProfileMemberHandle> memberHandlesToRemove = Sets.difference(
                 membersOnDevice.keySet(), memberHandlesToKeep);
         final Set<PiActionProfileGroupHandle> groupHandlesToRemove = groupsToRemove
@@ -188,23 +184,21 @@
             log.warn("Cleaning up {} action profile groups and " +
                              "{} members on {}...",
                      groupHandlesToRemove.size(), memberHandlesToRemove.size(), deviceId);
-            final WriteRequest deleteRequest = client.write(pipeconf)
+            client.write(pipeconf)
                     .delete(groupHandlesToRemove)
-                    .delete(memberHandlesToRemove);
-            WRITE_LOCKS.get(deviceId).lock();
-            try {
-                updateMirrorAndSubmit(deleteRequest).whenComplete((r, ex) -> {
-                    if (ex != null) {
-                        log.error("Exception removing inconsistent group/members", ex);
-                    } else {
-                        log.debug("Completed removal of inconsistent " +
-                                          "groups/members ({} of {} updates succeeded)",
-                                  r.success(), r.all());
-                    }
-                });
-            } finally {
-                WRITE_LOCKS.get(deviceId).unlock();
-            }
+                    .delete(memberHandlesToRemove)
+                    .submit().whenComplete((r, ex) -> {
+                if (ex != null) {
+                    log.error("Exception removing inconsistent group/members", ex);
+                } else {
+                    log.debug("Completed removal of inconsistent " +
+                                      "groups/members ({} of {} updates succeeded)",
+                              r.success().size(), r.all().size());
+                    groupMirror.applyWriteResponse(r);
+                    memberMirror.applyWriteResponse(r);
+                }
+            });
+
         }
 
         // Done.
@@ -214,8 +208,8 @@
     private Collection<Group> getGroupsFromMirror() {
         final Map<PiActionProfileMemberHandle, PiActionProfileMember> members =
                 memberMirror.getAll(deviceId).stream()
-                .map(TimedEntry::entry)
-                .collect(toMap(e -> e.handle(deviceId), e -> e));
+                        .map(TimedEntry::entry)
+                        .collect(toMap(e -> e.handle(deviceId), e -> e));
         return groupMirror.getAll(deviceId).stream()
                 .map(TimedEntry::entry)
                 .map(g -> checkAndForgeGroupEntry(
@@ -282,6 +276,7 @@
     }
 
     private void processPdGroup(Group pdGroup, GroupOperation.Type opType) {
+        // Translate.
         final PiActionProfileGroup piGroup;
         try {
             piGroup = groupTranslator.translate(pdGroup, pipeconf);
@@ -293,57 +288,69 @@
         final Operation operation = opType.equals(GroupOperation.Type.DELETE)
                 ? Operation.REMOVE : Operation.APPLY;
         final PiActionProfileGroupHandle handle = piGroup.handle(deviceId);
-        if (writePiGroupOnDevice(piGroup, handle, operation)) {
-            if (operation.equals(Operation.APPLY)) {
-                groupTranslator.learn(handle, new PiTranslatedEntity<>(
-                        pdGroup, piGroup, handle));
-            } else {
-                groupTranslator.forget(handle);
-            }
+        // Update translation store.
+        if (operation.equals(Operation.APPLY)) {
+            groupTranslator.learn(handle, new PiTranslatedEntity<>(
+                    pdGroup, piGroup, handle));
+        } else {
+            groupTranslator.forget(handle);
         }
+        // Submit write and forget about it.
+        asyncWritePiGroup(piGroup, handle, operation);
     }
 
-    private boolean writePiGroupOnDevice(
+    private void asyncWritePiGroup(
             PiActionProfileGroup group,
             PiActionProfileGroupHandle groupHandle,
             Operation operation) {
-        // Generate a write request to write both members and groups. Return
-        // true if request is successful or if there's no need to write on
-        // device (according to mirror state), otherwise, return false.
+        // Generate and submit  write request to write both members and groups.
         final Collection<PiActionProfileMember> members = extractAllMemberInstancesOrNull(group);
         if (members == null) {
-            return false;
+            return;
         }
         final WriteRequest request = client.write(pipeconf);
-        final CompletableFuture<WriteResponse> futureResponse;
         WRITE_LOCKS.get(deviceId).lock();
         try {
-            // FIXME: when operation is remove, should we remove members first? Same
-            //  thing when modifying a group, should we first modify the group then
-            //  remove the member?
-            final boolean allMembersSkipped = members.stream()
-                    .allMatch(m -> appendEntityToWriteRequestOrSkip(
-                            request, m.handle(deviceId), m, memberMirror, operation));
-            final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
-                    request, groupHandle, group, groupMirror, operation);
-            if (allMembersSkipped && groupSkipped) {
-                return true;
+            if (operation == Operation.APPLY) {
+                // First insert/update members, then group.
+                members.forEach(m -> appendEntityToWriteRequestOrSkip(
+                        request, m.handle(deviceId), m, memberMirror, operation));
+                appendEntityToWriteRequestOrSkip(
+                        request, groupHandle, group, groupMirror, operation);
             } else {
-                futureResponse = updateMirrorAndSubmit(request);
+                // First remove group, then members.
+                appendEntityToWriteRequestOrSkip(
+                        request, groupHandle, group, groupMirror, operation);
+                members.forEach(m -> appendEntityToWriteRequestOrSkip(
+                        request, m.handle(deviceId), m, memberMirror, operation));
             }
+            if (request.pendingUpdates().isEmpty()) {
+                // Nothing to do.
+                return;
+            }
+            // Optimistically update mirror before response arrives to make
+            // sure any write after this sees the expected mirror state. If
+            // anything goes wrong, mirror will be re-synced during
+            // reconciliation.
+            groupMirror.applyWriteRequest(request);
+            memberMirror.applyWriteRequest(request);
+            request.submit().whenComplete((r, ex) -> {
+                if (ex != null) {
+                    log.error("Exception writing PI group to " + deviceId, ex);
+                } else {
+                    log.debug("Completed write of PI group to {} " +
+                                      "({} of {} updates succeeded)",
+                              deviceId, r.success().size(), r.all().size());
+                }
+            });
         } finally {
             WRITE_LOCKS.get(deviceId).unlock();
         }
-        // Wait response from device. Returns true if all entities in the
-        // request (groups and members) were written successfully.
-        return Futures.getUnchecked(futureResponse).isSuccess();
     }
 
-    private <H extends PiHandle, E extends PiEntity> boolean appendEntityToWriteRequestOrSkip(
+    private <H extends PiHandle, E extends PiEntity> void appendEntityToWriteRequestOrSkip(
             WriteRequest writeRequest, H handle, E entityToApply,
             P4RuntimeMirror<H, E> mirror, Operation operation) {
-        // Should return true if there's no need to write entity on device,
-        // false if the write request is modified or an error occurs.
         final TimedEntry<E> entityOnDevice = mirror.get(handle);
         switch (operation) {
             case APPLY:
@@ -351,7 +358,7 @@
                     writeRequest.insert(entityToApply);
                 } else if (entityToApply.equals(entityOnDevice.entry())) {
                     // Skip writing if group is unchanged.
-                    return true;
+                    return;
                 } else {
                     writeRequest.modify(entityToApply);
                 }
@@ -359,7 +366,7 @@
             case REMOVE:
                 if (entityOnDevice == null) {
                     // Skip deleting if group does not exist on device.
-                    return true;
+                    return;
                 } else {
                     writeRequest.delete(handle);
                 }
@@ -368,7 +375,6 @@
                 log.error("Unrecognized operation {}", operation);
                 break;
         }
-        return false;
     }
 
     private Collection<PiActionProfileMember> extractAllMemberInstancesOrNull(
@@ -386,12 +392,6 @@
         return instances;
     }
 
-    private CompletableFuture<WriteResponse> updateMirrorAndSubmit(WriteRequest request) {
-        groupMirror.applyWriteRequest(request);
-        memberMirror.applyWriteRequest(request);
-        return request.submit();
-    }
-
     enum Operation {
         APPLY, REMOVE
     }