Catch exceptions when processing messages on P4Runtime stream channel

Also, packet I/O test in P4RuntimeTest.

Change-Id: Ib11d7356eef43cd962cf47f8a6fba8fc23ed69be
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index a7d7726..d8a76d4 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -110,7 +110,7 @@
         this.p4DeviceId = p4DeviceId;
         this.controller = controller;
         this.cancellableContext = Context.current().withCancellation();
-        this.executorService = Executors.newFixedThreadPool(5, groupedThreads(
+        this.executorService = Executors.newFixedThreadPool(15, groupedThreads(
                 "onos/p4runtime-client-" + deviceId.toString(),
                 deviceId.toString() + "-%d"));
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
@@ -133,6 +133,9 @@
             writeLock.lock();
             try {
                 return supplier.get();
+            } catch (Throwable ex) {
+                log.error("Exception in P4Runtime client of {}", deviceId, ex);
+                throw ex;
             } finally {
                 writeLock.unlock();
             }
@@ -348,6 +351,32 @@
         return true;
     }
 
+    private void doPacketIn(PacketIn packetInMsg) {
+
+        // Retrieve the pipeconf for this client's device.
+        PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
+        if (pipeconfService == null) {
+            throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
+        }
+        final PiPipeconf pipeconf;
+        if (pipeconfService.ofDevice(deviceId).isPresent() &&
+                pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
+            pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
+        } else {
+            log.warn("Unable to get pipeconf of {}. Can't handle packet in", deviceId);
+            return;
+        }
+        // Decode packet message and post event.
+        P4RuntimeEvent event = new DefaultPacketInEvent(deviceId, PacketIOCodec.decodePacketIn(packetInMsg, pipeconf));
+        log.debug("Received packet in: {}", event);
+        controller.postEvent(event);
+    }
+
+    private void doArbitrationUpdateFromDevice(MasterArbitrationUpdate arbitrationMsg) {
+
+        log.warn("Received arbitration update from {} (NOT IMPLEMENTED YET): {}", deviceId, arbitrationMsg);
+    }
+
     /**
      * Returns the internal P4 device ID associated with this client.
      *
@@ -401,39 +430,21 @@
         }
 
         private void doNext(StreamMessageResponse message) {
-            log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
-            switch (message.getUpdateCase()) {
-                case PACKET:
-                    // Packet-in
-                    PacketIn packetIn = message.getPacket();
-
-                    // Retrieve the pipeconf for the specific device
-                    PiPipeconfService pipeconfService = DefaultServiceDirectory.getService(PiPipeconfService.class);
-                    if (pipeconfService == null) {
-                        throw new IllegalStateException("PiPipeconfService is null. Can't handle packet in.");
-                    }
-
-                    final PiPipeconf pipeconf;
-                    if (pipeconfService.ofDevice(deviceId).isPresent() &&
-                            pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).isPresent()) {
-                        pipeconf = pipeconfService.getPipeconf(pipeconfService.ofDevice(deviceId).get()).get();
-                    } else {
-                        log.warn("Unable to get the pipeconf of the {}. Can't handle packet in", deviceId);
+            try {
+                log.info("Received message on stream channel from {}: {}", deviceId, message.getUpdateCase());
+                switch (message.getUpdateCase()) {
+                    case PACKET:
+                        // Packet-in
+                        doPacketIn(message.getPacket());
                         return;
-                    }
-                    //decode the packet and generate a corresponding p4Runtime event containing the PiPacketOperation
-                    P4RuntimeEvent event =
-                            new DefaultPacketInEvent(deviceId, PacketIOCodec.decodePacketIn(packetIn, pipeconf));
-
-                    //posting the event upwards
-                    controller.postEvent(event);
-                    return;
-
-                case ARBITRATION:
-                    throw new UnsupportedOperationException("Arbitration not implemented.");
-
-                default:
-                    log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
+                    case ARBITRATION:
+                        doArbitrationUpdateFromDevice(message.getArbitration());
+                        return;
+                    default:
+                        log.warn("Unrecognized stream message from {}: {}", deviceId, message.getUpdateCase());
+                }
+            } catch (Throwable ex) {
+                log.error("Exception while processing stream channel message from {}", deviceId, ex);
             }
         }