ONOS-6559 P4Runtime protocol library

Change-Id: I7070b69507dcf2ca47ee1c446bcc2505ca868fb1
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
new file mode 100644
index 0000000..e6603ea
--- /dev/null
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.p4runtime.ctl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.TextFormat;
+import io.grpc.ManagedChannel;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import org.onlab.util.ImmutableByteSequence;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.runtime.PiTableEntry;
+import org.onosproject.net.pi.runtime.PiTableId;
+import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeEvent;
+import org.slf4j.Logger;
+import p4.P4RuntimeGrpc;
+import p4.config.P4InfoOuterClass;
+import p4.tmp.P4Config;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.onlab.util.ImmutableByteSequence.copyFrom;
+import static org.slf4j.LoggerFactory.getLogger;
+import static p4.P4RuntimeOuterClass.*;
+import static p4.P4RuntimeOuterClass.SetForwardingPipelineConfigRequest.Action.VERIFY_AND_COMMIT;
+
+/**
+ * Implementation of a P4Runtime client.
+ */
+public class P4RuntimeClientImpl implements P4RuntimeClient {
+
+    private static final int DEADLINE_SECONDS = 15;
+
+    private final Logger log = getLogger(getClass());
+
+    private final DeviceId deviceId;
+    private final int p4DeviceId;
+    private final P4RuntimeControllerImpl controller;
+    private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
+    private final P4RuntimeGrpc.P4RuntimeStub asyncStub;
+    private ExecutorService executorService;
+    private StreamObserver<StreamMessageRequest> streamRequestObserver;
+
+
+    P4RuntimeClientImpl(DeviceId deviceId, int p4DeviceId, ManagedChannel channel, P4RuntimeControllerImpl controller,
+                        ExecutorService executorService) {
+        this.deviceId = deviceId;
+        this.p4DeviceId = p4DeviceId;
+        this.controller = controller;
+        this.executorService = executorService;
+        this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel)
+                .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
+        this.asyncStub = P4RuntimeGrpc.newStub(channel)
+                .withDeadlineAfter(DEADLINE_SECONDS, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> initStreamChannel() {
+        return CompletableFuture.supplyAsync(this::doInitStreamChannel, executorService);
+    }
+
+    private boolean doInitStreamChannel() {
+        if (this.streamRequestObserver == null) {
+            this.streamRequestObserver = this.asyncStub.streamChannel(new StreamChannelResponseObserver());
+            // To listen for packets and other events, we need to start the RPC.
+            // Here we do it by sending an empty packet out.
+            try {
+                this.streamRequestObserver.onNext(StreamMessageRequest.newBuilder()
+                                                          .setPacket(PacketOut.getDefaultInstance())
+                                                          .build());
+            } catch (StatusRuntimeException e) {
+                log.warn("Unable to initialize stream channel for {}: {}", deviceId, e);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> setPipelineConfig(InputStream p4info, InputStream targetConfig) {
+        return CompletableFuture.supplyAsync(() -> doSetPipelineConfig(p4info, targetConfig), executorService);
+    }
+
+    private boolean doSetPipelineConfig(InputStream p4info, InputStream targetConfig) {
+
+        log.debug("Setting pipeline config for {}", deviceId);
+
+        P4InfoOuterClass.P4Info.Builder p4iInfoBuilder = P4InfoOuterClass.P4Info.newBuilder();
+
+        try {
+            TextFormat.getParser().merge(new InputStreamReader(p4info),
+                                         ExtensionRegistry.getEmptyRegistry(),
+                                         p4iInfoBuilder);
+        } catch (IOException ex) {
+            log.warn("Unable to load p4info for {}: {}", deviceId, ex.getMessage());
+            return false;
+        }
+
+        P4Config.P4DeviceConfig deviceIdConfig;
+        try {
+            deviceIdConfig = P4Config.P4DeviceConfig
+                    .newBuilder()
+                    .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
+                    .setReassign(true)
+                    .setDeviceData(ByteString.readFrom(targetConfig))
+                    .build();
+        } catch (IOException ex) {
+            log.warn("Unable to load target-specific config for {}: {}", deviceId, ex.getMessage());
+            return false;
+        }
+
+        SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
+                .newBuilder()
+                .setAction(VERIFY_AND_COMMIT)
+                .addConfigs(ForwardingPipelineConfig
+                                    .newBuilder()
+                                    .setDeviceId(p4DeviceId)
+                                    .setP4Info(p4iInfoBuilder.build())
+                                    .setP4DeviceConfig(deviceIdConfig.toByteString())
+                                    .build())
+                .build();
+        try {
+            this.blockingStub.setForwardingPipelineConfig(request);
+        } catch (StatusRuntimeException ex) {
+            log.warn("Unable to set pipeline config for {}: {}", deviceId, ex.getMessage());
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public boolean writeTableEntries(Collection<PiTableEntry> entries, WriteOperationType opType) {
+
+        throw new UnsupportedOperationException("writeTableEntries not implemented.");
+    }
+
+    @Override
+    public CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId tableId) {
+
+        throw new UnsupportedOperationException("dumpTable not implemented.");
+    }
+
+    @Override
+    public void shutdown() {
+
+        if (this.streamRequestObserver != null) {
+            this.streamRequestObserver.onError(new StatusRuntimeException(Status.CANCELLED));
+            this.streamRequestObserver.onCompleted();
+        }
+
+        this.executorService.shutdownNow();
+        try {
+            executorService.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("Executor service didn't shutdown in time.");
+        }
+
+        // Prevent the execution of other tasks.
+        executorService = null;
+    }
+
+    private class StreamChannelResponseObserver implements StreamObserver<StreamMessageResponse> {
+
+        @Override
+        public void onNext(StreamMessageResponse message) {
+
+            P4RuntimeEvent event;
+
+            if (message.getPacket().isInitialized()) {
+                // Packet-in
+                PacketIn packetIn = message.getPacket();
+                ImmutableByteSequence data = copyFrom(packetIn.getPayload().asReadOnlyByteBuffer());
+                ImmutableList.Builder<ImmutableByteSequence> metadataBuilder = ImmutableList.builder();
+                packetIn.getMetadataList().stream()
+                        .map(m -> m.getValue().asReadOnlyByteBuffer())
+                        .map(ImmutableByteSequence::copyFrom)
+                        .forEach(metadataBuilder::add);
+                event = new DefaultPacketInEvent(deviceId, data, metadataBuilder.build());
+
+            } else if (message.getArbitration().isInitialized()) {
+                // Arbitration.
+                throw new UnsupportedOperationException("Arbitration not implemented.");
+
+            } else {
+                log.warn("Unrecognized stream message from {}: {}", deviceId, message);
+                return;
+            }
+
+            controller.postEvent(event);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            log.warn("Error on stream channel for {}: {}", deviceId, throwable);
+        }
+
+        @Override
+        public void onCompleted() {
+            // TODO: declare the device as disconnected?
+        }
+    }
+
+}