Fix inconsistent update type during concurrent P4Runtime writes

This is achieved by optimistically updating the P4Runtime mirror using
the write request (instead of waiting for a response) and by serializing
building write requests for the same device.

This change requires updating the P4Runtime protocol classes to expose
the content of the write request.

It also includes:
- force member weight to 1 when reading groups (some server
implementation still fails to be compliant to the spec)
- remove unused operation timeout handling in GDP (now all RPCz have a
timeout)

Change-Id: Ib4f99a6085c1283f46a2797e0c883d96954e02e9
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
index fca05cb..01af5eb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
@@ -18,7 +18,8 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.onlab.util.SharedExecutors;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Striped;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileGroupMirror;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileMemberMirror;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMirror;
@@ -43,7 +44,8 @@
 import org.onosproject.net.pi.service.PiTranslatedEntity;
 import org.onosproject.net.pi.service.PiTranslationException;
 import org.onosproject.p4runtime.api.P4RuntimeReadClient;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponse;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -52,6 +54,8 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 
 import static java.util.stream.Collectors.toMap;
@@ -70,6 +74,10 @@
     private static final String READ_ACTION_GROUPS_FROM_MIRROR = "actionGroupReadFromMirror";
     private static final boolean DEFAULT_READ_ACTION_GROUPS_FROM_MIRROR = false;
 
+    // Used to make sure concurrent calls to write groups are serialized so
+    // that each request gets consistent access to mirror state.
+    private static final Striped<Lock> WRITE_LOCKS = Striped.lock(30);
+
     protected GroupStore groupStore;
     private P4RuntimeActionProfileGroupMirror groupMirror;
     private P4RuntimeActionProfileMemberMirror memberMirror;
@@ -180,11 +188,23 @@
             log.warn("Cleaning up {} action profile groups and " +
                              "{} members on {}...",
                      groupHandlesToRemove.size(), memberHandlesToRemove.size(), deviceId);
-            SharedExecutors.getSingleThreadExecutor().execute(
-                    () -> submitWriteRequestAndUpdateMirror(
-                            client.write(pipeconf)
-                                    .delete(groupHandlesToRemove)
-                                    .delete(memberHandlesToRemove)));
+            final WriteRequest deleteRequest = client.write(pipeconf)
+                    .delete(groupHandlesToRemove)
+                    .delete(memberHandlesToRemove);
+            WRITE_LOCKS.get(deviceId).lock();
+            try {
+                updateMirrorAndSubmit(deleteRequest).whenComplete((r, ex) -> {
+                    if (ex != null) {
+                        log.error("Exception removing inconsistent group/members", ex);
+                    } else {
+                        log.debug("Completed removal of inconsistent " +
+                                          "groups/members ({} of {} updates succeeded)",
+                                  r.success(), r.all());
+                    }
+                });
+            } finally {
+                WRITE_LOCKS.get(deviceId).unlock();
+            }
         }
 
         // Done.
@@ -294,30 +314,34 @@
         if (members == null) {
             return false;
         }
-        final P4RuntimeWriteClient.WriteRequest request = client.write(pipeconf);
-        // FIXME: when operation is remove, should we remove members first? Same
-        //  thing when modifying a group, should we first modify the group then
-        //  remove the member?
-        final boolean allMembersSkipped = members.stream()
-                .allMatch(m -> appendEntityToWriteRequestOrSkip(
-                        request, m.handle(deviceId), m, memberMirror, operation));
-        final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
-                request, groupHandle, group, groupMirror, operation);
-        if (allMembersSkipped && groupSkipped) {
-            return true;
-        } else {
-            // True if all entities in the request (groups and members) where
-            // written successfully.
-            return submitWriteRequestAndUpdateMirror(request).isSuccess();
+        final WriteRequest request = client.write(pipeconf);
+        final CompletableFuture<WriteResponse> futureResponse;
+        WRITE_LOCKS.get(deviceId).lock();
+        try {
+            // FIXME: when operation is remove, should we remove members first? Same
+            //  thing when modifying a group, should we first modify the group then
+            //  remove the member?
+            final boolean allMembersSkipped = members.stream()
+                    .allMatch(m -> appendEntityToWriteRequestOrSkip(
+                            request, m.handle(deviceId), m, memberMirror, operation));
+            final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
+                    request, groupHandle, group, groupMirror, operation);
+            if (allMembersSkipped && groupSkipped) {
+                return true;
+            } else {
+                futureResponse = updateMirrorAndSubmit(request);
+            }
+        } finally {
+            WRITE_LOCKS.get(deviceId).unlock();
         }
+        // Wait response from device. Returns true if all entities in the
+        // request (groups and members) were written successfully.
+        return Futures.getUnchecked(futureResponse).isSuccess();
     }
 
     private <H extends PiHandle, E extends PiEntity> boolean appendEntityToWriteRequestOrSkip(
-            P4RuntimeWriteClient.WriteRequest writeRequest,
-            H handle,
-            E entityToApply,
-            P4RuntimeMirror<H, E> mirror,
-            Operation operation) {
+            WriteRequest writeRequest, H handle, E entityToApply,
+            P4RuntimeMirror<H, E> mirror, Operation operation) {
         // Should return true if there's no need to write entity on device,
         // false if the write request is modified or an error occurs.
         final TimedEntry<E> entityOnDevice = mirror.get(handle);
@@ -362,12 +386,10 @@
         return instances;
     }
 
-    private P4RuntimeWriteClient.WriteResponse submitWriteRequestAndUpdateMirror(
-            P4RuntimeWriteClient.WriteRequest request) {
-        final P4RuntimeWriteClient.WriteResponse response = request.submitSync();
-        groupMirror.replayWriteResponse(response);
-        memberMirror.replayWriteResponse(response);
-        return response;
+    private CompletableFuture<WriteResponse> updateMirrorAndSubmit(WriteRequest request) {
+        groupMirror.applyWriteRequest(request);
+        memberMirror.applyWriteRequest(request);
+        return request.submit();
     }
 
     enum Operation {