Catch exceptions when processing messages on P4Runtime stream channel
Also, packet I/O test in P4RuntimeTest.
Change-Id: Ib11d7356eef43cd962cf47f8a6fba8fc23ed69be
diff --git a/apps/p4runtime-test/src/test/java/org/onosproject/p4runtime/test/P4RuntimeTest.java b/apps/p4runtime-test/src/test/java/org/onosproject/p4runtime/test/P4RuntimeTest.java
index ab0a57e..e7447d9 100644
--- a/apps/p4runtime-test/src/test/java/org/onosproject/p4runtime/test/P4RuntimeTest.java
+++ b/apps/p4runtime-test/src/test/java/org/onosproject/p4runtime/test/P4RuntimeTest.java
@@ -43,7 +43,6 @@
import java.util.concurrent.ExecutionException;
import static org.onlab.util.ImmutableByteSequence.copyFrom;
-import static org.onlab.util.ImmutableByteSequence.fit;
import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.BMV2_JSON;
import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.P4_INFO_TEXT;
import static org.onosproject.net.pi.runtime.PiPacketOperation.Type.PACKET_OUT;
@@ -56,7 +55,7 @@
public class P4RuntimeTest {
private static final String GRPC_SERVER_ADDR = "192.168.56.102";
- private static final int GRPC_SERVER_PORT = 55044;
+ private static final int GRPC_SERVER_PORT = 55001;
private final URL p4InfoUrl = this.getClass().getResource("/bmv2/default.p4info");
private final URL jsonUrl = this.getClass().getResource("/bmv2/default.json");
@@ -112,10 +111,10 @@
.setGroupId(1)
.setType(SELECT)
.addMembers(P4RuntimeOuterClass.ActionProfileGroup.Member.newBuilder()
- .setMemberId(1)
+ .setMemberId(0)
.setWeight(1)
.build())
- .setMaxSize(3)
+ .setMaxSize(1)
.build();
P4RuntimeOuterClass.WriteRequest writeRequest = P4RuntimeOuterClass.WriteRequest.newBuilder()
@@ -123,13 +122,13 @@
.addUpdates(P4RuntimeOuterClass.Update.newBuilder()
.setType(INSERT)
.setEntity(P4RuntimeOuterClass.Entity.newBuilder()
- .setActionProfileGroup(groupMsg)
+ .setActionProfileMember(profileMemberMsg)
.build())
.build())
.addUpdates(P4RuntimeOuterClass.Update.newBuilder()
.setType(INSERT)
.setEntity(P4RuntimeOuterClass.Entity.newBuilder()
- .setActionProfileMember(profileMemberMsg)
+ .setActionProfileGroup(groupMsg)
.build())
.build())
.build();
@@ -137,19 +136,23 @@
stub.write(writeRequest);
}
- private void testPacketOut() throws IllegalAccessException, InstantiationException, ExecutionException,
+ private void testPacketIo() throws IllegalAccessException, InstantiationException, ExecutionException,
InterruptedException, ImmutableByteSequence.ByteSequenceTrimException {
+ // Emits a packet out trough the CPU_PORT (255), i.e. we should receive the same packet back.
PiPacketOperation packetOperation = PiPacketOperation.builder()
- .withData(ImmutableByteSequence.ofOnes(10))
+ .withData(ImmutableByteSequence.ofOnes(512))
.withType(PACKET_OUT)
.withMetadata(PiPacketMetadata.builder()
.withId(PiPacketMetadataId.of("egress_port"))
- .withValue(fit(copyFrom(1), 9))
+ .withValue(copyFrom((short) 255))
.build())
.build();
assert (client.packetOut(packetOperation, bmv2DefaultPipeconf).get());
+
+ // Wait for packet in.
+ Thread.sleep(1000);
}
private void testDumpTable(String tableName, PiPipeconf pipeconf) throws ExecutionException, InterruptedException {
@@ -161,12 +164,9 @@
public void testBmv2() throws Exception {
createClientAndSetPipelineConfig(bmv2DefaultPipeconf, BMV2_JSON);
-
+ testPacketIo();
testDumpTable("table0", bmv2DefaultPipeconf);
-
- // testPacketOut();
-
- testActionProfile(285261835);
+ testActionProfile(285227860);
}
@Test
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java
index 104999b..e0be104 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java
@@ -16,6 +16,7 @@
package org.onosproject.p4runtime.ctl;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import org.onosproject.event.AbstractEvent;
import org.onosproject.net.DeviceId;
@@ -78,5 +79,13 @@
public int hashCode() {
return Objects.hashCode(deviceId, operation);
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("deviceId", deviceId)
+ .add("operation", operation)
+ .toString();
+ }
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index a7d7726..d8a76d4 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -110,7 +110,7 @@
this.p4DeviceId = p4DeviceId;
this.controller = controller;
this.cancellableContext = Context.current().withCancellation();
- this.executorService = Executors.newFixedThreadPool(5, groupedThreads(
+ this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
"onos/p4runtime-client-" + deviceId.toString(),
deviceId.toString() + "-%d"));
this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
@@ -133,6 +133,9 @@
writeLock.lock();
try {
return supplier.get();
+ } catch (Throwable ex) {
+ log.error("Exception in P4Runtime client of {}", deviceId, ex);
+ throw ex;
} finally {
writeLock.unlock();
}
@@ -348,6 +351,32 @@
return true;
}
+ private void doPacketIn(PacketIn packetInMsg) {
+
+ // Retrieve the pipeconf for this client's device.
+ PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
+ if (pipeconfService == null) {
+ throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
+ }
+ final PiPipeconf pipeconf;
+ if (pipeconfService.ofDevice(deviceId).isPresent() &&
+ pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
+ pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
+ } else {
+ log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
+ return;
+ }
+ // Decode packet message and post event.
+ P4RuntimeEvent event = new DefaultPacketInEvent(deviceId, PacketIOCodec.decodePacketIn(packetInMsg, pipeconf));
+ log.debug("Received packet in: {}", event);
+ controller.postEvent(event);
+ }
+
+ private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
+
+ log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
+ }
+
/**
* Returns the internal P4 device ID associated with this client.
*
@@ -401,39 +430,21 @@
}
private void doNext(StreamMessageResponse message) {
- log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
- switch (message.getUpdateCase()) {
- case PACKET:
- // Packet-in
- PacketIn packetIn = message.getPacket();
-
- // Retrieve the pipeconf for the specific device
- PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
- if (pipeconfService == null) {
- throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
- }
-
- final PiPipeconf pipeconf;
- if (pipeconfService.ofDevice(deviceId).isPresent() &&
- pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
- pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
- } else {
- log.warn("Unable to get the pipeconf of the {}. Can't handle packet in", deviceId);
+ try {
+ log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
+ switch (message.getUpdateCase()) {
+ case PACKET:
+ // Packet-in
+ doPacketIn(message.getPacket());
return;
- }
- //decode the packet and generate a corresponding p4Runtime event containing the PiPacketOperation
- P4RuntimeEvent event =
- new DefaultPacketInEvent(deviceId, PacketIOCodec.decodePacketIn(packetIn, pipeconf));
-
- //posting the event upwards
- controller.postEvent(event);
- return;
-
- case ARBITRATION:
- throw new UnsupportedOperationException("Arbitration not implemented.");
-
- default:
- log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
+ case ARBITRATION:
+ doArbitrationUpdateFromDevice(message.getArbitration());
+ return;
+ default:
+ log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
+ }
+ } catch (Throwable ex) {
+ log.error("Exception while processing stream channel message from {}", deviceId, ex);
}
}