Refactor P4Runtime subsystem to implement async connection procedure

This patch is an attempt to solve issues observed when restarting both
switches and ONOS nodes. Most of the issues seemed to depend on a
brittle mastership handling when deploying the pipeline.

With this patch, GDP registers devices to the core with available=false
(i.e. offline) and marks them online only when the P4 pipeline has been
deployed to the device. A new PiPipeconfWatchdogService takes care of
deploying pipelines and producing event when devices are ready.

Moreover, we fix a race condition where pipeconf-related behaviors
were not found. This was caused by GDP enforcing the merged
driver name in the network config, while external entities (e.g.
Mininet) were pushing a JSON blob with the base driver name. This patch
removes the need to rely on such a trick and instead uses
pipeconf-aware logic directly in the driver manager (change #19622).

Finally, we fix issues in P4RuntimeClientImpl that were causing the
stream channel not detecting unreachable devices. The solution is to
follow gRPC APIs and re-instantiate a new channel once the first fails.

Change-Id: I6fbc91859c0fb58a6db3bc197b7081a8fe9f97f7
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java b/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java
index 2373af4..22367ba 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java
@@ -24,21 +24,40 @@
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Behavior to program the pipeline of a device that supports protocol-independence.
+ * Behavior to program the pipeline of a device that supports
+ * protocol-independence.
  */
 @Beta
 public interface PiPipelineProgrammable extends HandlerBehaviour {
     /**
-     * Deploys the given pipeconf to the device.
+     * Writes the given pipeconf to the device, returns a completable future
+     * with true is the operations was successful, false otherwise.
+     * <p>
+     * After the future has been completed, the device is expected to process
+     * data plane packets according to the written pipeconf.
      *
      * @param pipeconf pipeconf
-     * @return true if the operation was successful, false otherwise
+     * @return completable future set to true if the operation was successful,
+     * false otherwise
      */
     // TODO: return an explanation of why things went wrong, and the status of the device.
-    CompletableFuture<Boolean> deployPipeconf(PiPipeconf pipeconf);
+    CompletableFuture<Boolean> setPipeconf(PiPipeconf pipeconf);
 
     /**
-     * Returns the default pipeconf for ths device, to be used when any other pipeconf is not available.
+     * Returns true if the device is configured with the given pipeconf, false
+     * otherwise.
+     * <p>
+     * This method is expected to always return true after successfully calling
+     * {@link #setPipeconf(PiPipeconf)} with the given pipeconf.
+     *
+     * @param pipeconf pipeconf
+     * @return true if the device has the given pipeconf set, false otherwise
+     */
+    boolean isPipeconfSet(PiPipeconf pipeconf);
+
+    /**
+     * Returns the default pipeconf for this device, to be used when any other
+     * pipeconf is not available.
      *
      * @return optional pipeconf
      */
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
index bc870d3..bceaeff 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
@@ -19,6 +19,7 @@
 import com.google.common.annotations.Beta;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.driver.DeviceConnect;
+import org.onosproject.net.provider.ProviderId;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -48,21 +49,32 @@
     void roleChanged(MastershipRole newRole);
 
     /**
-     * Adds a device agent listener.
+     * Returns the last known mastership role agreed by the device for this
+     * node.
      *
-     * @param listener device agent listener
+     * @return mastership role
      */
-    default void addDeviceAgentListener(DeviceAgentListener listener) {
+    MastershipRole getRole();
+
+    /**
+     * Adds a device agent listener for the given provider ID.
+     *
+     * @param providerId provider ID
+     * @param listener   device agent listener
+     */
+    default void addDeviceAgentListener(
+            ProviderId providerId, DeviceAgentListener listener) {
         throw new UnsupportedOperationException(
                 "Device agent listener registration not supported");
     }
 
     /**
-     * Removes a device agent listener.
+     * Removes a device agent listener previously registered for the given
+     * provider ID.
      *
-     * @param listener device agent listener
+     * @param providerId provider ID
      */
-    default void removeDeviceAgentListener(DeviceAgentListener listener) {
+    default void removeDeviceAgentListener(ProviderId providerId) {
         throw new UnsupportedOperationException(
                 "Device agent listener removal not supported");
     }
diff --git a/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java b/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
index 2f53620..f30386d 100644
--- a/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
+++ b/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
@@ -20,27 +20,38 @@
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Abstraction of handler behaviour used to set-up and tear-down
- * connection with a device.
+ * Abstraction of handler behaviour used to set-up and tear-down connections
+ * with a device.
  */
 @Beta
 public interface DeviceConnect extends HandlerBehaviour {
 
     /**
-     * Connects to the device.
-     * It's supposed to initiate the transport sessions, channel and also,
-     * if applicable, store them in the proper protocol specific
-     * controller (e.g. GrpcController).
+     * Connects to the device, for example by opening the transport session that
+     * will be later used to send control messages. Returns true if the
+     * connection was initiated successfully, false otherwise.
+     * <p>
+     * Calling multiple times this method while a connection to the device is
+     * open should result in a no-op.
      *
      * @return CompletableFuture with true if the operation was successful
      */
     CompletableFuture<Boolean> connect();
 
     /**
-     * Disconnects from the device.
-     * It's supposed to destroy the transport sessions and channel and also,
-     * if applicable, remove them in the proper protocol specific
-     * controller (e.g. GrpcController).
+     * Returns true if a connection to the device is open, false otherwise.
+     *
+     * @return true if the connection is open, false otherwise
+     */
+    boolean isConnected();
+
+    /**
+     * Disconnects from the device, for example closing the transport session
+     * previously opened. Returns true if the disconnection procedure was
+     * successful, false otherwise.
+     * <p>
+     * Calling multiple times this method while a connection to the device is
+     * closed should result in a no-op.
      *
      * @return CompletableFuture with true if the operation was successful
      */
diff --git a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogEvent.java b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogEvent.java
new file mode 100644
index 0000000..0d34dd6
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.service;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.AbstractEvent;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Event representing changes in the status of a device pipeline.
+ */
+@Beta
+public class PiPipeconfWatchdogEvent
+        extends AbstractEvent<PiPipeconfWatchdogEvent.Type, DeviceId> {
+
+    /**
+     * Type of event.
+     */
+    public enum Type {
+        PIPELINE_READY,
+        PIPELINE_UNKNOWN
+    }
+
+    /**
+     * Creates a new event for the given device.
+     *
+     * @param type type
+     * @param subject device ID
+     */
+    public PiPipeconfWatchdogEvent(PiPipeconfWatchdogEvent.Type type, DeviceId subject) {
+        super(type, subject);
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogListener.java b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogListener.java
new file mode 100644
index 0000000..ad6a899
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogListener.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.service;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.EventListener;
+
+/**
+ * Listener of pipeline status change events produced by {@link
+ * PiPipeconfWatchdogService}.
+ */
+@Beta
+public interface PiPipeconfWatchdogListener
+        extends EventListener<PiPipeconfWatchdogEvent> {
+}
diff --git a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogService.java b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogService.java
new file mode 100644
index 0000000..9462569
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogService.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.service;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.ListenerService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.model.PiPipeconfId;
+
+/**
+ * Service that periodically probes pipeline programmable devices, to check that
+ * their pipeline is configured with the expected pipeconf. It emits events
+ * about pipeline status changes.
+ */
+@Beta
+public interface PiPipeconfWatchdogService
+        extends ListenerService<PiPipeconfWatchdogEvent, PiPipeconfWatchdogListener> {
+
+    /**
+     * Status of a device pipeline.
+     */
+    enum PipelineStatus {
+        /**
+         * The device pipeline is ready to process packets.
+         */
+        READY,
+        /**
+         * The status is unknown and the device might not be able to process
+         * packets yet.
+         */
+        UNKNOWN,
+    }
+
+    /**
+     * Asynchronously triggers a probe task that checks the device pipeline
+     * status and, if required, configures it with the pipeconf associated to
+     * this device (via {@link PiPipeconfService#bindToDevice(PiPipeconfId,
+     * DeviceId)}).
+     *
+     * @param deviceId device to probe
+     */
+    void triggerProbe(DeviceId deviceId);
+
+    /**
+     * Returns the last known pipeline status of the given device.
+     *
+     * @param deviceId device ID
+     * @return pipeline status
+     */
+    PipelineStatus getStatus(DeviceId deviceId);
+}
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 0e2ea82..a90149f 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -181,6 +181,11 @@
             return null;
         }
         String baseDriverName = basicDeviceConfig.driver();
+        if (baseDriverName == null) {
+            log.warn("Missing driver from basic device config for {}, " +
+                             "cannot produce merged driver", deviceId);
+            return null;
+        }
         if (isMergedDriverName(baseDriverName)) {
             // The config already has driver name that is a merged one. We still
             // need to make sure an instance of that merged driver is present in
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
new file mode 100644
index 0000000..8376295
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
@@ -0,0 +1,401 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.impl;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.mastership.MastershipInfo;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.behaviour.PiPipelineProgrammable;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceHandshaker;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.service.PiPipeconfMappingStore;
+import org.onosproject.net.pi.service.PiPipeconfService;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of PiPipeconfWatchdogService that implements a periodic
+ * pipeline probe task and listens for device events to update the status of the
+ * pipeline.
+ */
+@Component(immediate = true)
+@Service
+public class PiPipeconfWatchdogManager
+        extends AbstractListenerManager<PiPipeconfWatchdogEvent, PiPipeconfWatchdogListener>
+        implements PiPipeconfWatchdogService {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final long SECONDS = 1000L;
+    // Long enough to allow for network delay (e.g. to transfer large pipeline
+    // binaries over slow network).
+    private static final long PIPECONF_SET_TIMEOUT = 60; // Seconds.
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private PiPipeconfMappingStore pipeconfMappingStore;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PiPipeconfService pipeconfService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ComponentConfigService componentConfigService;
+
+    private static final String PROBE_INTERVAL = "probeInterval";
+    private static final int DEFAULT_PROBE_INTERVAL = 30;
+    @Property(name = PROBE_INTERVAL, intValue = DEFAULT_PROBE_INTERVAL,
+            label = "Configure interval in seconds for device pipeconf probing")
+    private int probeInterval = DEFAULT_PROBE_INTERVAL;
+
+    protected ExecutorService executor = Executors.newFixedThreadPool(
+            30, groupedThreads("onos/pipeconf-watchdog", "%d", log));
+
+    private final InternalDeviceListener deviceListener = new InternalDeviceListener();
+
+    private Timer timer;
+    private TimerTask task;
+
+    private final Striped<Lock> locks = Striped.lock(30);
+
+    private EventuallyConsistentMap<DeviceId, PipelineStatus> statusMap;
+    private Map<DeviceId, PipelineStatus> localStatusMap;
+
+    @Activate
+    public void activate() {
+        eventDispatcher.addSink(PiPipeconfWatchdogEvent.class, listenerRegistry);
+        localStatusMap = Maps.newConcurrentMap();
+        // Init distributed status map.
+        KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(PipelineStatus.class);
+        statusMap = storageService.<DeviceId, PipelineStatus>eventuallyConsistentMapBuilder()
+                .withName("onos-pipeconf-status-table")
+                .withSerializer(serializer)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
+        statusMap.addListener(new StatusMapListener());
+        // Register component configurable properties.
+        componentConfigService.registerProperties(getClass());
+        // Start periodic watchdog task.
+        timer = new Timer();
+        startProbeTask();
+        // Add device listener.
+        deviceService.addListener(deviceListener);
+        log.info("Started");
+    }
+
+    @Modified
+    public void modified(ComponentContext context) {
+        if (context == null) {
+            return;
+        }
+
+        Dictionary<?, ?> properties = context.getProperties();
+        final int oldProbeInterval = probeInterval;
+        probeInterval = Tools.getIntegerProperty(
+                properties, PROBE_INTERVAL, DEFAULT_PROBE_INTERVAL);
+        log.info("Configured. {} is configured to {} seconds",
+                 PROBE_INTERVAL, probeInterval);
+
+        if (oldProbeInterval != probeInterval) {
+            rescheduleProbeTask();
+        }
+    }
+
+    @Deactivate
+    public void deactivate() {
+        eventDispatcher.removeSink(PiPipeconfWatchdogEvent.class);
+        deviceService.removeListener(deviceListener);
+        stopProbeTask();
+        timer = null;
+        statusMap = null;
+        localStatusMap = null;
+        log.info("Stopped");
+    }
+
+    @Override
+    public void triggerProbe(DeviceId deviceId) {
+        final Device device = deviceService.getDevice(deviceId);
+        if (device != null) {
+            filterAndTriggerTasks(Collections.singleton(device));
+        }
+    }
+
+    @Override
+    public PipelineStatus getStatus(DeviceId deviceId) {
+        final PipelineStatus status = statusMap.get(deviceId);
+        return status == null ? PipelineStatus.UNKNOWN : status;
+    }
+
+    private void triggerCheckAllDevices() {
+        filterAndTriggerTasks(deviceService.getDevices());
+    }
+
+    private void filterAndTriggerTasks(Iterable<Device> devices) {
+        devices.forEach(device -> {
+            if (!isLocalMaster(device)) {
+                return;
+            }
+
+            final PiPipeconfId pipeconfId = pipeconfMappingStore.getPipeconfId(device.id());
+            if (pipeconfId == null
+                    || !device.is(PiPipelineProgrammable.class)) {
+                return;
+            }
+
+            if (!pipeconfService.getPipeconf(pipeconfId).isPresent()) {
+                log.error("Pipeconf {} is not registered", pipeconfId);
+                return;
+            }
+
+            final PiPipeconf pipeconf = pipeconfService.getPipeconf(pipeconfId).get();
+
+            if (!device.is(DeviceHandshaker.class)) {
+                log.error("Missing DeviceHandshaker behavior for {}", device.id());
+                return;
+            }
+
+            // Trigger task with per-device lock.
+            executor.execute(withLock(() -> {
+                final boolean success = doSetPipeconfIfRequired(device, pipeconf);
+                if (success) {
+                    signalStatusReady(device.id());
+                } else {
+                    signalStatusUnknown(device.id());
+                }
+            }, device.id()));
+        });
+    }
+
+    /**
+     * Returns true if the given device is known to be configured with the given
+     * pipeline, false otherwise. If necessary, this method enforces setting the
+     * given pipeconf using drivers.
+     *
+     * @param device   device
+     * @param pipeconf pipeconf
+     * @return boolean
+     */
+    private boolean doSetPipeconfIfRequired(Device device, PiPipeconf pipeconf) {
+        log.debug("Starting watchdog task for {} ({})", device.id(), pipeconf.id());
+        final PiPipelineProgrammable pipelineProg = device.as(PiPipelineProgrammable.class);
+        final DeviceHandshaker handshaker = device.as(DeviceHandshaker.class);
+        if (!handshaker.isConnected()) {
+            return false;
+        }
+        if (pipelineProg.isPipeconfSet(pipeconf)) {
+            log.debug("Pipeconf {} already configured on {}",
+                      pipeconf.id(), device.id());
+            return true;
+        }
+        try {
+            return pipelineProg.setPipeconf(pipeconf)
+                    .get(PIPECONF_SET_TIMEOUT, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("Thread interrupted while setting pipeconf on {}",
+                      device.id());
+            Thread.currentThread().interrupt();
+        } catch (ExecutionException e) {
+            log.error("Exception while setting pipeconf on {}",
+                      device.id(), e.getCause());
+        } catch (TimeoutException e) {
+            log.error("Operation TIMEOUT while setting pipeconf on {}",
+                      device.id());
+        }
+        return false;
+    }
+
+    private Runnable withLock(Runnable task, Object object) {
+        return () -> {
+            final Lock lock = locks.get(object);
+            lock.lock();
+            try {
+                task.run();
+            } finally {
+                lock.unlock();
+            }
+        };
+    }
+
+    private void signalStatusUnknown(DeviceId deviceId) {
+        statusMap.remove(deviceId);
+    }
+
+    private void signalStatusReady(DeviceId deviceId) {
+        statusMap.put(deviceId, PipelineStatus.READY);
+    }
+
+    private boolean isLocalMaster(Device device) {
+        if (mastershipService.isLocalMaster(device.id())) {
+            return true;
+        }
+        // The device might have no master (e.g. after it has been disconnected
+        // from core), hence we use device mastership state.
+        final MastershipInfo info = mastershipService.getMastershipFor(device.id());
+        return !info.master().isPresent() &&
+                device.is(DeviceHandshaker.class) &&
+                device.as(DeviceHandshaker.class).getRole()
+                        .equals(MastershipRole.MASTER);
+    }
+
+    private void startProbeTask() {
+        synchronized (timer) {
+            log.info("Starting pipeline probe thread with {} seconds interval...", probeInterval);
+            task = new InternalTimerTask();
+            timer.scheduleAtFixedRate(task, probeInterval * SECONDS,
+                                      probeInterval * SECONDS);
+        }
+    }
+
+
+    private void stopProbeTask() {
+        synchronized (timer) {
+            log.info("Stopping pipeline probe thread...");
+            task.cancel();
+            task = null;
+        }
+    }
+
+
+    private synchronized void rescheduleProbeTask() {
+        synchronized (timer) {
+            stopProbeTask();
+            startProbeTask();
+        }
+    }
+
+    private class InternalTimerTask extends TimerTask {
+        @Override
+        public void run() {
+            triggerCheckAllDevices();
+        }
+    }
+
+    /**
+     * Listener of device events used to update the pipeline status.
+     */
+    private class InternalDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+            final Device device = event.subject();
+            switch (event.type()) {
+                case DEVICE_ADDED:
+                case DEVICE_UPDATED:
+                case DEVICE_AVAILABILITY_CHANGED:
+                    if (!deviceService.isAvailable(device.id())) {
+                        signalStatusUnknown(device.id());
+                    }
+                    break;
+                case DEVICE_REMOVED:
+                case DEVICE_SUSPENDED:
+                    signalStatusUnknown(device.id());
+                    break;
+                case PORT_ADDED:
+                case PORT_UPDATED:
+                case PORT_REMOVED:
+                case PORT_STATS_UPDATED:
+                default:
+                    break;
+            }
+        }
+    }
+
+    private class StatusMapListener
+            implements EventuallyConsistentMapListener<DeviceId, PipelineStatus> {
+
+        @Override
+        public void event(EventuallyConsistentMapEvent<DeviceId, PipelineStatus> event) {
+            final DeviceId deviceId = event.key();
+            final PipelineStatus status = event.value();
+            switch (event.type()) {
+                case PUT:
+                    postStatusEvent(deviceId, status);
+                    break;
+                case REMOVE:
+                    postStatusEvent(deviceId, PipelineStatus.UNKNOWN);
+                    break;
+                default:
+                    log.error("Unknown map event type {}", event.type());
+            }
+        }
+
+        private void postStatusEvent(DeviceId deviceId, PipelineStatus newStatus) {
+            PipelineStatus oldStatus = localStatusMap.put(deviceId, newStatus);
+            oldStatus = oldStatus == null ? PipelineStatus.UNKNOWN : oldStatus;
+            final PiPipeconfWatchdogEvent.Type eventType =
+                    newStatus == PipelineStatus.READY
+                            ? PiPipeconfWatchdogEvent.Type.PIPELINE_READY
+                            : PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN;
+            if (newStatus != oldStatus) {
+                log.info("Pipeline status of {} is {}", deviceId, newStatus);
+                post(new PiPipeconfWatchdogEvent(eventType, deviceId));
+            }
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
index 64c23d8..092b8eb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
@@ -16,14 +16,16 @@
 
 package org.onosproject.store.pi.impl;
 
-import com.google.common.collect.Sets;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.pi.model.PiPipeconfId;
 import org.onosproject.net.pi.service.PiPipeconfDeviceMappingEvent;
@@ -31,24 +33,19 @@
 import org.onosproject.net.pi.service.PiPipeconfMappingStoreDelegate;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.MultiValuedTimestamp;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.WallClockTimestamp;
 import org.slf4j.Logger;
 
-import java.util.Collections;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Manages information of pipeconf to device binding using gossip protocol to distribute
- * information.
+ * Manages information of pipeconf to device binding.
  */
 @Component(immediate = true)
 @Service
@@ -61,22 +58,20 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
-    protected EventuallyConsistentMap<DeviceId, PiPipeconfId> deviceToPipeconf;
+    protected ConsistentMap<DeviceId, PiPipeconfId> deviceToPipeconf;
 
-    protected final EventuallyConsistentMapListener<DeviceId, PiPipeconfId> pipeconfListener =
+    protected final MapEventListener<DeviceId, PiPipeconfId> pipeconfListener =
             new InternalPiPipeconfListener();
 
-    protected ConcurrentMap<PiPipeconfId, Set<DeviceId>> pipeconfToDevices = new ConcurrentHashMap<>();
+    protected SetMultimap<PiPipeconfId, DeviceId> pipeconfToDevices =
+            Multimaps.synchronizedSetMultimap(HashMultimap.create());
 
     @Activate
     public void activate() {
-        KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .register(MultiValuedTimestamp.class);
-        deviceToPipeconf = storageService.<DeviceId, PiPipeconfId>eventuallyConsistentMapBuilder()
+        deviceToPipeconf = storageService.<DeviceId, PiPipeconfId>consistentMapBuilder()
                 .withName("onos-pipeconf-table")
-                .withSerializer(serializer)
-                .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
+                .withSerializer(Serializer.using(KryoNamespaces.API))
+                .build();
         deviceToPipeconf.addListener(pipeconfListener);
         log.info("Started");
     }
@@ -91,12 +86,15 @@
 
     @Override
     public PiPipeconfId getPipeconfId(DeviceId deviceId) {
-        return deviceToPipeconf.get(deviceId);
+        if (!deviceToPipeconf.containsKey(deviceId)) {
+            return null;
+        }
+        return deviceToPipeconf.get(deviceId).value();
     }
 
     @Override
     public Set<DeviceId> getDevices(PiPipeconfId pipeconfId) {
-        return pipeconfToDevices.getOrDefault(pipeconfId, Collections.emptySet());
+        return ImmutableSet.copyOf(pipeconfToDevices.get(pipeconfId));
     }
 
     @Override
@@ -109,35 +107,38 @@
         deviceToPipeconf.remove(deviceId);
     }
 
-    private class InternalPiPipeconfListener implements EventuallyConsistentMapListener<DeviceId, PiPipeconfId> {
+    private class InternalPiPipeconfListener implements MapEventListener<DeviceId, PiPipeconfId> {
 
         @Override
-        public void event(EventuallyConsistentMapEvent<DeviceId, PiPipeconfId> mapEvent) {
-            final PiPipeconfDeviceMappingEvent.Type type;
+        public void event(MapEvent<DeviceId, PiPipeconfId> mapEvent) {
+            PiPipeconfDeviceMappingEvent.Type eventType = null;
             final DeviceId deviceId = mapEvent.key();
-            final PiPipeconfId pipeconfId = mapEvent.value();
+            final PiPipeconfId newPipeconfId = mapEvent.newValue() != null
+                    ? mapEvent.newValue().value() : null;
+            final PiPipeconfId oldPipeconfId = mapEvent.oldValue() != null
+                    ? mapEvent.oldValue().value() : null;
             switch (mapEvent.type()) {
-                case PUT:
-                    type = PiPipeconfDeviceMappingEvent.Type.CREATED;
-                    pipeconfToDevices.compute(pipeconfId, (pipeconf, devices) -> {
-                        if (devices == null) {
-                            devices = Sets.newConcurrentHashSet();
+                case INSERT:
+                case UPDATE:
+                    if (newPipeconfId != null) {
+                        if (!newPipeconfId.equals(oldPipeconfId)) {
+                            eventType = PiPipeconfDeviceMappingEvent.Type.CREATED;
                         }
-                        devices.add(deviceId);
-                        return devices;
-                    });
+                        pipeconfToDevices.put(newPipeconfId, deviceId);
+                    }
                     break;
                 case REMOVE:
-                    type = PiPipeconfDeviceMappingEvent.Type.REMOVED;
-                    pipeconfToDevices.computeIfPresent(pipeconfId, (pipeconf, devices) -> {
-                        devices.remove(deviceId);
-                        return devices;
-                    });
+                    if (oldPipeconfId != null) {
+                        eventType = PiPipeconfDeviceMappingEvent.Type.REMOVED;
+                        pipeconfToDevices.remove(oldPipeconfId, deviceId);
+                    }
                     break;
                 default:
                     throw new IllegalArgumentException("Wrong event type " + mapEvent.type());
             }
-            notifyDelegate(new PiPipeconfDeviceMappingEvent(type, deviceId));
+            if (eventType != null) {
+                notifyDelegate(new PiPipeconfDeviceMappingEvent(eventType, deviceId));
+            }
         }
     }
 }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java
index 9a31b0c..875bb2a 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java
@@ -22,7 +22,11 @@
 import org.onosproject.net.pi.model.PiPipeconfId;
 import org.onosproject.store.service.TestStorageService;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test class for the Distributed Device to Pipeconf store.
@@ -72,10 +76,14 @@
     @Test
     public void createOrUpdatePipeconfToDeviceBinding() {
         store.createOrUpdateBinding(DEVICE_ID, PIPECONF_ID);
-        assertTrue("Value should be in the map", store.deviceToPipeconf.containsKey(DEVICE_ID));
-        assertTrue("Value should be in the map", store.deviceToPipeconf.containsValue(PIPECONF_ID));
-        assertTrue("Value should be in the map", store.pipeconfToDevices.containsKey(PIPECONF_ID));
-        assertTrue("Value should be in the map", store.pipeconfToDevices.containsValue(ImmutableSet.of(DEVICE_ID)));
+        assertTrue("Value should be in the map",
+                   store.deviceToPipeconf.containsKey(DEVICE_ID));
+        assertEquals("Value should be in the map",
+                     PIPECONF_ID, store.deviceToPipeconf.get(DEVICE_ID).value());
+        assertTrue("Value should be in the map",
+                   store.pipeconfToDevices.containsKey(PIPECONF_ID));
+        assertTrue("Value should be in the map",
+                   store.pipeconfToDevices.get(PIPECONF_ID).contains(DEVICE_ID));
     }
 
     /**
diff --git a/drivers/bmv2/src/main/resources/bmv2-drivers.xml b/drivers/bmv2/src/main/resources/bmv2-drivers.xml
index 0575ee2..502d654 100644
--- a/drivers/bmv2/src/main/resources/bmv2-drivers.xml
+++ b/drivers/bmv2/src/main/resources/bmv2-drivers.xml
@@ -15,7 +15,7 @@
   ~ limitations under the License.
   -->
 <drivers>
-    <driver name="bmv2" manufacturer="p4.org" hwVersion="master" swVersion="master" extends="p4runtime, gnmi">
+    <driver name="bmv2" manufacturer="p4.org" hwVersion="master" swVersion="master" extends="p4runtime">
         <behaviour api="org.onosproject.net.behaviour.PiPipelineProgrammable"
                    impl="org.onosproject.drivers.bmv2.Bmv2PipelineProgrammable"/>
         <property name="tableDeleteBeforeUpdate">true</property>
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
index 3e217c8..c7d76c0 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimePipelineProgrammable.java
@@ -48,7 +48,7 @@
     public abstract ByteBuffer createDeviceDataBuffer(PiPipeconf pipeconf);
 
     @Override
-    public CompletableFuture<Boolean> deployPipeconf(PiPipeconf pipeconf) {
+    public CompletableFuture<Boolean> setPipeconf(PiPipeconf pipeconf) {
         return CompletableFuture.supplyAsync(
                 () -> doDeployConfig(pipeconf),
                 SharedExecutors.getPoolThreadExecutor());
@@ -71,21 +71,6 @@
             return false;
         }
 
-        // We need to be master to perform write RPC to the device.
-        // FIXME: properly perform mastership handling in the device provider
-        // This would probably mean deploying the pipeline after the device as
-        // been notified to the core.
-        final Boolean masterSuccess = getFutureWithDeadline(
-                client.becomeMaster(),
-                "becoming master ", null);
-        if (masterSuccess == null) {
-            // Error already logged by getFutureWithDeadline()
-            return false;
-        } else if (!masterSuccess) {
-            log.warn("Unable to become master for {}, aborting pipeconf deploy", deviceId);
-            return false;
-        }
-
         final Boolean deploySuccess = getFutureWithDeadline(
                 client.setPipelineConfig(pipeconf, deviceDataBuffer),
                 "deploying pipeconf", null);
@@ -100,5 +85,25 @@
     }
 
     @Override
+    public boolean isPipeconfSet(PiPipeconf pipeconf) {
+        DeviceId deviceId = handler().data().deviceId();
+        P4RuntimeController controller = handler().get(P4RuntimeController.class);
+
+        P4RuntimeClient client = controller.getClient(deviceId);
+        if (client == null) {
+            log.warn("Unable to find client for {}, cannot check if pipeconf is set", deviceId);
+            return false;
+        }
+
+        ByteBuffer deviceDataBuffer = createDeviceDataBuffer(pipeconf);
+        if (deviceDataBuffer == null) {
+            // Hopefully the child class logged the problem.
+            return false;
+        }
+
+        return client.isPipelineConfigSet(pipeconf, deviceDataBuffer);
+    }
+
+    @Override
     public abstract Optional<PiPipeconf> getDefaultPipeconf();
 }
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
index c818135..15836b1 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeHandshaker.java
@@ -20,6 +20,7 @@
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.device.DeviceHandshaker;
+import org.onosproject.net.provider.ProviderId;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 
@@ -38,11 +39,19 @@
                     if (client == null) {
                         return CompletableFuture.completedFuture(false);
                     }
-                    return client.start();
+                    return client.startStreamChannel();
                 });
     }
 
     @Override
+    public boolean isConnected() {
+        final P4RuntimeController controller = handler().get(P4RuntimeController.class);
+        final DeviceId deviceId = handler().data().deviceId();
+        final P4RuntimeClient client = controller.getClient(deviceId);
+        return client != null && client.isStreamChannelOpen();
+    }
+
+    @Override
     public CompletableFuture<Boolean> disconnect() {
         final P4RuntimeController controller = handler().get(P4RuntimeController.class);
         final DeviceId deviceId = handler().data().deviceId();
@@ -59,10 +68,18 @@
 
     @Override
     public CompletableFuture<Boolean> isReachable() {
-        return CompletableFuture.supplyAsync(() -> handler()
-                .get(P4RuntimeController.class)
-                .isReachable(handler().data().deviceId())
-        );
+        return CompletableFuture
+                // P4RuntimeController requires a client to be created to
+                // check for reachability.
+                .supplyAsync(super::createClient)
+                .thenApplyAsync(client -> {
+                    if (client == null) {
+                        return false;
+                    }
+                    return handler()
+                            .get(P4RuntimeController.class)
+                            .isReachable(handler().data().deviceId());
+                });
     }
 
     @Override
@@ -78,16 +95,27 @@
     }
 
     @Override
-    public void addDeviceAgentListener(DeviceAgentListener listener) {
-        // Don't use controller/deviceId class variables as they might be uninitialized.
-        handler().get(P4RuntimeController.class)
-                .addDeviceAgentListener(data().deviceId(), listener);
+    public MastershipRole getRole() {
+        final P4RuntimeController controller = handler().get(P4RuntimeController.class);
+        final DeviceId deviceId = handler().data().deviceId();
+        final P4RuntimeClient client = controller.getClient(deviceId);
+        if (client == null || !client.isStreamChannelOpen()) {
+            return MastershipRole.NONE;
+        }
+        return client.isMaster() ? MastershipRole.MASTER : MastershipRole.STANDBY;
     }
 
     @Override
-    public void removeDeviceAgentListener(DeviceAgentListener listener) {
+    public void addDeviceAgentListener(ProviderId providerId, DeviceAgentListener listener) {
+        // Don't use controller/deviceId class variables as they might be uninitialized.
+        handler().get(P4RuntimeController.class)
+                .addDeviceAgentListener(data().deviceId(), providerId, listener);
+    }
+
+    @Override
+    public void removeDeviceAgentListener(ProviderId providerId) {
         // Don't use controller/deviceId class variable as they might be uninitialized.
         handler().get(P4RuntimeController.class)
-                .removeDeviceAgentListener(data().deviceId(), listener);
+                .removeDeviceAgentListener(data().deviceId(), providerId);
     }
 }
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
index 32b1f9f..886a39d 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
@@ -54,13 +54,19 @@
     }
 
     /**
-     * Starts the client by starting the Stream RPC with the device. The
-     * implementation of this method is equivalent to {@link #becomeMaster()}.
+     * Starts the Stream RPC with the device.
      *
      * @return completable future containing true if the operation was
      * successful, false otherwise.
      */
-    CompletableFuture<Boolean> start();
+    CompletableFuture<Boolean> startStreamChannel();
+
+    /**
+     * Returns true if the stream RPC is active, false otherwise.
+     *
+     * @return boolean
+     */
+    boolean isStreamChannelOpen();
 
     /**
      * Shutdowns the client by terminating any active RPC such as the Stream
@@ -81,6 +87,13 @@
     CompletableFuture<Boolean> becomeMaster();
 
     /**
+     * Returns true if this client is master for the device, false otherwise.
+     *
+     * @return boolean
+     */
+    boolean isMaster();
+
+    /**
      * Sets the device pipeline according to the given pipeconf, and for the
      * given byte buffer representing the target-specific data to be used in the
      * P4Runtime's SetPipelineConfig message. This method should be called
@@ -95,6 +108,21 @@
             PiPipeconf pipeconf, ByteBuffer deviceData);
 
     /**
+     * Returns true if the device has the given pipeconf set, false otherwise.
+     * Equality is based on the P4Info extension of the pipeconf as well as the
+     * given device data byte buffer.
+     * <p>
+     * This method is expected to return {@code true} if invoked after calling
+     * {@link #setPipelineConfig(PiPipeconf, ByteBuffer)} with the same
+     * parameters.
+     *
+     * @param pipeconf   pipeconf
+     * @param deviceData target-specific data
+     * @return boolean
+     */
+    boolean isPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData);
+
+    /**
      * Performs the given write operation for the given table entries and
      * pipeconf.
      *
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 836ed7a..159e313 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -20,6 +20,7 @@
 import org.onosproject.event.ListenerService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.provider.ProviderId;
 
 /**
  * Controller of P4Runtime devices.
@@ -94,18 +95,21 @@
     boolean isReachable(DeviceId deviceId);
 
     /**
-     * Adds a listener for device agent events.
+     * Adds a listener for device agent events for the given provider.
      *
      * @param deviceId device identifier
+     * @param providerId provider ID
      * @param listener the device agent listener
      */
-    void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener);
+    void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId,
+                                DeviceAgentListener listener);
 
     /**
-     * Removes the listener for device agent events.
+     * Removes the listener for device agent events that was previously
+     * registered for the given provider.
      *
-     * @param deviceId device identifier
-     * @param listener the device agent listener
+     * @param deviceId   device identifier
+     * @param providerId the provider ID
      */
-    void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener);
+    void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId);
 }
diff --git a/protocols/p4runtime/ctl/BUCK b/protocols/p4runtime/ctl/BUCK
index 46540b3..343d3b2 100644
--- a/protocols/p4runtime/ctl/BUCK
+++ b/protocols/p4runtime/ctl/BUCK
@@ -1,27 +1,28 @@
-GRPC_VER = '1.3.1'
-PROTOBUF_VER = '3.2.0'
-
-COMPILE_DEPS = [
-    '//lib:CORE_DEPS',
-    '//lib:KRYO',
-    '//protocols/grpc/api:onos-protocols-grpc-api',
-    '//protocols/p4runtime/api:onos-protocols-p4runtime-api',
-    '//protocols/p4runtime/proto:onos-protocols-p4runtime-proto',
-    '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
-    '//lib:grpc-stub-' + GRPC_VER,
-    '//lib:grpc-netty-' + GRPC_VER,
-    '//lib:protobuf-java-' + PROTOBUF_VER,
-    '//core/store/serializers:onos-core-serializers',
-]
-
-TEST_DEPS = [
-    '//lib:TEST',
-    '//lib:GRPC_TEST_1.3',
-    '//lib:minimal-json',
-    '//lib:grpc-protobuf-lite-' + GRPC_VER,
-]
-
-osgi_jar_with_tests(
-    deps = COMPILE_DEPS,
-    test_deps = TEST_DEPS,
-)
+# Buck build of P4-related modules is no longer supported, please use Bazel
+# GRPC_VER = '1.3.1'
+# PROTOBUF_VER = '3.2.0'
+#
+# COMPILE_DEPS = [
+#     '//lib:CORE_DEPS',
+#     '//lib:KRYO',
+#     '//protocols/grpc/api:onos-protocols-grpc-api',
+#     '//protocols/p4runtime/api:onos-protocols-p4runtime-api',
+#     '//protocols/p4runtime/proto:onos-protocols-p4runtime-proto',
+#     '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
+#     '//lib:grpc-stub-' + GRPC_VER,
+#     '//lib:grpc-netty-' + GRPC_VER,
+#     '//lib:protobuf-java-' + PROTOBUF_VER,
+#     '//core/store/serializers:onos-core-serializers',
+# ]
+#
+# TEST_DEPS = [
+#     '//lib:TEST',
+#     '//lib:GRPC_TEST_1.3',
+#     '//lib:minimal-json',
+#     '//lib:grpc-protobuf-lite-' + GRPC_VER,
+# ]
+#
+# osgi_jar_with_tests(
+#     deps = COMPILE_DEPS,
+#     test_deps = TEST_DEPS,
+# )
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
index 5483edc..443514f 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeClientImpl.java
@@ -26,6 +26,7 @@
 import io.grpc.ManagedChannel;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ClientCallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.onlab.osgi.DefaultServiceDirectory;
@@ -59,6 +60,8 @@
 import p4.v1.P4RuntimeOuterClass.ActionProfileMember;
 import p4.v1.P4RuntimeOuterClass.Entity;
 import p4.v1.P4RuntimeOuterClass.ForwardingPipelineConfig;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigRequest;
+import p4.v1.P4RuntimeOuterClass.GetForwardingPipelineConfigResponse;
 import p4.v1.P4RuntimeOuterClass.MasterArbitrationUpdate;
 import p4.v1.P4RuntimeOuterClass.MulticastGroupEntry;
 import p4.v1.P4RuntimeOuterClass.PacketReplicationEngineEntry;
@@ -73,6 +76,7 @@
 import p4.v1.P4RuntimeOuterClass.WriteRequest;
 
 import java.math.BigInteger;
+import java.net.ConnectException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
@@ -86,6 +90,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
@@ -132,11 +137,13 @@
     private final P4RuntimeGrpc.P4RuntimeBlockingStub blockingStub;
     private final ExecutorService executorService;
     private final Executor contextExecutor;
-    private final StreamObserver<StreamMessageRequest> streamRequestObserver;
+    private StreamChannelManager streamChannelManager;
 
     // Used by this client for write requests.
     private Uint128 clientElectionId = Uint128.newBuilder().setLow(1).build();
 
+    private final AtomicBoolean isClientMaster = new AtomicBoolean(false);
+
     /**
      * Default constructor.
      *
@@ -155,8 +162,7 @@
         this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
         //TODO Investigate use of stub deadlines instead of timeout in supplyInContext
         this.blockingStub = P4RuntimeGrpc.newBlockingStub(channel);
-        this.streamRequestObserver = P4RuntimeGrpc.newStub(channel)
-                .streamChannel(new StreamChannelResponseObserver());
+        this.streamChannelManager = new StreamChannelManager(channel);
     }
 
     /**
@@ -206,7 +212,7 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> start() {
+    public CompletableFuture<Boolean> startStreamChannel() {
         return supplyInContext(() -> sendMasterArbitrationUpdate(false),
                                "start-initStreamChannel");
     }
@@ -224,11 +230,26 @@
     }
 
     @Override
+    public boolean isMaster() {
+        return streamChannelManager.isOpen() && isClientMaster.get();
+    }
+
+    @Override
+    public boolean isStreamChannelOpen() {
+        return streamChannelManager.isOpen();
+    }
+
+    @Override
     public CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
         return supplyInContext(() -> doSetPipelineConfig(pipeconf, deviceData), "setPipelineConfig");
     }
 
     @Override
+    public boolean isPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
+        return doIsPipelineConfigSet(pipeconf, deviceData);
+    }
+
+    @Override
     public CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> piTableEntries,
                                                         WriteOperationType opType, PiPipeconf pipeconf) {
         return supplyInContext(() -> doWriteTableEntries(piTableEntries, opType, pipeconf),
@@ -338,24 +359,94 @@
         final Uint128 idMsg = bigIntegerToUint128(
                 controller.newMasterElectionId(deviceId));
 
-        log.info("Sending arbitration update to {}... electionId={}",
-                 deviceId, newId);
-        try {
-            streamRequestObserver.onNext(
-                    StreamMessageRequest.newBuilder()
-                            .setArbitration(
-                                    MasterArbitrationUpdate
-                                            .newBuilder()
-                                            .setDeviceId(p4DeviceId)
-                                            .setElectionId(idMsg)
-                                            .build())
-                            .build());
-            clientElectionId = idMsg;
-            return true;
-        } catch (StatusRuntimeException e) {
-            log.error("Unable to perform arbitration update on {}: {}", deviceId, e.getMessage());
+        log.debug("Sending arbitration update to {}... electionId={}",
+                  deviceId, newId);
+
+        streamChannelManager.send(
+                StreamMessageRequest.newBuilder()
+                        .setArbitration(
+                                MasterArbitrationUpdate
+                                        .newBuilder()
+                                        .setDeviceId(p4DeviceId)
+                                        .setElectionId(idMsg)
+                                        .build())
+                        .build());
+        clientElectionId = idMsg;
+        return true;
+    }
+
+    private ForwardingPipelineConfig getPipelineConfig(
+            PiPipeconf pipeconf, ByteBuffer deviceData) {
+        P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
+        if (p4Info == null) {
+            // Problem logged by PipeconfHelper.
+            return null;
         }
-        return false;
+
+        // FIXME: This is specific to PI P4Runtime implementation.
+        P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
+                .newBuilder()
+                .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
+                .setReassign(true)
+                .setDeviceData(ByteString.copyFrom(deviceData))
+                .build();
+
+        return ForwardingPipelineConfig
+                .newBuilder()
+                .setP4Info(p4Info)
+                .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
+                .build();
+    }
+
+    private boolean doIsPipelineConfigSet(PiPipeconf pipeconf, ByteBuffer deviceData) {
+
+        GetForwardingPipelineConfigRequest request = GetForwardingPipelineConfigRequest
+                .newBuilder()
+                .setDeviceId(p4DeviceId)
+                .build();
+
+        GetForwardingPipelineConfigResponse resp;
+        try {
+            resp = this.blockingStub
+                    .getForwardingPipelineConfig(request);
+        } catch (StatusRuntimeException ex) {
+            checkGrpcException(ex);
+            // FAILED_PRECONDITION means that a pipeline config was not set in
+            // the first place. Don't bother logging.
+            if (!ex.getStatus().getCode()
+                    .equals(Status.FAILED_PRECONDITION.getCode())) {
+                log.warn("Unable to get pipeline config from {}: {}",
+                         deviceId, ex.getMessage());
+            }
+            return false;
+        }
+
+        ForwardingPipelineConfig expectedConfig = getPipelineConfig(
+                pipeconf, deviceData);
+
+        if (expectedConfig == null) {
+            return false;
+        }
+        if (!resp.hasConfig()) {
+            log.warn("{} returned GetForwardingPipelineConfigResponse " +
+                             "with 'config' field unset",
+                     deviceId);
+            return false;
+        }
+        if (resp.getConfig().getP4DeviceConfig().isEmpty()
+                && !expectedConfig.getP4DeviceConfig().isEmpty()) {
+            // Don't bother with a warn or error since we don't really allow
+            // updating the pipeline to a different one. So the P4Info should be
+            // enough for us.
+            log.debug("{} returned GetForwardingPipelineConfigResponse " +
+                              "with empty 'p4_device_config' field, " +
+                              "equality will be based only on P4Info",
+                      deviceId);
+            return resp.getConfig().getP4Info().equals(
+                    expectedConfig.getP4Info());
+        } else {
+            return resp.getConfig().equals(expectedConfig);
+        }
     }
 
     private boolean doSetPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData) {
@@ -364,25 +455,13 @@
 
         checkNotNull(deviceData, "deviceData cannot be null");
 
-        P4Info p4Info = PipeconfHelper.getP4Info(pipeconf);
-        if (p4Info == null) {
-            // Problem logged by PipeconfHelper.
+        ForwardingPipelineConfig pipelineConfig = getPipelineConfig(pipeconf, deviceData);
+
+        if (pipelineConfig == null) {
+            // Error logged in getPipelineConfig()
             return false;
         }
 
-        P4Config.P4DeviceConfig p4DeviceConfigMsg = P4Config.P4DeviceConfig
-                .newBuilder()
-                .setExtras(P4Config.P4DeviceConfig.Extras.getDefaultInstance())
-                .setReassign(true)
-                .setDeviceData(ByteString.copyFrom(deviceData))
-                .build();
-
-        ForwardingPipelineConfig pipelineConfig = ForwardingPipelineConfig
-                .newBuilder()
-                .setP4Info(p4Info)
-                .setP4DeviceConfig(p4DeviceConfigMsg.toByteString())
-                .build();
-
         SetForwardingPipelineConfigRequest request = SetForwardingPipelineConfigRequest
                 .newBuilder()
                 .setDeviceId(p4DeviceId)
@@ -392,9 +471,11 @@
                 .build();
 
         try {
+            //noinspection ResultOfMethodCallIgnored
             this.blockingStub.setForwardingPipelineConfig(request);
             return true;
         } catch (StatusRuntimeException ex) {
+            checkGrpcException(ex);
             log.warn("Unable to set pipeline config on {}: {}", deviceId, ex.getMessage());
             return false;
         }
@@ -462,6 +543,7 @@
         try {
             responses = blockingStub.read(requestMsg);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to dump table {} from {}: {}", piTableId, deviceId, e.getMessage());
             return Collections.emptyList();
         }
@@ -490,7 +572,7 @@
                     .newBuilder().setPacket(packetOut).build();
 
             //Send the request
-            streamRequestObserver.onNext(packetOutRequest);
+            streamChannelManager.send(packetOutRequest);
 
         } catch (P4InfoBrowser.NotFoundException e) {
             log.error("Cant find expected metadata in p4Info file. {}", e.getMessage());
@@ -534,12 +616,14 @@
         if (!msg.hasElectionId() || !msg.hasStatus()) {
             return;
         }
-        final boolean isMaster = msg.getStatus().getCode() == Status.OK.getCode().value();
-        log.info("Received arbitration update from {}: isMaster={}, electionId={}",
-                 deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
+        final boolean isMaster =
+                msg.getStatus().getCode() == Status.OK.getCode().value();
+        log.debug("Received arbitration update from {}: isMaster={}, electionId={}",
+                  deviceId, isMaster, uint128ToBigInteger(msg.getElectionId()));
         controller.postEvent(new P4RuntimeEvent(
                 P4RuntimeEvent.Type.ARBITRATION_RESPONSE,
                 new ArbitrationResponse(deviceId, isMaster)));
+        isClientMaster.set(isMaster);
     }
 
     private Collection<PiCounterCellData> doReadAllCounterCells(
@@ -572,6 +656,7 @@
         try {
             responses = () -> blockingStub.read(request);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to read counter cells from {}: {}", deviceId, e.getMessage());
             return Collections.emptyList();
         }
@@ -654,6 +739,7 @@
         try {
             groupResponses = blockingStub.read(groupRequestMsg);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to dump action profile {} from {}: {}", piActionProfileId, deviceId, e.getMessage());
             return Collections.emptySet();
         }
@@ -702,6 +788,7 @@
         try {
             memberResponses = blockingStub.read(memberRequestMsg);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to read members of action profile {} from {}: {}",
                      piActionProfileId, deviceId, e.getMessage());
             return Collections.emptyList();
@@ -794,6 +881,7 @@
         try {
             responses = () -> blockingStub.read(request);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to read meter cells: {}", e.getMessage());
             log.debug("exception", e);
             return Collections.emptyList();
@@ -866,6 +954,7 @@
         try {
             responses = blockingStub.read(req);
         } catch (StatusRuntimeException e) {
+            checkGrpcException(e);
             log.warn("Unable to read multicast group entries from {}: {}", deviceId, e.getMessage());
             return Collections.emptyList();
         }
@@ -894,6 +983,8 @@
                                                WriteOperationType opType,
                                                String entryType) {
         try {
+
+            //noinspection ResultOfMethodCallIgnored
             blockingStub.write(writeRequest(updates));
             return true;
         } catch (StatusRuntimeException e) {
@@ -911,18 +1002,11 @@
     }
 
     private Void doShutdown() {
-        log.info("Shutting down client for {}...", deviceId);
-        if (streamRequestObserver != null) {
-            try {
-                streamRequestObserver.onCompleted();
-            } catch (IllegalStateException e) {
-                // Thrown if stream channel is already completed. Can ignore.
-                log.debug("Ignored expection: {}", e);
-            }
-            cancellableContext.cancel(new InterruptedException(
-                    "Requested client shutdown"));
-        }
-        this.executorService.shutdown();
+        log.debug("Shutting down client for {}...", deviceId);
+        streamChannelManager.complete();
+        cancellableContext.cancel(new InterruptedException(
+                "Requested client shutdown"));
+        this.executorService.shutdownNow();
         try {
             executorService.awaitTermination(5, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
@@ -1080,13 +1164,111 @@
     }
 
     /**
+     * A manager for the P4Runtime stream channel that opportunistically creates
+     * new stream RCP stubs (e.g. when one fails because of errors) and posts
+     * channel events via the P4Runtime controller.
+     */
+    private final class StreamChannelManager {
+
+        private final ManagedChannel channel;
+        private final AtomicBoolean open;
+        private final StreamObserver<StreamMessageResponse> responseObserver;
+        private ClientCallStreamObserver<StreamMessageRequest> requestObserver;
+
+        private StreamChannelManager(ManagedChannel channel) {
+            this.channel = channel;
+            this.responseObserver = new InternalStreamResponseObserver(this);
+            this.open = new AtomicBoolean(false);
+        }
+
+        private void initIfRequired() {
+            if (requestObserver == null) {
+                log.debug("Creating new stream channel for {}...", deviceId);
+                requestObserver =
+                        (ClientCallStreamObserver<StreamMessageRequest>)
+                                P4RuntimeGrpc.newStub(channel)
+                                        .streamChannel(responseObserver);
+                open.set(false);
+            }
+        }
+
+        public boolean send(StreamMessageRequest value) {
+            synchronized (this) {
+                initIfRequired();
+                try {
+                    requestObserver.onNext(value);
+                    // FIXME
+                    // signalOpen();
+                    return true;
+                } catch (Throwable ex) {
+                    if (ex instanceof StatusRuntimeException) {
+                        log.warn("Unable to send {} to {}: {}",
+                                 value.getUpdateCase().toString(), deviceId, ex.getMessage());
+                    } else {
+                        log.warn(format(
+                                "Exception while sending %s to %s",
+                                value.getUpdateCase().toString(), deviceId), ex);
+                    }
+                    complete();
+                    return false;
+                }
+            }
+        }
+
+        public void complete() {
+            synchronized (this) {
+                signalClosed();
+                if (requestObserver != null) {
+                    requestObserver.onCompleted();
+                    requestObserver.cancel("Terminated", null);
+                    requestObserver = null;
+                }
+            }
+        }
+
+        void signalOpen() {
+            synchronized (this) {
+                final boolean wasOpen = open.getAndSet(true);
+                if (!wasOpen) {
+                    controller.postEvent(new P4RuntimeEvent(
+                            P4RuntimeEvent.Type.CHANNEL_EVENT,
+                            new ChannelEvent(deviceId, ChannelEvent.Type.OPEN)));
+                }
+            }
+        }
+
+        void signalClosed() {
+            synchronized (this) {
+                final boolean wasOpen = open.getAndSet(false);
+                if (wasOpen) {
+                    controller.postEvent(new P4RuntimeEvent(
+                            P4RuntimeEvent.Type.CHANNEL_EVENT,
+                            new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
+                }
+            }
+        }
+
+        public boolean isOpen() {
+            return open.get();
+        }
+    }
+
+    /**
      * Handles messages received from the device on the stream channel.
      */
-    private class StreamChannelResponseObserver
+    private final class InternalStreamResponseObserver
             implements StreamObserver<StreamMessageResponse> {
 
+        private final StreamChannelManager streamChannelManager;
+
+        private InternalStreamResponseObserver(
+                StreamChannelManager streamChannelManager) {
+            this.streamChannelManager = streamChannelManager;
+        }
+
         @Override
         public void onNext(StreamMessageResponse message) {
+            streamChannelManager.signalOpen();
             executorService.submit(() -> doNext(message));
         }
 
@@ -1113,19 +1295,26 @@
 
         @Override
         public void onError(Throwable throwable) {
-            log.warn("Error on stream channel for {}: {}",
-                     deviceId, Status.fromThrowable(throwable));
-            controller.postEvent(new P4RuntimeEvent(
-                    P4RuntimeEvent.Type.CHANNEL_EVENT,
-                    new ChannelEvent(deviceId, ChannelEvent.Type.ERROR)));
+            if (throwable instanceof StatusRuntimeException) {
+                StatusRuntimeException sre = (StatusRuntimeException) throwable;
+                if (sre.getStatus().getCause() instanceof ConnectException) {
+                    log.warn("Device {} is unreachable ({})",
+                             deviceId, sre.getCause().getMessage());
+                } else {
+                    log.warn("Received error on stream channel for {}: {}",
+                             deviceId, throwable.getMessage());
+                }
+            } else {
+                log.warn(format("Received exception on stream channel for %s",
+                                deviceId), throwable);
+            }
+            streamChannelManager.complete();
         }
 
         @Override
         public void onCompleted() {
             log.warn("Stream channel for {} has completed", deviceId);
-            controller.postEvent(new P4RuntimeEvent(
-                    P4RuntimeEvent.Type.CHANNEL_EVENT,
-                    new ChannelEvent(deviceId, ChannelEvent.Type.CLOSED)));
+            streamChannelManager.complete();
         }
     }
 }
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 1e7326d..4ca08f1 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -33,6 +33,7 @@
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceAgentEvent;
 import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.provider.ProviderId;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
@@ -42,10 +43,8 @@
 
 import java.io.IOException;
 import java.math.BigInteger;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Supplier;
 
@@ -68,7 +67,8 @@
     private final Map<ClientKey, P4RuntimeClient> clients = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
 
-    private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
+    private final ConcurrentMap<DeviceId, ConcurrentMap<ProviderId, DeviceAgentListener>>
+            deviceAgentListeners = Maps.newConcurrentMap();
     private final Striped<Lock> stripedLocks = Striped.lock(30);
 
     private DistributedElectionIdGenerator electionIdGenerator;
@@ -120,13 +120,16 @@
         if (clientKeys.containsKey(deviceId)) {
             final ClientKey existingKey = clientKeys.get(deviceId);
             if (clientKey.equals(existingKey)) {
-                log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
+                log.debug("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
                          deviceId, serverAddr, serverPort, p4DeviceId);
                 return true;
             } else {
-                throw new IllegalStateException(
-                        "A client for the same device ID but different " +
-                                "server endpoints already exists");
+                log.info("Requested client for {} with new " +
+                                 "endpoint, removing old client (server={}:{}, " +
+                                 "p4DeviceId={})...",
+                         deviceId, existingKey.serverAddr(),
+                         existingKey.serverPort(), existingKey.p4DeviceId());
+                doRemoveClient(deviceId);
             }
         }
 
@@ -218,19 +221,20 @@
     }
 
     @Override
-    public void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+    public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId, DeviceAgentListener listener) {
         checkNotNull(deviceId, "deviceId cannot be null");
+        checkNotNull(deviceId, "providerId cannot be null");
         checkNotNull(listener, "listener cannot be null");
-        deviceAgentListeners.putIfAbsent(deviceId, new CopyOnWriteArrayList<>());
-        deviceAgentListeners.get(deviceId).add(listener);
+        deviceAgentListeners.putIfAbsent(deviceId, Maps.newConcurrentMap());
+        deviceAgentListeners.get(deviceId).put(providerId, listener);
     }
 
     @Override
-    public void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+    public void removeDeviceAgentListener(DeviceId deviceId, ProviderId providerId) {
         checkNotNull(deviceId, "deviceId cannot be null");
-        checkNotNull(listener, "listener cannot be null");
+        checkNotNull(providerId, "listener cannot be null");
         deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
-            listeners.remove(listener);
+            listeners.remove(providerId);
             return listeners;
         });
     }
@@ -298,7 +302,7 @@
 
     private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
         if (deviceAgentListeners.containsKey(deviceId)) {
-            deviceAgentListeners.get(deviceId).forEach(l -> l.event(event));
+            deviceAgentListeners.get(deviceId).values().forEach(l -> l.event(event));
         }
     }
 }
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 21c6ae7..42faa0c 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -16,9 +16,6 @@
 
 package org.onosproject.provider.general.device.impl;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.annotations.Beta;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
@@ -34,8 +31,6 @@
 import org.onlab.util.ItemNotFoundException;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
 import org.onosproject.core.CoreService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
@@ -77,6 +72,9 @@
 import org.onosproject.net.pi.model.PiPipeconfId;
 import org.onosproject.net.pi.service.PiPipeconfConfig;
 import org.onosproject.net.pi.service.PiPipeconfService;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
@@ -84,13 +82,13 @@
 import org.slf4j.Logger;
 
 import java.security.SecureRandom;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.StringJoiner;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -121,10 +119,19 @@
 public class GeneralDeviceProvider extends AbstractProvider
         implements DeviceProvider {
 
-    private static final String DRIVER = "driver";
-
     private final Logger log = getLogger(getClass());
 
+    private static final String APP_NAME = "org.onosproject.gdp";
+    private static final String URI_SCHEME = "device";
+    private static final String CFG_SCHEME = "generalprovider";
+    private static final String DEVICE_PROVIDER_PACKAGE =
+            "org.onosproject.general.provider.device";
+    private static final int CORE_POOL_SIZE = 10;
+    private static final String UNKNOWN = "unknown";
+    private static final String DRIVER = "driver";
+    private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS =
+            ImmutableSet.of("p4runtime");
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private DeviceProviderRegistry providerRegistry;
 
@@ -147,13 +154,10 @@
     private MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private PiPipeconfService piPipeconfService;
+    private PiPipeconfService pipeconfService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterService clusterService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private LeadershipService leadershipService;
+    private PiPipeconfWatchdogService pipeconfWatchdogService;
 
     private static final String STATS_POLL_FREQUENCY = "deviceStatsPollFrequency";
     private static final int DEFAULT_STATS_POLL_FREQUENCY = 10;
@@ -177,79 +181,48 @@
                     "(e.g. checking device reachability); default is 10 seconds")
     private int opTimeoutShort = DEFAULT_OP_TIMEOUT_SHORT;
 
-    private static final String OP_TIMEOUT_LONG = "deviceOperationTimeoutLong";
-    private static final int DEFAULT_OP_TIMEOUT_LONG = 60;
-    @Property(name = OP_TIMEOUT_LONG, intValue = DEFAULT_OP_TIMEOUT_LONG,
-            label = "Configure timeout in seconds for device operations " +
-                    "that are supposed to take a relatively long time " +
-                    "(e.g. pushing a large pipeline configuration with slow " +
-                    "network); default is 60 seconds")
-    private int opTimeoutLong = DEFAULT_OP_TIMEOUT_LONG;
-
-    private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
-    private static final String URI_SCHEME = "device";
-    private static final String CFG_SCHEME = "generalprovider";
-    private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
-    private static final int CORE_POOL_SIZE = 10;
-    private static final String UNKNOWN = "unknown";
-
-    //FIXME this will be removed when the configuration is synced at the source.
-    private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
-
     //FIXME to be removed when netcfg will issue device events in a bundle or
     //ensures all configuration needed is present
-    private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
-    private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
-    private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
+    private final Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
+    private final Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
+    private final Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
 
-    private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
     private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
-    private final Striped<Lock> deviceLocks = Striped.lock(30);
-
-    private ExecutorService connectionExecutor
-            = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
-            "onos/generaldeviceprovider-device-connect", "%d", log));
-    private ScheduledExecutorService statsExecutor
-            = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
-            "onos/generaldeviceprovider-stats-poll", "%d", log));
-    private ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
-    private ScheduledExecutorService probeExecutor
-            = newSingleThreadScheduledExecutor(groupedThreads(
-            "onos/generaldeviceprovider-probe-", "%d", log));
-    private ScheduledFuture<?> probeTask = null;
-
-    private DeviceProviderService providerService;
-    private InternalDeviceListener deviceListener = new InternalDeviceListener();
-
-    private final ConfigFactory factory =
-            new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
-                    SubjectFactories.DEVICE_SUBJECT_FACTORY,
-                    GeneralProviderDeviceConfig.class, CFG_SCHEME) {
-                @Override
-                public GeneralProviderDeviceConfig createConfig() {
-                    return new GeneralProviderDeviceConfig();
-                }
-            };
-
+    private final ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
+    private final Map<DeviceId, DeviceHandshaker> handshakersWithListeners = Maps.newConcurrentMap();
+    private final InternalDeviceListener deviceListener = new InternalDeviceListener();
+    private final InternalPipeconfWatchdogListener pipeconfWatchdogListener = new InternalPipeconfWatchdogListener();
     private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
     private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
+    private final ConfigFactory factory = new InternalConfigFactory();
+    private final Striped<Lock> deviceLocks = Striped.lock(30);
 
+    private ExecutorService connectionExecutor;
+    private ScheduledExecutorService statsExecutor;
+    private ScheduledExecutorService probeExecutor;
+    private ScheduledFuture<?> probeTask;
+    private DeviceProviderService providerService;
+
+    public GeneralDeviceProvider() {
+        super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
+    }
 
     @Activate
     public void activate(ComponentContext context) {
+        connectionExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
+                "onos/gdp-connect", "%d", log));
+        statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
+                "onos/gdp-stats", "%d", log));
+        probeExecutor = newSingleThreadScheduledExecutor(groupedThreads(
+                "onos/gdp-probe", "%d", log));
         providerService = providerRegistry.register(this);
         componentConfigService.registerProperties(getClass());
         coreService.registerApplication(APP_NAME);
         cfgService.registerConfigFactory(factory);
         cfgService.addListener(cfgListener);
         deviceService.addListener(deviceListener);
-        handshakers.clear();
-        //This will fail if ONOS has CFG and drivers which depend on this provider
-        // are activated, failing due to not finding the driver.
-        cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
-                .forEach(this::triggerConnect);
-        //Initiating a periodic check to see if any device is available again and reconnect it.
-        rescheduleProbeTask();
+        pipeconfWatchdogService.addListener(pipeconfWatchdogListener);
+        rescheduleProbeTask(false);
         modified(context);
         log.info("Started");
     }
@@ -275,51 +248,73 @@
                 properties, OP_TIMEOUT_SHORT, DEFAULT_OP_TIMEOUT_SHORT);
         log.info("Configured. {} is configured to {} seconds",
                  OP_TIMEOUT_SHORT, opTimeoutShort);
-        opTimeoutLong = Tools.getIntegerProperty(
-                properties, OP_TIMEOUT_LONG, DEFAULT_OP_TIMEOUT_LONG);
-        log.info("Configured. {} is configured to {} seconds",
-                 OP_TIMEOUT_LONG, opTimeoutLong);
 
         if (oldStatsPollFrequency != statsPollFrequency) {
             rescheduleStatsPollingTasks();
         }
 
         if (oldProbeFrequency != probeFrequency) {
-            rescheduleProbeTask();
+            rescheduleProbeTask(true);
         }
     }
 
-    private synchronized void rescheduleProbeTask() {
-        if (probeTask != null) {
-            probeTask.cancel(false);
+    private void rescheduleProbeTask(boolean deelay) {
+        synchronized (this) {
+            if (probeTask != null) {
+                probeTask.cancel(false);
+            }
+            probeTask = probeExecutor.scheduleAtFixedRate(
+                    this::triggerProbeAllDevices,
+                    deelay ? probeFrequency : 0,
+                    probeFrequency,
+                    TimeUnit.SECONDS);
         }
-        probeTask = probeExecutor.scheduleAtFixedRate(
-                this::triggerProbeAllDevices, probeFrequency,
-                probeFrequency, TimeUnit.SECONDS);
     }
 
     @Deactivate
     public void deactivate() {
-        statsExecutor.shutdown();
-        probeExecutor.shutdown();
+        // Shutdown stats polling tasks.
+        statsPollingTasks.keySet().forEach(this::cancelStatsPolling);
+        statsPollingTasks.clear();
+        statsExecutor.shutdownNow();
+        try {
+            statsExecutor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("statsExecutor not terminated properly");
+        }
+        statsExecutor = null;
+        // Shutdown probe executor.
+        probeTask.cancel(true);
+        probeTask = null;
+        probeExecutor.shutdownNow();
+        try {
+            probeExecutor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("probeExecutor not terminated properly");
+        }
+        probeExecutor = null;
+        // Shutdown connection executor.
+        connectionExecutor.shutdownNow();
+        try {
+            connectionExecutor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("connectionExecutor not terminated properly");
+        }
+        connectionExecutor = null;
+        // Remove all device agent listeners
+        handshakersWithListeners.values().forEach(h -> h.removeDeviceAgentListener(id()));
+        handshakersWithListeners.clear();
+        // Other cleanup.
         componentConfigService.unregisterProperties(getClass(), false);
         cfgService.removeListener(cfgListener);
-        //Not Removing the device so they can still be used from other driver providers
-        //cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
-        //          .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
-        connectionExecutor.shutdown();
         deviceService.removeListener(deviceListener);
+        pipeconfWatchdogService.removeListener(pipeconfWatchdogListener);
         providerRegistry.unregister(this);
-        handshakers.clear();
         providerService = null;
         cfgService.unregisterConfigFactory(factory);
         log.info("Stopped");
     }
 
-    public GeneralDeviceProvider() {
-        super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
-    }
-
 
     @Override
     public void triggerProbe(DeviceId deviceId) {
@@ -329,13 +324,14 @@
 
     @Override
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
-        log.info("Received role {} for device {}", newRole, deviceId);
+        log.info("Notifying role {} to device {}", newRole, deviceId);
         requestedRoles.put(deviceId, newRole);
         connectionExecutor.execute(() -> doRoleChanged(deviceId, newRole));
     }
 
     private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
-        final DeviceHandshaker handshaker = getHandshaker(deviceId);
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
         if (handshaker == null) {
             log.error("Null handshaker. Unable to notify new role {} to {}",
                       newRole, deviceId);
@@ -347,7 +343,8 @@
     @Override
     public boolean isReachable(DeviceId deviceId) {
         log.debug("Testing reachability for device {}", deviceId);
-        final DeviceHandshaker handshaker = getHandshaker(deviceId);
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
         if (handshaker == null) {
             return false;
         }
@@ -356,6 +353,16 @@
                 deviceId, false, opTimeoutShort);
     }
 
+    private boolean isConnected(DeviceId deviceId) {
+        log.debug("Testing connection to device {}", deviceId);
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
+        if (handshaker == null) {
+            return false;
+        }
+        return handshaker.isConnected();
+    }
+
     @Override
     public void changePortState(DeviceId deviceId, PortNumber portNumber,
                                 boolean enable) {
@@ -387,15 +394,6 @@
                 () -> doDisconnectDevice(deviceId), deviceId));
     }
 
-    private DeviceHandshaker getHandshaker(DeviceId deviceId) {
-        return handshakers.computeIfAbsent(deviceId, id -> {
-            Driver driver = getDriver(deviceId);
-            return driver == null ? null : getBehaviour(
-                    driver, DeviceHandshaker.class,
-                    new DefaultDriverData(driver, deviceId));
-        });
-    }
-
     private Driver getDriver(DeviceId deviceId) {
         try {
             // DriverManager checks first using basic device config.
@@ -406,224 +404,209 @@
         }
     }
 
-    private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
-                                                 DriverData data) {
-        // Allows obtaining behavior implementations before the device is pushed
-        // to the core.
-        if (driver != null && driver.hasBehaviour(type)) {
-            DefaultDriverHandler handler = new DefaultDriverHandler(data);
-            return driver.createBehaviour(handler, type);
-        } else {
+    private <T extends Behaviour> T getBehaviour(DeviceId deviceId, Class<T> type) {
+        // Get handshaker.
+
+        Driver driver = getDriver(deviceId);
+        if (driver == null) {
             return null;
         }
+        if (!driver.hasBehaviour(type)) {
+            return null;
+        }
+        final DriverData data = new DefaultDriverData(driver, deviceId);
+        // Storing deviceKeyId and all other config values as data in the driver
+        // with protocol_<info> name as the key. e.g protocol_ip.
+        final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
+                deviceId, GeneralProviderDeviceConfig.class);
+        if (providerConfig != null) {
+            providerConfig.protocolsInfo().forEach((protocol, info) -> {
+                info.configValues().forEach(
+                        (k, v) -> data.set(protocol + "_" + k, v));
+                data.set(protocol + "_key", info.deviceKeyId());
+            });
+        }
+        final DefaultDriverHandler handler = new DefaultDriverHandler(data);
+        return driver.createBehaviour(handler, type);
     }
 
     private void doConnectDevice(DeviceId deviceId) {
-        // Some operations can be performed by one node only.
-        final boolean isLocalLeader = leadershipService.runForLeadership(
-                GeneralProviderDeviceConfig.class.getName() + deviceId)
-                .leader().nodeId().equals(clusterService.getLocalNode().id());
-
-        if (deviceService.getDevice(deviceId) != null
-                && deviceService.isAvailable(deviceId)) {
-            log.info("Device {} is already connected to ONOS and is available",
-                     deviceId);
-            return;
-        }
+        log.debug("Initiating connection to device {}...", deviceId);
         // Retrieve config
-        final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
-                deviceId, GeneralProviderDeviceConfig.class);
-        final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
-                deviceId, BasicDeviceConfig.class);
-        if (providerConfig == null || basicDeviceConfig == null) {
-            log.error("Configuration missing, cannot connect to {}. " +
-                              "basicDeviceConfig={}, generalProvider={}",
-                      deviceId, basicDeviceConfig, providerConfig);
+        if (configIsMissing(deviceId)) {
             return;
         }
-        log.info("Initiating connection to device {} with driver {} ... asMaster={}",
-                 deviceId, basicDeviceConfig.driver(), isLocalLeader);
-        // Get handshaker, driver and driver data.
-        final DeviceHandshaker handshaker = getHandshaker(deviceId);
+        // Bind pipeconf (if any and if device is capable).
+        if (!bindPipeconfIfRequired(deviceId)) {
+            // We already logged the error.
+            return;
+        }
+        // Get handshaker.
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
         if (handshaker == null) {
-            log.error("Missing DeviceHandshaker behavior for {}, aborting connection",
+            log.error("Missing handshaker behavior for {}, aborting connection",
                       deviceId);
             return;
         }
-        final Driver driver = handshaker.handler().driver();
-        // Enhance driver data with info in GDP config.
-        augmentConfigData(providerConfig, handshaker.data());
-        final DriverData driverData = handshaker.data();
+        // Add device agent listener.
+        handshaker.addDeviceAgentListener(id(), deviceAgentListener);
+        handshakersWithListeners.put(deviceId, handshaker);
         // Start connection via handshaker.
         final Boolean connectSuccess = getFutureWithDeadline(
                 handshaker.connect(), "initiating connection",
                 deviceId, null, opTimeoutShort);
-        if (connectSuccess == null) {
-            // Error logged by getFutureWithDeadline().
-            return;
-        } else if (!connectSuccess) {
+        if (!connectSuccess) {
             log.warn("Unable to connect to {}", deviceId);
+        }
+    }
+
+    private void triggerAdvertiseDevice(DeviceId deviceId) {
+        connectionExecutor.execute(withDeviceLock(
+                () -> doAdvertiseDevice(deviceId), deviceId));
+    }
+
+    private void doAdvertiseDevice(DeviceId deviceId) {
+        // Retrieve config
+        if (configIsMissing(deviceId)) {
             return;
         }
-        // Handle pipeconf (if device is capable)
-        if (!handlePipeconf(deviceId, driver, driverData, isLocalLeader)) {
-            // We already logged the error.
-            getFutureWithDeadline(
-                    handshaker.disconnect(), "performing disconnection",
-                    deviceId, null, opTimeoutShort);
-            return;
-        }
-        // Add device agent listener.
-        handshaker.addDeviceAgentListener(deviceAgentListener);
-        // All good. Notify core (if master).
-        if (isLocalLeader) {
-            advertiseDevice(deviceId, driver, providerConfig, driverData);
-        }
-    }
-
-
-    private void advertiseDevice(DeviceId deviceId, Driver driver,
-                                 GeneralProviderDeviceConfig providerConfig,
-                                 DriverData driverData) {
-        // Obtain device and port description and advertise device to core.
-        DeviceDescription description = null;
-        final List<PortDescription> ports;
-
-        final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
-                driver, DeviceDescriptionDiscovery.class, driverData);
-
-        if (deviceDiscovery != null) {
-            description = deviceDiscovery.discoverDeviceDetails();
-            ports = deviceDiscovery.discoverPortDetails();
-        } else {
-            log.warn("Missing DeviceDescriptionDiscovery behavior for {}, " +
-                             "no update for description or ports.", deviceId);
-            ports = new ArrayList<>();
-        }
-
-        if (description == null) {
-            // Generate one here.
-            // FIXME: a behavior impl should not return a null description
-            // (e.g. as GnmiDeviceDescriptionDiscovery). This case should apply
-            // only if a the behavior is not available.
-            description = new DefaultDeviceDescription(
-                    deviceId.uri(), Device.Type.SWITCH,
-                    driver.manufacturer(), driver.hwVersion(),
-                    driver.swVersion(), UNKNOWN,
-                    new ChassisId(), true,
-                    DefaultAnnotations.builder()
-                            .set(AnnotationKeys.PROTOCOL,
-                                 providerConfig.protocolsInfo().keySet().toString())
-                            .build());
-        }
-
-        providerService.deviceConnected(deviceId, description);
-        providerService.updatePorts(deviceId, ports);
-    }
-
-    /**
-     * Handles the case of a device that is pipeline programmable. Returns true
-     * if the operation wa successful and the device can be registered to the
-     * core, false otherwise.
-     */
-    private boolean handlePipeconf(DeviceId deviceId, Driver driver,
-                                   DriverData driverData, boolean asMaster) {
-        final PiPipelineProgrammable pipelineProg = getBehaviour(
-                driver, PiPipelineProgrammable.class, driverData);
-        if (pipelineProg == null) {
-            // Device is not pipeline programmable.
-            return true;
-        }
-
-        final PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
-        if (pipeconf == null) {
-            return false;
-        }
-        final PiPipeconfId pipeconfId = pipeconf.id();
-
-        // To be removed in change #19606
-        // final String mergedDriverName = piPipeconfService.mergeDriver(
-        //         deviceId, pipeconfId);
-        // if (mergedDriverName == null) {
-        //     log.error("Unable to get merged driver for {} and {}, aborting device discovery",
-        //               deviceId, pipeconfId);
-        //     return false;
-        // }
-
-        if (!asMaster) {
-            // From now one only the master.
-            return true;
-        }
-
-        // if (!setDriverViaCfg(deviceId, mergedDriverName)) {
-        //     return false;
-        // }
-
-        // FIXME: we just introduced a race condition as it might happen that a
-        // node does not receive the new cfg (with the merged driver) before the
-        // device is advertised to the core. Perhaps we should be waiting for a
-        // NetworkConfig event signaling that the driver has been updated on all
-        // nodes? The effect is mitigated by deploying the pipeconf (slow
-        // operation), after calling setDriverViaCfg().
-
-        piPipeconfService.bindToDevice(pipeconfId, deviceId);
-
-        final Boolean deploySuccess = getFutureWithDeadline(
-                pipelineProg.deployPipeconf(pipeconf),
-                "deploying pipeconf", deviceId, null,
-                opTimeoutLong);
-        if (deploySuccess == null) {
-            // Error logged by getFutureWithDeadline().
-            return false;
-        } else if (!deploySuccess) {
-            log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
-                      pipeconfId, deviceId);
-            return false;
-        }
-
-        return true;
-    }
-
-    private boolean setDriverViaCfg(DeviceId deviceId, String driverName) {
-        BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
-        if (cfg == null) {
-            log.error("Unable to get basic device config for {}, aborting device discovery",
-                      deviceId);
-            return false;
-        }
-        ObjectNode newCfg = (ObjectNode) cfg.node();
-        newCfg = newCfg.put(DRIVER, driverName);
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
-        cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
-        return true;
-    }
-
-    private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
-        PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
-        if (pipeconfId == null || pipeconfId.id().isEmpty()) {
-            // No pipeconf has been provided in the cfg.
-            // Check if device driver provides a default one.
-            if (pipelineProg.getDefaultPipeconf().isPresent()) {
-                final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
-                log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
-                pipeconfId = defaultPipeconf.id();
-            } else {
-                log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it", deviceId);
-                return null;
+        // Obtain device and port description.
+        final boolean isPipelineReady = isPipelineReady(deviceId);
+        final DeviceDescription description = getDeviceDescription(
+                deviceId, isPipelineReady);
+        final List<PortDescription> ports = getPortDetails(deviceId);
+        // Advertise to core.
+        if (deviceService.getDevice(deviceId) == null ||
+                (description.isDefaultAvailable() &&
+                        !deviceService.isAvailable(deviceId))) {
+            if (!isPipelineReady) {
+                log.info("Advertising device to core with available={} as " +
+                                 "device pipeline is not ready yet",
+                         description.isDefaultAvailable());
             }
+            providerService.deviceConnected(deviceId, description);
         }
-        // Check if registered
-        if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
-            log.warn("Pipeconf {} is not registered", pipeconfId);
+        providerService.updatePorts(deviceId, ports);
+        // If pipeline is not ready, encourage watchdog to perform probe ASAP.
+        if (!isPipelineReady) {
+            pipeconfWatchdogService.triggerProbe(deviceId);
+        }
+    }
+
+    private DeviceDescription getDeviceDescription(
+            DeviceId deviceId, boolean defaultAvailable) {
+        // Get one from driver or forge.
+        final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
+                deviceId, DeviceDescriptionDiscovery.class);
+        if (deviceDiscovery != null) {
+            // Enforce defaultAvailable flag over the one obtained from driver.
+            final DeviceDescription d = deviceDiscovery.discoverDeviceDetails();
+            return new DefaultDeviceDescription(d, defaultAvailable, d.annotations());
+        } else {
+            return forgeDeviceDescription(deviceId, defaultAvailable);
+        }
+    }
+
+    private List<PortDescription> getPortDetails(DeviceId deviceId) {
+        final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
+                deviceId, DeviceDescriptionDiscovery.class);
+        if (deviceDiscovery != null) {
+            return deviceDiscovery.discoverPortDetails();
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    private DeviceDescription forgeDeviceDescription(
+            DeviceId deviceId, boolean defaultAvailable) {
+        // Uses handshaker and provider config to get driver data.
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
+        final Driver driver = handshaker != null
+                ? handshaker.handler().driver() : null;
+        final GeneralProviderDeviceConfig cfg = cfgService.getConfig(
+                deviceId, GeneralProviderDeviceConfig.class);
+        final DefaultAnnotations.Builder annBuilder = DefaultAnnotations.builder();
+        // If device is pipeline programmable, let this provider decide when the
+        // device can be marked online.
+        annBuilder.set(AnnotationKeys.PROVIDER_MARK_ONLINE,
+                       String.valueOf(isPipelineProgrammable(deviceId)));
+        if (cfg != null) {
+            StringJoiner protoStringBuilder = new StringJoiner(", ");
+            cfg.protocolsInfo().keySet().forEach(protoStringBuilder::add);
+            annBuilder.set(AnnotationKeys.PROTOCOL, protoStringBuilder.toString());
+        }
+        return new DefaultDeviceDescription(
+                deviceId.uri(),
+                Device.Type.SWITCH,
+                driver != null ? driver.manufacturer() : UNKNOWN,
+                driver != null ? driver.hwVersion() : UNKNOWN,
+                driver != null ? driver.swVersion() : UNKNOWN,
+                UNKNOWN,
+                new ChassisId(),
+                defaultAvailable,
+                annBuilder.build());
+    }
+
+    private void triggerMarkAvailable(DeviceId deviceId) {
+        connectionExecutor.execute(withDeviceLock(
+                () -> doMarkAvailable(deviceId), deviceId));
+    }
+
+    private void doMarkAvailable(DeviceId deviceId) {
+        if (deviceService.isAvailable(deviceId)) {
+            return;
+        }
+        final DeviceDescription descr = getDeviceDescription(deviceId, true);
+        // It has been observed that devices that were marked offline (e.g.
+        // after device disconnection) might end up with no master. Here we
+        // trigger a new master election (if device has no master).
+        mastershipService.requestRoleForSync(deviceId);
+        providerService.deviceConnected(deviceId, descr);
+    }
+
+    private boolean bindPipeconfIfRequired(DeviceId deviceId) {
+        if (pipeconfService.ofDevice(deviceId).isPresent()
+                || !isPipelineProgrammable(deviceId)) {
+            // Nothing to do, all good.
+            return true;
+        }
+        // Get pipeconf from netcfg or driver (default one).
+        final PiPipelineProgrammable pipelineProg = getBehaviour(
+                deviceId, PiPipelineProgrammable.class);
+        final PiPipeconfId pipeconfId = getPipeconfId(deviceId, pipelineProg);
+        if (pipeconfId == null) {
+            return false;
+        }
+        // Store binding in pipeconf service.
+        pipeconfService.bindToDevice(pipeconfId, deviceId);
+        return true;
+    }
+
+    private PiPipeconfId getPipeconfId(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
+        // Places to look for a pipeconf ID (in priority order)):
+        // 1) netcfg
+        // 2) device driver (default one)
+        final PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
+        if (pipeconfId != null && !pipeconfId.id().isEmpty()) {
+            return pipeconfId;
+        }
+        if (pipelineProg != null
+                && pipelineProg.getDefaultPipeconf().isPresent()) {
+            final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
+            log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
+            return defaultPipeconf.id();
+        } else {
+            log.warn("Unable to associate a pipeconf to {}", deviceId);
             return null;
         }
-        return piPipeconfService.getPipeconf(pipeconfId).get();
     }
 
     private void doDisconnectDevice(DeviceId deviceId) {
         log.debug("Initiating disconnection from {}...", deviceId);
-        final DeviceHandshaker handshaker = handshakers.remove(deviceId);
+        final DeviceHandshaker handshaker = getBehaviour(
+                deviceId, DeviceHandshaker.class);
         final boolean isAvailable = deviceService.isAvailable(deviceId);
         // Signal disconnection to core (if master).
         if (isAvailable && mastershipService.isLocalMaster(deviceId)) {
@@ -642,7 +625,8 @@
             }
             return;
         }
-        handshaker.removeDeviceAgentListener(deviceAgentListener);
+        handshaker.removeDeviceAgentListener(id());
+        handshakersWithListeners.remove(deviceId);
         final boolean disconnectSuccess = getFutureWithDeadline(
                 handshaker.disconnect(), "performing disconnection",
                 deviceId, false, opTimeoutShort);
@@ -704,6 +688,17 @@
                 () -> doConnectDevice(deviceId), deviceId));
     }
 
+    private boolean isPipelineProgrammable(DeviceId deviceId) {
+        final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
+                deviceId, GeneralProviderDeviceConfig.class);
+        if (providerConfig == null) {
+            return false;
+        }
+        return !Collections.disjoint(
+                ImmutableSet.copyOf(providerConfig.node().fieldNames()),
+                PIPELINE_CONFIGURABLE_PROTOCOLS);
+    }
+
     /**
      * Listener for configuration events.
      */
@@ -790,16 +785,12 @@
         }
     }
 
-    private void augmentConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
-        //Storing deviceKeyId and all other config values
-        // as data in the driver with protocol_<info>
-        // name as the key. e.g protocol_ip
-        providerConfig.protocolsInfo()
-                .forEach((protocol, deviceInfoConfig) -> {
-                    deviceInfoConfig.configValues()
-                            .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
-                    driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
-                });
+    private boolean isPipelineReady(DeviceId deviceId) {
+        final boolean isPipelineProg = isPipelineProgrammable(deviceId);
+        final boolean isPipeconfReady = pipeconfWatchdogService
+                .getStatus(deviceId)
+                .equals(PiPipeconfWatchdogService.PipelineStatus.READY);
+        return !isPipelineProg || isPipeconfReady;
     }
 
     private void cleanUpConfigInfo(DeviceId deviceId) {
@@ -838,8 +829,7 @@
     private void triggerProbeAllDevices() {
         // Async trigger a task for all devices in the cfg.
         cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
-                .forEach(deviceId -> connectionExecutor.execute(withDeviceLock(
-                        () -> doDeviceProbe(deviceId), deviceId)));
+                .forEach(this::triggerDeviceProbe);
     }
 
     private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
@@ -851,53 +841,45 @@
         return config.piPipeconfId();
     }
 
+    private void triggerDeviceProbe(DeviceId deviceId) {
+        connectionExecutor.execute(withDeviceLock(
+                () -> doDeviceProbe(deviceId), deviceId));
+    }
+
     private void doDeviceProbe(DeviceId deviceId) {
-        if (!configIsPresent(deviceId)) {
+        log.debug("Probing device {}...", deviceId);
+        if (configIsMissing(deviceId)) {
             return;
         }
-        final boolean isAvailable = deviceService.getDevice(deviceId) != null
-                && deviceService.isAvailable(deviceId);
-        final boolean isLocalMaster = mastershipService.isLocalMaster(deviceId);
-        if (isAvailable) {
-            if (!isLocalMaster) {
-                return;
+        if (!isConnected(deviceId)) {
+            if (deviceService.isAvailable(deviceId)) {
+                providerService.deviceDisconnected(deviceId);
             }
-            if (!isReachable(deviceId)) {
-                log.info("Disconnecting available but unreachable device {}...",
-                         deviceId);
-                triggerDisconnect(deviceId);
-            }
-        } else {
-            // We do not check for reachability using isReachable()
-            // since the behaviour of this method can vary depending on protocol
-            // nuances. We leave this check to the device handshaker at later
-            // stages of the connection process.
             triggerConnect(deviceId);
         }
     }
 
-    private boolean configIsPresent(DeviceId deviceId) {
+    private boolean configIsMissing(DeviceId deviceId) {
         final boolean present =
                 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
                         && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
         if (!present) {
             log.warn("Configuration for device {} is not complete", deviceId);
         }
-        return present;
-    }
-
-    private void handleChannelClosed(DeviceId deviceId) {
-        log.info("Disconnecting device {}, due to channel closed event",
-                 deviceId);
-        triggerDisconnect(deviceId);
+        return !present;
     }
 
     private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
-        //Notify core about response.
-        if (!requestedRoles.containsKey(deviceId)) {
+        // Notify core about mastership response.
+        final MastershipRole request = requestedRoles.get(deviceId);
+        final boolean isAvailable = deviceService.isAvailable(deviceId);
+        if (request == null || !isAvailable) {
             return;
         }
-        providerService.receivedRoleReply(deviceId, requestedRoles.get(deviceId), response);
+        log.debug("Device {} asserted role {} (requested was {})",
+                  deviceId, response, request);
+        providerService.receivedRoleReply(deviceId, request, response);
+        // FIXME: this should be based on assigned mastership, not what returned by device
         if (response.equals(MastershipRole.MASTER)) {
             startStatsPolling(deviceId, false);
         } else {
@@ -951,16 +933,11 @@
             DeviceId deviceId = event.subject();
             switch (event.type()) {
                 case CHANNEL_OPEN:
-                    // Ignore.
+                    triggerAdvertiseDevice(deviceId);
                     break;
                 case CHANNEL_CLOSED:
-                    handleChannelClosed(deviceId);
-                    break;
                 case CHANNEL_ERROR:
-                    // TODO evaluate other reaction to channel error.
-                    log.warn("Received CHANNEL_ERROR from {}. Is the channel still open?",
-                             deviceId);
-                    handleChannelClosed(deviceId);
+                    triggerDeviceProbe(deviceId);
                     break;
                 case ROLE_MASTER:
                     handleMastershipResponse(deviceId, MastershipRole.MASTER);
@@ -977,4 +954,30 @@
         }
 
     }
+
+    private class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
+        @Override
+        public void event(PiPipeconfWatchdogEvent event) {
+            triggerMarkAvailable(event.subject());
+        }
+
+        @Override
+        public boolean isRelevant(PiPipeconfWatchdogEvent event) {
+            return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_READY);
+        }
+    }
+
+    private class InternalConfigFactory
+            extends ConfigFactory<DeviceId, GeneralProviderDeviceConfig> {
+
+        InternalConfigFactory() {
+            super(SubjectFactories.DEVICE_SUBJECT_FACTORY,
+                  GeneralProviderDeviceConfig.class, CFG_SCHEME);
+        }
+
+        @Override
+        public GeneralProviderDeviceConfig createConfig() {
+            return new GeneralProviderDeviceConfig();
+        }
+    }
 }