Fix missing P4Runtime groups because of infinite delete/insert cycle

The fix is simple: when cleaning up inconsistent entries from device,
we update the mirror when the response is received, and not before
sending the request. Otherwise, if delete goes wrong, writes happening
right after reconciliation cycle might find an inconsistent mirror state.

When writing entries (e.g. apply group/flow rule) we keep updating the
mirror before sending the request to handled the case of back-to-back
writes.

Change-Id: I9e1cc5cac3f8746c67e93e2cee17aff78d3f1d7e
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
index 01af5eb..9603241 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
@@ -18,7 +18,6 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Striped;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileGroupMirror;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileMemberMirror;
@@ -45,7 +44,6 @@
 import org.onosproject.net.pi.service.PiTranslationException;
 import org.onosproject.p4runtime.api.P4RuntimeReadClient;
 import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -54,7 +52,6 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 
@@ -163,8 +160,8 @@
         for (PiActionProfileGroup piGroup : groupsOnDevice) {
             final Group pdGroup = checkAndForgeGroupEntry(piGroup, membersOnDevice);
             if (pdGroup == null) {
-                // Entry is on device but unknown to translation service or
-                // device mirror. Inconsistent. Mark for removal.
+                // Entry is on device but is inconsistent with controller state.
+                // Mark for removal.
                 groupsToRemove.add(piGroup);
             } else {
                 result.add(pdGroup);
@@ -176,10 +173,9 @@
             }
         }
 
-        // Trigger clean up of inconsistent groups and members (if any). This
-        // process takes care of removing any orphan member, e.g. from a
+        // Trigger clean up of inconsistent groups and members (if any). Also
+        // take care of removing any orphan member, e.g. from a
         // partial/unsuccessful group insertion.
-        // This will update the mirror accordingly.
         final Set<PiActionProfileMemberHandle> memberHandlesToRemove = Sets.difference(
                 membersOnDevice.keySet(), memberHandlesToKeep);
         final Set<PiActionProfileGroupHandle> groupHandlesToRemove = groupsToRemove
@@ -188,23 +184,21 @@
             log.warn("Cleaning up {} action profile groups and " +
                              "{} members on {}...",
                      groupHandlesToRemove.size(), memberHandlesToRemove.size(), deviceId);
-            final WriteRequest deleteRequest = client.write(pipeconf)
+            client.write(pipeconf)
                     .delete(groupHandlesToRemove)
-                    .delete(memberHandlesToRemove);
-            WRITE_LOCKS.get(deviceId).lock();
-            try {
-                updateMirrorAndSubmit(deleteRequest).whenComplete((r, ex) -> {
-                    if (ex != null) {
-                        log.error("Exception removing inconsistent group/members", ex);
-                    } else {
-                        log.debug("Completed removal of inconsistent " +
-                                          "groups/members ({} of {} updates succeeded)",
-                                  r.success(), r.all());
-                    }
-                });
-            } finally {
-                WRITE_LOCKS.get(deviceId).unlock();
-            }
+                    .delete(memberHandlesToRemove)
+                    .submit().whenComplete((r, ex) -> {
+                if (ex != null) {
+                    log.error("Exception removing inconsistent group/members", ex);
+                } else {
+                    log.debug("Completed removal of inconsistent " +
+                                      "groups/members ({} of {} updates succeeded)",
+                              r.success().size(), r.all().size());
+                    groupMirror.applyWriteResponse(r);
+                    memberMirror.applyWriteResponse(r);
+                }
+            });
+
         }
 
         // Done.
@@ -214,8 +208,8 @@
     private Collection<Group> getGroupsFromMirror() {
         final Map<PiActionProfileMemberHandle, PiActionProfileMember> members =
                 memberMirror.getAll(deviceId).stream()
-                .map(TimedEntry::entry)
-                .collect(toMap(e -> e.handle(deviceId), e -> e));
+                        .map(TimedEntry::entry)
+                        .collect(toMap(e -> e.handle(deviceId), e -> e));
         return groupMirror.getAll(deviceId).stream()
                 .map(TimedEntry::entry)
                 .map(g -> checkAndForgeGroupEntry(
@@ -282,6 +276,7 @@
     }
 
     private void processPdGroup(Group pdGroup, GroupOperation.Type opType) {
+        // Translate.
         final PiActionProfileGroup piGroup;
         try {
             piGroup = groupTranslator.translate(pdGroup, pipeconf);
@@ -293,57 +288,69 @@
         final Operation operation = opType.equals(GroupOperation.Type.DELETE)
                 ? Operation.REMOVE : Operation.APPLY;
         final PiActionProfileGroupHandle handle = piGroup.handle(deviceId);
-        if (writePiGroupOnDevice(piGroup, handle, operation)) {
-            if (operation.equals(Operation.APPLY)) {
-                groupTranslator.learn(handle, new PiTranslatedEntity<>(
-                        pdGroup, piGroup, handle));
-            } else {
-                groupTranslator.forget(handle);
-            }
+        // Update translation store.
+        if (operation.equals(Operation.APPLY)) {
+            groupTranslator.learn(handle, new PiTranslatedEntity<>(
+                    pdGroup, piGroup, handle));
+        } else {
+            groupTranslator.forget(handle);
         }
+        // Submit write and forget about it.
+        asyncWritePiGroup(piGroup, handle, operation);
     }
 
-    private boolean writePiGroupOnDevice(
+    private void asyncWritePiGroup(
             PiActionProfileGroup group,
             PiActionProfileGroupHandle groupHandle,
             Operation operation) {
-        // Generate a write request to write both members and groups. Return
-        // true if request is successful or if there's no need to write on
-        // device (according to mirror state), otherwise, return false.
+        // Generate and submit  write request to write both members and groups.
         final Collection<PiActionProfileMember> members = extractAllMemberInstancesOrNull(group);
         if (members == null) {
-            return false;
+            return;
         }
         final WriteRequest request = client.write(pipeconf);
-        final CompletableFuture<WriteResponse> futureResponse;
         WRITE_LOCKS.get(deviceId).lock();
         try {
-            // FIXME: when operation is remove, should we remove members first? Same
-            //  thing when modifying a group, should we first modify the group then
-            //  remove the member?
-            final boolean allMembersSkipped = members.stream()
-                    .allMatch(m -> appendEntityToWriteRequestOrSkip(
-                            request, m.handle(deviceId), m, memberMirror, operation));
-            final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
-                    request, groupHandle, group, groupMirror, operation);
-            if (allMembersSkipped && groupSkipped) {
-                return true;
+            if (operation == Operation.APPLY) {
+                // First insert/update members, then group.
+                members.forEach(m -> appendEntityToWriteRequestOrSkip(
+                        request, m.handle(deviceId), m, memberMirror, operation));
+                appendEntityToWriteRequestOrSkip(
+                        request, groupHandle, group, groupMirror, operation);
             } else {
-                futureResponse = updateMirrorAndSubmit(request);
+                // First remove group, then members.
+                appendEntityToWriteRequestOrSkip(
+                        request, groupHandle, group, groupMirror, operation);
+                members.forEach(m -> appendEntityToWriteRequestOrSkip(
+                        request, m.handle(deviceId), m, memberMirror, operation));
             }
+            if (request.pendingUpdates().isEmpty()) {
+                // Nothing to do.
+                return;
+            }
+            // Optimistically update mirror before response arrives to make
+            // sure any write after this sees the expected mirror state. If
+            // anything goes wrong, mirror will be re-synced during
+            // reconciliation.
+            groupMirror.applyWriteRequest(request);
+            memberMirror.applyWriteRequest(request);
+            request.submit().whenComplete((r, ex) -> {
+                if (ex != null) {
+                    log.error("Exception writing PI group to " + deviceId, ex);
+                } else {
+                    log.debug("Completed write of PI group to {} " +
+                                      "({} of {} updates succeeded)",
+                              deviceId, r.success().size(), r.all().size());
+                }
+            });
         } finally {
             WRITE_LOCKS.get(deviceId).unlock();
         }
-        // Wait response from device. Returns true if all entities in the
-        // request (groups and members) were written successfully.
-        return Futures.getUnchecked(futureResponse).isSuccess();
     }
 
-    private <H extends PiHandle, E extends PiEntity> boolean appendEntityToWriteRequestOrSkip(
+    private <H extends PiHandle, E extends PiEntity> void appendEntityToWriteRequestOrSkip(
             WriteRequest writeRequest, H handle, E entityToApply,
             P4RuntimeMirror<H, E> mirror, Operation operation) {
-        // Should return true if there's no need to write entity on device,
-        // false if the write request is modified or an error occurs.
         final TimedEntry<E> entityOnDevice = mirror.get(handle);
         switch (operation) {
             case APPLY:
@@ -351,7 +358,7 @@
                     writeRequest.insert(entityToApply);
                 } else if (entityToApply.equals(entityOnDevice.entry())) {
                     // Skip writing if group is unchanged.
-                    return true;
+                    return;
                 } else {
                     writeRequest.modify(entityToApply);
                 }
@@ -359,7 +366,7 @@
             case REMOVE:
                 if (entityOnDevice == null) {
                     // Skip deleting if group does not exist on device.
-                    return true;
+                    return;
                 } else {
                     writeRequest.delete(handle);
                 }
@@ -368,7 +375,6 @@
                 log.error("Unrecognized operation {}", operation);
                 break;
         }
-        return false;
     }
 
     private Collection<PiActionProfileMember> extractAllMemberInstancesOrNull(
@@ -386,12 +392,6 @@
         return instances;
     }
 
-    private CompletableFuture<WriteResponse> updateMirrorAndSubmit(WriteRequest request) {
-        groupMirror.applyWriteRequest(request);
-        memberMirror.applyWriteRequest(request);
-        return request.submit();
-    }
-
     enum Operation {
         APPLY, REMOVE
     }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index e0e5c1d..a4ae0ef 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -173,23 +173,19 @@
             // Trigger clean up of inconsistent entries.
             log.warn("Found {} inconsistent table entries on {}, removing them...",
                      inconsistentEntries.size(), deviceId);
-            final WriteRequest request = client.write(pipeconf)
-                    .entities(inconsistentEntries, DELETE);
-            WRITE_LOCKS.get(deviceId).lock();
-            // Update mirror and async submit delete request.
-            try {
-                tableMirror.applyWriteRequest(request);
-                request.submit().whenComplete((response, ex) -> {
-                    if (ex != null) {
-                        log.error("Exception removing inconsistent table entries", ex);
-                    } else {
-                        log.debug("Successfully removed {} out of {} inconsistent entries",
-                                  response.success().size(), response.all().size());
-                    }
-                });
-            } finally {
-                WRITE_LOCKS.get(deviceId).unlock();
-            }
+            // Submit delete request and update mirror when done.
+            client.write(pipeconf)
+                    .entities(inconsistentEntries, DELETE)
+                    .submit().whenComplete((response, ex) -> {
+                if (ex != null) {
+                    log.error("Exception removing inconsistent table entries", ex);
+                } else {
+                    log.debug("Successfully removed {} out of {} inconsistent entries",
+                              response.success().size(), response.all().size());
+                }
+                tableMirror.applyWriteResponse(response);
+            });
+
         }
 
         return result.build();
@@ -285,6 +281,7 @@
         WRITE_LOCKS.get(deviceId).lock();
         try {
             for (FlowRule rule : rules) {
+                // Translate.
                 final PiTableEntry entry;
                 try {
                     entry = translator.translate(rule, pipeconf);
@@ -296,14 +293,23 @@
                 }
                 final PiTableEntryHandle handle = entry.handle(deviceId);
                 handleToRuleMap.put(handle, rule);
+                // Update translation store.
+                if (driverOperation.equals(APPLY)) {
+                    translator.learn(handle, new PiTranslatedEntity<>(
+                            rule, entry, handle));
+                } else {
+                    translator.forget(handle);
+                }
                 // Append entry to batched write request (returns false), or skip (true)
                 if (appendEntryToWriteRequestOrSkip(
                         request, handle, entry, driverOperation)) {
                     skippedRules.add(rule);
-                    updateTranslationStore(
-                            driverOperation, handle, rule, entry);
                 }
             }
+            if (request.pendingUpdates().isEmpty()) {
+                // All good. No need to write on device.
+                return rules;
+            }
             // Update mirror.
             tableMirror.applyWriteRequest(request);
             // Async submit request to server.
@@ -314,14 +320,14 @@
         // Wait for response.
         final WriteResponse response = Futures.getUnchecked(futureResponse);
         // Derive successfully applied flow rule from response.
-        final List<FlowRule> appliedRules = getAppliedFlowRulesAndUpdateTranslator(
+        final List<FlowRule> appliedRules = getAppliedFlowRules(
                 response, handleToRuleMap, driverOperation);
         // Return skipped and applied rules.
         return ImmutableList.<FlowRule>builder()
                 .addAll(skippedRules).addAll(appliedRules).build();
     }
 
-    private List<FlowRule> getAppliedFlowRulesAndUpdateTranslator(
+    private List<FlowRule> getAppliedFlowRules(
             WriteResponse response,
             Map<PiHandle, FlowRule> handleToFlowRuleMap,
             Operation driverOperation) {
@@ -329,24 +335,19 @@
         // server according to the given write response and operation.
         return response.success().stream()
                 .filter(r -> r.entityType().equals(PiEntityType.TABLE_ENTRY))
-                .map(r -> {
-                    final PiHandle handle = r.handle();
-                    final FlowRule rule = handleToFlowRuleMap.get(handle);
-                    if (rule == null) {
-                        log.error("Server returned unrecognized table entry " +
-                                          "handle in write response: {}", handle);
-                        return null;
-                    }
+                .filter(r -> {
                     // Filter intermediate responses (e.g. P4Runtime DELETE
                     // during FlowRule APPLY because we are performing
                     // delete-before-update)
-                    if (isUpdateTypeRelevant(r.updateType(), driverOperation)) {
-                        updateTranslationStore(
-                                driverOperation, (PiTableEntryHandle) handle,
-                                rule, (PiTableEntry) r.entity());
-                        return rule;
+                    return isUpdateTypeRelevant(r.updateType(), driverOperation);
+                })
+                .map(r -> {
+                    final FlowRule rule = handleToFlowRuleMap.get(r.handle());
+                    if (rule == null) {
+                        log.warn("Server returned unrecognized table entry " +
+                                         "handle in write response: {}", r.handle());
                     }
-                    return null;
+                    return rule;
                 })
                 .filter(Objects::nonNull)
                 .collect(Collectors.toList());
@@ -372,17 +373,6 @@
         return true;
     }
 
-    private void updateTranslationStore(
-            Operation operation, PiTableEntryHandle handle,
-            FlowRule rule, PiTableEntry entry) {
-        if (operation.equals(APPLY)) {
-            translator.learn(handle, new PiTranslatedEntity<>(
-                    rule, entry, handle));
-        } else {
-            translator.forget(handle);
-        }
-    }
-
     private boolean appendEntryToWriteRequestOrSkip(
             final WriteRequest writeRequest,
             final PiTableEntryHandle handle,
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
index 4daeaaa..33bae9d 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -28,7 +28,9 @@
 import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.StorageService;
@@ -233,9 +235,20 @@
     }
 
     @Override
+    public void applyWriteRequest(WriteRequest request) {
+        // Optimistically assume all requests will be successful.
+        applyUpdates(request.pendingUpdates());
+    }
+
+    @Override
+    public void applyWriteResponse(WriteResponse response) {
+        // Record only successful updates.
+        applyUpdates(response.success());
+    }
+
     @SuppressWarnings("unchecked")
-    public void applyWriteRequest(P4RuntimeWriteClient.WriteRequest request) {
-        request.pendingUpdates().stream()
+    private void applyUpdates(Collection<? extends EntityUpdateRequest> updates) {
+        updates.stream()
                 .filter(r -> r.entityType().equals(this.entityType))
                 .forEach(r -> {
                     switch (r.updateType()) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
index b836455..bd66fe7 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
@@ -107,4 +107,12 @@
      * @param request P4Runtime write request
      */
     void applyWriteRequest(P4RuntimeWriteClient.WriteRequest request);
+
+    /**
+     * Uses the given P4Runtime write response to update the state of this
+     * mirror.
+     *
+     * @param response P4Runtime write response
+     */
+    void applyWriteResponse(P4RuntimeWriteClient.WriteResponse response);
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/TimedEntry.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/TimedEntry.java
index 76b44a0..4b7170e 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/TimedEntry.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/TimedEntry.java
@@ -16,9 +16,12 @@
 
 package org.onosproject.drivers.p4runtime.mirror;
 
+import com.google.common.base.MoreObjects;
 import org.onosproject.net.pi.runtime.PiEntity;
 import org.onosproject.store.service.WallClockTimestamp;
 
+import java.util.Objects;
+
 public class TimedEntry<E extends PiEntity> {
 
     private final long timestamp;
@@ -41,4 +44,30 @@
         final long now = new WallClockTimestamp().unixTimestamp();
         return (now - timestamp) / 1000;
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(timestamp, entity);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        final TimedEntry other = (TimedEntry) obj;
+        return Objects.equals(this.timestamp, other.timestamp)
+                && Objects.equals(this.entity, other.entity);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("timestamp", timestamp)
+                .add("entity", entity)
+                .toString();
+    }
 }