Catch exceptions when processing messages on P4Runtime stream channel

Also, packet I/O test in P4RuntimeTest.

Change-Id: Ib11d7356eef43cd962cf47f8a6fba8fc23ed69be
diff --git a/apps/p4runtime-test/src/test/java/org/onosproject/p4runtime/test/P4RuntimeTest.java b/apps/p4runtime-test/src/test/java/org/onosproject/p4runtime/test/P4RuntimeTest.java
index ab0a57e..e7447d9 100644
--- a/apps/p4runtime-test/src/test/java/org/onosproject/p4runtime/test/P4RuntimeTest.java
+++ b/apps/p4runtime-test/src/test/java/org/onosproject/p4runtime/test/P4RuntimeTest.java
@@ -43,7 +43,6 @@
 import java.util.concurrent.ExecutionException;
 
 import static org.onlab.util.ImmutableByteSequence.copyFrom;
-import static org.onlab.util.ImmutableByteSequence.fit;
 import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.BMV2_JSON;
 import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.P4_INFO_TEXT;
 import static org.onosproject.net.pi.runtime.PiPacketOperation.Type.PACKET_OUT;
@@ -56,7 +55,7 @@
 public class P4RuntimeTest {
 
     private static final String GRPC_SERVER_ADDR = "192.168.56.102";
-    private static final int GRPC_SERVER_PORT = 55044;
+    private static final int GRPC_SERVER_PORT = 55001;
 
     private final URL p4InfoUrl = this.getClass().getResource("/bmv2/default.p4info");
     private final URL jsonUrl = this.getClass().getResource("/bmv2/default.json");
@@ -112,10 +111,10 @@
                 .setGroupId(1)
                 .setType(SELECT)
                 .addMembers(P4RuntimeOuterClass.ActionProfileGroup.Member.newBuilder()
-                                    .setMemberId(1)
+                                    .setMemberId(0)
                                     .setWeight(1)
                                     .build())
-                .setMaxSize(3)
+                .setMaxSize(1)
                 .build();
 
         P4RuntimeOuterClass.WriteRequest writeRequest = P4RuntimeOuterClass.WriteRequest.newBuilder()
@@ -123,13 +122,13 @@
                 .addUpdates(P4RuntimeOuterClass.Update.newBuilder()
                                     .setType(INSERT)
                                     .setEntity(P4RuntimeOuterClass.Entity.newBuilder()
-                                                       .setActionProfileGroup(groupMsg)
+                                                       .setActionProfileMember(profileMemberMsg)
                                                        .build())
                                     .build())
                 .addUpdates(P4RuntimeOuterClass.Update.newBuilder()
                                     .setType(INSERT)
                                     .setEntity(P4RuntimeOuterClass.Entity.newBuilder()
-                                                       .setActionProfileMember(profileMemberMsg)
+                                                       .setActionProfileGroup(groupMsg)
                                                        .build())
                                     .build())
                 .build();
@@ -137,19 +136,23 @@
         stub.write(writeRequest);
     }
 
-    private void testPacketOut() throws IllegalAccessException, InstantiationException, ExecutionException,
+    private void testPacketIo() throws IllegalAccessException, InstantiationException, ExecutionException,
             InterruptedException, ImmutableByteSequence.ByteSequenceTrimException {
 
+        // Emits a packet out trough the CPU_PORT (255), i.e. we should receive the same packet back.
         PiPacketOperation packetOperation = PiPacketOperation.builder()
-                .withData(ImmutableByteSequence.ofOnes(10))
+                .withData(ImmutableByteSequence.ofOnes(512))
                 .withType(PACKET_OUT)
                 .withMetadata(PiPacketMetadata.builder()
                                       .withId(PiPacketMetadataId.of("egress_port"))
-                                      .withValue(fit(copyFrom(1), 9))
+                                      .withValue(copyFrom((short) 255))
                                       .build())
                 .build();
 
         assert (client.packetOut(packetOperation, bmv2DefaultPipeconf).get());
+
+        // Wait for packet in.
+        Thread.sleep(1000);
     }
 
     private void testDumpTable(String tableName, PiPipeconf pipeconf) throws ExecutionException, InterruptedException {
@@ -161,12 +164,9 @@
     public void testBmv2() throws Exception {
 
         createClientAndSetPipelineConfig(bmv2DefaultPipeconf, BMV2_JSON);
-
+        testPacketIo();
         testDumpTable("table0", bmv2DefaultPipeconf);
-
-        // testPacketOut();
-
-        testActionProfile(285261835);
+        testActionProfile(285227860);
     }
 
     @Test
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java
index 104999b..e0be104 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.p4runtime.ctl;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import org.onosproject.event.AbstractEvent;
 import org.onosproject.net.DeviceId;
@@ -78,5 +79,13 @@
         public int hashCode() {
             return Objects.hashCode(deviceId, operation);
         }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                    .add("deviceId", deviceId)
+                    .add("operation", operation)
+                    .toString();
+        }
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index a7d7726..d8a76d4 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -110,7 +110,7 @@
         this.p4DeviceId = p4DeviceId;
         this.controller = controller;
         this.cancellableContext = Context.current().withCancellation();
-        this.executorService = Executors.newFixedThreadPool(5, groupedThreads(
+        this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
                 "onos/p4runtime-client-" + deviceId.toString(),
                 deviceId.toString() + "-%d"));
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
@@ -133,6 +133,9 @@
             writeLock.lock();
             try {
                 return supplier.get();
+            } catch (Throwable ex) {
+                log.error("Exception in P4Runtime client of {}", deviceId, ex);
+                throw ex;
             } finally {
                 writeLock.unlock();
             }
@@ -348,6 +351,32 @@
         return true;
     }
 
+    private void doPacketIn(PacketIn packetInMsg) {
+
+        // Retrieve the pipeconf for this client's device.
+        PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
+        if (pipeconfService == null) {
+            throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
+        }
+        final PiPipeconf pipeconf;
+        if (pipeconfService.ofDevice(deviceId).isPresent() &&
+                pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
+            pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
+        } else {
+            log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
+            return;
+        }
+        // Decode packet message and post event.
+        P4RuntimeEvent event = new DefaultPacketInEvent(deviceId, PacketIOCodec.decodePacketIn(packetInMsg, pipeconf));
+        log.debug("Received packet in: {}", event);
+        controller.postEvent(event);
+    }
+
+    private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
+
+        log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
+    }
+
     /**
      * Returns the internal P4 device ID associated with this client.
      *
@@ -401,39 +430,21 @@
         }
 
         private void doNext(StreamMessageResponse message) {
-            log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
-            switch (message.getUpdateCase()) {
-                case PACKET:
-                    // Packet-in
-                    PacketIn packetIn = message.getPacket();
-
-                    // Retrieve the pipeconf for the specific device
-                    PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
-                    if (pipeconfService == null) {
-                        throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
-                    }
-
-                    final PiPipeconf pipeconf;
-                    if (pipeconfService.ofDevice(deviceId).isPresent() &&
-                            pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
-                        pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
-                    } else {
-                        log.warn("Unable to get the pipeconf of the {}. Can't handle packet in", deviceId);
+            try {
+                log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
+                switch (message.getUpdateCase()) {
+                    case PACKET:
+                        // Packet-in
+                        doPacketIn(message.getPacket());
                         return;
-                    }
-                    //decode the packet and generate a corresponding p4Runtime event containing the PiPacketOperation
-                    P4RuntimeEvent event =
-                            new DefaultPacketInEvent(deviceId, PacketIOCodec.decodePacketIn(packetIn, pipeconf));
-
-                    //posting the event upwards
-                    controller.postEvent(event);
-                    return;
-
-                case ARBITRATION:
-                    throw new UnsupportedOperationException("Arbitration not implemented.");
-
-                default:
-                    log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
+                    case ARBITRATION:
+                        doArbitrationUpdateFromDevice(message.getArbitration());
+                        return;
+                    default:
+                        log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
+                }
+            } catch (Throwable ex) {
+                log.error("Exception while processing stream channel message from {}", deviceId, ex);
             }
         }