New P4RuntimeClient implementation that supports batching and error reporting

The new client API supports batching and provides detailed response for
write requests (e.g. if entity already exists when inserting), which was
not possible with the old one.

This patch includes:
- New more efficient implementation of P4RuntimeClient (no more locking,
use native gRPC executor, use stub deadlines)
- Ported all codecs to new AbstractCodec-based implementation (needed to
implement codec cache in the future)
- Uses batching in P4RuntimeFlowRuleProgrammable and
P4RuntimeGroupActionProgrammable
- Minor changes to PI framework runtime classes

Change-Id: I3fac42057bb4e1389d761006a32600c786598683
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ArbitrationUpdateEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ArbitrationUpdateEvent.java
new file mode 100644
index 0000000..80e3e98
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ArbitrationUpdateEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.controller;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Default implementation of arbitration in P4Runtime.
+ */
+public final class ArbitrationUpdateEvent implements P4RuntimeEventSubject {
+
+    private DeviceId deviceId;
+    private boolean isMaster;
+
+    /**
+     * Creates arbitration with given role and master flag.
+     *
+     * @param deviceId the device
+     * @param isMaster true if arbitration response signals master status
+     */
+    public ArbitrationUpdateEvent(DeviceId deviceId, boolean isMaster) {
+        this.deviceId = deviceId;
+        this.isMaster = isMaster;
+    }
+
+    /**
+     * Returns true if arbitration response signals master status, false
+     * otherwise.
+     *
+     * @return boolean flag
+     */
+    boolean isMaster() {
+        return isMaster;
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/BaseEventSubject.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/BaseEventSubject.java
new file mode 100644
index 0000000..089d934
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/BaseEventSubject.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.controller;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Base P4Runtime event subject that carries just the device ID that originated
+ * the event.
+ */
+public final class BaseEventSubject implements P4RuntimeEventSubject {
+
+    private DeviceId deviceId;
+
+    /**
+     * Creates an event subject.
+     *
+     * @param deviceId the device
+     */
+    public BaseEventSubject(DeviceId deviceId) {
+        this.deviceId = deviceId;
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ChannelEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ChannelEvent.java
new file mode 100644
index 0000000..0a77e46
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/ChannelEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.controller;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.p4runtime.api.P4RuntimeEventSubject;
+
+/**
+ * Channel event in P4Runtime.
+ */
+public final class ChannelEvent implements P4RuntimeEventSubject {
+
+    public enum Type {
+        OPEN,
+        CLOSED,
+        ERROR
+    }
+
+    private DeviceId deviceId;
+    private Type type;
+
+    /**
+     * Creates channel event with given status and throwable.
+     *
+     * @param deviceId  the device
+     * @param type      error type
+     */
+    public ChannelEvent(DeviceId deviceId, Type type) {
+        this.deviceId = deviceId;
+        this.type = type;
+    }
+
+    /**
+     * Gets the type of this event.
+     *
+     * @return the error type
+     */
+    public Type type() {
+        return type;
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedElectionIdGenerator.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedElectionIdGenerator.java
new file mode 100644
index 0000000..980ab11
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/DistributedElectionIdGenerator.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.controller;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicCounterMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Distributed implementation of a generator of P4Runtime election IDs.
+ */
+final class DistributedElectionIdGenerator {
+
+    private final Logger log = getLogger(this.getClass());
+
+    // FIXME: counter map use long, but P4Runtime accepts 128bit election IDs
+    private AtomicCounterMap<DeviceId> electionIds;
+
+    /**
+     * Creates a new election ID generator using the given storage service.
+     *
+     * @param storageService storage service
+     */
+    DistributedElectionIdGenerator(StorageService storageService) {
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .build();
+        this.electionIds = storageService.<DeviceId>atomicCounterMapBuilder()
+                .withName("p4runtime-election-ids")
+                .withSerializer(Serializer.using(serializer))
+                .build();
+    }
+
+    /**
+     * Returns an election ID for the given device ID. The first election ID for
+     * a given device ID is always 1.
+     *
+     * @param deviceId device ID
+     * @return new election ID
+     */
+    BigInteger generate(DeviceId deviceId) {
+        if (electionIds == null) {
+            return null;
+        }
+        // Default value is 0 for AtomicCounterMap.
+        return BigInteger.valueOf(electionIds.incrementAndGet(deviceId));
+    }
+
+    /**
+     * Destroy the backing distributed primitive of this generator.
+     */
+    void destroy() {
+        try {
+            electionIds.destroy().get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            log.error("Exception while destroying distributed counter map", e);
+        } finally {
+            electionIds = null;
+        }
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
new file mode 100644
index 0000000..affbf7d
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/P4RuntimeControllerImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.controller;
+
+import com.google.common.collect.Maps;
+import io.grpc.ManagedChannel;
+import org.onosproject.grpc.ctl.AbstractGrpcClientController;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.pi.service.PiPipeconfService;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeClientKey;
+import org.onosproject.p4runtime.api.P4RuntimeController;
+import org.onosproject.p4runtime.api.P4RuntimeEvent;
+import org.onosproject.p4runtime.api.P4RuntimeEventListener;
+import org.onosproject.p4runtime.ctl.client.P4RuntimeClientImpl;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.concurrent.ConcurrentMap;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * P4Runtime controller implementation.
+ */
+@Component(immediate = true, service = P4RuntimeController.class)
+public class P4RuntimeControllerImpl
+        extends AbstractGrpcClientController
+        <P4RuntimeClientKey, P4RuntimeClient, P4RuntimeEvent, P4RuntimeEventListener>
+        implements P4RuntimeController {
+
+    private final Logger log = getLogger(getClass());
+
+    private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
+            deviceAgentListeners = Maps.newConcurrentMap();
+
+    private DistributedElectionIdGenerator electionIdGenerator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private PiPipeconfService pipeconfService;
+
+    @Activate
+    public void activate() {
+        super.activate();
+        eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
+        electionIdGenerator = new DistributedElectionIdGenerator(storageService);
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        super.deactivate();
+        deviceAgentListeners.clear();
+        electionIdGenerator.destroy();
+        electionIdGenerator = null;
+        log.info("Stopped");
+    }
+
+    @Override
+    protected P4RuntimeClient createClientInstance(P4RuntimeClientKey clientKey, ManagedChannel channel) {
+        return new P4RuntimeClientImpl(clientKey, channel, this, pipeconfService);
+    }
+
+    @Override
+    public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
+        checkNotNull(deviceId, "deviceId cannot be null");
+        checkNotNull(deviceId, "providerId cannot be null");
+        checkNotNull(listener, "listener cannot be null");
+        deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
+        deviceAgentListeners.get(deviceId).put(providerId, listener);
+    }
+
+    @Override
+    public void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId) {
+        checkNotNull(deviceId, "deviceId cannot be null");
+        checkNotNull(providerId, "listener cannot be null");
+        deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
+            listeners.remove(providerId);
+            return listeners;
+        });
+    }
+
+    public BigInteger newMasterElectionId(DeviceId deviceId) {
+        return electionIdGenerator.generate(deviceId);
+    }
+
+    public void postEvent(P4RuntimeEvent event) {
+        switch (event.type()) {
+            case CHANNEL_EVENT:
+                handleChannelEvent(event);
+                break;
+            case ARBITRATION_RESPONSE:
+                handleArbitrationReply(event);
+                break;
+            case PERMISSION_DENIED:
+                handlePermissionDenied(event);
+                break;
+            default:
+                post(event);
+                break;
+        }
+    }
+
+    private void handlePermissionDenied(P4RuntimeEvent event) {
+        postDeviceAgentEvent(event.subject().deviceId(), new DeviceAgentEvent(
+                DeviceAgentEvent.Type.NOT_MASTER, event.subject().deviceId()));
+    }
+
+    private void handleChannelEvent(P4RuntimeEvent event) {
+        final ChannelEvent channelEvent = (ChannelEvent) event.subject();
+        final DeviceId deviceId = channelEvent.deviceId();
+        final DeviceAgentEvent.Type agentEventType;
+        switch (channelEvent.type()) {
+            case OPEN:
+                agentEventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+                break;
+            case CLOSED:
+                agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+                break;
+            case ERROR:
+                agentEventType = !isReachable(deviceId)
+                        ? DeviceAgentEvent.Type.CHANNEL_CLOSED
+                        : DeviceAgentEvent.Type.CHANNEL_ERROR;
+                break;
+            default:
+                log.warn("Unrecognized channel event type {}", channelEvent.type());
+                return;
+        }
+        postDeviceAgentEvent(deviceId, new DeviceAgentEvent(agentEventType, deviceId));
+    }
+
+    private void handleArbitrationReply(P4RuntimeEvent event) {
+        final DeviceId deviceId = event.subject().deviceId();
+        final ArbitrationUpdateEvent response = (ArbitrationUpdateEvent) event.subject();
+        final DeviceAgentEvent.Type roleType = response.isMaster()
+                ? DeviceAgentEvent.Type.ROLE_MASTER
+                : DeviceAgentEvent.Type.ROLE_STANDBY;
+        postDeviceAgentEvent(deviceId, new DeviceAgentEvent(
+                roleType, response.deviceId()));
+    }
+
+    private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
+        if (deviceAgentListeners.containsKey(deviceId)) {
+            deviceAgentListeners.get(deviceId).values().forEach(l -> l.event(event));
+        }
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/PacketInEvent.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/PacketInEvent.java
new file mode 100644
index 0000000..4a983a8
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/PacketInEvent.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.controller;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.runtime.PiPacketOperation;
+import org.onosproject.p4runtime.api.P4RuntimePacketIn;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * P4Runtime packet-in.
+ */
+public final class PacketInEvent implements P4RuntimePacketIn {
+
+    private final DeviceId deviceId;
+    private final PiPacketOperation operation;
+
+    public PacketInEvent(DeviceId deviceId, PiPacketOperation operation) {
+        this.deviceId = checkNotNull(deviceId);
+        this.operation = checkNotNull(operation);
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    @Override
+    public PiPacketOperation packetOperation() {
+        return operation;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PacketInEvent that = (PacketInEvent) o;
+        return Objects.equal(deviceId, that.deviceId) &&
+                Objects.equal(operation, that.operation);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(deviceId, operation);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("deviceId", deviceId)
+                .add("operation", operation)
+                .toString();
+    }
+}
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/package-info.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/package-info.java
new file mode 100644
index 0000000..4d37da9
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/controller/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * P4Runtime controller implementation classes.
+ */
+package org.onosproject.p4runtime.ctl.controller;