Catch exceptions when processing messages on P4Runtime stream channel
Also, packet I/O test in P4RuntimeTest.
Change-Id: Ib11d7356eef43cd962cf47f8a6fba8fc23ed69be
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java
index 104999b..e0be104 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java
@@ -16,6 +16,7 @@
package org.onosproject.p4runtime.ctl;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import org.onosproject.event.AbstractEvent;
import org.onosproject.net.DeviceId;
@@ -78,5 +79,13 @@
public int hashCode() {
return Objects.hashCode(deviceId, operation);
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("deviceId", deviceId)
+ .add("operation", operation)
+ .toString();
+ }
}
}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index a7d7726..d8a76d4 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -110,7 +110,7 @@
this.p4DeviceId = p4DeviceId;
this.controller = controller;
this.cancellableContext = Context.current().withCancellation();
- this.executorService = Executors.newFixedThreadPool(5, groupedThreads(
+ this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
"onos/p4runtime-client-" + deviceId.toString(),
deviceId.toString() + "-%d"));
this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
@@ -133,6 +133,9 @@
writeLock.lock();
try {
return supplier.get();
+ } catch (Throwable ex) {
+ log.error("Exception in P4Runtime client of {}", deviceId, ex);
+ throw ex;
} finally {
writeLock.unlock();
}
@@ -348,6 +351,32 @@
return true;
}
+ private void doPacketIn(PacketIn packetInMsg) {
+
+ // Retrieve the pipeconf for this client's device.
+ PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
+ if (pipeconfService == null) {
+ throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
+ }
+ final PiPipeconf pipeconf;
+ if (pipeconfService.ofDevice(deviceId).isPresent() &&
+ pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
+ pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
+ } else {
+ log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
+ return;
+ }
+ // Decode packet message and post event.
+ P4RuntimeEvent event = new DefaultPacketInEvent(deviceId, PacketIOCodec.decodePacketIn(packetInMsg, pipeconf));
+ log.debug("Received packet in: {}", event);
+ controller.postEvent(event);
+ }
+
+ private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
+
+ log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
+ }
+
/**
* Returns the internal P4 device ID associated with this client.
*
@@ -401,39 +430,21 @@
}
private void doNext(StreamMessageResponse message) {
- log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
- switch (message.getUpdateCase()) {
- case PACKET:
- // Packet-in
- PacketIn packetIn = message.getPacket();
-
- // Retrieve the pipeconf for the specific device
- PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
- if (pipeconfService == null) {
- throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
- }
-
- final PiPipeconf pipeconf;
- if (pipeconfService.ofDevice(deviceId).isPresent() &&
- pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
- pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
- } else {
- log.warn("Unable to get the pipeconf of the {}. Can't handle packet in", deviceId);
+ try {
+ log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
+ switch (message.getUpdateCase()) {
+ case PACKET:
+ // Packet-in
+ doPacketIn(message.getPacket());
return;
- }
- //decode the packet and generate a corresponding p4Runtime event containing the PiPacketOperation
- P4RuntimeEvent event =
- new DefaultPacketInEvent(deviceId, PacketIOCodec.decodePacketIn(packetIn, pipeconf));
-
- //posting the event upwards
- controller.postEvent(event);
- return;
-
- case ARBITRATION:
- throw new UnsupportedOperationException("Arbitration not implemented.");
-
- default:
- log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
+ case ARBITRATION:
+ doArbitrationUpdateFromDevice(message.getArbitration());
+ return;
+ default:
+ log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
+ }
+ } catch (Throwable ex) {
+ log.error("Exception while processing stream channel message from {}", deviceId, ex);
}
}