Refactor P4Runtime subsystem to implement async connection procedure

This patch is an attempt to solve issues observed when restarting both
switches and ONOS nodes. Most of the issues seemed to depend on a
brittle mastership handling when deploying the pipeline.

With this patch, GDP registers devices to the core with available=false
(i.e. offline) and marks them online only when the P4 pipeline has been
deployed to the device. A new PiPipeconfWatchdogService takes care of
deploying pipelines and producing event when devices are ready.

Moreover, we fix a race condition where pipeconf-related behaviors
were not found. This was caused by GDP enforcing the merged
driver name in the network config, while external entities (e.g.
Mininet) were pushing a JSON blob with the base driver name. This patch
removes the need to rely on such a trick and instead uses
pipeconf-aware logic directly in the driver manager (change #19622).

Finally, we fix issues in P4RuntimeClientImpl that were causing the
stream channel not detecting unreachable devices. The solution is to
follow gRPC APIs and re-instantiate a new channel once the first fails.

Change-Id: I6fbc91859c0fb58a6db3bc197b7081a8fe9f97f7
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
index 32b1f9f..886a39d 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
@@ -54,13 +54,19 @@
     }
 
     /**
-     * Starts the client by starting the Stream RPC with the device. The
-     * implementation of this method is equivalent to {@link #becomeMaster()}.
+     * Starts the Stream RPC with the device.
      *
      * @return completable future containing true if the operation was
      * successful, false otherwise.
      */
-    CompletableFuture<Boolean> start();
+    CompletableFuture<Boolean> startStreamChannel();
+
+    /**
+     * Returns true if the stream RPC is active, false otherwise.
+     *
+     * @return boolean
+     */
+    boolean isStreamChannelOpen();
 
     /**
      * Shutdowns the client by terminating any active RPC such as the Stream
@@ -81,6 +87,13 @@
     CompletableFuture<Boolean> becomeMaster();
 
     /**
+     * Returns true if this client is master for the device, false otherwise.
+     *
+     * @return boolean
+     */
+    boolean isMaster();
+
+    /**
      * Sets the device pipeline according to the given pipeconf, and for the
      * given byte buffer representing the target-specific data to be used in the
      * P4Runtime's SetPipelineConfig message. This method should be called
@@ -95,6 +108,21 @@
             PiPipeconf pipeconf, ByteBuffer deviceData);
 
     /**
+     * Returns true if the device has the given pipeconf set, false otherwise.
+     * Equality is based on the P4Info extension of the pipeconf as well as the
+     * given device data byte buffer.
+     * <p>
+     * This method is expected to return {@code true} if invoked after calling
+     * {@link #setPipelineConfig(PiPipeconf, ByteBuffer)} with the same
+     * parameters.
+     *
+     * @param pipeconf   pipeconf
+     * @param deviceData target-specific data
+     * @return boolean
+     */
+    boolean isPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData);
+
+    /**
      * Performs the given write operation for the given table entries and
      * pipeconf.
      *
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 836ed7a..159e313 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -20,6 +20,7 @@
 import org.onosproject.event.ListenerService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.provider.ProviderId;
 
 /**
  * Controller of P4Runtime devices.
@@ -94,18 +95,21 @@
     boolean isReachable(DeviceId deviceId);
 
     /**
-     * Adds a listener for device agent events.
+     * Adds a listener for device agent events for the given provider.
      *
      * @param deviceId device identifier
+     * @param providerId provider ID
      * @param listener the device agent listener
      */
-    void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener);
+    void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId,
+                                DeviceAgentListener listener);
 
     /**
-     * Removes the listener for device agent events.
+     * Removes the listener for device agent events that was previously
+     * registered for the given provider.
      *
-     * @param deviceId device identifier
-     * @param listener the device agent listener
+     * @param deviceId   device identifier
+     * @param providerId the provider ID
      */
-    void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener);
+    void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId);
 }
diff --git a/protocols/p4runtime/ctl/BUCK b/protocols/p4runtime/ctl/BUCK
index 46540b3..343d3b2 100644
--- a/protocols/p4runtime/ctl/BUCK
+++ b/protocols/p4runtime/ctl/BUCK
@@ -1,27 +1,28 @@
-GRPC_VER = '1.3.1'
-PROTOBUF_VER = '3.2.0'
-
-COMPILE_DEPS = [
-    '//lib:CORE_DEPS',
-    '//lib:KRYO',
-    '//protocols/grpc/api:onos-protocols-grpc-api',
-    '//protocols/p4runtime/api:onos-protocols-p4runtime-api',
-    '//protocols/p4runtime/proto:onos-protocols-p4runtime-proto',
-    '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
-    '//lib:grpc-stub-' + GRPC_VER,
-    '//lib:grpc-netty-' + GRPC_VER,
-    '//lib:protobuf-java-' + PROTOBUF_VER,
-    '//core/store/serializers:onos-core-serializers',
-]
-
-TEST_DEPS = [
-    '//lib:TEST',
-    '//lib:GRPC_TEST_1.3',
-    '//lib:minimal-json',
-    '//lib:grpc-protobuf-lite-' + GRPC_VER,
-]
-
-osgi_jar_with_tests(
-    deps = COMPILE_DEPS,
-    test_deps = TEST_DEPS,
-)
+# Buck build of P4-related modules is no longer supported, please use Bazel
+# GRPC_VER = '1.3.1'
+# PROTOBUF_VER = '3.2.0'
+#
+# COMPILE_DEPS = [
+#     '//lib:CORE_DEPS',
+#     '//lib:KRYO',
+#     '//protocols/grpc/api:onos-protocols-grpc-api',
+#     '//protocols/p4runtime/api:onos-protocols-p4runtime-api',
+#     '//protocols/p4runtime/proto:onos-protocols-p4runtime-proto',
+#     '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
+#     '//lib:grpc-stub-' + GRPC_VER,
+#     '//lib:grpc-netty-' + GRPC_VER,
+#     '//lib:protobuf-java-' + PROTOBUF_VER,
+#     '//core/store/serializers:onos-core-serializers',
+# ]
+#
+# TEST_DEPS = [
+#     '//lib:TEST',
+#     '//lib:GRPC_TEST_1.3',
+#     '//lib:minimal-json',
+#     '//lib:grpc-protobuf-lite-' + GRPC_VER,
+# ]
+#
+# osgi_jar_with_tests(
+#     deps = COMPILE_DEPS,
+#     test_deps = TEST_DEPS,
+# )
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 5483edc..443514f 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -26,6 +26,7 @@
 import io.grpc.ManagedChannel;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ClientCallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.onlab.osgi.DefaultServiceDirectory;
@@ -59,6 +60,8 @@
 import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
 import p4.v1.P4RuntimeOuterClass.Entity;
 import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
 import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
 import p4.v1.P4RuntimeOuterClass.MulticastGroupEntry;
 import p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry;
@@ -73,6 +76,7 @@
 import p4.v1.P4RuntimeOuterClass.WriteRequest;
 
 import java.math.BigInteger;
+import java.net.ConnectException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
@@ -86,6 +90,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
@@ -132,11 +137,13 @@
     private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
     private final ExecutorService executorService;
     private final Executor contextExecutor;
-    private final StreamObserver<StreamMessageRequest> streamRequestObserver;
+    private StreamChannelManager streamChannelManager;
 
     // Used by this client for write requests.
     private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
 
+    private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
+
     /**
      * Default constructor.
      *
@@ -155,8 +162,7 @@
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
         //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
         this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
-        this.streamRequestObserver = P4RuntimeGrpc.newStub(channel)
-                .streamChannel(new StreamChannelResponseObserver());
+        this.streamChannelManager = new StreamChannelManager(channel);
     }
 
     /**
@@ -206,7 +212,7 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> start() {
+    public CompletableFuture<Boolean> startStreamChannel() {
         return supplyInContext(() -> sendMasterArbitrationUpdate(false),
                                "start-initStreamChannel");
     }
@@ -224,11 +230,26 @@
     }
 
     @Override
+    public boolean isMaster() {
+        return streamChannelManager.isOpen() && isClientMaster.get();
+    }
+
+    @Override
+    public boolean isStreamChannelOpen() {
+        return streamChannelManager.isOpen();
+    }
+
+    @Override
     public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
         return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
     }
 
     @Override
+    public boolean isPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
+        return doIsPipelineConfigSet(pipeconf, deviceData);
+    }
+
+    @Override
     public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
                                                         WriteOperationType opType, PiPipeconf pipeconf) {
         return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
@@ -338,24 +359,94 @@
         final Uint128 idMsg = bigIntegerToUint128(
                 controller.newMasterElectionId(deviceId));
 
-        log.info("Sending arbitration update to {}... electionId={}",
-                 deviceId, newId);
-        try {
-            streamRequestObserver.onNext(
-                    StreamMessageRequest.newBuilder()
-                            .setArbitration(
-                                    MasterArbitrationUpdate
-                                            .newBuilder()
-                                            .setDeviceId(p4DeviceId)
-                                            .setElectionId(idMsg)
-                                            .build())
-                            .build());
-            clientElectionId = idMsg;
-            return true;
-        } catch (StatusRuntimeException e) {
-            log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
+        log.debug("Sending arbitration update to {}... electionId={}",
+                  deviceId, newId);
+
+        streamChannelManager.send(
+                StreamMessageRequest.newBuilder()
+                        .setArbitration(
+                                MasterArbitrationUpdate
+                                        .newBuilder()
+                                        .setDeviceId(p4DeviceId)
+                                        .setElectionId(idMsg)
+                                        .build())
+                        .build());
+        clientElectionId = idMsg;
+        return true;
+    }
+
+    private ForwardingPipelineConfig getPipelineConfig(
+            PiPipeconf pipeconf, ByteBuffer deviceData) {
+        P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
+        if (p4Info == null) {
+            // Problem logged by PipeconfHelper.
+            return null;
         }
-        return false;
+
+        // FIXME: This is specific to PI P4Runtime implementation.
+        P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
+                .newBuilder()
+                .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
+                .setReassign(true)
+                .setDeviceData(ByteString.copyFrom(deviceData))
+                .build();
+
+        return ForwardingPipelineConfig
+                .newBuilder()
+                .setP4Info(p4Info)
+                .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
+                .build();
+    }
+
+    private boolean doIsPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
+
+        GetForwardingPipelineConfigRequest request = GetForwardingPipelineConfigRequest
+                .newBuilder()
+                .setDeviceId(p4DeviceId)
+                .build();
+
+        GetForwardingPipelineConfigResponse resp;
+        try {
+            resp = this.blockingStub
+                    .getForwardingPipelineConfig(request);
+        } catch (StatusRuntimeException ex) {
+            checkGrpcException(ex);
+            // FAILED_PRECONDITION means that a pipeline config was not set in
+            // the first place. Don't bother logging.
+            if (!ex.getStatus().getCode()
+                    .equals(Status.FAILED_PRECONDITION.getCode())) {
+                log.warn("Unable to get pipeline config from {}: {}",
+                         deviceId, ex.getMessage());
+            }
+            return false;
+        }
+
+        ForwardingPipelineConfig expectedConfig = getPipelineConfig(
+                pipeconf, deviceData);
+
+        if (expectedConfig == null) {
+            return false;
+        }
+        if (!resp.hasConfig()) {
+            log.warn("{} returned GetForwardingPipelineConfigResponse " +
+                             "with 'config' field unset",
+                     deviceId);
+            return false;
+        }
+        if (resp.getConfig().getP4DeviceConfig().isEmpty()
+                && !expectedConfig.getP4DeviceConfig().isEmpty()) {
+            // Don't bother with a warn or error since we don't really allow
+            // updating the pipeline to a different one. So the P4Info should be
+            // enough for us.
+            log.debug("{} returned GetForwardingPipelineConfigResponse " +
+                              "with empty 'p4_device_config' field, " +
+                              "equality will be based only on P4Info",
+                      deviceId);
+            return resp.getConfig().getP4Info().equals(
+                    expectedConfig.getP4Info());
+        } else {
+            return resp.getConfig().equals(expectedConfig);
+        }
     }
 
     private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
@@ -364,25 +455,13 @@
 
         checkNotNull(deviceData, "deviceData cannot be null");
 
-        P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
-        if (p4Info == null) {
-            // Problem logged by PipeconfHelper.
+        ForwardingPipelineConfig pipelineConfig = getPipelineConfig(pipeconf, deviceData);
+
+        if (pipelineConfig == null) {
+            // Error logged in getPipelineConfig()
             return false;
         }
 
-        P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
-                .newBuilder()
-                .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
-                .setReassign(true)
-                .setDeviceData(ByteString.copyFrom(deviceData))
-                .build();
-
-        ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
-                .newBuilder()
-                .setP4Info(p4Info)
-                .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
-                .build();
-
         SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
                 .newBuilder()
                 .setDeviceId(p4DeviceId)
@@ -392,9 +471,11 @@
                 .build();
 
         try {
+            //noinspection ResultOfMethodCallIgnored
             this.blockingStub.setForwardingPipelineConfig(request);
             return true;
         } catch (StatusRuntimeException ex) {
+            checkGrpcException(ex);
             log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
             return false;
         }
@@ -462,6 +543,7 @@
         try {
             responses = blockingStub.read(requestMsg);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to dump table {} from {}: {}", piTableId, deviceId, e.getMessage());
             return Collections.emptyList();
         }
@@ -490,7 +572,7 @@
                     .newBuilder().setPacket(packetOut).build();
 
             //Send the request
-            streamRequestObserver.onNext(packetOutRequest);
+            streamChannelManager.send(packetOutRequest);
 
         } catch (P4InfoBrowser.NotFoundException e) {
             log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
@@ -534,12 +616,14 @@
         if (!msg.hasElectionId() || !msg.hasStatus()) {
             return;
         }
-        final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
-        log.info("Received arbitration update from {}: isMaster={}, electionId={}",
-                 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
+        final boolean isMaster =
+                msg.getStatus().getCode() == Status.OK.getCode().value();
+        log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
+                  deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
         controller.postEvent(new P4RuntimeEvent(
                 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
                 new ArbitrationResponse(deviceId, isMaster)));
+        isClientMaster.set(isMaster);
     }
 
     private Collection<PiCounterCellData> doReadAllCounterCells(
@@ -572,6 +656,7 @@
         try {
             responses = () -> blockingStub.read(request);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
             return Collections.emptyList();
         }
@@ -654,6 +739,7 @@
         try {
             groupResponses = blockingStub.read(groupRequestMsg);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
             return Collections.emptySet();
         }
@@ -702,6 +788,7 @@
         try {
             memberResponses = blockingStub.read(memberRequestMsg);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to read members of action profile {} from {}: {}",
                      piActionProfileId, deviceId, e.getMessage());
             return Collections.emptyList();
@@ -794,6 +881,7 @@
         try {
             responses = () -> blockingStub.read(request);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to read meter cells: {}", e.getMessage());
             log.debug("exception", e);
             return Collections.emptyList();
@@ -866,6 +954,7 @@
         try {
             responses = blockingStub.read(req);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to read multicast group entries from {}: {}", deviceId, e.getMessage());
             return Collections.emptyList();
         }
@@ -894,6 +983,8 @@
                                                WriteOperationType opType,
                                                String entryType) {
         try {
+
+            //noinspection ResultOfMethodCallIgnored
             blockingStub.write(writeRequest(updates));
             return true;
         } catch (StatusRuntimeException e) {
@@ -911,18 +1002,11 @@
     }
 
     private Void doShutdown() {
-        log.info("Shutting down client for {}...", deviceId);
-        if (streamRequestObserver != null) {
-            try {
-                streamRequestObserver.onCompleted();
-            } catch (IllegalStateException e) {
-                // Thrown if stream channel is already completed. Can ignore.
-                log.debug("Ignored expection: {}", e);
-            }
-            cancellableContext.cancel(new InterruptedException(
-                    "Requested client shutdown"));
-        }
-        this.executorService.shutdown();
+        log.debug("Shutting down client for {}...", deviceId);
+        streamChannelManager.complete();
+        cancellableContext.cancel(new InterruptedException(
+                "Requested client shutdown"));
+        this.executorService.shutdownNow();
         try {
             executorService.awaitTermination(5, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
@@ -1080,13 +1164,111 @@
     }
 
     /**
+     * A manager for the P4Runtime stream channel that opportunistically creates
+     * new stream RCP stubs (e.g. when one fails because of errors) and posts
+     * channel events via the P4Runtime controller.
+     */
+    private final class StreamChannelManager {
+
+        private final ManagedChannel channel;
+        private final AtomicBoolean open;
+        private final StreamObserver<StreamMessageResponse> responseObserver;
+        private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
+
+        private StreamChannelManager(ManagedChannel channel) {
+            this.channel = channel;
+            this.responseObserver = new InternalStreamResponseObserver(this);
+            this.open = new AtomicBoolean(false);
+        }
+
+        private void initIfRequired() {
+            if (requestObserver == null) {
+                log.debug("Creating new stream channel for {}...", deviceId);
+                requestObserver =
+                        (ClientCallStreamObserver<StreamMessageRequest>)
+                                P4RuntimeGrpc.newStub(channel)
+                                        .streamChannel(responseObserver);
+                open.set(false);
+            }
+        }
+
+        public boolean send(StreamMessageRequest value) {
+            synchronized (this) {
+                initIfRequired();
+                try {
+                    requestObserver.onNext(value);
+                    // FIXME
+                    // signalOpen();
+                    return true;
+                } catch (Throwable ex) {
+                    if (ex instanceof StatusRuntimeException) {
+                        log.warn("Unable to send {} to {}: {}",
+                                 value.getUpdateCase().toString(), deviceId, ex.getMessage());
+                    } else {
+                        log.warn(format(
+                                "Exception while sending %s to %s",
+                                value.getUpdateCase().toString(), deviceId), ex);
+                    }
+                    complete();
+                    return false;
+                }
+            }
+        }
+
+        public void complete() {
+            synchronized (this) {
+                signalClosed();
+                if (requestObserver != null) {
+                    requestObserver.onCompleted();
+                    requestObserver.cancel("Terminated", null);
+                    requestObserver = null;
+                }
+            }
+        }
+
+        void signalOpen() {
+            synchronized (this) {
+                final boolean wasOpen = open.getAndSet(true);
+                if (!wasOpen) {
+                    controller.postEvent(new P4RuntimeEvent(
+                            P4RuntimeEvent.Type.CHANNEL_EVENT,
+                            new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
+                }
+            }
+        }
+
+        void signalClosed() {
+            synchronized (this) {
+                final boolean wasOpen = open.getAndSet(false);
+                if (wasOpen) {
+                    controller.postEvent(new P4RuntimeEvent(
+                            P4RuntimeEvent.Type.CHANNEL_EVENT,
+                            new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
+                }
+            }
+        }
+
+        public boolean isOpen() {
+            return open.get();
+        }
+    }
+
+    /**
      * Handles messages received from the device on the stream channel.
      */
-    private class StreamChannelResponseObserver
+    private final class InternalStreamResponseObserver
             implements StreamObserver<StreamMessageResponse> {
 
+        private final StreamChannelManager streamChannelManager;
+
+        private InternalStreamResponseObserver(
+                StreamChannelManager streamChannelManager) {
+            this.streamChannelManager = streamChannelManager;
+        }
+
         @Override
         public void onNext(StreamMessageResponse message) {
+            streamChannelManager.signalOpen();
             executorService.submit(() -> doNext(message));
         }
 
@@ -1113,19 +1295,26 @@
 
         @Override
         public void onError(Throwable throwable) {
-            log.warn("Error on stream channel for {}: {}",
-                     deviceId, Status.fromThrowable(throwable));
-            controller.postEvent(new P4RuntimeEvent(
-                    P4RuntimeEvent.Type.CHANNEL_EVENT,
-                    new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
+            if (throwable instanceof StatusRuntimeException) {
+                StatusRuntimeException sre = (StatusRuntimeException) throwable;
+                if (sre.getStatus().getCause() instanceof ConnectException) {
+                    log.warn("Device {} is unreachable ({})",
+                             deviceId, sre.getCause().getMessage());
+                } else {
+                    log.warn("Received error on stream channel for {}: {}",
+                             deviceId, throwable.getMessage());
+                }
+            } else {
+                log.warn(format("Received exception on stream channel for %s",
+                                deviceId), throwable);
+            }
+            streamChannelManager.complete();
         }
 
         @Override
         public void onCompleted() {
             log.warn("Stream channel for {} has completed", deviceId);
-            controller.postEvent(new P4RuntimeEvent(
-                    P4RuntimeEvent.Type.CHANNEL_EVENT,
-                    new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
+            streamChannelManager.complete();
         }
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 1e7326d..4ca08f1 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -33,6 +33,7 @@
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.provider.ProviderId;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
@@ -42,10 +43,8 @@
 
 import java.io.IOException;
 import java.math.BigInteger;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Supplier;
 
@@ -68,7 +67,8 @@
     private final Map<ClientKey, P4RuntimeClient> clients = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
 
-    private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
+    private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
+            deviceAgentListeners = Maps.newConcurrentMap();
     private final Striped<Lock> stripedLocks = Striped.lock(30);
 
     private DistributedElectionIdGenerator electionIdGenerator;
@@ -120,13 +120,16 @@
         if (clientKeys.containsKey(deviceId)) {
             final ClientKey existingKey = clientKeys.get(deviceId);
             if (clientKey.equals(existingKey)) {
-                log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
+                log.debug("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
                          deviceId, serverAddr, serverPort, p4DeviceId);
                 return true;
             } else {
-                throw new IllegalStateException(
-                        "A client for the same device ID but different " +
-                                "server endpoints already exists");
+                log.info("Requested client for {} with new " +
+                                 "endpoint, removing old client (server={}:{}, " +
+                                 "p4DeviceId={})...",
+                         deviceId, existingKey.serverAddr(),
+                         existingKey.serverPort(), existingKey.p4DeviceId());
+                doRemoveClient(deviceId);
             }
         }
 
@@ -218,19 +221,20 @@
     }
 
     @Override
-    public void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+    public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
         checkNotNull(deviceId, "deviceId cannot be null");
+        checkNotNull(deviceId, "providerId cannot be null");
         checkNotNull(listener, "listener cannot be null");
-        deviceAgentListeners.putIfAbsent(deviceId, new CopyOnWriteArrayList<>());
-        deviceAgentListeners.get(deviceId).add(listener);
+        deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
+        deviceAgentListeners.get(deviceId).put(providerId, listener);
     }
 
     @Override
-    public void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+    public void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId) {
         checkNotNull(deviceId, "deviceId cannot be null");
-        checkNotNull(listener, "listener cannot be null");
+        checkNotNull(providerId, "listener cannot be null");
         deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
-            listeners.remove(listener);
+            listeners.remove(providerId);
             return listeners;
         });
     }
@@ -298,7 +302,7 @@
 
     private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
         if (deviceAgentListeners.containsKey(deviceId)) {
-            deviceAgentListeners.get(deviceId).forEach(l -> l.event(event));
+            deviceAgentListeners.get(deviceId).values().forEach(l -> l.event(event));
         }
     }
 }