[ONOS-6935] ActionProfile supports in P4RuntimeClient

Change-Id: I9f0ac307985c03b7ed93e14e41ba468c481a4e4f
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index a9adf55..7f22e79 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -17,7 +17,10 @@
 package org.onosproject.p4runtime.ctl;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import io.grpc.Context;
@@ -26,13 +29,17 @@
 import io.grpc.StatusRuntimeException;
 import io.grpc.stub.StreamObserver;
 import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.util.Tools;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.runtime.PiActionProfileId;
 import org.onosproject.net.pi.runtime.PiCounterCellData;
 import org.onosproject.net.pi.runtime.PiCounterCellId;
 import org.onosproject.net.pi.runtime.PiCounterId;
 import org.onosproject.net.pi.runtime.PiDirectCounterCellId;
 import org.onosproject.net.pi.runtime.PiIndirectCounterCellId;
+import org.onosproject.net.pi.runtime.PiActionGroup;
+import org.onosproject.net.pi.runtime.PiActionGroupMember;
 import org.onosproject.net.pi.runtime.PiPacketOperation;
 import org.onosproject.net.pi.runtime.PiPipeconfService;
 import org.onosproject.net.pi.runtime.PiTableEntry;
@@ -41,6 +48,8 @@
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.slf4j.Logger;
 import p4.P4RuntimeGrpc;
+import p4.P4RuntimeOuterClass.ActionProfileGroup;
+import p4.P4RuntimeOuterClass.ActionProfileMember;
 import p4.P4RuntimeOuterClass.Entity;
 import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
 import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
@@ -53,6 +62,7 @@
 import p4.P4RuntimeOuterClass.TableEntry;
 import p4.P4RuntimeOuterClass.Update;
 import p4.P4RuntimeOuterClass.WriteRequest;
+import p4.config.P4InfoOuterClass;
 import p4.config.P4InfoOuterClass.P4Info;
 import p4.tmp.P4Config;
 
@@ -69,6 +79,8 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
@@ -78,6 +90,8 @@
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType;
 import static org.slf4j.LoggerFactory.getLogger;
+import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
+import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
 import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
 import static p4.P4RuntimeOuterClass.PacketOut;
 import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
@@ -106,7 +120,14 @@
     private final Lock writeLock = new ReentrantLock();
     private final StreamObserver<StreamMessageRequest> streamRequestObserver;
 
-
+    /**
+     * Default constructor.
+     *
+     * @param deviceId the ONOS device id
+     * @param p4DeviceId the P4 device id
+     * @param channel gRPC channel
+     * @param controller runtime client controller
+     */
     P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
                         P4RuntimeControllerImpl controller) {
         this.deviceId = deviceId;
@@ -216,6 +237,30 @@
                                "readAllCounterCells-" + cellIds.hashCode());
     }
 
+    @Override
+    public CompletableFuture<Boolean> writeActionGroupMembers(PiActionGroup group,
+                                                              Collection<PiActionGroupMember> members,
+                                                              WriteOperationType opType,
+                                                              PiPipeconf pipeconf) {
+        return supplyInContext(() -> doWriteActionGroupMembers(group, members, opType, pipeconf),
+                               "writeActionGroupMembers-" + opType.name());
+    }
+
+    @Override
+    public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
+                                                       WriteOperationType opType,
+                                                       PiPipeconf pipeconf) {
+        return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
+                               "writeActionGroup-" + opType.name());
+    }
+
+    @Override
+    public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
+                                                                   PiPipeconf pipeconf) {
+        return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
+                               "dumpGroups-" + actionProfileId.id());
+    }
+
     /* Blocking method implementations below */
 
     private boolean doInitStreamChannel() {
@@ -464,6 +509,191 @@
         return CounterEntryCodec.decodeCounterEntities(entities, counterIdMap, pipeconf);
     }
 
+    private boolean doWriteActionGroupMembers(PiActionGroup group, Collection<PiActionGroupMember> members,
+                                              WriteOperationType opType, PiPipeconf pipeconf) {
+        WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
+
+        Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
+        try {
+            for (PiActionGroupMember member : members) {
+                actionProfileMembers.add(
+                        ActionProfileMemberEncoder.encode(group, member, pipeconf)
+                );
+            }
+        } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
+            log.warn("Can't encode group member {} due to {}", members, e.getMessage());
+            return false;
+        }
+
+        Collection<Update> updateMsgs = actionProfileMembers.stream()
+                .map(actionProfileMember ->
+                             Update.newBuilder()
+                                     .setEntity(Entity.newBuilder()
+                                                        .setActionProfileMember(actionProfileMember)
+                                                        .build())
+                                     .setType(UPDATE_TYPES.get(opType))
+                                     .build())
+                .collect(Collectors.toList());
+
+        if (updateMsgs.size() == 0) {
+            // Nothing to update
+            return true;
+        }
+
+        writeRequestBuilder
+                .setDeviceId(p4DeviceId)
+                .addAllUpdates(updateMsgs);
+        try {
+            blockingStub.write(writeRequestBuilder.build());
+            return true;
+        } catch (StatusRuntimeException e) {
+            log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
+            return false;
+        }
+    }
+
+    private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
+        log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
+                  piActionProfileId.id(), deviceId, pipeconf.id());
+        P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
+        if (browser == null) {
+            log.warn("Unable to get a P4Info browser for pipeconf {}, skipping dump action profile {}",
+                     pipeconf, piActionProfileId);
+            return Collections.emptySet();
+        }
+
+        int actionProfileId;
+        try {
+            P4InfoOuterClass.ActionProfile actionProfile =
+                    browser.actionProfiles().getByName(piActionProfileId.id());
+            actionProfileId = actionProfile.getPreamble().getId();
+        } catch (P4InfoBrowser.NotFoundException e) {
+            log.warn("Can't find action profile {} from p4info", piActionProfileId);
+            return Collections.emptySet();
+        }
+
+        ActionProfileGroup actionProfileGroup =
+                ActionProfileGroup.newBuilder()
+                        .setActionProfileId(actionProfileId)
+                        .build();
+
+        ReadRequest requestMsg = ReadRequest.newBuilder()
+                .setDeviceId(p4DeviceId)
+                .addEntities(Entity.newBuilder()
+                                     .setActionProfileGroup(actionProfileGroup)
+                                     .build())
+                .build();
+
+        Iterator<ReadResponse> responses;
+        try {
+            responses = blockingStub.read(requestMsg);
+        } catch (StatusRuntimeException e) {
+            log.warn("Unable to read action profile {} due to {}", piActionProfileId, e.getMessage());
+            return Collections.emptySet();
+        }
+
+        List<ActionProfileGroup> actionProfileGroups =
+                Tools.stream(() -> responses)
+                        .map(ReadResponse::getEntitiesList)
+                        .flatMap(List::stream)
+                        .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
+                        .map(Entity::getActionProfileGroup)
+                        .collect(Collectors.toList());
+
+        log.debug("Retrieved {} groups from action profile {} on {}...",
+                  actionProfileGroups.size(), piActionProfileId.id(), deviceId);
+
+        // group id -> members
+        Multimap<Integer, ActionProfileMember> actionProfileMemberMap = HashMultimap.create();
+        AtomicLong memberCount = new AtomicLong(0);
+        AtomicBoolean success = new AtomicBoolean(true);
+        actionProfileGroups.forEach(actProfGrp -> {
+            actProfGrp.getMembersList().forEach(member -> {
+                ActionProfileMember actProfMember =
+                        ActionProfileMember.newBuilder()
+                                .setActionProfileId(actProfGrp.getActionProfileId())
+                                .setMemberId(member.getMemberId())
+                                .build();
+                Entity entity = Entity.newBuilder()
+                        .setActionProfileMember(actProfMember)
+                        .build();
+
+                ReadRequest reqMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
+                        .addEntities(entity)
+                        .build();
+
+                Iterator<ReadResponse> resps;
+                try {
+                    resps = blockingStub.read(reqMsg);
+                } catch (StatusRuntimeException e) {
+                    log.warn("Unable to read member {} from action profile {} due to {}",
+                             member, piActionProfileId, e.getMessage());
+                    success.set(false);
+                    return;
+                }
+                Tools.stream(() -> resps)
+                        .map(ReadResponse::getEntitiesList)
+                        .flatMap(List::stream)
+                        .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
+                        .map(Entity::getActionProfileMember)
+                        .forEach(m -> {
+                            actionProfileMemberMap.put(actProfGrp.getGroupId(), m);
+                            memberCount.incrementAndGet();
+                        });
+            });
+        });
+
+        if (!success.get()) {
+            // Can't read members
+            return Collections.emptySet();
+        }
+        log.info("Retrieved {} group members from action profile {} on {}...",
+                 memberCount.get(), piActionProfileId.id(), deviceId);
+
+        Collection<PiActionGroup> piActionGroups = Sets.newHashSet();
+
+        for (ActionProfileGroup apg : actionProfileGroups) {
+            try {
+                Collection<ActionProfileMember> members = actionProfileMemberMap.get(apg.getGroupId());
+                PiActionGroup decodedGroup =
+                        ActionProfileGroupEncoder.decode(apg, members, pipeconf);
+                piActionGroups.add(decodedGroup);
+            } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
+                log.warn("Can't decode group {} due to {}", apg, e.getMessage());
+                return Collections.emptySet();
+            }
+        }
+
+        return piActionGroups;
+    }
+
+    private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
+        WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
+        ActionProfileGroup actionProfileGroup;
+        try {
+            actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
+        } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
+            log.warn("Can't encode group {} due to {}", e.getMessage());
+            return false;
+        }
+        Update updateMessage = Update.newBuilder()
+                .setEntity(Entity.newBuilder()
+                                   .setActionProfileGroup(actionProfileGroup)
+                                   .build())
+                .setType(UPDATE_TYPES.get(opType))
+                .build();
+        writeRequestBuilder
+                .setDeviceId(p4DeviceId)
+                .addUpdates(updateMessage);
+        try {
+            blockingStub.write(writeRequestBuilder.build());
+            return true;
+        } catch (StatusRuntimeException e) {
+            log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
+            return false;
+        }
+    }
+
     /**
      * Returns the internal P4 device ID associated with this client.
      *