ONOS-7898 Action profile group/member refactoring

Also includes:
- New abstract P4Runtime codec implementation. Currently used for action
profile members/groups encoding/deconding, the plan is to handle all
other codecs via this.
- Improved read requests in P4RuntimeClientImpl
- Removed handling of max group size in P4Runtime driver. Instead, added
modified group translator to specify a max group size by using
information from the pipeline model.

Change-Id: I684bae0184d683bb448ba19863c561f9848479d2
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
index 4a48897..d614d6e 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeActionGroupProgrammable.java
@@ -19,14 +19,12 @@
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Striped;
 import org.onlab.util.SharedExecutors;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileGroupMirror;
 import org.onosproject.drivers.p4runtime.mirror.P4RuntimeActionProfileMemberMirror;
 import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
-import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.group.DefaultGroup;
 import org.onosproject.net.group.DefaultGroupDescription;
@@ -36,10 +34,8 @@
 import org.onosproject.net.group.GroupOperations;
 import org.onosproject.net.group.GroupProgrammable;
 import org.onosproject.net.group.GroupStore;
-import org.onosproject.net.pi.model.PiActionId;
 import org.onosproject.net.pi.model.PiActionProfileId;
 import org.onosproject.net.pi.model.PiActionProfileModel;
-import org.onosproject.net.pi.runtime.PiAction;
 import org.onosproject.net.pi.runtime.PiActionProfileGroup;
 import org.onosproject.net.pi.runtime.PiActionProfileGroupHandle;
 import org.onosproject.net.pi.runtime.PiActionProfileMember;
@@ -59,8 +55,9 @@
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toMap;
 import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.DELETE;
 import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.INSERT;
 import static org.onosproject.p4runtime.api.P4RuntimeClient.WriteOperationType.MODIFY;
@@ -77,7 +74,6 @@
     // the ONOS store.
     private static final String READ_ACTION_GROUPS_FROM_MIRROR = "actionGroupReadFromMirror";
     private static final boolean DEFAULT_READ_ACTION_GROUPS_FROM_MIRROR = false;
-    private static final String MAX_MEM_SIZE = "maxMemSize";
 
     protected GroupStore groupStore;
     private P4RuntimeActionProfileGroupMirror groupMirror;
@@ -86,7 +82,6 @@
 
     // Needed to synchronize operations over the same group.
     private static final Striped<Lock> STRIPED_LOCKS = Striped.lock(30);
-    private static final int GROUP_MEMBERS_BUFFER_SIZE = 3;
 
     @Override
     protected boolean setupBehaviour() {
@@ -110,13 +105,20 @@
         groupOps.operations().stream()
                 .filter(op -> !op.groupType().equals(GroupDescription.Type.ALL))
                 .forEach(op -> {
-                    // ONOS-7785 We need app cookie (action profile id) from the group
+                    // ONOS-7785 We need the group app cookie (which includes
+                    // the action profile ID) but this is not part of the
+                    // GroupDescription.
                     Group groupOnStore = groupStore.getGroup(deviceId, op.groupId());
+                    if (groupOnStore == null) {
+                        log.warn("Unable to find group {} in store, aborting {} operation",
+                                 op.groupId(), op.opType());
+                        return;
+                    }
                     GroupDescription groupDesc = new DefaultGroupDescription(
                             deviceId, op.groupType(), op.buckets(), groupOnStore.appCookie(),
                             op.groupId().id(), groupOnStore.appId());
                     DefaultGroup groupToApply = new DefaultGroup(op.groupId(), groupDesc);
-                    processGroupOperation(groupToApply, op.opType());
+                    processPdGroup(groupToApply, op.opType());
                 });
     }
 
@@ -125,89 +127,66 @@
         if (!setupBehaviour()) {
             return Collections.emptyList();
         }
-        return getActionGroups();
-    }
-
-    private Collection<Group> getActionGroups() {
 
         if (driverBoolProperty(READ_ACTION_GROUPS_FROM_MIRROR,
                                DEFAULT_READ_ACTION_GROUPS_FROM_MIRROR)) {
-            return getActionGroupsFromMirror();
+            return getGroupsFromMirror();
         }
 
-        final Collection<PiActionProfileId> actionProfileIds = pipeconf.pipelineModel()
+        // Dump groups and members from device for all action profiles.
+        final Set<PiActionProfileId> actionProfileIds = pipeconf.pipelineModel()
                 .actionProfiles()
                 .stream()
                 .map(PiActionProfileModel::id)
-                .collect(Collectors.toList());
-        final List<PiActionProfileGroup> groupsOnDevice = actionProfileIds.stream()
-                .flatMap(this::streamGroupsFromDevice)
-                .collect(Collectors.toList());
-        final Set<PiActionProfileMemberHandle> membersOnDevice = actionProfileIds
-                .stream()
-                .flatMap(actProfId -> getMembersFromDevice(actProfId)
-                        .stream()
-                        .map(memberId -> PiActionProfileMemberHandle.of(
-                                deviceId, actProfId, memberId)))
                 .collect(Collectors.toSet());
-
-        if (groupsOnDevice.isEmpty()) {
-            return Collections.emptyList();
-        }
+        final Map<PiActionProfileGroupHandle, PiActionProfileGroup>
+                groupsOnDevice = dumpAllGroupsFromDevice(actionProfileIds);
+        final Map<PiActionProfileMemberHandle, PiActionProfileMember> membersOnDevice =
+                dumpAllMembersFromDevice(actionProfileIds);
 
         // Sync mirrors.
-        syncGroupMirror(groupsOnDevice);
-        syncMemberMirror(membersOnDevice);
+        groupMirror.sync(deviceId, groupsOnDevice);
+        memberMirror.sync(deviceId, membersOnDevice);
 
+        // Retrieve the original PD group before translation.
         final List<Group> result = Lists.newArrayList();
-        final List<PiActionProfileGroup> inconsistentGroups = Lists.newArrayList();
-        final List<PiActionProfileGroup> validGroups = Lists.newArrayList();
-
-        for (PiActionProfileGroup piGroup : groupsOnDevice) {
-            final Group pdGroup = forgeGroupEntry(piGroup);
+        final List<PiActionProfileGroup> groupsToRemove = Lists.newArrayList();
+        final Set<PiActionProfileMemberHandle> memberHandlesToKeep = Sets.newHashSet();
+        for (PiActionProfileGroup piGroup : groupsOnDevice.values()) {
+            final Group pdGroup = checkAndForgeGroupEntry(piGroup, membersOnDevice);
             if (pdGroup == null) {
                 // Entry is on device but unknown to translation service or
                 // device mirror. Inconsistent. Mark for removal.
-                inconsistentGroups.add(piGroup);
+                groupsToRemove.add(piGroup);
             } else {
-                validGroups.add(piGroup);
                 result.add(pdGroup);
+                // Keep track of member handles used in groups.
+                piGroup.members().stream()
+                        .map(m -> PiActionProfileMemberHandle.of(
+                                deviceId, piGroup.actionProfile(), m.id()))
+                        .forEach(memberHandlesToKeep::add);
             }
         }
 
-        // Trigger clean up of inconsistent groups and members. This will also
-        // remove all members that are not used by any group, and update the
-        // mirror accordingly.
-        final Set<PiActionProfileMemberHandle> membersToKeep = validGroups.stream()
-                .flatMap(g -> g.members().stream())
-                .map(m -> PiActionProfileMemberHandle.of(deviceId, m))
-                .collect(Collectors.toSet());
-        final Set<PiActionProfileMemberHandle> inconsistentMembers = Sets.difference(
-                membersOnDevice, membersToKeep);
+        // Trigger clean up of inconsistent groups and members. This will update
+        // the mirror accordingly.
+        final Set<PiActionProfileMemberHandle> memberHandlesToRemove = Sets.difference(
+                membersOnDevice.keySet(), memberHandlesToKeep);
         SharedExecutors.getSingleThreadExecutor().execute(
                 () -> cleanUpInconsistentGroupsAndMembers(
-                        inconsistentGroups, inconsistentMembers));
+                        groupsToRemove, memberHandlesToRemove));
 
+        // Done.
         return result;
     }
 
-    private void syncGroupMirror(Collection<PiActionProfileGroup> groups) {
-        Map<PiActionProfileGroupHandle, PiActionProfileGroup> handleMap = Maps.newHashMap();
-        groups.forEach(g -> handleMap.put(PiActionProfileGroupHandle.of(deviceId, g), g));
-        groupMirror.sync(deviceId, handleMap);
-    }
-
-    private void syncMemberMirror(Collection<PiActionProfileMemberHandle> memberHandles) {
-        Map<PiActionProfileMemberHandle, PiActionProfileMember> handleMap = Maps.newHashMap();
-        memberHandles.forEach(handle -> handleMap.put(
-                handle, dummyMember(handle.actionProfileId(), handle.memberId())));
-        memberMirror.sync(deviceId, handleMap);
-    }
-
-    private Collection<Group> getActionGroupsFromMirror() {
+    private Collection<Group> getGroupsFromMirror() {
+        final Map<PiActionProfileMemberHandle, PiActionProfileMember> members =
+                memberMirror.deviceHandleMap(deviceId);
         return groupMirror.getAll(deviceId).stream()
                 .map(TimedEntry::entry)
-                .map(this::forgeGroupEntry)
+                .map(g -> checkAndForgeGroupEntry(
+                        g, members))
                 .filter(Objects::nonNull)
                 .collect(Collectors.toList());
     }
@@ -219,7 +198,7 @@
                      groupsToRemove.size(), deviceId);
             groupsToRemove.forEach(piGroup -> {
                 log.debug(piGroup.toString());
-                processGroup(piGroup, null, Operation.REMOVE);
+                processPiGroup(piGroup, null, Operation.REMOVE);
             });
         }
         if (!membersToRemove.isEmpty()) {
@@ -244,44 +223,77 @@
         }
     }
 
-    private Stream<PiActionProfileGroup> streamGroupsFromDevice(PiActionProfileId actProfId) {
+    private Map<PiActionProfileGroupHandle, PiActionProfileGroup> dumpAllGroupsFromDevice(
+            Set<PiActionProfileId> actProfIds) {
         // TODO: implement P4Runtime client call to read all groups with one call
         // Good if pipeline has multiple action profiles.
-        final Collection<PiActionProfileGroup> groups = getFutureWithDeadline(
-                client.dumpActionProfileGroups(actProfId, pipeconf),
-                "dumping groups", Collections.emptyList());
-        return groups.stream();
+        return actProfIds.stream()
+                .flatMap(actProfId -> getFutureWithDeadline(
+                        client.dumpActionProfileGroups(actProfId, pipeconf),
+                        "dumping groups", Collections.emptyList()).stream())
+                .collect(toMap(g -> PiActionProfileGroupHandle.of(deviceId, g), g -> g));
     }
 
-    private List<PiActionProfileMemberId> getMembersFromDevice(PiActionProfileId actProfId) {
+    private Map<PiActionProfileMemberHandle, PiActionProfileMember> dumpAllMembersFromDevice(
+            Set<PiActionProfileId> actProfIds) {
         // TODO: implement P4Runtime client call to read all members with one call
         // Good if pipeline has multiple action profiles.
-        return getFutureWithDeadline(
-                client.dumpActionProfileMemberIds(actProfId, pipeconf),
-                "dumping action profile ids", Collections.emptyList());
+        return actProfIds.stream()
+                .flatMap(actProfId -> getFutureWithDeadline(
+                        client.dumpActionProfileMembers(actProfId, pipeconf),
+                        "dumping members", Collections.emptyList()).stream())
+                .collect(toMap(m -> PiActionProfileMemberHandle.of(deviceId, m), m -> m));
     }
 
-    private Group forgeGroupEntry(PiActionProfileGroup piGroup) {
-        final PiActionProfileGroupHandle handle = PiActionProfileGroupHandle.of(deviceId, piGroup);
+    private Group checkAndForgeGroupEntry(
+            PiActionProfileGroup piGroupOnDevice,
+            Map<PiActionProfileMemberHandle, PiActionProfileMember> membersOnDevice) {
+        final PiActionProfileGroupHandle handle = PiActionProfileGroupHandle.of(
+                deviceId, piGroupOnDevice);
         final Optional<PiTranslatedEntity<Group, PiActionProfileGroup>>
                 translatedEntity = groupTranslator.lookup(handle);
-        final TimedEntry<PiActionProfileGroup> timedEntry = groupMirror.get(handle);
-        // Is entry consistent with our state?
+        final TimedEntry<PiActionProfileGroup> mirrorEntry = groupMirror.get(handle);
+        // Check that entry obtained from device is consistent with what is known
+        // by the translation store.
         if (!translatedEntity.isPresent()) {
-            log.warn("Group handle not found in translation store: {}", handle);
+            log.warn("Group not found in translation store: {}", handle);
             return null;
         }
-        if (!translatedEntity.get().translated().equals(piGroup)) {
-            log.warn("Group obtained from device {} is different from the one in " +
-                             "translation store: device={}, store={}",
-                     deviceId, piGroup, translatedEntity.get().translated());
+        final PiActionProfileGroup piGroupFromStore = translatedEntity.get().translated();
+        if (!piGroupFromStore.equals(piGroupOnDevice)) {
+            log.warn("Group on device {} is different from the one in " +
+                             "translation store: {} [device={}, store={}]",
+                     deviceId, handle, piGroupOnDevice, piGroupFromStore);
             return null;
         }
-        if (timedEntry == null) {
+        // Groups in P4Runtime contains only a reference to members. Check that
+        // the actual member instances in the translation store are the same
+        // found on the device.
+        if (!validateMembers(piGroupFromStore, membersOnDevice)) {
+            log.warn("Group on device {} refers to members that are different " +
+                             "than those found in translation store: {}", handle);
+            return null;
+        }
+        if (mirrorEntry == null) {
             log.warn("Group handle not found in device mirror: {}", handle);
             return null;
         }
-        return addedGroup(translatedEntity.get().original(), timedEntry.lifeSec());
+        // Check that members from device are the same as in the translated group.
+        return addedGroup(translatedEntity.get().original(), mirrorEntry.lifeSec());
+    }
+
+    private boolean validateMembers(
+            PiActionProfileGroup piGroupFromStore,
+            Map<PiActionProfileMemberHandle, PiActionProfileMember> membersOnDevice) {
+        final Collection<PiActionProfileMember> groupMembers =
+                extractAllMemberInstancesOrNull(piGroupFromStore);
+        if (groupMembers == null) {
+            return false;
+        }
+        return groupMembers.stream().allMatch(
+                memberFromStore -> memberFromStore.equals(
+                        membersOnDevice.get(
+                                PiActionProfileMemberHandle.of(deviceId, memberFromStore))));
     }
 
     private Group addedGroup(Group original, long life) {
@@ -291,7 +303,7 @@
         return forgedGroup;
     }
 
-    private void processGroupOperation(Group pdGroup, GroupOperation.Type opType) {
+    private void processPdGroup(Group pdGroup, GroupOperation.Type opType) {
         final PiActionProfileGroup piGroup;
         try {
             piGroup = groupTranslator.translate(pdGroup, pipeconf);
@@ -302,12 +314,12 @@
         }
         final Operation operation = opType.equals(GroupOperation.Type.DELETE)
                 ? Operation.REMOVE : Operation.APPLY;
-        processGroup(piGroup, pdGroup, operation);
+        processPiGroup(piGroup, pdGroup, operation);
     }
 
-    private void processGroup(PiActionProfileGroup groupToApply,
-                              Group pdGroup,
-                              Operation operation) {
+    private void processPiGroup(PiActionProfileGroup groupToApply,
+                                Group pdGroup,
+                                Operation operation) {
         final PiActionProfileGroupHandle handle = PiActionProfileGroupHandle.of(deviceId, groupToApply);
         STRIPED_LOCKS.get(handle).lock();
         try {
@@ -334,55 +346,45 @@
 
     private boolean applyGroupWithMembersOrNothing(PiActionProfileGroup group, PiActionProfileGroupHandle handle) {
         // First apply members, then group, if fails, delete members.
-        if (!applyAllMembersOrNothing(group.members())) {
+        Collection<PiActionProfileMember> members = extractAllMemberInstancesOrNull(group);
+        if (members == null) {
+            return false;
+        }
+        if (!applyAllMembersOrNothing(members)) {
             return false;
         }
         if (!applyGroup(group, handle)) {
-            deleteMembers(group.members());
+            deleteMembers(handles(members));
             return false;
         }
         return true;
     }
 
-    private boolean applyGroup(PiActionProfileGroup group, PiActionProfileGroupHandle handle) {
-        final int currentMemberSize = group.members().size();
-        if (groupMirror.get(handle) != null) {
-            String maxMemSize = "";
-            if (groupMirror.annotations(handle) != null &&
-                    groupMirror.annotations(handle).value(MAX_MEM_SIZE) != null) {
-                maxMemSize = groupMirror.annotations(handle).value(MAX_MEM_SIZE);
-            }
-            if (maxMemSize.equals("") || currentMemberSize > Integer.parseInt(maxMemSize)) {
-                deleteGroup(group, handle);
-            }
+    private boolean applyGroup(PiActionProfileGroup groupToApply, PiActionProfileGroupHandle handle) {
+        final TimedEntry<PiActionProfileGroup> groupOnDevice = groupMirror.get(handle);
+        final P4RuntimeClient.WriteOperationType opType =
+                groupOnDevice == null ? INSERT : MODIFY;
+        if (opType.equals(MODIFY) && groupToApply.equals(groupOnDevice.entry())) {
+            // Skip writing, group is unchanged.
+            return true;
         }
-
-        P4RuntimeClient.WriteOperationType opType =
-                groupMirror.get(handle) == null ? INSERT : MODIFY;
-        int currentMaxMemberSize = opType == INSERT ? (currentMemberSize + GROUP_MEMBERS_BUFFER_SIZE) : 0;
-
         final boolean success = getFutureWithDeadline(
-                client.writeActionProfileGroup(group, opType, pipeconf, currentMaxMemberSize),
+                client.writeActionProfileGroup(groupToApply, opType, pipeconf),
                 "performing action profile group " + opType, false);
         if (success) {
-            groupMirror.put(handle, group);
-            if (opType == INSERT) {
-                groupMirror.putAnnotations(handle, DefaultAnnotations
-                        .builder()
-                        .set(MAX_MEM_SIZE, Integer.toString(currentMaxMemberSize))
-                        .build());
-            }
+            groupMirror.put(handle, groupToApply);
         }
         return success;
     }
 
     private boolean deleteGroup(PiActionProfileGroup group, PiActionProfileGroupHandle handle) {
         final boolean success = getFutureWithDeadline(
-                client.writeActionProfileGroup(group, DELETE, pipeconf, 0),
+                client.writeActionProfileGroup(group, DELETE, pipeconf),
                 "performing action profile group " + DELETE, false);
         if (success) {
             groupMirror.remove(handle);
         }
+        // Orphan members will be removed at the next reconciliation cycle.
         return success;
     }
 
@@ -391,7 +393,7 @@
         if (appliedMembers.size() == members.size()) {
             return true;
         } else {
-            deleteMembers(appliedMembers);
+            deleteMembers(handles(appliedMembers));
             return false;
         }
     }
@@ -403,52 +405,67 @@
                 .collect(Collectors.toList());
     }
 
-    private boolean applyMember(PiActionProfileMember member) {
-        // If exists, modify, otherwise insert
+    private boolean applyMember(PiActionProfileMember memberToApply) {
+        // If exists, modify, otherwise insert.
         final PiActionProfileMemberHandle handle = PiActionProfileMemberHandle.of(
-                deviceId, member);
+                deviceId, memberToApply);
+        final TimedEntry<PiActionProfileMember> memberOnDevice = memberMirror.get(handle);
         final P4RuntimeClient.WriteOperationType opType =
-                memberMirror.get(handle) == null ? INSERT : MODIFY;
+                memberOnDevice == null ? INSERT : MODIFY;
+        if (opType.equals(MODIFY) && memberToApply.equals(memberOnDevice.entry())) {
+            // Skip writing if member is unchanged.
+            return true;
+        }
         final boolean success = getFutureWithDeadline(
-                client.writeActionProfileMembers(Collections.singletonList(member),
-                                                 opType, pipeconf),
+                client.writeActionProfileMembers(
+                        singletonList(memberToApply), opType, pipeconf),
                 "performing action profile member " + opType, false);
         if (success) {
-            memberMirror.put(handle, dummyMember(member.actionProfile(), member.id()));
+            memberMirror.put(handle, memberToApply);
         }
         return success;
     }
 
-    private void deleteMembers(Collection<PiActionProfileMember> members) {
-        members.forEach(this::deleteMember);
+    private void deleteMembers(Collection<PiActionProfileMemberHandle> handles) {
+        // TODO: improve by batching deletes.
+        handles.forEach(this::deleteMember);
     }
 
-    private void deleteMember(PiActionProfileMember member) {
-        final PiActionProfileMemberHandle handle = PiActionProfileMemberHandle.of(
-                deviceId, member);
+    private void deleteMember(PiActionProfileMemberHandle handle) {
         final boolean success = getFutureWithDeadline(
-                client.writeActionProfileMembers(Collections.singletonList(member),
-                                                 DELETE, pipeconf),
-                "performing action profile member " + DELETE, false);
+                client.removeActionProfileMembers(
+                        handle.actionProfileId(),
+                        singletonList(handle.memberId()), pipeconf),
+                "performing action profile member " + DELETE,
+                Collections.emptyList())
+                // Successful if the only member passed has been removed.
+                .size() == 1;
         if (success) {
             memberMirror.remove(handle);
         }
     }
 
-    // FIXME: this is nasty, we have to rely on a dummy member of the mirror
-    // because the PiActionProfileMember abstraction is broken, since it includes
-    // attributes that are not part of a P4Runtime member, e.g. weight.
-    // We should remove weight from the class, and have client methods that
-    // return the full PiActionProfileMember, not just the IDs.
-    private PiActionProfileMember dummyMember(
-            PiActionProfileId actionProfileId, PiActionProfileMemberId memberId) {
-        return PiActionProfileMember.builder()
-                .forActionProfile(actionProfileId)
-                .withId(memberId)
-                .withAction(PiAction.builder()
-                                    .withId(PiActionId.of("dummy"))
-                                    .build())
-                .build();
+    private Collection<PiActionProfileMemberHandle> handles(
+            Collection<PiActionProfileMember> members) {
+        return members.stream()
+                .map(m -> PiActionProfileMemberHandle.of(
+                        deviceId, m.actionProfile(), m.id()))
+                .collect(Collectors.toList());
+    }
+
+    private Collection<PiActionProfileMember> extractAllMemberInstancesOrNull(
+            PiActionProfileGroup group) {
+        final Collection<PiActionProfileMember> instances = group.members().stream()
+                .map(PiActionProfileGroup.WeightedMember::instance)
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+        if (instances.size() != group.members().size()) {
+            log.error("PiActionProfileGroup has {} member references, " +
+                              "but only {} instances were found",
+                      group.members().size(), instances.size());
+            return null;
+        }
+        return instances;
     }
 
     enum Operation {