Various fixes to get a P4Runtime demo that works

Change-Id: Icab512fceeb6ec0faf1b402c1e325e055cdb2caf
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java
new file mode 100644
index 0000000..2085505
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketIn.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.runtime.PiPacketOperation;
+import org.onosproject.p4runtime.api.P4RuntimePacketIn;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Default implementation of a packet-in in P4Runtime.
+ */
+final class DefaultPacketIn implements P4RuntimePacketIn {
+
+    private final DeviceId deviceId;
+    private final PiPacketOperation operation;
+
+    DefaultPacketIn(DeviceId deviceId, PiPacketOperation operation) {
+        this.deviceId = checkNotNull(deviceId);
+        this.operation = checkNotNull(operation);
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    @Override
+    public PiPacketOperation packetOperation() {
+        return operation;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DefaultPacketIn that = (DefaultPacketIn) o;
+        return Objects.equal(deviceId, that.deviceId) &&
+                Objects.equal(operation, that.operation);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(deviceId, operation);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("deviceId", deviceId)
+                .add("operation", operation)
+                .toString();
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java
deleted file mode 100644
index e0be104..0000000
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/DefaultPacketInEvent.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.p4runtime.ctl;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-import org.onosproject.event.AbstractEvent;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.pi.runtime.PiPacketOperation;
-import org.onosproject.p4runtime.api.P4RuntimeEvent;
-import org.onosproject.p4runtime.api.P4RuntimeEventListener;
-import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
-import org.onosproject.p4runtime.api.P4RuntimePacketIn;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Default implementation of a packet-in event.
- */
-final class DefaultPacketInEvent
-        extends AbstractEvent<P4RuntimeEventListener.Type, P4RuntimeEventSubject>
-        implements P4RuntimeEvent {
-
-    DefaultPacketInEvent(DeviceId deviceId, PiPacketOperation operation) {
-        super(P4RuntimeEventListener.Type.PACKET_IN, new DefaultPacketIn(deviceId, operation));
-    }
-
-    /**
-     * Default implementation of a packet-in in P4Runtime.
-     */
-    private static final class DefaultPacketIn implements P4RuntimePacketIn {
-
-        private final DeviceId deviceId;
-        private final PiPacketOperation operation;
-
-        private DefaultPacketIn(DeviceId deviceId, PiPacketOperation operation) {
-            this.deviceId = checkNotNull(deviceId);
-            this.operation = checkNotNull(operation);
-        }
-
-        @Override
-        public DeviceId deviceId() {
-            return deviceId;
-        }
-
-        @Override
-        public PiPacketOperation packetOperation() {
-            return operation;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            DefaultPacketIn that = (DefaultPacketIn) o;
-            return Objects.equal(deviceId, that.deviceId) &&
-                    Objects.equal(operation, that.operation);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hashCode(deviceId, operation);
-        }
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(this)
-                    .add("deviceId", deviceId)
-                    .add("operation", operation)
-                    .toString();
-        }
-    }
-}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index d8a76d4..e805e91 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -44,7 +44,6 @@
 import p4.P4RuntimeOuterClass.StreamMessageRequest;
 import p4.P4RuntimeOuterClass.StreamMessageResponse;
 import p4.P4RuntimeOuterClass.TableEntry;
-import p4.P4RuntimeOuterClass.Uint128;
 import p4.P4RuntimeOuterClass.Update;
 import p4.P4RuntimeOuterClass.WriteRequest;
 import p4.config.P4InfoOuterClass.P4Info;
@@ -114,8 +113,8 @@
                 "onos/p4runtime-client-" + deviceId.toString(),
                 deviceId.toString() + "-%d"));
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
-        this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
-                .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
+        //TODO Investigate deadline or timeout in supplyInContext Method
+        this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
         P4RuntimeGrpc.P4RuntimeStub asyncStub = P4RuntimeGrpc.newStub(channel);
         this.streamRequestObserver = asyncStub.streamChannel(new StreamChannelResponseObserver());
     }
@@ -127,14 +126,14 @@
      * Important: Tasks submitted in parallel by different threads are forced executed sequentially.
      * <p>
      */
-    private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier) {
+    private <U> CompletableFuture<U> supplyInContext(Supplier<U> supplier, String opDescription) {
         return CompletableFuture.supplyAsync(() -> {
             // TODO: explore a more relaxed locking strategy.
             writeLock.lock();
             try {
                 return supplier.get();
             } catch (Throwable ex) {
-                log.error("Exception in P4Runtime client of {}", deviceId, ex);
+                log.error("Exception in P4Runtime client of {}, executing {}", deviceId, opDescription, ex);
                 throw ex;
             } finally {
                 writeLock.unlock();
@@ -144,28 +143,29 @@
 
     @Override
     public CompletableFuture<Boolean> initStreamChannel() {
-        return supplyInContext(this::doInitStreamChannel);
+        return supplyInContext(this::doInitStreamChannel, "initStreamChannel");
     }
 
     @Override
     public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ExtensionType targetConfigExtType) {
-        return supplyInContext(() -> doSetPipelineConfig(pipeconf, targetConfigExtType));
+        return supplyInContext(() -> doSetPipelineConfig(pipeconf, targetConfigExtType), "setPipelineConfig");
     }
 
     @Override
     public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
                                                         WriteOperationType opType, PiPipeconf pipeconf) {
-        return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf));
+        return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
+                               "writeTableEntries-" + opType.name());
     }
 
     @Override
     public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId piTableId, PiPipeconf pipeconf) {
-        return supplyInContext(() -> doDumpTable(piTableId, pipeconf));
+        return supplyInContext(() -> doDumpTable(piTableId, pipeconf), "dumpTable-" + piTableId);
     }
 
     @Override
     public CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
-        return supplyInContext(() -> doPacketOut(packet, pipeconf));
+        return supplyInContext(() -> doPacketOut(packet, pipeconf), "packetOut");
     }
 
     /* Blocking method implementations below */
@@ -271,10 +271,11 @@
 
         writeRequestBuilder
                 .setDeviceId(p4DeviceId)
+                /* PI ignores this ElectionId, commenting out for now.
                 .setElectionId(Uint128.newBuilder()
                                        .setHigh(0)
                                        .setLow(ELECTION_ID)
-                                       .build())
+                                       .build()) */
                 .addAllUpdates(updateMsgs)
                 .build();
 
@@ -367,7 +368,9 @@
             return;
         }
         // Decode packet message and post event.
-        P4RuntimeEvent event = new DefaultPacketInEvent(deviceId, PacketIOCodec.decodePacketIn(packetInMsg, pipeconf));
+        PiPacketOperation packetOperation = PacketIOCodec.decodePacketIn(packetInMsg, pipeconf);
+        DefaultPacketIn packetInEventSubject = new DefaultPacketIn(deviceId, packetOperation);
+        P4RuntimeEvent event = new P4RuntimeEvent(P4RuntimeEvent.Type.PACKET_IN, packetInEventSubject);
         log.debug("Received packet in: {}", event);
         controller.postEvent(event);
     }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index d134a22..277af11 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -67,6 +67,7 @@
 
     @Activate
     public void activate() {
+        eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
         log.info("Started");
     }
 
@@ -74,6 +75,7 @@
     @Deactivate
     public void deactivate() {
         grpcController = null;
+        eventDispatcher.removeSink(P4RuntimeEvent.class);
         log.info("Stopped");
     }
 
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketIOCodec.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketIOCodec.java
index 461cde0..a06fe7a 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketIOCodec.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/PacketIOCodec.java
@@ -25,6 +25,7 @@
 import org.slf4j.Logger;
 import p4.config.P4InfoOuterClass;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
@@ -72,7 +73,7 @@
         //Get the P4browser
         P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
 
-        //Get the packet out packet metadata
+        //Get the packet out controller packet metadata
         P4InfoOuterClass.ControllerPacketMetadata controllerPacketMetadata =
                 browser.controllerPacketMetadatas().getByName(PACKET_OUT);
         PacketOut.Builder packetOutBuilder = PacketOut.newBuilder();
@@ -126,27 +127,38 @@
         //Get the P4browser
         P4InfoBrowser browser = PipeconfHelper.getP4InfoBrowser(pipeconf);
 
+        List<PiPacketMetadata> packetMetadatas;
+        try {
+            int controllerPacketMetadataId = browser.controllerPacketMetadatas().getByName(PACKET_IN)
+                                                .getPreamble().getId();
+            packetMetadatas = decodePacketMetadataIn(packetIn.getMetadataList(), browser,
+                                                                            controllerPacketMetadataId);
+        } catch (NotFoundException e) {
+            log.error("Unable to decode packet metadatas: {}", e.getMessage());
+            packetMetadatas = Collections.emptyList();
+        }
+
         //Transform the packetIn data
         ImmutableByteSequence data = copyFrom(packetIn.getPayload().asReadOnlyByteBuffer());
 
         //Build the PiPacketOperation with all the metadatas.
         return PiPacketOperation.builder()
                 .withType(PiPacketOperation.Type.PACKET_IN)
-                .withMetadatas(decodePacketMetadata(packetIn.getMetadataList(), browser))
+                .withMetadatas(packetMetadatas)
                 .withData(data)
                 .build();
     }
 
-    private static List<PiPacketMetadata> decodePacketMetadata(List<PacketMetadata> packetMetadatas,
-                                                               P4InfoBrowser browser) {
+    private static List<PiPacketMetadata> decodePacketMetadataIn(List<PacketMetadata> packetMetadatas,
+                                                               P4InfoBrowser browser, int controllerPacketMetadataId) {
         return packetMetadatas.stream().map(packetMetadata -> {
             try {
 
-                int controllerPacketMetadataId = packetMetadata.getMetadataId();
-                //convert id to name through p4Info
-                P4InfoOuterClass.ControllerPacketMetadata metadata =
-                        browser.controllerPacketMetadatas().getById(controllerPacketMetadataId);
-                PiPacketMetadataId metadataId = PiPacketMetadataId.of(metadata.getPreamble().getName());
+                int packetMetadataId = packetMetadata.getMetadataId();
+                String packetMetadataName = browser.packetMetadatas(controllerPacketMetadataId)
+                        .getById(packetMetadataId).getName();
+
+                PiPacketMetadataId metadataId = PiPacketMetadataId.of(packetMetadataName);
 
                 //Build each metadata.
                 return PiPacketMetadata.builder()