Fix inconsistent update type during concurrent P4Runtime writes

This is achieved by optimistically updating the P4Runtime mirror using
the write request (instead of waiting for a response) and by serializing
building write requests for the same device.

This change requires updating the P4Runtime protocol classes to expose
the content of the write request.

It also includes:
- force member weight to 1 when reading groups (some server
implementation still fails to be compliant to the spec)
- remove unused operation timeout handling in GDP (now all RPCz have a
timeout)

Change-Id: Ib4f99a6085c1283f46a2797e0c883d96954e02e9
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
index fca05cb..01af5eb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
@@ -18,7 +18,8 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.onlab.util.SharedExecutors;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Striped;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileGroupMirror;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileMemberMirror;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMirror;
@@ -43,7 +44,8 @@
 import org.onosproject.net.pi.service.PiTranslatedEntity;
 import org.onosproject.net.pi.service.PiTranslationException;
 import org.onosproject.p4runtime.api.P4RuntimeReadClient;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -52,6 +54,8 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 
 import static java.util.stream.Collectors.toMap;
@@ -70,6 +74,10 @@
     private static final String READ_ACTION_GROUPS_FROM_MIRROR = "actionGroupReadFromMirror";
     private static final boolean DEFAULT_READ_ACTION_GROUPS_FROM_MIRROR = false;
 
+    // Used to make sure concurrent calls to write groups are serialized so
+    // that each request gets consistent access to mirror state.
+    private static final Striped<Lock> WRITE_LOCKS = Striped.lock(30);
+
     protected GroupStore groupStore;
     private P4RuntimeActionProfileGroupMirror groupMirror;
     private P4RuntimeActionProfileMemberMirror memberMirror;
@@ -180,11 +188,23 @@
             log.warn("Cleaning up {} action profile groups and " +
                              "{} members on {}...",
                      groupHandlesToRemove.size(), memberHandlesToRemove.size(), deviceId);
-            SharedExecutors.getSingleThreadExecutor().execute(
-                    () -> submitWriteRequestAndUpdateMirror(
-                            client.write(pipeconf)
-                                    .delete(groupHandlesToRemove)
-                                    .delete(memberHandlesToRemove)));
+            final WriteRequest deleteRequest = client.write(pipeconf)
+                    .delete(groupHandlesToRemove)
+                    .delete(memberHandlesToRemove);
+            WRITE_LOCKS.get(deviceId).lock();
+            try {
+                updateMirrorAndSubmit(deleteRequest).whenComplete((r, ex) -> {
+                    if (ex != null) {
+                        log.error("Exception removing inconsistent group/members", ex);
+                    } else {
+                        log.debug("Completed removal of inconsistent " +
+                                          "groups/members ({} of {} updates succeeded)",
+                                  r.success(), r.all());
+                    }
+                });
+            } finally {
+                WRITE_LOCKS.get(deviceId).unlock();
+            }
         }
 
         // Done.
@@ -294,30 +314,34 @@
         if (members == null) {
             return false;
         }
-        final P4RuntimeWriteClient.WriteRequest request = client.write(pipeconf);
-        // FIXME: when operation is remove, should we remove members first? Same
-        //  thing when modifying a group, should we first modify the group then
-        //  remove the member?
-        final boolean allMembersSkipped = members.stream()
-                .allMatch(m -> appendEntityToWriteRequestOrSkip(
-                        request, m.handle(deviceId), m, memberMirror, operation));
-        final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
-                request, groupHandle, group, groupMirror, operation);
-        if (allMembersSkipped && groupSkipped) {
-            return true;
-        } else {
-            // True if all entities in the request (groups and members) where
-            // written successfully.
-            return submitWriteRequestAndUpdateMirror(request).isSuccess();
+        final WriteRequest request = client.write(pipeconf);
+        final CompletableFuture<WriteResponse> futureResponse;
+        WRITE_LOCKS.get(deviceId).lock();
+        try {
+            // FIXME: when operation is remove, should we remove members first? Same
+            //  thing when modifying a group, should we first modify the group then
+            //  remove the member?
+            final boolean allMembersSkipped = members.stream()
+                    .allMatch(m -> appendEntityToWriteRequestOrSkip(
+                            request, m.handle(deviceId), m, memberMirror, operation));
+            final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
+                    request, groupHandle, group, groupMirror, operation);
+            if (allMembersSkipped && groupSkipped) {
+                return true;
+            } else {
+                futureResponse = updateMirrorAndSubmit(request);
+            }
+        } finally {
+            WRITE_LOCKS.get(deviceId).unlock();
         }
+        // Wait response from device. Returns true if all entities in the
+        // request (groups and members) were written successfully.
+        return Futures.getUnchecked(futureResponse).isSuccess();
     }
 
     private <H extends PiHandle, E extends PiEntity> boolean appendEntityToWriteRequestOrSkip(
-            P4RuntimeWriteClient.WriteRequest writeRequest,
-            H handle,
-            E entityToApply,
-            P4RuntimeMirror<H, E> mirror,
-            Operation operation) {
+            WriteRequest writeRequest, H handle, E entityToApply,
+            P4RuntimeMirror<H, E> mirror, Operation operation) {
         // Should return true if there's no need to write entity on device,
         // false if the write request is modified or an error occurs.
         final TimedEntry<E> entityOnDevice = mirror.get(handle);
@@ -362,12 +386,10 @@
         return instances;
     }
 
-    private P4RuntimeWriteClient.WriteResponse submitWriteRequestAndUpdateMirror(
-            P4RuntimeWriteClient.WriteRequest request) {
-        final P4RuntimeWriteClient.WriteResponse response = request.submitSync();
-        groupMirror.replayWriteResponse(response);
-        memberMirror.replayWriteResponse(response);
-        return response;
+    private CompletableFuture<WriteResponse> updateMirrorAndSubmit(WriteRequest request) {
+        groupMirror.applyWriteRequest(request);
+        memberMirror.applyWriteRequest(request);
+        return request.submit();
     }
 
     enum Operation {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
index f905c4c..8a1f99a 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeFlowRuleProgrammable.java
@@ -19,7 +19,8 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.onlab.util.SharedExecutors;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Striped;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeTableMirror;
 import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
 import org.onosproject.net.flow.DefaultFlowEntry;
@@ -42,8 +43,9 @@
 import org.onosproject.net.pi.service.PiTranslatedEntity;
 import org.onosproject.net.pi.service.PiTranslationException;
 import org.onosproject.p4runtime.api.P4RuntimeReadClient;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
 import org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -52,6 +54,8 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 
 import static org.onosproject.drivers.p4runtime.P4RuntimeFlowRuleProgrammable.Operation.APPLY;
@@ -97,6 +101,10 @@
     private static final String TABLE_DEFAULT_AS_ENTRY = "tableDefaultAsEntry";
     private static final boolean DEFAULT_TABLE_DEFAULT_AS_ENTRY = false;
 
+    // Used to make sure concurrent calls to write flow rules are serialized so
+    // that each request gets consistent access to mirror state.
+    private static final Striped<Lock> WRITE_LOCKS = Striped.lock(30);
+
     private PiPipelineModel pipelineModel;
     private P4RuntimeTableMirror tableMirror;
     private PiFlowRuleTranslator translator;
@@ -165,8 +173,25 @@
 
         if (!inconsistentEntries.isEmpty()) {
             // Trigger clean up of inconsistent entries.
-            SharedExecutors.getSingleThreadExecutor().execute(
-                    () -> cleanUpInconsistentEntries(inconsistentEntries));
+            log.warn("Found {} inconsistent table entries on {}, removing them...",
+                     inconsistentEntries.size(), deviceId);
+            final WriteRequest request = client.write(pipeconf)
+                    .entities(inconsistentEntries, DELETE);
+            WRITE_LOCKS.get(deviceId).lock();
+            // Update mirror and async submit delete request.
+            try {
+                tableMirror.applyWriteRequest(request);
+                request.submit().whenComplete((response, ex) -> {
+                    if (ex != null) {
+                        log.error("Exception removing inconsistent table entries", ex);
+                    } else {
+                        log.debug("Successfully removed {} out of {} inconsistent entries",
+                                  response.success().size(), response.all().size());
+                    }
+                });
+            } finally {
+                WRITE_LOCKS.get(deviceId).unlock();
+            }
         }
 
         return result.build();
@@ -246,48 +271,48 @@
                 .collect(Collectors.toList());
     }
 
-    private void cleanUpInconsistentEntries(Collection<PiTableEntry> piEntries) {
-        log.warn("Found {} inconsistent table entries on {}, removing them...",
-                 piEntries.size(), deviceId);
-        // Remove entries and update mirror.
-        tableMirror.replayWriteResponse(
-                client.write(pipeconf).entities(piEntries, DELETE).submitSync());
-    }
-
     private Collection<FlowRule> processFlowRules(Collection<FlowRule> rules,
                                                   Operation driverOperation) {
         if (!setupBehaviour() || rules.isEmpty()) {
             return Collections.emptyList();
         }
         // Created batched write request.
-        final P4RuntimeWriteClient.WriteRequest request = client.write(pipeconf);
+        final WriteRequest request = client.write(pipeconf);
         // For each rule, translate to PI and append to write request.
         final Map<PiHandle, FlowRule> handleToRuleMap = Maps.newHashMap();
         final List<FlowRule> skippedRules = Lists.newArrayList();
-        for (FlowRule rule : rules) {
-            final PiTableEntry entry;
-            try {
-                entry = translator.translate(rule, pipeconf);
-            } catch (PiTranslationException e) {
-                log.warn("Unable to translate flow rule for pipeconf '{}': {} [{}]",
-                         pipeconf.id(), e.getMessage(), rule);
-                // Next rule.
-                continue;
+        final CompletableFuture<WriteResponse> futureResponse;
+        WRITE_LOCKS.get(deviceId).lock();
+        try {
+            for (FlowRule rule : rules) {
+                final PiTableEntry entry;
+                try {
+                    entry = translator.translate(rule, pipeconf);
+                } catch (PiTranslationException e) {
+                    log.warn("Unable to translate flow rule for pipeconf '{}': {} [{}]",
+                             pipeconf.id(), e.getMessage(), rule);
+                    // Next rule.
+                    continue;
+                }
+                final PiTableEntryHandle handle = entry.handle(deviceId);
+                handleToRuleMap.put(handle, rule);
+                // Append entry to batched write request (returns false), or skip (true)
+                if (appendEntryToWriteRequestOrSkip(
+                        request, handle, entry, driverOperation)) {
+                    skippedRules.add(rule);
+                    updateTranslationStore(
+                            driverOperation, handle, rule, entry);
+                }
             }
-            final PiTableEntryHandle handle = entry.handle(deviceId);
-            handleToRuleMap.put(handle, rule);
-            // Append entry to batched write request (returns false), or skip (true)
-            if (appendEntryToWriteRequestOrSkip(
-                    request, handle, entry, driverOperation)) {
-                skippedRules.add(rule);
-                updateTranslationStore(
-                        driverOperation, handle, rule, entry);
-            }
+            // Update mirror.
+            tableMirror.applyWriteRequest(request);
+            // Async submit request to server.
+            futureResponse = request.submit();
+        } finally {
+            WRITE_LOCKS.get(deviceId).unlock();
         }
-        // Submit request to server.
-        final P4RuntimeWriteClient.WriteResponse response = request.submitSync();
-        // Update mirror.
-        tableMirror.replayWriteResponse(response);
+        // Wait for response.
+        final WriteResponse response = Futures.getUnchecked(futureResponse);
         // Derive successfully applied flow rule from response.
         final List<FlowRule> appliedRules = getAppliedFlowRulesAndUpdateTranslator(
                 response, handleToRuleMap, driverOperation);
@@ -297,7 +322,7 @@
     }
 
     private List<FlowRule> getAppliedFlowRulesAndUpdateTranslator(
-            P4RuntimeWriteClient.WriteResponse response,
+            WriteResponse response,
             Map<PiHandle, FlowRule> handleToFlowRuleMap,
             Operation driverOperation) {
         // Returns a list of flow rules that were successfully written on the
@@ -359,7 +384,7 @@
     }
 
     private boolean appendEntryToWriteRequestOrSkip(
-            final P4RuntimeWriteClient.WriteRequest writeRequest,
+            final WriteRequest writeRequest,
             final PiTableEntryHandle handle,
             PiTableEntry piEntryToApply,
             final Operation driverOperation) {
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
index fb9451c..4daeaaa 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -234,9 +234,9 @@
 
     @Override
     @SuppressWarnings("unchecked")
-    public void replayWriteResponse(P4RuntimeWriteClient.WriteResponse response) {
-        response.success().stream()
-                .filter(r -> r.entityType().equals(this.entityType) && r.isSuccess())
+    public void applyWriteRequest(P4RuntimeWriteClient.WriteRequest request) {
+        request.pendingUpdates().stream()
+                .filter(r -> r.entityType().equals(this.entityType))
                 .forEach(r -> {
                     switch (r.updateType()) {
                         case INSERT:
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
index bee8c51..b836455 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/P4RuntimeMirror.java
@@ -94,16 +94,17 @@
     /**
      * Synchronizes the state of the given device ID with the given collection
      * of PI entities.
+     *
      * @param deviceId device ID
      * @param entities collection of PI entities
      */
     void sync(DeviceId deviceId, Collection<E> entities);
 
     /**
-     * Uses the given P4Runtime write response to update the state of this
-     * mirror.
+     * Uses the given P4Runtime write request to update the state of this
+     * mirror by optimistically assuming that all updates in it will succeed.
      *
-     * @param response P4Runtime write response
+     * @param request P4Runtime write request
      */
-    void replayWriteResponse(P4RuntimeWriteClient.WriteResponse response);
+    void applyWriteRequest(P4RuntimeWriteClient.WriteRequest request);
 }