New P4RuntimeClient implementation that supports batching and error reporting

The new client API supports batching and provides detailed response for
write requests (e.g. if entity already exists when inserting), which was
not possible with the old one.

This patch includes:
- New more efficient implementation of P4RuntimeClient (no more locking,
use native gRPC executor, use stub deadlines)
- Ported all codecs to new AbstractCodec-based implementation (needed to
implement codec cache in the future)
- Uses batching in P4RuntimeFlowRuleProgrammable and
P4RuntimeGroupActionProgrammable
- Minor changes to PI framework runtime classes

Change-Id: I3fac42057bb4e1389d761006a32600c786598683
diff --git a/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/BasicConstants.java b/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/BasicConstants.java
index c36a77e..667897b 100644
--- a/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/BasicConstants.java
+++ b/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/BasicConstants.java
@@ -19,7 +19,7 @@
 import org.onosproject.net.pi.model.PiActionId;
 import org.onosproject.net.pi.model.PiActionParamId;
 import org.onosproject.net.pi.model.PiActionProfileId;
-import org.onosproject.net.pi.model.PiControlMetadataId;
+import org.onosproject.net.pi.model.PiPacketMetadataId;
 import org.onosproject.net.pi.model.PiCounterId;
 import org.onosproject.net.pi.model.PiMatchFieldId;
 import org.onosproject.net.pi.model.PiTableId;
@@ -91,10 +91,10 @@
     public static final PiActionProfileId INGRESS_WCMP_CONTROL_WCMP_SELECTOR =
             PiActionProfileId.of("ingress.wcmp_control.wcmp_selector");
     // Packet Metadata IDs
-    public static final PiControlMetadataId PADDING =
-            PiControlMetadataId.of("_padding");
-    public static final PiControlMetadataId INGRESS_PORT =
-            PiControlMetadataId.of("ingress_port");
-    public static final PiControlMetadataId EGRESS_PORT =
-            PiControlMetadataId.of("egress_port");
+    public static final PiPacketMetadataId PADDING =
+            PiPacketMetadataId.of("_padding");
+    public static final PiPacketMetadataId INGRESS_PORT =
+            PiPacketMetadataId.of("ingress_port");
+    public static final PiPacketMetadataId EGRESS_PORT =
+            PiPacketMetadataId.of("egress_port");
 }
diff --git a/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/BasicInterpreterImpl.java b/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/BasicInterpreterImpl.java
index 56a95fa..eb49819 100644
--- a/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/BasicInterpreterImpl.java
+++ b/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/BasicInterpreterImpl.java
@@ -22,6 +22,7 @@
 import org.onlab.packet.Ethernet;
 import org.onlab.util.ImmutableByteSequence;
 import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceService;
@@ -38,7 +39,7 @@
 import org.onosproject.net.pi.model.PiTableId;
 import org.onosproject.net.pi.runtime.PiAction;
 import org.onosproject.net.pi.runtime.PiActionParam;
-import org.onosproject.net.pi.runtime.PiControlMetadata;
+import org.onosproject.net.pi.runtime.PiPacketMetadata;
 import org.onosproject.net.pi.runtime.PiPacketOperation;
 
 import java.nio.ByteBuffer;
@@ -173,7 +174,7 @@
     }
 
     @Override
-    public InboundPacket mapInboundPacket(PiPacketOperation packetIn)
+    public InboundPacket mapInboundPacket(PiPacketOperation packetIn, DeviceId deviceId)
             throws PiInterpreterException {
         // Assuming that the packet is ethernet, which is fine since basic.p4
         // can deparse only ethernet packets.
@@ -186,37 +187,36 @@
         }
 
         // Returns the ingress port packet metadata.
-        Optional<PiControlMetadata> packetMetadata = packetIn.metadatas()
+        Optional<PiPacketMetadata> packetMetadata = packetIn.metadatas()
                 .stream().filter(m -> m.id().equals(INGRESS_PORT))
                 .findFirst();
 
         if (packetMetadata.isPresent()) {
             ImmutableByteSequence portByteSequence = packetMetadata.get().value();
             short s = portByteSequence.asReadOnlyBuffer().getShort();
-            ConnectPoint receivedFrom = new ConnectPoint(packetIn.deviceId(), PortNumber.portNumber(s));
+            ConnectPoint receivedFrom = new ConnectPoint(deviceId, PortNumber.portNumber(s));
             ByteBuffer rawData = ByteBuffer.wrap(packetIn.data().asArray());
             return new DefaultInboundPacket(receivedFrom, ethPkt, rawData);
         } else {
             throw new PiInterpreterException(format(
                     "Missing metadata '%s' in packet-in received from '%s': %s",
-                    INGRESS_PORT, packetIn.deviceId(), packetIn));
+                    INGRESS_PORT, deviceId, packetIn));
         }
     }
 
     private PiPacketOperation createPiPacketOperation(ByteBuffer data, long portNumber)
             throws PiInterpreterException {
-        PiControlMetadata metadata = createPacketMetadata(portNumber);
+        PiPacketMetadata metadata = createPacketMetadata(portNumber);
         return PiPacketOperation.builder()
-                .forDevice(this.data().deviceId())
                 .withType(PACKET_OUT)
                 .withData(copyFrom(data))
                 .withMetadatas(ImmutableList.of(metadata))
                 .build();
     }
 
-    private PiControlMetadata createPacketMetadata(long portNumber) throws PiInterpreterException {
+    private PiPacketMetadata createPacketMetadata(long portNumber) throws PiInterpreterException {
         try {
-            return PiControlMetadata.builder()
+            return PiPacketMetadata.builder()
                     .withId(EGRESS_PORT)
                     .withValue(copyFrom(portNumber).fit(PORT_BITWIDTH))
                     .build();
diff --git a/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/PortStatisticsDiscoveryImpl.java b/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/PortStatisticsDiscoveryImpl.java
index 0057293..f7cb811 100644
--- a/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/PortStatisticsDiscoveryImpl.java
+++ b/pipelines/basic/src/main/java/org/onosproject/pipelines/basic/PortStatisticsDiscoveryImpl.java
@@ -29,6 +29,7 @@
 import org.onosproject.net.pi.model.PiCounterId;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.runtime.PiCounterCell;
+import org.onosproject.net.pi.runtime.PiCounterCellHandle;
 import org.onosproject.net.pi.runtime.PiCounterCellId;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
@@ -40,7 +41,6 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static org.onosproject.net.pi.model.PiCounterType.INDIRECT;
@@ -111,15 +111,14 @@
             counterCellIds.add(PiCounterCellId.ofIndirect(ingressCounterId(), p));
             counterCellIds.add(PiCounterCellId.ofIndirect(egressCounterId(), p));
         });
+        Set<PiCounterCellHandle> counterCellHandles = counterCellIds.stream()
+                .map(id -> PiCounterCellHandle.of(deviceId, id))
+                .collect(Collectors.toSet());
 
-        Collection<PiCounterCell> counterEntryResponse;
-        try {
-            counterEntryResponse = client.readCounterCells(counterCellIds, pipeconf).get();
-        } catch (InterruptedException | ExecutionException e) {
-            log.warn("Exception while reading port counters from {}: {}", deviceId, e.toString());
-            log.debug("", e);
-            return Collections.emptyList();
-        }
+        // Query the device.
+        Collection<PiCounterCell> counterEntryResponse = client.read(pipeconf)
+                .handles(counterCellHandles).submitSync()
+                .all(PiCounterCell.class);
 
         counterEntryResponse.forEach(counterCell -> {
             if (counterCell.cellId().counterType() != INDIRECT) {