[ONOS-6935] ActionProfile supports in P4RuntimeClient
Change-Id: I9f0ac307985c03b7ed93e14e41ba468c481a4e4f
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index a9adf55..7f22e79 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -17,7 +17,10 @@
package org.onosproject.p4runtime.ctl;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import io.grpc.Context;
@@ -26,13 +29,17 @@
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.util.Tools;
import org.onosproject.net.DeviceId;
import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.runtime.PiActionProfileId;
import org.onosproject.net.pi.runtime.PiCounterCellData;
import org.onosproject.net.pi.runtime.PiCounterCellId;
import org.onosproject.net.pi.runtime.PiCounterId;
import org.onosproject.net.pi.runtime.PiDirectCounterCellId;
import org.onosproject.net.pi.runtime.PiIndirectCounterCellId;
+import org.onosproject.net.pi.runtime.PiActionGroup;
+import org.onosproject.net.pi.runtime.PiActionGroupMember;
import org.onosproject.net.pi.runtime.PiPacketOperation;
import org.onosproject.net.pi.runtime.PiPipeconfService;
import org.onosproject.net.pi.runtime.PiTableEntry;
@@ -41,6 +48,8 @@
import org.onosproject.p4runtime.api.P4RuntimeEvent;
import org.slf4j.Logger;
import p4.P4RuntimeGrpc;
+import p4.P4RuntimeOuterClass.ActionProfileGroup;
+import p4.P4RuntimeOuterClass.ActionProfileMember;
import p4.P4RuntimeOuterClass.Entity;
import p4.P4RuntimeOuterClass.ForwardingPipelineConfig;
import p4.P4RuntimeOuterClass.MasterArbitrationUpdate;
@@ -53,6 +62,7 @@
import p4.P4RuntimeOuterClass.TableEntry;
import p4.P4RuntimeOuterClass.Update;
import p4.P4RuntimeOuterClass.WriteRequest;
+import p4.config.P4InfoOuterClass;
import p4.config.P4InfoOuterClass.P4Info;
import p4.tmp.P4Config;
@@ -69,6 +79,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
@@ -78,6 +90,8 @@
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType;
import static org.slf4j.LoggerFactory.getLogger;
+import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_GROUP;
+import static p4.P4RuntimeOuterClass.Entity.EntityCase.ACTION_PROFILE_MEMBER;
import static p4.P4RuntimeOuterClass.Entity.EntityCase.TABLE_ENTRY;
import static p4.P4RuntimeOuterClass.PacketOut;
import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
@@ -106,7 +120,14 @@
private final Lock writeLock = new ReentrantLock();
private final StreamObserver<StreamMessageRequest> streamRequestObserver;
-
+ /**
+ * Default constructor.
+ *
+ * @param deviceId the ONOS device id
+ * @param p4DeviceId the P4 device id
+ * @param channel gRPC channel
+ * @param controller runtime client controller
+ */
P4RuntimeClientImpl(DeviceId deviceId, long p4DeviceId, ManagedChannel channel,
P4RuntimeControllerImpl controller) {
this.deviceId = deviceId;
@@ -216,6 +237,30 @@
"readAllCounterCells-" + cellIds.hashCode());
}
+ @Override
+ public CompletableFuture<Boolean> writeActionGroupMembers(PiActionGroup group,
+ Collection<PiActionGroupMember> members,
+ WriteOperationType opType,
+ PiPipeconf pipeconf) {
+ return supplyInContext(() -> doWriteActionGroupMembers(group, members, opType, pipeconf),
+ "writeActionGroupMembers-" + opType.name());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
+ WriteOperationType opType,
+ PiPipeconf pipeconf) {
+ return supplyInContext(() -> doWriteActionGroup(group, opType, pipeconf),
+ "writeActionGroup-" + opType.name());
+ }
+
+ @Override
+ public CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
+ PiPipeconf pipeconf) {
+ return supplyInContext(() -> doDumpGroups(actionProfileId, pipeconf),
+ "dumpGroups-" + actionProfileId.id());
+ }
+
/* Blocking method implementations below */
private boolean doInitStreamChannel() {
@@ -464,6 +509,191 @@
return CounterEntryCodec.decodeCounterEntities(entities, counterIdMap, pipeconf);
}
+ private boolean doWriteActionGroupMembers(PiActionGroup group, Collection<PiActionGroupMember> members,
+ WriteOperationType opType, PiPipeconf pipeconf) {
+ WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
+
+ Collection<ActionProfileMember> actionProfileMembers = Lists.newArrayList();
+ try {
+ for (PiActionGroupMember member : members) {
+ actionProfileMembers.add(
+ ActionProfileMemberEncoder.encode(group, member, pipeconf)
+ );
+ }
+ } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
+ log.warn("Can't encode group member {} due to {}", members, e.getMessage());
+ return false;
+ }
+
+ Collection<Update> updateMsgs = actionProfileMembers.stream()
+ .map(actionProfileMember ->
+ Update.newBuilder()
+ .setEntity(Entity.newBuilder()
+ .setActionProfileMember(actionProfileMember)
+ .build())
+ .setType(UPDATE_TYPES.get(opType))
+ .build())
+ .collect(Collectors.toList());
+
+ if (updateMsgs.size() == 0) {
+ // Nothing to update
+ return true;
+ }
+
+ writeRequestBuilder
+ .setDeviceId(p4DeviceId)
+ .addAllUpdates(updateMsgs);
+ try {
+ blockingStub.write(writeRequestBuilder.build());
+ return true;
+ } catch (StatusRuntimeException e) {
+ log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
+ return false;
+ }
+ }
+
+ private Collection<PiActionGroup> doDumpGroups(PiActionProfileId piActionProfileId, PiPipeconf pipeconf) {
+ log.debug("Dumping groups from action profile {} from {} (pipeconf {})...",
+ piActionProfileId.id(), deviceId, pipeconf.id());
+ P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
+ if (browser == null) {
+ log.warn("Unable to get a P4Info browser for pipeconf {}, skipping dump action profile {}",
+ pipeconf, piActionProfileId);
+ return Collections.emptySet();
+ }
+
+ int actionProfileId;
+ try {
+ P4InfoOuterClass.ActionProfile actionProfile =
+ browser.actionProfiles().getByName(piActionProfileId.id());
+ actionProfileId = actionProfile.getPreamble().getId();
+ } catch (P4InfoBrowser.NotFoundException e) {
+ log.warn("Can't find action profile {} from p4info", piActionProfileId);
+ return Collections.emptySet();
+ }
+
+ ActionProfileGroup actionProfileGroup =
+ ActionProfileGroup.newBuilder()
+ .setActionProfileId(actionProfileId)
+ .build();
+
+ ReadRequest requestMsg = ReadRequest.newBuilder()
+ .setDeviceId(p4DeviceId)
+ .addEntities(Entity.newBuilder()
+ .setActionProfileGroup(actionProfileGroup)
+ .build())
+ .build();
+
+ Iterator<ReadResponse> responses;
+ try {
+ responses = blockingStub.read(requestMsg);
+ } catch (StatusRuntimeException e) {
+ log.warn("Unable to read action profile {} due to {}", piActionProfileId, e.getMessage());
+ return Collections.emptySet();
+ }
+
+ List<ActionProfileGroup> actionProfileGroups =
+ Tools.stream(() -> responses)
+ .map(ReadResponse::getEntitiesList)
+ .flatMap(List::stream)
+ .filter(entity -> entity.getEntityCase() == ACTION_PROFILE_GROUP)
+ .map(Entity::getActionProfileGroup)
+ .collect(Collectors.toList());
+
+ log.debug("Retrieved {} groups from action profile {} on {}...",
+ actionProfileGroups.size(), piActionProfileId.id(), deviceId);
+
+ // group id -> members
+ Multimap<Integer, ActionProfileMember> actionProfileMemberMap = HashMultimap.create();
+ AtomicLong memberCount = new AtomicLong(0);
+ AtomicBoolean success = new AtomicBoolean(true);
+ actionProfileGroups.forEach(actProfGrp -> {
+ actProfGrp.getMembersList().forEach(member -> {
+ ActionProfileMember actProfMember =
+ ActionProfileMember.newBuilder()
+ .setActionProfileId(actProfGrp.getActionProfileId())
+ .setMemberId(member.getMemberId())
+ .build();
+ Entity entity = Entity.newBuilder()
+ .setActionProfileMember(actProfMember)
+ .build();
+
+ ReadRequest reqMsg = ReadRequest.newBuilder().setDeviceId(p4DeviceId)
+ .addEntities(entity)
+ .build();
+
+ Iterator<ReadResponse> resps;
+ try {
+ resps = blockingStub.read(reqMsg);
+ } catch (StatusRuntimeException e) {
+ log.warn("Unable to read member {} from action profile {} due to {}",
+ member, piActionProfileId, e.getMessage());
+ success.set(false);
+ return;
+ }
+ Tools.stream(() -> resps)
+ .map(ReadResponse::getEntitiesList)
+ .flatMap(List::stream)
+ .filter(e -> e.getEntityCase() == ACTION_PROFILE_MEMBER)
+ .map(Entity::getActionProfileMember)
+ .forEach(m -> {
+ actionProfileMemberMap.put(actProfGrp.getGroupId(), m);
+ memberCount.incrementAndGet();
+ });
+ });
+ });
+
+ if (!success.get()) {
+ // Can't read members
+ return Collections.emptySet();
+ }
+ log.info("Retrieved {} group members from action profile {} on {}...",
+ memberCount.get(), piActionProfileId.id(), deviceId);
+
+ Collection<PiActionGroup> piActionGroups = Sets.newHashSet();
+
+ for (ActionProfileGroup apg : actionProfileGroups) {
+ try {
+ Collection<ActionProfileMember> members = actionProfileMemberMap.get(apg.getGroupId());
+ PiActionGroup decodedGroup =
+ ActionProfileGroupEncoder.decode(apg, members, pipeconf);
+ piActionGroups.add(decodedGroup);
+ } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
+ log.warn("Can't decode group {} due to {}", apg, e.getMessage());
+ return Collections.emptySet();
+ }
+ }
+
+ return piActionGroups;
+ }
+
+ private boolean doWriteActionGroup(PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf) {
+ WriteRequest.Builder writeRequestBuilder = WriteRequest.newBuilder();
+ ActionProfileGroup actionProfileGroup;
+ try {
+ actionProfileGroup = ActionProfileGroupEncoder.encode(group, pipeconf);
+ } catch (EncodeException | P4InfoBrowser.NotFoundException e) {
+ log.warn("Can't encode group {} due to {}", e.getMessage());
+ return false;
+ }
+ Update updateMessage = Update.newBuilder()
+ .setEntity(Entity.newBuilder()
+ .setActionProfileGroup(actionProfileGroup)
+ .build())
+ .setType(UPDATE_TYPES.get(opType))
+ .build();
+ writeRequestBuilder
+ .setDeviceId(p4DeviceId)
+ .addUpdates(updateMessage);
+ try {
+ blockingStub.write(writeRequestBuilder.build());
+ return true;
+ } catch (StatusRuntimeException e) {
+ log.warn("Unable to write table entries ({}): {}", opType, e.getMessage());
+ return false;
+ }
+ }
+
/**
* Returns the internal P4 device ID associated with this client.
*