New P4RuntimeClient implementation that supports batching and error reporting

The new client API supports batching and provides detailed response for
write requests (e.g. if entity already exists when inserting), which was
not possible with the old one.

This patch includes:
- New more efficient implementation of P4RuntimeClient (no more locking,
use native gRPC executor, use stub deadlines)
- Ported all codecs to new AbstractCodec-based implementation (needed to
implement codec cache in the future)
- Uses batching in P4RuntimeFlowRuleProgrammable and
P4RuntimeGroupActionProgrammable
- Minor changes to PI framework runtime classes

Change-Id: I3fac42057bb4e1389d761006a32600c786598683
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
index d614d6e..fca05cb 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
@@ -16,14 +16,12 @@
 
 package org.onosproject.drivers.p4runtime;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Striped;
 import org.onlab.util.SharedExecutors;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileGroupMirror;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileMemberMirror;
+import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMirror;
 import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.group.DefaultGroup;
@@ -34,17 +32,18 @@
 import org.onosproject.net.group.GroupOperations;
 import org.onosproject.net.group.GroupProgrammable;
 import org.onosproject.net.group.GroupStore;
-import org.onosproject.net.pi.model.PiActionProfileId;
 import org.onosproject.net.pi.model.PiActionProfileModel;
 import org.onosproject.net.pi.runtime.PiActionProfileGroup;
 import org.onosproject.net.pi.runtime.PiActionProfileGroupHandle;
 import org.onosproject.net.pi.runtime.PiActionProfileMember;
 import org.onosproject.net.pi.runtime.PiActionProfileMemberHandle;
-import org.onosproject.net.pi.runtime.PiActionProfileMemberId;
+import org.onosproject.net.pi.runtime.PiEntity;
+import org.onosproject.net.pi.runtime.PiHandle;
 import org.onosproject.net.pi.service.PiGroupTranslator;
 import org.onosproject.net.pi.service.PiTranslatedEntity;
 import org.onosproject.net.pi.service.PiTranslationException;
-import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeReadClient;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -53,14 +52,10 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.singletonList;
 import static java.util.stream.Collectors.toMap;
-import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.DELETE;
-import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.INSERT;
-import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.MODIFY;
+import static java.util.stream.Collectors.toSet;
 
 /**
  * Implementation of GroupProgrammable to handle action profile groups in
@@ -80,9 +75,6 @@
     private P4RuntimeActionProfileMemberMirror memberMirror;
     private PiGroupTranslator groupTranslator;
 
-    // Needed to synchronize operations over the same group.
-    private static final Striped<Lock> STRIPED_LOCKS = Striped.lock(30);
-
     @Override
     protected boolean setupBehaviour() {
         if (!super.setupBehaviour()) {
@@ -134,25 +126,33 @@
         }
 
         // Dump groups and members from device for all action profiles.
-        final Set<PiActionProfileId> actionProfileIds = pipeconf.pipelineModel()
-                .actionProfiles()
-                .stream()
-                .map(PiActionProfileModel::id)
-                .collect(Collectors.toSet());
-        final Map<PiActionProfileGroupHandle, PiActionProfileGroup>
-                groupsOnDevice = dumpAllGroupsFromDevice(actionProfileIds);
+        final P4RuntimeReadClient.ReadRequest request = client.read(pipeconf);
+        pipeconf.pipelineModel().actionProfiles()
+                .stream().map(PiActionProfileModel::id)
+                .forEach(id -> request.actionProfileGroups(id)
+                        .actionProfileMembers(id));
+        final P4RuntimeReadClient.ReadResponse response = request.submitSync();
+
+        if (!response.isSuccess()) {
+            // Error at client level.
+            return Collections.emptyList();
+        }
+
+        final Collection<PiActionProfileGroup> groupsOnDevice = response.all(
+                PiActionProfileGroup.class);
         final Map<PiActionProfileMemberHandle, PiActionProfileMember> membersOnDevice =
-                dumpAllMembersFromDevice(actionProfileIds);
+                response.all(PiActionProfileMember.class).stream()
+                        .collect(toMap(m -> m.handle(deviceId), m -> m));
 
         // Sync mirrors.
         groupMirror.sync(deviceId, groupsOnDevice);
-        memberMirror.sync(deviceId, membersOnDevice);
+        memberMirror.sync(deviceId, membersOnDevice.values());
 
         // Retrieve the original PD group before translation.
         final List<Group> result = Lists.newArrayList();
         final List<PiActionProfileGroup> groupsToRemove = Lists.newArrayList();
         final Set<PiActionProfileMemberHandle> memberHandlesToKeep = Sets.newHashSet();
-        for (PiActionProfileGroup piGroup : groupsOnDevice.values()) {
+        for (PiActionProfileGroup piGroup : groupsOnDevice) {
             final Group pdGroup = checkAndForgeGroupEntry(piGroup, membersOnDevice);
             if (pdGroup == null) {
                 // Entry is on device but unknown to translation service or
@@ -168,13 +168,24 @@
             }
         }
 
-        // Trigger clean up of inconsistent groups and members. This will update
-        // the mirror accordingly.
+        // Trigger clean up of inconsistent groups and members (if any). This
+        // process takes care of removing any orphan member, e.g. from a
+        // partial/unsuccessful group insertion.
+        // This will update the mirror accordingly.
         final Set<PiActionProfileMemberHandle> memberHandlesToRemove = Sets.difference(
                 membersOnDevice.keySet(), memberHandlesToKeep);
-        SharedExecutors.getSingleThreadExecutor().execute(
-                () -> cleanUpInconsistentGroupsAndMembers(
-                        groupsToRemove, memberHandlesToRemove));
+        final Set<PiActionProfileGroupHandle> groupHandlesToRemove = groupsToRemove
+                .stream().map(g -> g.handle(deviceId)).collect(toSet());
+        if (groupHandlesToRemove.size() + memberHandlesToRemove.size() > 0) {
+            log.warn("Cleaning up {} action profile groups and " +
+                             "{} members on {}...",
+                     groupHandlesToRemove.size(), memberHandlesToRemove.size(), deviceId);
+            SharedExecutors.getSingleThreadExecutor().execute(
+                    () -> submitWriteRequestAndUpdateMirror(
+                            client.write(pipeconf)
+                                    .delete(groupHandlesToRemove)
+                                    .delete(memberHandlesToRemove)));
+        }
 
         // Done.
         return result;
@@ -182,7 +193,9 @@
 
     private Collection<Group> getGroupsFromMirror() {
         final Map<PiActionProfileMemberHandle, PiActionProfileMember> members =
-                memberMirror.deviceHandleMap(deviceId);
+                memberMirror.getAll(deviceId).stream()
+                .map(TimedEntry::entry)
+                .collect(toMap(e -> e.handle(deviceId), e -> e));
         return groupMirror.getAll(deviceId).stream()
                 .map(TimedEntry::entry)
                 .map(g -> checkAndForgeGroupEntry(
@@ -191,60 +204,6 @@
                 .collect(Collectors.toList());
     }
 
-    private void cleanUpInconsistentGroupsAndMembers(Collection<PiActionProfileGroup> groupsToRemove,
-                                                     Collection<PiActionProfileMemberHandle> membersToRemove) {
-        if (!groupsToRemove.isEmpty()) {
-            log.warn("Found {} inconsistent action profile groups on {}, removing them...",
-                     groupsToRemove.size(), deviceId);
-            groupsToRemove.forEach(piGroup -> {
-                log.debug(piGroup.toString());
-                processPiGroup(piGroup, null, Operation.REMOVE);
-            });
-        }
-        if (!membersToRemove.isEmpty()) {
-            log.warn("Found {} inconsistent action profile members on {}, removing them...",
-                     membersToRemove.size(), deviceId);
-            // FIXME: implement client call to remove members from multiple
-            // action profiles in one shot.
-            final ListMultimap<PiActionProfileId, PiActionProfileMemberId>
-                    membersByActProfId = ArrayListMultimap.create();
-            membersToRemove.forEach(m -> membersByActProfId.put(
-                    m.actionProfileId(), m.memberId()));
-            membersByActProfId.keySet().forEach(actProfId -> {
-                List<PiActionProfileMemberId> removedMembers = getFutureWithDeadline(
-                        client.removeActionProfileMembers(
-                                actProfId, membersByActProfId.get(actProfId), pipeconf),
-                        "cleaning up action profile members", Collections.emptyList());
-                // Update member mirror.
-                removedMembers.stream()
-                        .map(id -> PiActionProfileMemberHandle.of(deviceId, actProfId, id))
-                        .forEach(memberMirror::remove);
-            });
-        }
-    }
-
-    private Map<PiActionProfileGroupHandle, PiActionProfileGroup> dumpAllGroupsFromDevice(
-            Set<PiActionProfileId> actProfIds) {
-        // TODO: implement P4Runtime client call to read all groups with one call
-        // Good if pipeline has multiple action profiles.
-        return actProfIds.stream()
-                .flatMap(actProfId -> getFutureWithDeadline(
-                        client.dumpActionProfileGroups(actProfId, pipeconf),
-                        "dumping groups", Collections.emptyList()).stream())
-                .collect(toMap(g -> PiActionProfileGroupHandle.of(deviceId, g), g -> g));
-    }
-
-    private Map<PiActionProfileMemberHandle, PiActionProfileMember> dumpAllMembersFromDevice(
-            Set<PiActionProfileId> actProfIds) {
-        // TODO: implement P4Runtime client call to read all members with one call
-        // Good if pipeline has multiple action profiles.
-        return actProfIds.stream()
-                .flatMap(actProfId -> getFutureWithDeadline(
-                        client.dumpActionProfileMembers(actProfId, pipeconf),
-                        "dumping members", Collections.emptyList()).stream())
-                .collect(toMap(m -> PiActionProfileMemberHandle.of(deviceId, m), m -> m));
-    }
-
     private Group checkAndForgeGroupEntry(
             PiActionProfileGroup piGroupOnDevice,
             Map<PiActionProfileMemberHandle, PiActionProfileMember> membersOnDevice) {
@@ -269,7 +228,7 @@
         // Groups in P4Runtime contains only a reference to members. Check that
         // the actual member instances in the translation store are the same
         // found on the device.
-        if (!validateMembers(piGroupFromStore, membersOnDevice)) {
+        if (!validateGroupMembers(piGroupFromStore, membersOnDevice)) {
             log.warn("Group on device {} refers to members that are different " +
                              "than those found in translation store: {}", handle);
             return null;
@@ -282,7 +241,7 @@
         return addedGroup(translatedEntity.get().original(), mirrorEntry.lifeSec());
     }
 
-    private boolean validateMembers(
+    private boolean validateGroupMembers(
             PiActionProfileGroup piGroupFromStore,
             Map<PiActionProfileMemberHandle, PiActionProfileMember> membersOnDevice) {
         final Collection<PiActionProfileMember> groupMembers =
@@ -291,9 +250,8 @@
             return false;
         }
         return groupMembers.stream().allMatch(
-                memberFromStore -> memberFromStore.equals(
-                        membersOnDevice.get(
-                                PiActionProfileMemberHandle.of(deviceId, memberFromStore))));
+                memberFromStore -> memberFromStore.equals(membersOnDevice.get(
+                        memberFromStore.handle(deviceId))));
     }
 
     private Group addedGroup(Group original, long life) {
@@ -314,143 +272,79 @@
         }
         final Operation operation = opType.equals(GroupOperation.Type.DELETE)
                 ? Operation.REMOVE : Operation.APPLY;
-        processPiGroup(piGroup, pdGroup, operation);
-    }
-
-    private void processPiGroup(PiActionProfileGroup groupToApply,
-                                Group pdGroup,
-                                Operation operation) {
-        final PiActionProfileGroupHandle handle = PiActionProfileGroupHandle.of(deviceId, groupToApply);
-        STRIPED_LOCKS.get(handle).lock();
-        try {
-            switch (operation) {
-                case APPLY:
-                    if (applyGroupWithMembersOrNothing(groupToApply, handle)) {
-                        groupTranslator.learn(handle, new PiTranslatedEntity<>(
-                                pdGroup, groupToApply, handle));
-                    }
-                    return;
-                case REMOVE:
-                    if (deleteGroup(groupToApply, handle)) {
-                        groupTranslator.forget(handle);
-                    }
-                    return;
-                default:
-                    log.error("Unknwon group operation type {}, cannot process group", operation);
-                    break;
+        final PiActionProfileGroupHandle handle = piGroup.handle(deviceId);
+        if (writePiGroupOnDevice(piGroup, handle, operation)) {
+            if (operation.equals(Operation.APPLY)) {
+                groupTranslator.learn(handle, new PiTranslatedEntity<>(
+                        pdGroup, piGroup, handle));
+            } else {
+                groupTranslator.forget(handle);
             }
-        } finally {
-            STRIPED_LOCKS.get(handle).unlock();
         }
     }
 
-    private boolean applyGroupWithMembersOrNothing(PiActionProfileGroup group, PiActionProfileGroupHandle handle) {
-        // First apply members, then group, if fails, delete members.
-        Collection<PiActionProfileMember> members = extractAllMemberInstancesOrNull(group);
+    private boolean writePiGroupOnDevice(
+            PiActionProfileGroup group,
+            PiActionProfileGroupHandle groupHandle,
+            Operation operation) {
+        // Generate a write request to write both members and groups. Return
+        // true if request is successful or if there's no need to write on
+        // device (according to mirror state), otherwise, return false.
+        final Collection<PiActionProfileMember> members = extractAllMemberInstancesOrNull(group);
         if (members == null) {
             return false;
         }
-        if (!applyAllMembersOrNothing(members)) {
-            return false;
-        }
-        if (!applyGroup(group, handle)) {
-            deleteMembers(handles(members));
-            return false;
-        }
-        return true;
-    }
-
-    private boolean applyGroup(PiActionProfileGroup groupToApply, PiActionProfileGroupHandle handle) {
-        final TimedEntry<PiActionProfileGroup> groupOnDevice = groupMirror.get(handle);
-        final P4RuntimeClient.WriteOperationType opType =
-                groupOnDevice == null ? INSERT : MODIFY;
-        if (opType.equals(MODIFY) && groupToApply.equals(groupOnDevice.entry())) {
-            // Skip writing, group is unchanged.
-            return true;
-        }
-        final boolean success = getFutureWithDeadline(
-                client.writeActionProfileGroup(groupToApply, opType, pipeconf),
-                "performing action profile group " + opType, false);
-        if (success) {
-            groupMirror.put(handle, groupToApply);
-        }
-        return success;
-    }
-
-    private boolean deleteGroup(PiActionProfileGroup group, PiActionProfileGroupHandle handle) {
-        final boolean success = getFutureWithDeadline(
-                client.writeActionProfileGroup(group, DELETE, pipeconf),
-                "performing action profile group " + DELETE, false);
-        if (success) {
-            groupMirror.remove(handle);
-        }
-        // Orphan members will be removed at the next reconciliation cycle.
-        return success;
-    }
-
-    private boolean applyAllMembersOrNothing(Collection<PiActionProfileMember> members) {
-        Collection<PiActionProfileMember> appliedMembers = applyMembers(members);
-        if (appliedMembers.size() == members.size()) {
+        final P4RuntimeWriteClient.WriteRequest request = client.write(pipeconf);
+        // FIXME: when operation is remove, should we remove members first? Same
+        //  thing when modifying a group, should we first modify the group then
+        //  remove the member?
+        final boolean allMembersSkipped = members.stream()
+                .allMatch(m -> appendEntityToWriteRequestOrSkip(
+                        request, m.handle(deviceId), m, memberMirror, operation));
+        final boolean groupSkipped = appendEntityToWriteRequestOrSkip(
+                request, groupHandle, group, groupMirror, operation);
+        if (allMembersSkipped && groupSkipped) {
             return true;
         } else {
-            deleteMembers(handles(appliedMembers));
-            return false;
+            // True if all entities in the request (groups and members) where
+            // written successfully.
+            return submitWriteRequestAndUpdateMirror(request).isSuccess();
         }
     }
 
-    private Collection<PiActionProfileMember> applyMembers(
-            Collection<PiActionProfileMember> members) {
-        return members.stream()
-                .filter(this::applyMember)
-                .collect(Collectors.toList());
-    }
-
-    private boolean applyMember(PiActionProfileMember memberToApply) {
-        // If exists, modify, otherwise insert.
-        final PiActionProfileMemberHandle handle = PiActionProfileMemberHandle.of(
-                deviceId, memberToApply);
-        final TimedEntry<PiActionProfileMember> memberOnDevice = memberMirror.get(handle);
-        final P4RuntimeClient.WriteOperationType opType =
-                memberOnDevice == null ? INSERT : MODIFY;
-        if (opType.equals(MODIFY) && memberToApply.equals(memberOnDevice.entry())) {
-            // Skip writing if member is unchanged.
-            return true;
+    private <H extends PiHandle, E extends PiEntity> boolean appendEntityToWriteRequestOrSkip(
+            P4RuntimeWriteClient.WriteRequest writeRequest,
+            H handle,
+            E entityToApply,
+            P4RuntimeMirror<H, E> mirror,
+            Operation operation) {
+        // Should return true if there's no need to write entity on device,
+        // false if the write request is modified or an error occurs.
+        final TimedEntry<E> entityOnDevice = mirror.get(handle);
+        switch (operation) {
+            case APPLY:
+                if (entityOnDevice == null) {
+                    writeRequest.insert(entityToApply);
+                } else if (entityToApply.equals(entityOnDevice.entry())) {
+                    // Skip writing if group is unchanged.
+                    return true;
+                } else {
+                    writeRequest.modify(entityToApply);
+                }
+                break;
+            case REMOVE:
+                if (entityOnDevice == null) {
+                    // Skip deleting if group does not exist on device.
+                    return true;
+                } else {
+                    writeRequest.delete(handle);
+                }
+                break;
+            default:
+                log.error("Unrecognized operation {}", operation);
+                break;
         }
-        final boolean success = getFutureWithDeadline(
-                client.writeActionProfileMembers(
-                        singletonList(memberToApply), opType, pipeconf),
-                "performing action profile member " + opType, false);
-        if (success) {
-            memberMirror.put(handle, memberToApply);
-        }
-        return success;
-    }
-
-    private void deleteMembers(Collection<PiActionProfileMemberHandle> handles) {
-        // TODO: improve by batching deletes.
-        handles.forEach(this::deleteMember);
-    }
-
-    private void deleteMember(PiActionProfileMemberHandle handle) {
-        final boolean success = getFutureWithDeadline(
-                client.removeActionProfileMembers(
-                        handle.actionProfileId(),
-                        singletonList(handle.memberId()), pipeconf),
-                "performing action profile member " + DELETE,
-                Collections.emptyList())
-                // Successful if the only member passed has been removed.
-                .size() == 1;
-        if (success) {
-            memberMirror.remove(handle);
-        }
-    }
-
-    private Collection<PiActionProfileMemberHandle> handles(
-            Collection<PiActionProfileMember> members) {
-        return members.stream()
-                .map(m -> PiActionProfileMemberHandle.of(
-                        deviceId, m.actionProfile(), m.id()))
-                .collect(Collectors.toList());
+        return false;
     }
 
     private Collection<PiActionProfileMember> extractAllMemberInstancesOrNull(
@@ -468,6 +362,14 @@
         return instances;
     }
 
+    private P4RuntimeWriteClient.WriteResponse submitWriteRequestAndUpdateMirror(
+            P4RuntimeWriteClient.WriteRequest request) {
+        final P4RuntimeWriteClient.WriteResponse response = request.submitSync();
+        groupMirror.replayWriteResponse(response);
+        memberMirror.replayWriteResponse(response);
+        return response;
+    }
+
     enum Operation {
         APPLY, REMOVE
     }