New P4RuntimeClient implementation that supports batching and error reporting

The new client API supports batching and provides detailed response for
write requests (e.g. if entity already exists when inserting), which was
not possible with the old one.

This patch includes:
- New more efficient implementation of P4RuntimeClient (no more locking,
use native gRPC executor, use stub deadlines)
- Ported all codecs to new AbstractCodec-based implementation (needed to
implement codec cache in the future)
- Uses batching in P4RuntimeFlowRuleProgrammable and
P4RuntimeGroupActionProgrammable
- Minor changes to PI framework runtime classes

Change-Id: I3fac42057bb4e1389d761006a32600c786598683
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
index e1fc1e9..fb9451c 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/mirror/AbstractDistributedP4RuntimeMirror.java
@@ -23,10 +23,13 @@
 import org.onosproject.net.Annotations;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.pi.runtime.PiEntity;
+import org.onosproject.net.pi.runtime.PiEntityType;
 import org.onosproject.net.pi.runtime.PiHandle;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
 import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
+import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.WallClockTimestamp;
@@ -66,26 +69,38 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected PiPipeconfWatchdogService pipeconfWatchdogService;
 
-    private EventuallyConsistentMap<H, TimedEntry<E>> mirrorMap;
+    private EventuallyConsistentMap<PiHandle, TimedEntry<E>> mirrorMap;
+    private EventuallyConsistentMap<PiHandle, Annotations> annotationsMap;
 
-    private EventuallyConsistentMap<H, Annotations> annotationsMap;
+    private final PiEntityType entityType;
 
     private final PiPipeconfWatchdogListener pipeconfListener =
             new InternalPipeconfWatchdogListener();
 
+    AbstractDistributedP4RuntimeMirror(PiEntityType entityType) {
+        this.entityType = entityType;
+    }
+
     @Activate
     public void activate() {
+        final String mapName = "onos-p4runtime-mirror-"
+                + entityType.name().toLowerCase();
+        final KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(TimedEntry.class)
+                .build();
+
         mirrorMap = storageService
-                .<H, TimedEntry<E>>eventuallyConsistentMapBuilder()
-                .withName(mapName())
-                .withSerializer(storeSerializer())
+                .<PiHandle, TimedEntry<E>>eventuallyConsistentMapBuilder()
+                .withName(mapName)
+                .withSerializer(serializer)
                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
                 .build();
 
         annotationsMap = storageService
-                .<H, Annotations>eventuallyConsistentMapBuilder()
-                .withName(mapName() + "-annotations")
-                .withSerializer(storeSerializer())
+                .<PiHandle, Annotations>eventuallyConsistentMapBuilder()
+                .withName(mapName + "-annotations")
+                .withSerializer(serializer)
                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
                 .build();
 
@@ -93,10 +108,6 @@
         log.info("Started");
     }
 
-    abstract String mapName();
-
-    abstract KryoNamespace storeSerializer();
-
     @Deactivate
     public void deactivate() {
         pipeconfWatchdogService.removeListener(pipeconfListener);
@@ -158,9 +169,12 @@
     }
 
     @Override
-    public void sync(DeviceId deviceId, Map<H, E> deviceState) {
+    @SuppressWarnings("unchecked")
+    public void sync(DeviceId deviceId, Collection<E> entities) {
         checkNotNull(deviceId);
-        final Map<H, E> localState = deviceHandleMap(deviceId);
+        final Map<PiHandle, E> deviceState = entities.stream()
+                .collect(Collectors.toMap(e -> e.handle(deviceId), e -> e));
+        final Map<PiHandle, E> localState = deviceHandleMap(deviceId);
 
         final AtomicInteger removeCount = new AtomicInteger(0);
         final AtomicInteger updateCount = new AtomicInteger(0);
@@ -172,7 +186,7 @@
                     final E entryToAdd = deviceState.get(deviceHandle);
                     log.debug("Adding mirror entry for {}: {}",
                               deviceId, entryToAdd);
-                    put(deviceHandle, entryToAdd);
+                    put((H) deviceHandle, entryToAdd);
                     addCount.incrementAndGet();
                 });
         // Update or remove local entries.
@@ -181,12 +195,12 @@
             final E deviceEntry = deviceState.get(localHandle);
             if (deviceEntry == null) {
                 log.debug("Removing mirror entry for {}: {}", deviceId, localEntry);
-                remove(localHandle);
+                remove((H) localHandle);
                 removeCount.incrementAndGet();
             } else if (!deviceEntry.equals(localEntry)) {
                 log.debug("Updating mirror entry for {}: {}-->{}",
                           deviceId, localEntry, deviceEntry);
-                put(localHandle, deviceEntry);
+                put((H) localHandle, deviceEntry);
                 updateCount.incrementAndGet();
             }
         });
@@ -196,27 +210,48 @@
         }
     }
 
-    private Set<H> getHandlesForDevice(DeviceId deviceId) {
+    private Set<PiHandle> getHandlesForDevice(DeviceId deviceId) {
         return mirrorMap.keySet().stream()
                 .filter(h -> h.deviceId().equals(deviceId))
                 .collect(Collectors.toSet());
     }
 
-    @Override
-    public Map<H, E> deviceHandleMap(DeviceId deviceId) {
-        final Map<H, E> deviceMap = Maps.newHashMap();
+    private Map<PiHandle, E> deviceHandleMap(DeviceId deviceId) {
+        final Map<PiHandle, E> deviceMap = Maps.newHashMap();
         mirrorMap.entrySet().stream()
                 .filter(e -> e.getKey().deviceId().equals(deviceId))
                 .forEach(e -> deviceMap.put(e.getKey(), e.getValue().entry()));
         return deviceMap;
     }
 
+
     private void removeAll(DeviceId deviceId) {
         checkNotNull(deviceId);
-        Collection<H> handles = getHandlesForDevice(deviceId);
+        @SuppressWarnings("unchecked")
+        Collection<H> handles = (Collection<H>) getHandlesForDevice(deviceId);
         handles.forEach(this::remove);
     }
 
+    @Override
+    @SuppressWarnings("unchecked")
+    public void replayWriteResponse(P4RuntimeWriteClient.WriteResponse response) {
+        response.success().stream()
+                .filter(r -> r.entityType().equals(this.entityType) && r.isSuccess())
+                .forEach(r -> {
+                    switch (r.updateType()) {
+                        case INSERT:
+                        case MODIFY:
+                            put((H) r.handle(), (E) r.entity());
+                            break;
+                        case DELETE:
+                            remove((H) r.handle());
+                            break;
+                        default:
+                            log.error("Unknown update type {}", r.updateType());
+                    }
+                });
+    }
+
     public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
         @Override
         public void event(PiPipeconfWatchdogEvent event) {