Refactor P4Runtime subsystem to implement async connection procedure

This patch is an attempt to solve issues observed when restarting both
switches and ONOS nodes. Most of the issues seemed to depend on a
brittle mastership handling when deploying the pipeline.

With this patch, GDP registers devices to the core with available=false
(i.e. offline) and marks them online only when the P4 pipeline has been
deployed to the device. A new PiPipeconfWatchdogService takes care of
deploying pipelines and producing event when devices are ready.

Moreover, we fix a race condition where pipeconf-related behaviors
were not found. This was caused by GDP enforcing the merged
driver name in the network config, while external entities (e.g.
Mininet) were pushing a JSON blob with the base driver name. This patch
removes the need to rely on such a trick and instead uses
pipeconf-aware logic directly in the driver manager (change #19622).

Finally, we fix issues in P4RuntimeClientImpl that were causing the
stream channel not detecting unreachable devices. The solution is to
follow gRPC APIs and re-instantiate a new channel once the first fails.

Change-Id: I6fbc91859c0fb58a6db3bc197b7081a8fe9f97f7
diff --git a/protocols/p4runtime/ctl/BUCK b/protocols/p4runtime/ctl/BUCK
index 46540b3..343d3b2 100644
--- a/protocols/p4runtime/ctl/BUCK
+++ b/protocols/p4runtime/ctl/BUCK
@@ -1,27 +1,28 @@
-GRPC_VER = '1.3.1'
-PROTOBUF_VER = '3.2.0'
-
-COMPILE_DEPS = [
-    '//lib:CORE_DEPS',
-    '//lib:KRYO',
-    '//protocols/grpc/api:onos-protocols-grpc-api',
-    '//protocols/p4runtime/api:onos-protocols-p4runtime-api',
-    '//protocols/p4runtime/proto:onos-protocols-p4runtime-proto',
-    '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
-    '//lib:grpc-stub-' + GRPC_VER,
-    '//lib:grpc-netty-' + GRPC_VER,
-    '//lib:protobuf-java-' + PROTOBUF_VER,
-    '//core/store/serializers:onos-core-serializers',
-]
-
-TEST_DEPS = [
-    '//lib:TEST',
-    '//lib:GRPC_TEST_1.3',
-    '//lib:minimal-json',
-    '//lib:grpc-protobuf-lite-' + GRPC_VER,
-]
-
-osgi_jar_with_tests(
-    deps = COMPILE_DEPS,
-    test_deps = TEST_DEPS,
-)
+# Buck build of P4-related modules is no longer supported, please use Bazel
+# GRPC_VER = '1.3.1'
+# PROTOBUF_VER = '3.2.0'
+#
+# COMPILE_DEPS = [
+#     '//lib:CORE_DEPS',
+#     '//lib:KRYO',
+#     '//protocols/grpc/api:onos-protocols-grpc-api',
+#     '//protocols/p4runtime/api:onos-protocols-p4runtime-api',
+#     '//protocols/p4runtime/proto:onos-protocols-p4runtime-proto',
+#     '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
+#     '//lib:grpc-stub-' + GRPC_VER,
+#     '//lib:grpc-netty-' + GRPC_VER,
+#     '//lib:protobuf-java-' + PROTOBUF_VER,
+#     '//core/store/serializers:onos-core-serializers',
+# ]
+#
+# TEST_DEPS = [
+#     '//lib:TEST',
+#     '//lib:GRPC_TEST_1.3',
+#     '//lib:minimal-json',
+#     '//lib:grpc-protobuf-lite-' + GRPC_VER,
+# ]
+#
+# osgi_jar_with_tests(
+#     deps = COMPILE_DEPS,
+#     test_deps = TEST_DEPS,
+# )
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 5483edc..443514f 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -26,6 +26,7 @@
 import io.grpc.ManagedChannel;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ClientCallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.onlab.osgi.DefaultServiceDirectory;
@@ -59,6 +60,8 @@
 import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
 import p4.v1.P4RuntimeOuterClass.Entity;
 import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
 import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
 import p4.v1.P4RuntimeOuterClass.MulticastGroupEntry;
 import p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry;
@@ -73,6 +76,7 @@
 import p4.v1.P4RuntimeOuterClass.WriteRequest;
 
 import java.math.BigInteger;
+import java.net.ConnectException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
@@ -86,6 +90,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
@@ -132,11 +137,13 @@
     private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
     private final ExecutorService executorService;
     private final Executor contextExecutor;
-    private final StreamObserver<StreamMessageRequest> streamRequestObserver;
+    private StreamChannelManager streamChannelManager;
 
     // Used by this client for write requests.
     private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
 
+    private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
+
     /**
      * Default constructor.
      *
@@ -155,8 +162,7 @@
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
         //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
         this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
-        this.streamRequestObserver = P4RuntimeGrpc.newStub(channel)
-                .streamChannel(new StreamChannelResponseObserver());
+        this.streamChannelManager = new StreamChannelManager(channel);
     }
 
     /**
@@ -206,7 +212,7 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> start() {
+    public CompletableFuture<Boolean> startStreamChannel() {
         return supplyInContext(() -> sendMasterArbitrationUpdate(false),
                                "start-initStreamChannel");
     }
@@ -224,11 +230,26 @@
     }
 
     @Override
+    public boolean isMaster() {
+        return streamChannelManager.isOpen() && isClientMaster.get();
+    }
+
+    @Override
+    public boolean isStreamChannelOpen() {
+        return streamChannelManager.isOpen();
+    }
+
+    @Override
     public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
         return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
     }
 
     @Override
+    public boolean isPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
+        return doIsPipelineConfigSet(pipeconf, deviceData);
+    }
+
+    @Override
     public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
                                                         WriteOperationType opType, PiPipeconf pipeconf) {
         return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
@@ -338,24 +359,94 @@
         final Uint128 idMsg = bigIntegerToUint128(
                 controller.newMasterElectionId(deviceId));
 
-        log.info("Sending arbitration update to {}... electionId={}",
-                 deviceId, newId);
-        try {
-            streamRequestObserver.onNext(
-                    StreamMessageRequest.newBuilder()
-                            .setArbitration(
-                                    MasterArbitrationUpdate
-                                            .newBuilder()
-                                            .setDeviceId(p4DeviceId)
-                                            .setElectionId(idMsg)
-                                            .build())
-                            .build());
-            clientElectionId = idMsg;
-            return true;
-        } catch (StatusRuntimeException e) {
-            log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
+        log.debug("Sending arbitration update to {}... electionId={}",
+                  deviceId, newId);
+
+        streamChannelManager.send(
+                StreamMessageRequest.newBuilder()
+                        .setArbitration(
+                                MasterArbitrationUpdate
+                                        .newBuilder()
+                                        .setDeviceId(p4DeviceId)
+                                        .setElectionId(idMsg)
+                                        .build())
+                        .build());
+        clientElectionId = idMsg;
+        return true;
+    }
+
+    private ForwardingPipelineConfig getPipelineConfig(
+            PiPipeconf pipeconf, ByteBuffer deviceData) {
+        P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
+        if (p4Info == null) {
+            // Problem logged by PipeconfHelper.
+            return null;
         }
-        return false;
+
+        // FIXME: This is specific to PI P4Runtime implementation.
+        P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
+                .newBuilder()
+                .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
+                .setReassign(true)
+                .setDeviceData(ByteString.copyFrom(deviceData))
+                .build();
+
+        return ForwardingPipelineConfig
+                .newBuilder()
+                .setP4Info(p4Info)
+                .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
+                .build();
+    }
+
+    private boolean doIsPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
+
+        GetForwardingPipelineConfigRequest request = GetForwardingPipelineConfigRequest
+                .newBuilder()
+                .setDeviceId(p4DeviceId)
+                .build();
+
+        GetForwardingPipelineConfigResponse resp;
+        try {
+            resp = this.blockingStub
+                    .getForwardingPipelineConfig(request);
+        } catch (StatusRuntimeException ex) {
+            checkGrpcException(ex);
+            // FAILED_PRECONDITION means that a pipeline config was not set in
+            // the first place. Don't bother logging.
+            if (!ex.getStatus().getCode()
+                    .equals(Status.FAILED_PRECONDITION.getCode())) {
+                log.warn("Unable to get pipeline config from {}: {}",
+                         deviceId, ex.getMessage());
+            }
+            return false;
+        }
+
+        ForwardingPipelineConfig expectedConfig = getPipelineConfig(
+                pipeconf, deviceData);
+
+        if (expectedConfig == null) {
+            return false;
+        }
+        if (!resp.hasConfig()) {
+            log.warn("{} returned GetForwardingPipelineConfigResponse " +
+                             "with 'config' field unset",
+                     deviceId);
+            return false;
+        }
+        if (resp.getConfig().getP4DeviceConfig().isEmpty()
+                && !expectedConfig.getP4DeviceConfig().isEmpty()) {
+            // Don't bother with a warn or error since we don't really allow
+            // updating the pipeline to a different one. So the P4Info should be
+            // enough for us.
+            log.debug("{} returned GetForwardingPipelineConfigResponse " +
+                              "with empty 'p4_device_config' field, " +
+                              "equality will be based only on P4Info",
+                      deviceId);
+            return resp.getConfig().getP4Info().equals(
+                    expectedConfig.getP4Info());
+        } else {
+            return resp.getConfig().equals(expectedConfig);
+        }
     }
 
     private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
@@ -364,25 +455,13 @@
 
         checkNotNull(deviceData, "deviceData cannot be null");
 
-        P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
-        if (p4Info == null) {
-            // Problem logged by PipeconfHelper.
+        ForwardingPipelineConfig pipelineConfig = getPipelineConfig(pipeconf, deviceData);
+
+        if (pipelineConfig == null) {
+            // Error logged in getPipelineConfig()
             return false;
         }
 
-        P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
-                .newBuilder()
-                .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
-                .setReassign(true)
-                .setDeviceData(ByteString.copyFrom(deviceData))
-                .build();
-
-        ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
-                .newBuilder()
-                .setP4Info(p4Info)
-                .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
-                .build();
-
         SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
                 .newBuilder()
                 .setDeviceId(p4DeviceId)
@@ -392,9 +471,11 @@
                 .build();
 
         try {
+            //noinspection ResultOfMethodCallIgnored
             this.blockingStub.setForwardingPipelineConfig(request);
             return true;
         } catch (StatusRuntimeException ex) {
+            checkGrpcException(ex);
             log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
             return false;
         }
@@ -462,6 +543,7 @@
         try {
             responses = blockingStub.read(requestMsg);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to dump table {} from {}: {}", piTableId, deviceId, e.getMessage());
             return Collections.emptyList();
         }
@@ -490,7 +572,7 @@
                     .newBuilder().setPacket(packetOut).build();
 
             //Send the request
-            streamRequestObserver.onNext(packetOutRequest);
+            streamChannelManager.send(packetOutRequest);
 
         } catch (P4InfoBrowser.NotFoundException e) {
             log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
@@ -534,12 +616,14 @@
         if (!msg.hasElectionId() || !msg.hasStatus()) {
             return;
         }
-        final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
-        log.info("Received arbitration update from {}: isMaster={}, electionId={}",
-                 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
+        final boolean isMaster =
+                msg.getStatus().getCode() == Status.OK.getCode().value();
+        log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
+                  deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
         controller.postEvent(new P4RuntimeEvent(
                 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
                 new ArbitrationResponse(deviceId, isMaster)));
+        isClientMaster.set(isMaster);
     }
 
     private Collection<PiCounterCellData> doReadAllCounterCells(
@@ -572,6 +656,7 @@
         try {
             responses = () -> blockingStub.read(request);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
             return Collections.emptyList();
         }
@@ -654,6 +739,7 @@
         try {
             groupResponses = blockingStub.read(groupRequestMsg);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
             return Collections.emptySet();
         }
@@ -702,6 +788,7 @@
         try {
             memberResponses = blockingStub.read(memberRequestMsg);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to read members of action profile {} from {}: {}",
                      piActionProfileId, deviceId, e.getMessage());
             return Collections.emptyList();
@@ -794,6 +881,7 @@
         try {
             responses = () -> blockingStub.read(request);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to read meter cells: {}", e.getMessage());
             log.debug("exception", e);
             return Collections.emptyList();
@@ -866,6 +954,7 @@
         try {
             responses = blockingStub.read(req);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to read multicast group entries from {}: {}", deviceId, e.getMessage());
             return Collections.emptyList();
         }
@@ -894,6 +983,8 @@
                                                WriteOperationType opType,
                                                String entryType) {
         try {
+
+            //noinspection ResultOfMethodCallIgnored
             blockingStub.write(writeRequest(updates));
             return true;
         } catch (StatusRuntimeException e) {
@@ -911,18 +1002,11 @@
     }
 
     private Void doShutdown() {
-        log.info("Shutting down client for {}...", deviceId);
-        if (streamRequestObserver != null) {
-            try {
-                streamRequestObserver.onCompleted();
-            } catch (IllegalStateException e) {
-                // Thrown if stream channel is already completed. Can ignore.
-                log.debug("Ignored expection: {}", e);
-            }
-            cancellableContext.cancel(new InterruptedException(
-                    "Requested client shutdown"));
-        }
-        this.executorService.shutdown();
+        log.debug("Shutting down client for {}...", deviceId);
+        streamChannelManager.complete();
+        cancellableContext.cancel(new InterruptedException(
+                "Requested client shutdown"));
+        this.executorService.shutdownNow();
         try {
             executorService.awaitTermination(5, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
@@ -1080,13 +1164,111 @@
     }
 
     /**
+     * A manager for the P4Runtime stream channel that opportunistically creates
+     * new stream RCP stubs (e.g. when one fails because of errors) and posts
+     * channel events via the P4Runtime controller.
+     */
+    private final class StreamChannelManager {
+
+        private final ManagedChannel channel;
+        private final AtomicBoolean open;
+        private final StreamObserver<StreamMessageResponse> responseObserver;
+        private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
+
+        private StreamChannelManager(ManagedChannel channel) {
+            this.channel = channel;
+            this.responseObserver = new InternalStreamResponseObserver(this);
+            this.open = new AtomicBoolean(false);
+        }
+
+        private void initIfRequired() {
+            if (requestObserver == null) {
+                log.debug("Creating new stream channel for {}...", deviceId);
+                requestObserver =
+                        (ClientCallStreamObserver<StreamMessageRequest>)
+                                P4RuntimeGrpc.newStub(channel)
+                                        .streamChannel(responseObserver);
+                open.set(false);
+            }
+        }
+
+        public boolean send(StreamMessageRequest value) {
+            synchronized (this) {
+                initIfRequired();
+                try {
+                    requestObserver.onNext(value);
+                    // FIXME
+                    // signalOpen();
+                    return true;
+                } catch (Throwable ex) {
+                    if (ex instanceof StatusRuntimeException) {
+                        log.warn("Unable to send {} to {}: {}",
+                                 value.getUpdateCase().toString(), deviceId, ex.getMessage());
+                    } else {
+                        log.warn(format(
+                                "Exception while sending %s to %s",
+                                value.getUpdateCase().toString(), deviceId), ex);
+                    }
+                    complete();
+                    return false;
+                }
+            }
+        }
+
+        public void complete() {
+            synchronized (this) {
+                signalClosed();
+                if (requestObserver != null) {
+                    requestObserver.onCompleted();
+                    requestObserver.cancel("Terminated", null);
+                    requestObserver = null;
+                }
+            }
+        }
+
+        void signalOpen() {
+            synchronized (this) {
+                final boolean wasOpen = open.getAndSet(true);
+                if (!wasOpen) {
+                    controller.postEvent(new P4RuntimeEvent(
+                            P4RuntimeEvent.Type.CHANNEL_EVENT,
+                            new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
+                }
+            }
+        }
+
+        void signalClosed() {
+            synchronized (this) {
+                final boolean wasOpen = open.getAndSet(false);
+                if (wasOpen) {
+                    controller.postEvent(new P4RuntimeEvent(
+                            P4RuntimeEvent.Type.CHANNEL_EVENT,
+                            new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
+                }
+            }
+        }
+
+        public boolean isOpen() {
+            return open.get();
+        }
+    }
+
+    /**
      * Handles messages received from the device on the stream channel.
      */
-    private class StreamChannelResponseObserver
+    private final class InternalStreamResponseObserver
             implements StreamObserver<StreamMessageResponse> {
 
+        private final StreamChannelManager streamChannelManager;
+
+        private InternalStreamResponseObserver(
+                StreamChannelManager streamChannelManager) {
+            this.streamChannelManager = streamChannelManager;
+        }
+
         @Override
         public void onNext(StreamMessageResponse message) {
+            streamChannelManager.signalOpen();
             executorService.submit(() -> doNext(message));
         }
 
@@ -1113,19 +1295,26 @@
 
         @Override
         public void onError(Throwable throwable) {
-            log.warn("Error on stream channel for {}: {}",
-                     deviceId, Status.fromThrowable(throwable));
-            controller.postEvent(new P4RuntimeEvent(
-                    P4RuntimeEvent.Type.CHANNEL_EVENT,
-                    new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
+            if (throwable instanceof StatusRuntimeException) {
+                StatusRuntimeException sre = (StatusRuntimeException) throwable;
+                if (sre.getStatus().getCause() instanceof ConnectException) {
+                    log.warn("Device {} is unreachable ({})",
+                             deviceId, sre.getCause().getMessage());
+                } else {
+                    log.warn("Received error on stream channel for {}: {}",
+                             deviceId, throwable.getMessage());
+                }
+            } else {
+                log.warn(format("Received exception on stream channel for %s",
+                                deviceId), throwable);
+            }
+            streamChannelManager.complete();
         }
 
         @Override
         public void onCompleted() {
             log.warn("Stream channel for {} has completed", deviceId);
-            controller.postEvent(new P4RuntimeEvent(
-                    P4RuntimeEvent.Type.CHANNEL_EVENT,
-                    new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
+            streamChannelManager.complete();
         }
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 1e7326d..4ca08f1 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -33,6 +33,7 @@
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.provider.ProviderId;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
@@ -42,10 +43,8 @@
 
 import java.io.IOException;
 import java.math.BigInteger;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Supplier;
 
@@ -68,7 +67,8 @@
     private final Map<ClientKey, P4RuntimeClient> clients = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
 
-    private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
+    private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
+            deviceAgentListeners = Maps.newConcurrentMap();
     private final Striped<Lock> stripedLocks = Striped.lock(30);
 
     private DistributedElectionIdGenerator electionIdGenerator;
@@ -120,13 +120,16 @@
         if (clientKeys.containsKey(deviceId)) {
             final ClientKey existingKey = clientKeys.get(deviceId);
             if (clientKey.equals(existingKey)) {
-                log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
+                log.debug("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
                          deviceId, serverAddr, serverPort, p4DeviceId);
                 return true;
             } else {
-                throw new IllegalStateException(
-                        "A client for the same device ID but different " +
-                                "server endpoints already exists");
+                log.info("Requested client for {} with new " +
+                                 "endpoint, removing old client (server={}:{}, " +
+                                 "p4DeviceId={})...",
+                         deviceId, existingKey.serverAddr(),
+                         existingKey.serverPort(), existingKey.p4DeviceId());
+                doRemoveClient(deviceId);
             }
         }
 
@@ -218,19 +221,20 @@
     }
 
     @Override
-    public void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+    public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
         checkNotNull(deviceId, "deviceId cannot be null");
+        checkNotNull(deviceId, "providerId cannot be null");
         checkNotNull(listener, "listener cannot be null");
-        deviceAgentListeners.putIfAbsent(deviceId, new CopyOnWriteArrayList<>());
-        deviceAgentListeners.get(deviceId).add(listener);
+        deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
+        deviceAgentListeners.get(deviceId).put(providerId, listener);
     }
 
     @Override
-    public void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+    public void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId) {
         checkNotNull(deviceId, "deviceId cannot be null");
-        checkNotNull(listener, "listener cannot be null");
+        checkNotNull(providerId, "listener cannot be null");
         deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
-            listeners.remove(listener);
+            listeners.remove(providerId);
             return listeners;
         });
     }
@@ -298,7 +302,7 @@
 
     private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
         if (deviceAgentListeners.containsKey(deviceId)) {
-            deviceAgentListeners.get(deviceId).forEach(l -> l.event(event));
+            deviceAgentListeners.get(deviceId).values().forEach(l -> l.event(event));
         }
     }
 }