Various fixes to get a P4Runtime demo that works

Change-Id: Icab512fceeb6ec0faf1b402c1e325e055cdb2caf
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index d8a76d4..e805e91 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -44,7 +44,6 @@
 import p4.P4RuntimeOuterClass.StreamMessageRequest;
 import p4.P4RuntimeOuterClass.StreamMessageResponse;
 import p4.P4RuntimeOuterClass.TableEntry;
-import p4.P4RuntimeOuterClass.Uint128;
 import p4.P4RuntimeOuterClass.Update;
 import p4.P4RuntimeOuterClass.WriteRequest;
 import p4.config.P4InfoOuterClass.P4Info;
@@ -114,8 +113,8 @@
                 "onos/p4runtime-client-" + deviceId.toString(),
                 deviceId.toString() + "-%d"));
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
-        this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
-                .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
+        //TODO Investigate deadline or timeout in supplyInContext Method
+        this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
         P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
         this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
     }
@@ -127,14 +126,14 @@
      * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
      * <p>
      */
-    private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier) {
+    private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
         return CompletableFuture.supplyAsync(() -> {
             // TODO: explore a more relaxed locking strategy.
             writeLock.lock();
             try {
                 return supplier.get();
             } catch (Throwable ex) {
-                log.error("Exception in P4Runtime client of {}", deviceId, ex);
+                log.error("Exception in P4Runtime client of {}, executing {}", deviceId, opDescription, ex);
                 throw ex;
             } finally {
                 writeLock.unlock();
@@ -144,28 +143,29 @@
 
     @Override
     public CompletableFuture<Boolean> initStreamChannel() {
-        return supplyInContext(this::doInitStreamChannel);
+        return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
     }
 
     @Override
     public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
-        return supplyInContext(() -> doSetPipelineConfig(pipeconf, targetConfigExtType));
+        return supplyInContext(() -> doSetPipelineConfig(pipeconf, targetConfigExtType), "setPipelineConfig");
     }
 
     @Override
     public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
                                                         WriteOperationType opType, PiPipeconf pipeconf) {
-        return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf));
+        return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
+                               "writeTableEntries-" + opType.name());
     }
 
     @Override
     public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
-        return supplyInContext(() -> doDumpTable(piTableId, pipeconf));
+        return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
     }
 
     @Override
     public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
-        return supplyInContext(() -> doPacketOut(packet, pipeconf));
+        return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
     }
 
     /* Blocking method implementations below */
@@ -271,10 +271,11 @@
 
         writeRequestBuilder
                 .setDeviceId(p4DeviceId)
+                /* PI ignores this ElectionId, commenting out for now.
                 .setElectionId(Uint128.newBuilder()
                                        .setHigh(0)
                                        .setLow(ELECTION_ID)
-                                       .build())
+                                       .build()) */
                 .addAllUpdates(updateMsgs)
                 .build();
 
@@ -367,7 +368,9 @@
             return;
         }
         // Decode packet message and post event.
-        P4RuntimeEvent event = new DefaultPacketInEvent(deviceId, PacketIOCodec.decodePacketIn(packetInMsg, pipeconf));
+        PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf);
+        DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
+        P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
         log.debug("Received packet in: {}", event);
         controller.postEvent(event);
     }