New P4RuntimeClient implementation that supports batching and error reporting

The new client API supports batching and provides detailed response for
write requests (e.g. if entity already exists when inserting), which was
not possible with the old one.

This patch includes:
- New more efficient implementation of P4RuntimeClient (no more locking,
use native gRPC executor, use stub deadlines)
- Ported all codecs to new AbstractCodec-based implementation (needed to
implement codec cache in the future)
- Uses batching in P4RuntimeFlowRuleProgrammable and
P4RuntimeGroupActionProgrammable
- Minor changes to PI framework runtime classes

Change-Id: I3fac42057bb4e1389d761006a32600c786598683
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
new file mode 100644
index 0000000..7afd97b
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/client/StreamClientImpl.java
@@ -0,0 +1,404 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl.client;
+
+import com.google.protobuf.TextFormat;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.runtime.PiPacketOperation;
+import org.onosproject.net.pi.service.PiPipeconfService;
+import org.onosproject.p4runtime.api.P4RuntimeEvent;
+import org.onosproject.p4runtime.api.P4RuntimeStreamClient;
+import org.onosproject.p4runtime.ctl.codec.CodecException;
+import org.onosproject.p4runtime.ctl.controller.ArbitrationUpdateEvent;
+import org.onosproject.p4runtime.ctl.controller.ChannelEvent;
+import org.onosproject.p4runtime.ctl.controller.P4RuntimeControllerImpl;
+import org.onosproject.p4runtime.ctl.controller.PacketInEvent;
+import org.slf4j.Logger;
+import p4.v1.P4RuntimeOuterClass;
+import p4.v1.P4RuntimeOuterClass.StreamMessageRequest;
+import p4.v1.P4RuntimeOuterClass.StreamMessageResponse;
+
+import java.math.BigInteger;
+import java.net.ConnectException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.lang.String.format;
+import static org.onosproject.p4runtime.ctl.codec.Codecs.CODECS;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of P4RuntimeStreamClient. Handles P4Runtime StreamChannel RPC
+ * operations, such as arbitration update and packet-in/out.
+ */
+public final class StreamClientImpl implements P4RuntimeStreamClient {
+
+    private static final Logger log = getLogger(StreamClientImpl.class);
+
+    private static final BigInteger ONE_THOUSAND = BigInteger.valueOf(1000);
+
+    private final P4RuntimeClientImpl client;
+    private final DeviceId deviceId;
+    private final long p4DeviceId;
+    private final PiPipeconfService pipeconfService;
+    private final P4RuntimeControllerImpl controller;
+    private final StreamChannelManager streamChannelManager = new StreamChannelManager();
+
+    private P4RuntimeOuterClass.Uint128 lastUsedElectionId = P4RuntimeOuterClass.Uint128
+            .newBuilder().setLow(1).build();
+
+    private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
+
+    StreamClientImpl(
+            PiPipeconfService pipeconfService,
+            P4RuntimeClientImpl client,
+            P4RuntimeControllerImpl controller) {
+        this.client = client;
+        this.deviceId = client.deviceId();
+        this.p4DeviceId = client.p4DeviceId();
+        this.pipeconfService = pipeconfService;
+        this.controller = controller;
+    }
+
+    @Override
+    public void openSession() {
+        if (isSessionOpen()) {
+            log.debug("Dropping request to open session for {}, session is already open",
+                      deviceId);
+            return;
+        }
+        log.debug("Opening session for {}...", deviceId);
+        sendMasterArbitrationUpdate(controller.newMasterElectionId(deviceId));
+
+    }
+
+    @Override
+    public boolean isSessionOpen() {
+        return streamChannelManager.isOpen();
+    }
+
+    @Override
+    public void closeSession() {
+        streamChannelManager.complete();
+    }
+
+    @Override
+    public void runForMastership() {
+        if (!isSessionOpen()) {
+            log.debug("Dropping mastership request for {}, session is closed",
+                      deviceId);
+            return;
+        }
+        // Becoming master is a race. Here we increase our chances of win, i.e.
+        // using the highest election ID, against other ONOS nodes in the
+        // cluster that are calling openSession() (which is used to start the
+        // stream RPC session, not to become master).
+        log.debug("Running for mastership on {}...", deviceId);
+        final BigInteger masterId = controller.newMasterElectionId(deviceId)
+                .add(ONE_THOUSAND);
+        sendMasterArbitrationUpdate(masterId);
+    }
+
+    @Override
+    public boolean isMaster() {
+        return streamChannelManager.isOpen() && isClientMaster.get();
+    }
+
+    @Override
+    public void packetOut(PiPacketOperation packet, PiPipeconf pipeconf) {
+        if (!isSessionOpen()) {
+            log.debug("Dropping packet-out request for {}, session is closed",
+                      deviceId);
+            return;
+        }
+        if (log.isTraceEnabled()) {
+            log.trace("Sending packet-out to {}: {}", deviceId, packet);
+        }
+        try {
+            // Encode the PiPacketOperation into a PacketOut
+            final P4RuntimeOuterClass.PacketOut packetOut =
+                    CODECS.packetOut().encode(packet, null, pipeconf);
+            // Build the request
+            final StreamMessageRequest packetOutRequest = StreamMessageRequest
+                    .newBuilder().setPacket(packetOut).build();
+            // Send.
+            streamChannelManager.sendIfOpen(packetOutRequest);
+        } catch (CodecException e) {
+            log.error("Unable to send packet-out: {}", e.getMessage());
+        }
+    }
+
+    private void sendMasterArbitrationUpdate(BigInteger electionId) {
+        log.debug("Sending arbitration update to {}... electionId={}",
+                  deviceId, electionId);
+        final P4RuntimeOuterClass.Uint128 idMsg = bigIntegerToUint128(electionId);
+        streamChannelManager.send(
+                StreamMessageRequest.newBuilder()
+                        .setArbitration(
+                                P4RuntimeOuterClass.MasterArbitrationUpdate
+                                        .newBuilder()
+                                        .setDeviceId(p4DeviceId)
+                                        .setElectionId(idMsg)
+                                        .build())
+                        .build());
+        lastUsedElectionId = idMsg;
+    }
+
+    private P4RuntimeOuterClass.Uint128 bigIntegerToUint128(BigInteger value) {
+        final byte[] arr = value.toByteArray();
+        final ByteBuffer bb = ByteBuffer.allocate(Long.BYTES * 2)
+                .put(new byte[Long.BYTES * 2 - arr.length])
+                .put(arr);
+        bb.rewind();
+        return P4RuntimeOuterClass.Uint128.newBuilder()
+                .setHigh(bb.getLong())
+                .setLow(bb.getLong())
+                .build();
+    }
+
+    private BigInteger uint128ToBigInteger(P4RuntimeOuterClass.Uint128 value) {
+        return new BigInteger(
+                ByteBuffer.allocate(Long.BYTES * 2)
+                        .putLong(value.getHigh())
+                        .putLong(value.getLow())
+                        .array());
+    }
+
+    private void handlePacketIn(P4RuntimeOuterClass.PacketIn packetInMsg) {
+        if (log.isTraceEnabled()) {
+            log.trace("Received packet-in from {}: {}", deviceId, packetInMsg);
+        }
+        if (!pipeconfService.getPipeconf(deviceId).isPresent()) {
+            log.warn("Unable to handle packet-in from {}, missing pipeconf: {}",
+                     deviceId, TextFormat.shortDebugString(packetInMsg));
+            return;
+        }
+        // Decode packet message and post event.
+        // TODO: consider implementing a cache to speed up
+        //  encoding/deconding of packet-in/out (e.g. LLDP, ARP)
+        final PiPipeconf pipeconf = pipeconfService.getPipeconf(deviceId).get();
+        final PiPacketOperation pktOperation;
+        try {
+            pktOperation = CODECS.packetIn().decode(
+                    packetInMsg, null, pipeconf);
+        } catch (CodecException e) {
+            log.warn("Unable to process packet-int: {}", e.getMessage());
+            return;
+        }
+        controller.postEvent(new P4RuntimeEvent(
+                P4RuntimeEvent.Type.PACKET_IN,
+                new PacketInEvent(deviceId, pktOperation)));
+    }
+
+    private void handleArbitrationUpdate(P4RuntimeOuterClass.MasterArbitrationUpdate msg) {
+        // From the spec...
+        // - Election_id: The stream RPC with the highest election_id is the
+        // master. Switch populates with the highest election ID it
+        // has received from all connected controllers.
+        // - Status: Switch populates this with OK for the client that is the
+        // master, and with an error status for all other connected clients (at
+        // every mastership change).
+        if (!msg.hasElectionId() || !msg.hasStatus()) {
+            return;
+        }
+        final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
+        log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
+                  deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
+        controller.postEvent(new P4RuntimeEvent(
+                P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
+                new ArbitrationUpdateEvent(deviceId, isMaster)));
+        isClientMaster.set(isMaster);
+    }
+
+    /**
+     * Returns the election ID last used in a MasterArbitrationUpdate message
+     * sent by the client to the server.
+     *
+     * @return election ID uint128 protobuf message
+     */
+    P4RuntimeOuterClass.Uint128 lastUsedElectionId() {
+        return lastUsedElectionId;
+    }
+
+    /**
+     * A manager for the P4Runtime stream channel that opportunistically creates
+     * new stream RCP stubs (e.g. when one fails because of errors) and posts
+     * channel events via the P4Runtime controller.
+     */
+    private final class StreamChannelManager {
+
+        private final AtomicBoolean open = new AtomicBoolean(false);
+        private final StreamObserver<StreamMessageResponse> responseObserver =
+                new InternalStreamResponseObserver(this);
+        private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
+
+        void send(StreamMessageRequest value) {
+            synchronized (this) {
+                initIfRequired();
+                doSend(value);
+            }
+        }
+
+        void sendIfOpen(StreamMessageRequest value) {
+            // We do not lock here, but we ignore NPEs due to stream RPC not
+            // being active (null requestObserver). Good for frequent
+            // packet-outs.
+            try {
+                doSend(value);
+            } catch (NullPointerException e) {
+                if (requestObserver != null) {
+                    // Must be something else.
+                    throw e;
+                }
+            }
+        }
+
+        private void doSend(StreamMessageRequest value) {
+            try {
+                requestObserver.onNext(value);
+            } catch (Throwable ex) {
+                if (ex instanceof StatusRuntimeException) {
+                    log.warn("Unable to send {} to {}: {}",
+                             value.getUpdateCase().toString(), deviceId, ex.getMessage());
+                } else {
+                    log.error("Exception while sending {} to {}: {}",
+                              value.getUpdateCase().toString(), deviceId, ex);
+                }
+                complete();
+            }
+        }
+
+        private void initIfRequired() {
+            if (requestObserver == null) {
+                log.debug("Creating new stream channel for {}...", deviceId);
+                open.set(false);
+                client.execRpcNoTimeout(
+                        s -> requestObserver =
+                                (ClientCallStreamObserver<StreamMessageRequest>)
+                                        s.streamChannel(responseObserver)
+                );
+            }
+        }
+
+        void complete() {
+            synchronized (this) {
+                signalClosed();
+                if (requestObserver != null) {
+                    requestObserver.onCompleted();
+                    requestObserver.cancel("Completed", null);
+                    requestObserver = null;
+                }
+            }
+        }
+
+        void signalOpen() {
+            synchronized (this) {
+                final boolean wasOpen = open.getAndSet(true);
+                if (!wasOpen) {
+                    controller.postEvent(new P4RuntimeEvent(
+                            P4RuntimeEvent.Type.CHANNEL_EVENT,
+                            new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
+                }
+            }
+        }
+
+        void signalClosed() {
+            synchronized (this) {
+                final boolean wasOpen = open.getAndSet(false);
+                if (wasOpen) {
+                    controller.postEvent(new P4RuntimeEvent(
+                            P4RuntimeEvent.Type.CHANNEL_EVENT,
+                            new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
+                }
+            }
+        }
+
+        boolean isOpen() {
+            return open.get();
+        }
+    }
+
+    /**
+     * Handles messages received from the device on the stream channel.
+     */
+    private final class InternalStreamResponseObserver
+            implements StreamObserver<StreamMessageResponse> {
+
+        private final StreamChannelManager streamChannelManager;
+
+        private InternalStreamResponseObserver(
+                StreamChannelManager streamChannelManager) {
+            this.streamChannelManager = streamChannelManager;
+        }
+
+        @Override
+        public void onNext(StreamMessageResponse message) {
+            streamChannelManager.signalOpen();
+            try {
+                if (log.isTraceEnabled()) {
+                    log.trace(
+                            "Received {} from {}: {}",
+                            message.getUpdateCase(), deviceId,
+                            TextFormat.shortDebugString(message));
+                }
+                switch (message.getUpdateCase()) {
+                    case PACKET:
+                        handlePacketIn(message.getPacket());
+                        return;
+                    case ARBITRATION:
+                        handleArbitrationUpdate(message.getArbitration());
+                        return;
+                    default:
+                        log.warn("Unrecognized StreamMessageResponse from {}: {}",
+                                 deviceId, message.getUpdateCase());
+                }
+            } catch (Throwable ex) {
+                log.error("Exception while processing stream message from {}",
+                          deviceId, ex);
+            }
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            if (throwable instanceof StatusRuntimeException) {
+                final StatusRuntimeException sre = (StatusRuntimeException) throwable;
+                if (sre.getStatus().getCause() instanceof ConnectException) {
+                    log.warn("{} is unreachable ({})",
+                             deviceId, sre.getCause().getMessage());
+                } else {
+                    log.warn("Error on stream channel for {}: {}",
+                             deviceId, throwable.getMessage());
+                }
+            } else {
+                log.error(format("Exception on stream channel for %s",
+                                 deviceId), throwable);
+            }
+            streamChannelManager.complete();
+        }
+
+        @Override
+        public void onCompleted() {
+            log.warn("Stream channel for {} has completed", deviceId);
+            streamChannelManager.complete();
+        }
+    }
+}