ONOS-6561 BMv2 handshaker via P4Runtime

+ support fort device-specific default pipeconf
+ improvements to P4runtime and gRPC protocol stuff

Change-Id: I8986fce3959df564454ea3d31859860f61eabcae
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultInterpreter.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultInterpreter.java
new file mode 100644
index 0000000..59f0c8c
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultInterpreter.java
@@ -0,0 +1,121 @@
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.drivers.bmv2;
+import com.google.common.collect.ImmutableBiMap;
+import org.onlab.util.ImmutableByteSequence;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipelineInterpreter;
+import org.onosproject.net.pi.runtime.PiAction;
+import org.onosproject.net.pi.runtime.PiActionId;
+import org.onosproject.net.pi.runtime.PiActionParam;
+import org.onosproject.net.pi.runtime.PiActionParamId;
+import org.onosproject.net.pi.runtime.PiHeaderFieldId;
+import org.onosproject.net.pi.runtime.PiTableAction;
+import org.onosproject.net.pi.runtime.PiTableId;
+import java.util.Optional;
+import static org.onosproject.net.PortNumber.CONTROLLER;
+ * Interpreter implementation for the default pipeconf.
+ */
+public class Bmv2DefaultInterpreter extends AbstractHandlerBehaviour implements PiPipelineInterpreter {
+    private static final String TABLE0 = "table0";
+    private static final String SEND_TO_CPU = "send_to_cpu_0";
+    private static final String PORT = "port";
+    private static final String DROP = "_drop_0";
+    private static final String SET_EGRESS_PORT = "set_egress_port_0";
+    private static final PiHeaderFieldId IN_PORT_ID = PiHeaderFieldId.of("standard_metadata", "ingress_port");
+    private static final PiHeaderFieldId ETH_DST_ID = PiHeaderFieldId.of("ethernet_t", "dstAddr");
+    private static final PiHeaderFieldId ETH_SRC_ID = PiHeaderFieldId.of("ethernet_t", "srcAddr");
+    private static final PiHeaderFieldId ETH_TYPE_ID = PiHeaderFieldId.of("ethernet_t", "etherType");
+    private static final ImmutableBiMap<Criterion.Type, PiHeaderFieldId> CRITERION_MAP = ImmutableBiMap.of(
+            Criterion.Type.IN_PORT, IN_PORT_ID,
+            Criterion.Type.ETH_DST, ETH_DST_ID,
+            Criterion.Type.ETH_SRC, ETH_SRC_ID,
+            Criterion.Type.ETH_TYPE, ETH_TYPE_ID);
+    private static final ImmutableBiMap<Integer, PiTableId> TABLE_MAP = ImmutableBiMap.of(
+            0, PiTableId.of(TABLE0));
+    @Override
+    public PiTableAction mapTreatment(TrafficTreatment treatment, PiPipeconf pipeconf) throws PiInterpreterException {
+        if (treatment.allInstructions().size() == 0) {
+            // No instructions means drop for us.
+            return actionWithName(DROP);
+        } else if (treatment.allInstructions().size() > 1) {
+            // Otherwise, we understand treatments with only 1 instruction.
+            throw new PiPipelineInterpreter.PiInterpreterException("Treatment has multiple instructions");
+        }
+        Instruction instruction = treatment.allInstructions().get(0);
+        switch (instruction.type()) {
+            case OUTPUT:
+                Instructions.OutputInstruction outInstruction = (Instructions.OutputInstruction) instruction;
+                PortNumber port = outInstruction.port();
+                if (!port.isLogical()) {
+                    PiAction.builder()
+                            .withId(PiActionId.of(SET_EGRESS_PORT))
+                            .withParameter(new PiActionParam(PiActionParamId.of(PORT),
+                                                             ImmutableByteSequence.copyFrom(port.toLong())))
+                            .build();
+                } else if (port.equals(CONTROLLER)) {
+                    return actionWithName(SEND_TO_CPU);
+                } else {
+                    throw new PiInterpreterException("Egress on logical port not supported: " + port);
+                }
+            case NOACTION:
+                return actionWithName(DROP);
+            default:
+                throw new PiInterpreterException("Instruction type not supported: " + instruction.type().name());
+        }
+    }
+    /**
+     * Returns an action instance with no runtime parameters.
+     */
+    private PiAction actionWithName(String name) {
+        return PiAction.builder().withId(PiActionId.of(name)).build();
+    }
+    @Override
+    public Optional<PiHeaderFieldId> mapCriterionType(Criterion.Type type) {
+        return Optional.ofNullable(CRITERION_MAP.get(type));
+    }
+    @Override
+    public Optional<Criterion.Type> mapPiHeaderFieldId(PiHeaderFieldId headerFieldId) {
+        return Optional.ofNullable(CRITERION_MAP.inverse().get(headerFieldId));
+    }
+    @Override
+    public Optional<PiTableId> mapFlowRuleTableId(int flowRuleTableId) {
+        return Optional.ofNullable(TABLE_MAP.get(flowRuleTableId));
+    }
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconf.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconf.java
new file mode 100644
index 0000000..ac2dbd7
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2DefaultPipeconf.java
@@ -0,0 +1,103 @@
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.drivers.bmv2;
+import com.eclipsesource.json.Json;
+import com.google.common.collect.ImmutableMap;
+import org.onosproject.bmv2.model.Bmv2PipelineModelParser;
+import org.onosproject.net.driver.Behaviour;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.model.PiPipelineInterpreter;
+import org.onosproject.net.pi.model.PiPipelineModel;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+ * Implementation of the default pipeconf for a BMv2 device.
+ */
+public final class Bmv2DefaultPipeconf implements PiPipeconf {
+    private static final String PIPECONF_ID = "bmv2-default-pipeconf";
+    private static final String JSON_PATH = "/default.json";
+    private static final String P4INFO_PATH = "/default.p4info";
+    private final PiPipeconfId id;
+    private final PiPipelineModel pipelineModel;
+    private final InputStream jsonConfigStream = this.getClass().getResourceAsStream(JSON_PATH);
+    private final InputStream p4InfoStream = this.getClass().getResourceAsStream(P4INFO_PATH);
+    private final Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours;
+    Bmv2DefaultPipeconf() {
+        this.id = new PiPipeconfId(PIPECONF_ID);
+        try {
+            this.pipelineModel = Bmv2PipelineModelParser.parse(
+                    Json.parse(new BufferedReader(new InputStreamReader(jsonConfigStream))).asObject());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        this.behaviours = ImmutableMap.of(
+                PiPipelineInterpreter.class, Bmv2DefaultInterpreter.class
+                // TODO: reuse default single table pipeliner.
+        );
+    }
+    @Override
+    public PiPipeconfId id() {
+        return this.id;
+    }
+    @Override
+    public PiPipelineModel pipelineModel() {
+        return pipelineModel;
+    }
+    @Override
+    public Collection<Class<? extends Behaviour>> behaviours() {
+        return behaviours.keySet();
+    }
+    @Override
+    public Optional<Class<? extends Behaviour>> implementation(Class<? extends Behaviour> behaviour) {
+        return Optional.ofNullable(behaviours.get(behaviour));
+    }
+    @Override
+    public boolean hasBehaviour(Class<? extends Behaviour> behaviourClass) {
+        return behaviours.containsKey(behaviourClass);
+    }
+    @Override
+    public Optional<InputStream> extension(ExtensionType type) {
+        switch (type) {
+            case BMV2_JSON:
+                return Optional.of(jsonConfigStream);
+            case P4_INFO_TEXT:
+                return Optional.of(p4InfoStream);
+            default:
+                return Optional.empty();
+        }
+    }
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2Handshaker.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2Handshaker.java
index 8774c09..5d74947 100644
--- a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2Handshaker.java
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2Handshaker.java
@@ -16,17 +16,14 @@
 package org.onosproject.drivers.bmv2;
-import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.api.GrpcController;
-import org.onosproject.grpc.api.GrpcServiceId;
-import org.onosproject.grpc.api.GrpcStreamObserverId;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.device.DeviceHandshaker;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.onosproject.net.driver.DriverData;
-import org.onosproject.net.key.DeviceKeyId;
-import org.onosproject.net.key.DeviceKeyService;
+import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.slf4j.Logger;
 import java.util.concurrent.CompletableFuture;
@@ -34,63 +31,71 @@
 import static org.slf4j.LoggerFactory.getLogger;
- * Implementation of the DeviceHandshaker for the BMv2 softswitch.
+ * Implementation of DeviceHandshaker for BMv2.
-//TODO consider abstract class with empty connect method and
-//the implementation into a protected one for reusability.
-//FIXME fill method bodies, used for testing.
 public class Bmv2Handshaker extends AbstractHandlerBehaviour
         implements DeviceHandshaker {
     private final Logger log = getLogger(getClass());
+    // TODO: consider abstract class with empty connect method and  implementation into a protected one for reusability.
     public CompletableFuture<Boolean> connect() {
-        GrpcController controller = handler().get(GrpcController.class);
+        return CompletableFuture.supplyAsync(this::doConnect);
+    }
+    private boolean doConnect() {
+        P4RuntimeController controller = handler().get(P4RuntimeController.class);
         DeviceId deviceId = handler().data().deviceId();
-        CompletableFuture<Boolean> result = new CompletableFuture<>();
-        DeviceKeyService deviceKeyService = handler().get(DeviceKeyService.class);
+        // DeviceKeyService deviceKeyService = handler().get(DeviceKeyService.class);
         DriverData data = data();
-        //Retrieving information from the driver data.
-        log.info("protocol bmv2, ip {}, port {}, key {}", data.value("p4runtime_ip"),
-                data.value("p4runtime_port"),
-                deviceKeyService.getDeviceKey(DeviceKeyId.deviceKeyId(data.value("p4runtime_key")))
-                        .asUsernamePassword().username());
-        log.info("protocol gnmi, ip {}, port {}, key {}", data.value("gnmi_ip"), data.value("gnmi_port"),
-                deviceKeyService.getDeviceKey(DeviceKeyId.deviceKeyId(data.value("gnmi_key")))
-                        .asUsernamePassword().username());
-        result.complete(true);
+        String serverAddr = data.value("p4runtime_ip");
+        int serverPort = Integer.valueOf(data.value("p4runtime_port"));
+        int p4DeviceId = Integer.valueOf(data.value("p4runtime_deviceId"));
-        //we know we need packet in so we register the observer.
-        GrpcChannelId channelId = GrpcChannelId.of(deviceId, "bmv2");
-        GrpcServiceId serviceId = GrpcServiceId.of(channelId, "p4runtime");
-        GrpcStreamObserverId observerId = GrpcStreamObserverId.of(serviceId,
-                Bmv2PacketProgrammable.class.getSimpleName());
-        controller.addObserver(observerId, new Bmv2PacketInObserverHandler());
-        return result;
+        ManagedChannelBuilder channelBuilder = NettyChannelBuilder
+                .forAddress(serverAddr, serverPort)
+                .usePlaintext(true);
+        if (!controller.createClient(deviceId, p4DeviceId, channelBuilder)) {
+            log.warn("Unable to create P4runtime client for {}", deviceId);
+            return false;
+        }
+        // TODO: gNMI handling
+        return true;
     public CompletableFuture<Boolean> disconnect() {
-        CompletableFuture<Boolean> result = new CompletableFuture<>();
-        result.complete(true);
-        return result;
+        return CompletableFuture.supplyAsync(() -> {
+            P4RuntimeController controller = handler().get(P4RuntimeController.class);
+            DeviceId deviceId = handler().data().deviceId();
+            controller.removeClient(deviceId);
+            return true;
+        });
     public CompletableFuture<Boolean> isReachable() {
-        CompletableFuture<Boolean> result = new CompletableFuture<>();
-        result.complete(true);
-        return result;
+        return CompletableFuture.supplyAsync(() -> {
+            P4RuntimeController controller = handler().get(P4RuntimeController.class);
+            DeviceId deviceId = handler().data().deviceId();
+            return controller.isReacheable(deviceId);
+        });
     public CompletableFuture<MastershipRole> roleChanged(MastershipRole newRole) {
         CompletableFuture<MastershipRole> result = new CompletableFuture<>();
+        log.warn("roleChanged not implemented");
+        // TODO.
         return result;
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2PacketInObserverHandler.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2PacketInObserverHandler.java
deleted file mode 100644
index 3b9e145..0000000
--- a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2PacketInObserverHandler.java
+++ /dev/null
@@ -1,79 +0,0 @@
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.drivers.bmv2;
-import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
-import org.onosproject.grpc.api.GrpcObserverHandler;
-import org.slf4j.Logger;
-import java.util.Optional;
-import static org.slf4j.LoggerFactory.getLogger;
- * Sample Implementation of a PacketInObserverHandler.
- * TODO refactor and actually use.
- */
-public class Bmv2PacketInObserverHandler implements GrpcObserverHandler {
-    private final Logger log = getLogger(getClass());
-    //private final AbstractStub asyncStub;
-    //FIXME put at object due to p4Runtime compilation problems to be fixed.
-    private StreamObserver<Object> requestStreamObserver;
-    @Override
-    public void bindObserver(ManagedChannel channel) {
-        //asyncStub = ProtoGeneratedClass.newStub(channel);
-        //reqeustStreamObserver = asyncStub.MethodName(new PacketInObserver());
-    }
-    @Override
-    public Optional<StreamObserver> requestStreamObserver() {
-        return Optional.of(requestStreamObserver);
-    }
-    @Override
-    public void removeObserver() {
-        //this should complete the two way streaming
-        requestStreamObserver.onCompleted();
-    }
-    private class PacketInObserver implements StreamObserver<Object> {
-        @Override
-        public void onNext(Object o) {
-            log.info("onNext: {}", o.toString());
-        }
-        @Override
-        public void onError(Throwable throwable) {
-        }
-        @Override
-        public void onCompleted() {
-        }
-    }
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2PacketProgrammable.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2PacketProgrammable.java
index 4b76067..7bf2258 100644
--- a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2PacketProgrammable.java
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2PacketProgrammable.java
@@ -16,40 +16,32 @@
 package org.onosproject.drivers.bmv2;
-import io.grpc.stub.StreamObserver;
-import org.onosproject.grpc.api.GrpcChannelId;
-import org.onosproject.grpc.api.GrpcController;
-import org.onosproject.grpc.api.GrpcObserverHandler;
-import org.onosproject.grpc.api.GrpcServiceId;
-import org.onosproject.grpc.api.GrpcStreamObserverId;
-import org.onosproject.net.DeviceId;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
-import org.onosproject.net.driver.DriverHandler;
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketProgrammable;
-import java.util.Optional;
  * Packet Programmable behaviour for BMv2 devices.
 public class Bmv2PacketProgrammable extends AbstractHandlerBehaviour implements PacketProgrammable {
     public void emit(OutboundPacket packet) {
-        DriverHandler handler = handler();
-        GrpcController controller = handler.get(GrpcController.class);
-        DeviceId deviceId = handler.data().deviceId();
-        GrpcChannelId channelId = GrpcChannelId.of(deviceId, "bmv2");
-        GrpcServiceId serviceId = GrpcServiceId.of(channelId, "p4runtime");
-        GrpcStreamObserverId observerId = GrpcStreamObserverId.of(serviceId,
-                this.getClass().getSimpleName());
-        Optional<GrpcObserverHandler> manager = controller.getObserverManager(observerId);
-        if (!manager.isPresent()) {
-            //this is the first time the behaviour is called
-            controller.addObserver(observerId, new Bmv2PacketInObserverHandler());
-        }
-        //other already registered the observer for us.
-        Optional<StreamObserver> observer = manager.get().requestStreamObserver();
-        observer.ifPresent(objectStreamObserver -> objectStreamObserver.onNext(packet));
+        // TODO: implement using P4runtime client.
+        // DriverHandler handler = handler();
+        // GrpcController controller = handler.get(GrpcController.class);
+        // DeviceId deviceId = handler.data().deviceId();
+        // GrpcChannelId channelId = GrpcChannelId.of(deviceId, "bmv2");
+        // GrpcServiceId serviceId = GrpcServiceId.of(channelId, "p4runtime");
+        // GrpcStreamObserverId observerId = GrpcStreamObserverId.of(serviceId,
+        //         this.getClass().getSimpleName());
+        // Optional<GrpcObserverHandler> manager = controller.getObserverManager(observerId);
+        // if (!manager.isPresent()) {
+        //     //this is the first time the behaviour is called
+        //     controller.addObserver(observerId, new Bmv2PacketInObserverHandler());
+        // }
+        // //other already registered the observer for us.
+        // Optional<StreamObserver> observer = manager.get().requestStreamObserver();
+        // observer.ifPresent(objectStreamObserver -> objectStreamObserver.onNext(packet));
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2PipelineProgrammable.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2PipelineProgrammable.java
new file mode 100644
index 0000000..735c142
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/Bmv2PipelineProgrammable.java
@@ -0,0 +1,107 @@
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.drivers.bmv2;
+import org.onlab.util.SharedExecutors;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipelineProgrammable;
+import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeController;
+import org.slf4j.Logger;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.BMV2_JSON;
+import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.P4_INFO_TEXT;
+import static org.slf4j.LoggerFactory.getLogger;
+ * Implementation of the PiPipelineProgrammable for BMv2.
+ */
+public class Bmv2PipelineProgrammable extends AbstractHandlerBehaviour implements PiPipelineProgrammable {
+    private static final PiPipeconf DEFAULT_PIPECONF = new Bmv2DefaultPipeconf();
+    private final Logger log = getLogger(getClass());
+    @Override
+    public CompletableFuture<Boolean> deployPipeconf(PiPipeconf pipeconf) {
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
+        SharedExecutors.getPoolThreadExecutor().submit(() -> result.complete(doDeployConfig(pipeconf)));
+        return result;
+    }
+    private boolean doDeployConfig(PiPipeconf pipeconf) {
+        P4RuntimeController controller = handler().get(P4RuntimeController.class);
+        DeviceId deviceId = handler().data().deviceId();
+        if (!controller.hasClient(deviceId)) {
+            log.warn("Unable to find client for {}, aborting pipeconf deploy", deviceId);
+            return false;
+        }
+        P4RuntimeClient client = controller.getClient(deviceId);
+        if (!pipeconf.extension(BMV2_JSON).isPresent()) {
+            log.warn("Missing BMv2 JSON config in pipeconf {}, aborting pipeconf deploy", pipeconf.id());
+            return false;
+        }
+        if (!pipeconf.extension(P4_INFO_TEXT).isPresent()) {
+            log.warn("Missing P4Info in pipeconf {}, aborting pipeconf deploy", pipeconf.id());
+            return false;
+        }
+        InputStream p4InfoStream = pipeconf.extension(P4_INFO_TEXT).get();
+        InputStream jsonStream = pipeconf.extension(BMV2_JSON).get();
+        try {
+            if (!client.setPipelineConfig(p4InfoStream, jsonStream).get()) {
+                log.warn("Unable to deploy pipeconf {} to {}", pipeconf.id(), deviceId);
+                return false;
+            }
+            // It would be more logical to have this performed at device handshake, but P4runtime would reject any
+            // command if a P4info has not been set first.
+            if (!client.initStreamChannel().get()) {
+                log.warn("Unable to init stream channel to {}.", deviceId);
+                return false;
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+        return true;
+    }
+    @Override
+    public Optional<PiPipeconf> getDefaultPipeconf() {
+        return Optional.of(DEFAULT_PIPECONF);
+    }