Fix inconsistent update type during concurrent P4Runtime writes

This is achieved by optimistically updating the P4Runtime mirror using
the write request (instead of waiting for a response) and by serializing
building write requests for the same device.

This change requires updating the P4Runtime protocol classes to expose
the content of the write request.

It also includes:
- force member weight to 1 when reading groups (some server
implementation still fails to be compliant to the spec)
- remove unused operation timeout handling in GDP (now all RPCz have a
timeout)

Change-Id: Ib4f99a6085c1283f46a2797e0c883d96954e02e9
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
index 5b5d087..fb3a5fb 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteRequestImpl.java
@@ -27,9 +27,12 @@
 import org.slf4j.Logger;
 import p4.v1.P4RuntimeOuterClass;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 import static java.lang.String.format;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl.SHORT_TIMEOUT_SECONDS;
@@ -49,6 +52,7 @@
 
     private final P4RuntimeClientImpl client;
     private final PiPipeconf pipeconf;
+    private final AtomicBoolean submitted = new AtomicBoolean(false);
     // The P4Runtime WriteRequest protobuf message we need to populate.
     private final P4RuntimeOuterClass.WriteRequest.Builder requestMsg;
     // WriteResponse instance builder. We populate entity responses as we add new
@@ -147,7 +151,14 @@
     }
 
     @Override
+    public Collection<P4RuntimeWriteClient.EntityUpdateRequest> pendingUpdates() {
+        return responseBuilder.pendingUpdates();
+    }
+
+    @Override
     public CompletableFuture<P4RuntimeWriteClient.WriteResponse> submit() {
+        checkState(!submitted.getAndSet(true),
+                   "Request has already been submitted, cannot submit again");
         final P4RuntimeOuterClass.WriteRequest writeRequest = requestMsg
                 .setElectionId(client.lastUsedElectionId())
                 .build();
@@ -191,6 +202,8 @@
 
     private void appendToRequestMsg(P4RuntimeWriteClient.UpdateType updateType,
                                     PiEntity piEntity, PiHandle handle) {
+        checkState(!submitted.get(),
+                   "Request has already been submitted, cannot add more entities");
         final P4RuntimeOuterClass.Update.Type p4UpdateType;
         final P4RuntimeOuterClass.Entity entityMsg;
         try {
@@ -213,7 +226,7 @@
         } catch (CodecException e) {
             responseBuilder.addFailedResponse(
                     handle, piEntity, updateType, e.getMessage(),
-                    P4RuntimeWriteClient.WriteResponseStatus.CODEC_ERROR);
+                    P4RuntimeWriteClient.EntityUpdateStatus.CODEC_ERROR);
         }
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
index f24c717..35ee132 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/WriteResponseImpl.java
@@ -35,9 +35,10 @@
 import org.onosproject.net.pi.runtime.PiEntityType;
 import org.onosproject.net.pi.runtime.PiHandle;
 import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateRequest;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateResponse;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient.EntityUpdateStatus;
 import org.onosproject.p4runtime.api.P4RuntimeWriteClient.UpdateType;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteEntityResponse;
-import org.onosproject.p4runtime.api.P4RuntimeWriteClient.WriteResponseStatus;
 import org.slf4j.Logger;
 import p4.v1.P4RuntimeOuterClass;
 
@@ -46,7 +47,9 @@
 import java.util.List;
 import java.util.Map;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 import static java.lang.String.format;
 import static java.util.stream.Collectors.toList;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -68,12 +71,12 @@
 
     private static final Logger log = getLogger(WriteResponseImpl.class);
 
-    private final ImmutableList<WriteEntityResponse> entityResponses;
-    private final ImmutableListMultimap<WriteResponseStatus, WriteEntityResponse> statusMultimap;
+    private final ImmutableList<EntityUpdateResponse> entityResponses;
+    private final ImmutableListMultimap<EntityUpdateStatus, EntityUpdateResponse> statusMultimap;
 
     private WriteResponseImpl(
-            ImmutableList<WriteEntityResponse> allResponses,
-            ImmutableListMultimap<WriteResponseStatus, WriteEntityResponse> statusMultimap) {
+            ImmutableList<EntityUpdateResponse> allResponses,
+            ImmutableListMultimap<EntityUpdateStatus, EntityUpdateResponse> statusMultimap) {
         this.entityResponses = allResponses;
         this.statusMultimap = statusMultimap;
     }
@@ -84,25 +87,25 @@
     }
 
     @Override
-    public Collection<WriteEntityResponse> all() {
+    public Collection<EntityUpdateResponse> all() {
         return entityResponses;
     }
 
     @Override
-    public Collection<WriteEntityResponse> success() {
-        return statusMultimap.get(WriteResponseStatus.OK);
+    public Collection<EntityUpdateResponse> success() {
+        return statusMultimap.get(EntityUpdateStatus.OK);
     }
 
     @Override
-    public Collection<WriteEntityResponse> failed() {
+    public Collection<EntityUpdateResponse> failed() {
         return isSuccess()
                 ? Collections.emptyList()
                 : entityResponses.stream().filter(r -> !r.isSuccess()).collect(toList());
     }
 
     @Override
-    public Collection<WriteEntityResponse> status(
-            WriteResponseStatus status) {
+    public Collection<EntityUpdateResponse> status(
+            EntityUpdateStatus status) {
         checkNotNull(status);
         return statusMultimap.get(status);
     }
@@ -123,11 +126,11 @@
     static final class Builder {
 
         private final DeviceId deviceId;
-        private final Map<Integer, WriteEntityResponseImpl> pendingResponses =
+        private final Map<Integer, EntityUpdateResponseImpl> pendingResponses =
                 Maps.newHashMap();
-        private final List<WriteEntityResponse> allResponses =
+        private final List<EntityUpdateResponse> allResponses =
                 Lists.newArrayList();
-        private final ListMultimap<WriteResponseStatus, WriteEntityResponse> statusMap =
+        private final ListMultimap<EntityUpdateStatus, EntityUpdateResponse> statusMap =
                 ArrayListMultimap.create();
 
         private Builder(DeviceId deviceId) {
@@ -136,7 +139,7 @@
 
         void addPendingResponse(PiHandle handle, PiEntity entity, UpdateType updateType) {
             synchronized (this) {
-                final WriteEntityResponseImpl resp = new WriteEntityResponseImpl(
+                final EntityUpdateResponseImpl resp = new EntityUpdateResponseImpl(
                         handle, entity, updateType);
                 allResponses.add(resp);
                 pendingResponses.put(pendingResponses.size(), resp);
@@ -144,21 +147,28 @@
         }
 
         void addFailedResponse(PiHandle handle, PiEntity entity, UpdateType updateType,
-                               String explanation, WriteResponseStatus status) {
+                               String explanation, EntityUpdateStatus status) {
             synchronized (this) {
-                final WriteEntityResponseImpl resp = new WriteEntityResponseImpl(
+                final EntityUpdateResponseImpl resp = new EntityUpdateResponseImpl(
                         handle, entity, updateType)
                         .withFailure(explanation, status);
                 allResponses.add(resp);
             }
         }
 
+        Collection<EntityUpdateRequest> pendingUpdates() {
+            return ImmutableList.copyOf(pendingResponses.values());
+        }
+
         WriteResponseImpl buildAsIs() {
             synchronized (this) {
-                if (!pendingResponses.isEmpty()) {
+                final long pendingCount = pendingResponses.values().stream()
+                        .filter(r -> r.status() == EntityUpdateStatus.PENDING)
+                        .count();
+                if (pendingCount > 0) {
                     log.warn("Partial response from {}, {} of {} total " +
-                                     "entities are in status PENDING",
-                             deviceId, pendingResponses.size(), allResponses.size());
+                                     "updates are still in status PENDING",
+                             deviceId, pendingCount, allResponses.size());
                 }
                 return new WriteResponseImpl(
                         ImmutableList.copyOf(allResponses),
@@ -169,7 +179,6 @@
         WriteResponseImpl setSuccessAllAndBuild() {
             synchronized (this) {
                 pendingResponses.values().forEach(this::doSetSuccess);
-                pendingResponses.clear();
                 return buildAsIs();
             }
         }
@@ -177,7 +186,6 @@
         WriteResponseImpl setFailAllAndBuild(Throwable throwable) {
             synchronized (this) {
                 pendingResponses.values().forEach(r -> r.setFailure(throwable));
-                pendingResponses.clear();
                 return buildAsIs();
             }
         }
@@ -190,8 +198,8 @@
 
         private void setSuccess(int index) {
             synchronized (this) {
-                final WriteEntityResponseImpl resp = pendingResponses.remove(index);
-                if (resp != null) {
+                final EntityUpdateResponseImpl resp = pendingResponses.get(index);
+                if (resp != null && resp.status == EntityUpdateStatus.PENDING) {
                     doSetSuccess(resp);
                 } else {
                     log.error("Missing pending response at index {}", index);
@@ -199,16 +207,16 @@
             }
         }
 
-        private void doSetSuccess(WriteEntityResponseImpl resp) {
+        private void doSetSuccess(EntityUpdateResponseImpl resp) {
             resp.setSuccess();
-            statusMap.put(WriteResponseStatus.OK, resp);
+            statusMap.put(EntityUpdateStatus.OK, resp);
         }
 
         private void setFailure(int index,
                                 String explanation,
-                                WriteResponseStatus status) {
+                                EntityUpdateStatus status) {
             synchronized (this) {
-                final WriteEntityResponseImpl resp = pendingResponses.remove(index);
+                final EntityUpdateResponseImpl resp = pendingResponses.get(index);
                 if (resp != null) {
                     resp.withFailure(explanation, status);
                     statusMap.put(status, resp);
@@ -274,7 +282,7 @@
                         "P4Runtime Error message format not recognized [%s]",
                         TextFormat.shortDebugString(any));
                 if (reconcilable) {
-                    setFailure(index, unpackErr, WriteResponseStatus.OTHER_ERROR);
+                    setFailure(index, unpackErr, EntityUpdateStatus.OTHER_ERROR);
                 } else {
                     log.warn(unpackErr);
                 }
@@ -283,7 +291,7 @@
             // Map gRPC status codes to our WriteResponseStatus codes.
             final Status.Code p4Code = Status.fromCodeValue(
                     p4Error.getCanonicalCode()).getCode();
-            final WriteResponseStatus ourCode;
+            final EntityUpdateStatus ourCode;
             switch (p4Code) {
                 case OK:
                     if (reconcilable) {
@@ -291,17 +299,17 @@
                     }
                     return;
                 case NOT_FOUND:
-                    ourCode = WriteResponseStatus.NOT_FOUND;
+                    ourCode = EntityUpdateStatus.NOT_FOUND;
                     break;
                 case ALREADY_EXISTS:
-                    ourCode = WriteResponseStatus.ALREADY_EXIST;
+                    ourCode = EntityUpdateStatus.ALREADY_EXIST;
                     break;
                 default:
-                    ourCode = WriteResponseStatus.OTHER_ERROR;
+                    ourCode = EntityUpdateStatus.OTHER_ERROR;
                     break;
             }
             // Put the p4Code in the explanation only if ourCode is OTHER_ERROR.
-            final String explanationCode = ourCode == WriteResponseStatus.OTHER_ERROR
+            final String explanationCode = ourCode == EntityUpdateStatus.OTHER_ERROR
                     ? p4Code.name() + " " : "";
             final String details = p4Error.hasDetails()
                     ? ", " + p4Error.getDetails().toString() : "";
@@ -317,42 +325,50 @@
     }
 
     /**
-     * Internal implementation of WriteEntityResponse.
+     * Internal implementation of EntityUpdateResponse.
      */
-    private static final class WriteEntityResponseImpl implements WriteEntityResponse {
+    private static final class EntityUpdateResponseImpl implements EntityUpdateResponse {
 
         private final PiHandle handle;
         private final PiEntity entity;
         private final UpdateType updateType;
 
-        private WriteResponseStatus status = WriteResponseStatus.PENDING;
+        private EntityUpdateStatus status = EntityUpdateStatus.PENDING;
         private String explanation;
         private Throwable throwable;
 
-        private WriteEntityResponseImpl(PiHandle handle, PiEntity entity, UpdateType updateType) {
+        private EntityUpdateResponseImpl(PiHandle handle, PiEntity entity, UpdateType updateType) {
             this.handle = handle;
             this.entity = entity;
             this.updateType = updateType;
         }
 
-        private WriteEntityResponseImpl withFailure(
-                String explanation, WriteResponseStatus status) {
-            this.status = status;
+        private EntityUpdateResponseImpl withFailure(
+                String explanation, EntityUpdateStatus status) {
+            setStatus(status);
             this.explanation = explanation;
             this.throwable = null;
             return this;
         }
 
         private void setSuccess() {
-            this.status = WriteResponseStatus.OK;
+            setStatus(EntityUpdateStatus.OK);
         }
 
         private void setFailure(Throwable throwable) {
-            this.status = WriteResponseStatus.OTHER_ERROR;
+            setStatus(EntityUpdateStatus.OTHER_ERROR);
             this.explanation = throwable.toString();
             this.throwable = throwable;
         }
 
+        private void setStatus(EntityUpdateStatus newStatus) {
+            checkState(this.status == EntityUpdateStatus.PENDING,
+                       "Cannot set status for non-pending update");
+            checkArgument(newStatus  != EntityUpdateStatus.PENDING,
+                          "newStatus must be different than pending");
+            this.status = newStatus;
+        }
+
         @Override
         public PiHandle handle() {
             return handle;
@@ -375,11 +391,11 @@
 
         @Override
         public boolean isSuccess() {
-            return status().equals(WriteResponseStatus.OK);
+            return status().equals(EntityUpdateStatus.OK);
         }
 
         @Override
-        public WriteResponseStatus status() {
+        public EntityUpdateStatus status() {
             return status;
         }
 
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java
index 7946394..5110274 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/codec/ActionProfileGroupCodec.java
@@ -91,8 +91,18 @@
                                 .getPreamble().getName()))
                 .withId(PiActionProfileGroupId.of(msg.getGroupId()))
                 .withMaxSize(msg.getMaxSize());
-        msg.getMembersList().forEach(m -> piGroupBuilder.addMember(
-                PiActionProfileMemberId.of(m.getMemberId()), m.getWeight()));
+        msg.getMembersList().forEach(m -> {
+            final int weight;
+            if (m.getWeight() < 1) {
+                log.warn("Decoding group with invalid weight '{}', will set to 1",
+                         m.getWeight());
+                weight = 1;
+            } else {
+                weight = m.getWeight();
+            }
+            piGroupBuilder.addMember(
+                PiActionProfileMemberId.of(m.getMemberId()), weight);
+        });
         return piGroupBuilder.build();
     }
 }