Refactor P4Runtime subsystem to implement async connection procedure

This patch is an attempt to solve issues observed when restarting both
switches and ONOS nodes. Most of the issues seemed to depend on a
brittle mastership handling when deploying the pipeline.

With this patch, GDP registers devices to the core with available=false
(i.e. offline) and marks them online only when the P4 pipeline has been
deployed to the device. A new PiPipeconfWatchdogService takes care of
deploying pipelines and producing event when devices are ready.

Moreover, we fix a race condition where pipeconf-related behaviors
were not found. This was caused by GDP enforcing the merged
driver name in the network config, while external entities (e.g.
Mininet) were pushing a JSON blob with the base driver name. This patch
removes the need to rely on such a trick and instead uses
pipeconf-aware logic directly in the driver manager (change #19622).

Finally, we fix issues in P4RuntimeClientImpl that were causing the
stream channel not detecting unreachable devices. The solution is to
follow gRPC APIs and re-instantiate a new channel once the first fails.

Change-Id: I6fbc91859c0fb58a6db3bc197b7081a8fe9f97f7
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java b/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java
index 2373af4..22367ba 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java
@@ -24,21 +24,40 @@
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Behavior to program the pipeline of a device that supports protocol-independence.
+ * Behavior to program the pipeline of a device that supports
+ * protocol-independence.
  */
 @Beta
 public interface PiPipelineProgrammable extends HandlerBehaviour {
     /**
-     * Deploys the given pipeconf to the device.
+     * Writes the given pipeconf to the device, returns a completable future
+     * with true is the operations was successful, false otherwise.
+     * <p>
+     * After the future has been completed, the device is expected to process
+     * data plane packets according to the written pipeconf.
      *
      * @param pipeconf pipeconf
-     * @return true if the operation was successful, false otherwise
+     * @return completable future set to true if the operation was successful,
+     * false otherwise
      */
     // TODO: return an explanation of why things went wrong, and the status of the device.
-    CompletableFuture<Boolean> deployPipeconf(PiPipeconf pipeconf);
+    CompletableFuture<Boolean> setPipeconf(PiPipeconf pipeconf);
 
     /**
-     * Returns the default pipeconf for ths device, to be used when any other pipeconf is not available.
+     * Returns true if the device is configured with the given pipeconf, false
+     * otherwise.
+     * <p>
+     * This method is expected to always return true after successfully calling
+     * {@link #setPipeconf(PiPipeconf)} with the given pipeconf.
+     *
+     * @param pipeconf pipeconf
+     * @return true if the device has the given pipeconf set, false otherwise
+     */
+    boolean isPipeconfSet(PiPipeconf pipeconf);
+
+    /**
+     * Returns the default pipeconf for this device, to be used when any other
+     * pipeconf is not available.
      *
      * @return optional pipeconf
      */
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
index bc870d3..bceaeff 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
@@ -19,6 +19,7 @@
 import com.google.common.annotations.Beta;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.driver.DeviceConnect;
+import org.onosproject.net.provider.ProviderId;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -48,21 +49,32 @@
     void roleChanged(MastershipRole newRole);
 
     /**
-     * Adds a device agent listener.
+     * Returns the last known mastership role agreed by the device for this
+     * node.
      *
-     * @param listener device agent listener
+     * @return mastership role
      */
-    default void addDeviceAgentListener(DeviceAgentListener listener) {
+    MastershipRole getRole();
+
+    /**
+     * Adds a device agent listener for the given provider ID.
+     *
+     * @param providerId provider ID
+     * @param listener   device agent listener
+     */
+    default void addDeviceAgentListener(
+            ProviderId providerId, DeviceAgentListener listener) {
         throw new UnsupportedOperationException(
                 "Device agent listener registration not supported");
     }
 
     /**
-     * Removes a device agent listener.
+     * Removes a device agent listener previously registered for the given
+     * provider ID.
      *
-     * @param listener device agent listener
+     * @param providerId provider ID
      */
-    default void removeDeviceAgentListener(DeviceAgentListener listener) {
+    default void removeDeviceAgentListener(ProviderId providerId) {
         throw new UnsupportedOperationException(
                 "Device agent listener removal not supported");
     }
diff --git a/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java b/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
index 2f53620..f30386d 100644
--- a/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
+++ b/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
@@ -20,27 +20,38 @@
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Abstraction of handler behaviour used to set-up and tear-down
- * connection with a device.
+ * Abstraction of handler behaviour used to set-up and tear-down connections
+ * with a device.
  */
 @Beta
 public interface DeviceConnect extends HandlerBehaviour {
 
     /**
-     * Connects to the device.
-     * It's supposed to initiate the transport sessions, channel and also,
-     * if applicable, store them in the proper protocol specific
-     * controller (e.g. GrpcController).
+     * Connects to the device, for example by opening the transport session that
+     * will be later used to send control messages. Returns true if the
+     * connection was initiated successfully, false otherwise.
+     * <p>
+     * Calling multiple times this method while a connection to the device is
+     * open should result in a no-op.
      *
      * @return CompletableFuture with true if the operation was successful
      */
     CompletableFuture<Boolean> connect();
 
     /**
-     * Disconnects from the device.
-     * It's supposed to destroy the transport sessions and channel and also,
-     * if applicable, remove them in the proper protocol specific
-     * controller (e.g. GrpcController).
+     * Returns true if a connection to the device is open, false otherwise.
+     *
+     * @return true if the connection is open, false otherwise
+     */
+    boolean isConnected();
+
+    /**
+     * Disconnects from the device, for example closing the transport session
+     * previously opened. Returns true if the disconnection procedure was
+     * successful, false otherwise.
+     * <p>
+     * Calling multiple times this method while a connection to the device is
+     * closed should result in a no-op.
      *
      * @return CompletableFuture with true if the operation was successful
      */
diff --git a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogEvent.java b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogEvent.java
new file mode 100644
index 0000000..0d34dd6
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.service;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.AbstractEvent;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Event representing changes in the status of a device pipeline.
+ */
+@Beta
+public class PiPipeconfWatchdogEvent
+        extends AbstractEvent<PiPipeconfWatchdogEvent.Type, DeviceId> {
+
+    /**
+     * Type of event.
+     */
+    public enum Type {
+        PIPELINE_READY,
+        PIPELINE_UNKNOWN
+    }
+
+    /**
+     * Creates a new event for the given device.
+     *
+     * @param type type
+     * @param subject device ID
+     */
+    public PiPipeconfWatchdogEvent(PiPipeconfWatchdogEvent.Type type, DeviceId subject) {
+        super(type, subject);
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogListener.java b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogListener.java
new file mode 100644
index 0000000..ad6a899
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogListener.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.service;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.EventListener;
+
+/**
+ * Listener of pipeline status change events produced by {@link
+ * PiPipeconfWatchdogService}.
+ */
+@Beta
+public interface PiPipeconfWatchdogListener
+        extends EventListener<PiPipeconfWatchdogEvent> {
+}
diff --git a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogService.java b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogService.java
new file mode 100644
index 0000000..9462569
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogService.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.service;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.ListenerService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.model.PiPipeconfId;
+
+/**
+ * Service that periodically probes pipeline programmable devices, to check that
+ * their pipeline is configured with the expected pipeconf. It emits events
+ * about pipeline status changes.
+ */
+@Beta
+public interface PiPipeconfWatchdogService
+        extends ListenerService<PiPipeconfWatchdogEvent, PiPipeconfWatchdogListener> {
+
+    /**
+     * Status of a device pipeline.
+     */
+    enum PipelineStatus {
+        /**
+         * The device pipeline is ready to process packets.
+         */
+        READY,
+        /**
+         * The status is unknown and the device might not be able to process
+         * packets yet.
+         */
+        UNKNOWN,
+    }
+
+    /**
+     * Asynchronously triggers a probe task that checks the device pipeline
+     * status and, if required, configures it with the pipeconf associated to
+     * this device (via {@link PiPipeconfService#bindToDevice(PiPipeconfId,
+     * DeviceId)}).
+     *
+     * @param deviceId device to probe
+     */
+    void triggerProbe(DeviceId deviceId);
+
+    /**
+     * Returns the last known pipeline status of the given device.
+     *
+     * @param deviceId device ID
+     * @return pipeline status
+     */
+    PipelineStatus getStatus(DeviceId deviceId);
+}
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 0e2ea82..a90149f 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -181,6 +181,11 @@
             return null;
         }
         String baseDriverName = basicDeviceConfig.driver();
+        if (baseDriverName == null) {
+            log.warn("Missing driver from basic device config for {}, " +
+                             "cannot produce merged driver", deviceId);
+            return null;
+        }
         if (isMergedDriverName(baseDriverName)) {
             // The config already has driver name that is a merged one. We still
             // need to make sure an instance of that merged driver is present in
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
new file mode 100644
index 0000000..8376295
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
@@ -0,0 +1,401 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.impl;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.mastership.MastershipInfo;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.behaviour.PiPipelineProgrammable;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceHandshaker;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.service.PiPipeconfMappingStore;
+import org.onosproject.net.pi.service.PiPipeconfService;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
+import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of PiPipeconfWatchdogService that implements a periodic
+ * pipeline probe task and listens for device events to update the status of the
+ * pipeline.
+ */
+@Component(immediate = true)
+@Service
+public class PiPipeconfWatchdogManager
+        extends AbstractListenerManager<PiPipeconfWatchdogEvent, PiPipeconfWatchdogListener>
+        implements PiPipeconfWatchdogService {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final long SECONDS = 1000L;
+    // Long enough to allow for network delay (e.g. to transfer large pipeline
+    // binaries over slow network).
+    private static final long PIPECONF_SET_TIMEOUT = 60; // Seconds.
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private PiPipeconfMappingStore pipeconfMappingStore;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PiPipeconfService pipeconfService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ComponentConfigService componentConfigService;
+
+    private static final String PROBE_INTERVAL = "probeInterval";
+    private static final int DEFAULT_PROBE_INTERVAL = 30;
+    @Property(name = PROBE_INTERVAL, intValue = DEFAULT_PROBE_INTERVAL,
+            label = "Configure interval in seconds for device pipeconf probing")
+    private int probeInterval = DEFAULT_PROBE_INTERVAL;
+
+    protected ExecutorService executor = Executors.newFixedThreadPool(
+            30, groupedThreads("onos/pipeconf-watchdog", "%d", log));
+
+    private final InternalDeviceListener deviceListener = new InternalDeviceListener();
+
+    private Timer timer;
+    private TimerTask task;
+
+    private final Striped<Lock> locks = Striped.lock(30);
+
+    private EventuallyConsistentMap<DeviceId, PipelineStatus> statusMap;
+    private Map<DeviceId, PipelineStatus> localStatusMap;
+
+    @Activate
+    public void activate() {
+        eventDispatcher.addSink(PiPipeconfWatchdogEvent.class, listenerRegistry);
+        localStatusMap = Maps.newConcurrentMap();
+        // Init distributed status map.
+        KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(PipelineStatus.class);
+        statusMap = storageService.<DeviceId, PipelineStatus>eventuallyConsistentMapBuilder()
+                .withName("onos-pipeconf-status-table")
+                .withSerializer(serializer)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
+        statusMap.addListener(new StatusMapListener());
+        // Register component configurable properties.
+        componentConfigService.registerProperties(getClass());
+        // Start periodic watchdog task.
+        timer = new Timer();
+        startProbeTask();
+        // Add device listener.
+        deviceService.addListener(deviceListener);
+        log.info("Started");
+    }
+
+    @Modified
+    public void modified(ComponentContext context) {
+        if (context == null) {
+            return;
+        }
+
+        Dictionary<?, ?> properties = context.getProperties();
+        final int oldProbeInterval = probeInterval;
+        probeInterval = Tools.getIntegerProperty(
+                properties, PROBE_INTERVAL, DEFAULT_PROBE_INTERVAL);
+        log.info("Configured. {} is configured to {} seconds",
+                 PROBE_INTERVAL, probeInterval);
+
+        if (oldProbeInterval != probeInterval) {
+            rescheduleProbeTask();
+        }
+    }
+
+    @Deactivate
+    public void deactivate() {
+        eventDispatcher.removeSink(PiPipeconfWatchdogEvent.class);
+        deviceService.removeListener(deviceListener);
+        stopProbeTask();
+        timer = null;
+        statusMap = null;
+        localStatusMap = null;
+        log.info("Stopped");
+    }
+
+    @Override
+    public void triggerProbe(DeviceId deviceId) {
+        final Device device = deviceService.getDevice(deviceId);
+        if (device != null) {
+            filterAndTriggerTasks(Collections.singleton(device));
+        }
+    }
+
+    @Override
+    public PipelineStatus getStatus(DeviceId deviceId) {
+        final PipelineStatus status = statusMap.get(deviceId);
+        return status == null ? PipelineStatus.UNKNOWN : status;
+    }
+
+    private void triggerCheckAllDevices() {
+        filterAndTriggerTasks(deviceService.getDevices());
+    }
+
+    private void filterAndTriggerTasks(Iterable<Device> devices) {
+        devices.forEach(device -> {
+            if (!isLocalMaster(device)) {
+                return;
+            }
+
+            final PiPipeconfId pipeconfId = pipeconfMappingStore.getPipeconfId(device.id());
+            if (pipeconfId == null
+                    || !device.is(PiPipelineProgrammable.class)) {
+                return;
+            }
+
+            if (!pipeconfService.getPipeconf(pipeconfId).isPresent()) {
+                log.error("Pipeconf {} is not registered", pipeconfId);
+                return;
+            }
+
+            final PiPipeconf pipeconf = pipeconfService.getPipeconf(pipeconfId).get();
+
+            if (!device.is(DeviceHandshaker.class)) {
+                log.error("Missing DeviceHandshaker behavior for {}", device.id());
+                return;
+            }
+
+            // Trigger task with per-device lock.
+            executor.execute(withLock(() -> {
+                final boolean success = doSetPipeconfIfRequired(device, pipeconf);
+                if (success) {
+                    signalStatusReady(device.id());
+                } else {
+                    signalStatusUnknown(device.id());
+                }
+            }, device.id()));
+        });
+    }
+
+    /**
+     * Returns true if the given device is known to be configured with the given
+     * pipeline, false otherwise. If necessary, this method enforces setting the
+     * given pipeconf using drivers.
+     *
+     * @param device   device
+     * @param pipeconf pipeconf
+     * @return boolean
+     */
+    private boolean doSetPipeconfIfRequired(Device device, PiPipeconf pipeconf) {
+        log.debug("Starting watchdog task for {} ({})", device.id(), pipeconf.id());
+        final PiPipelineProgrammable pipelineProg = device.as(PiPipelineProgrammable.class);
+        final DeviceHandshaker handshaker = device.as(DeviceHandshaker.class);
+        if (!handshaker.isConnected()) {
+            return false;
+        }
+        if (pipelineProg.isPipeconfSet(pipeconf)) {
+            log.debug("Pipeconf {} already configured on {}",
+                      pipeconf.id(), device.id());
+            return true;
+        }
+        try {
+            return pipelineProg.setPipeconf(pipeconf)
+                    .get(PIPECONF_SET_TIMEOUT, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("Thread interrupted while setting pipeconf on {}",
+                      device.id());
+            Thread.currentThread().interrupt();
+        } catch (ExecutionException e) {
+            log.error("Exception while setting pipeconf on {}",
+                      device.id(), e.getCause());
+        } catch (TimeoutException e) {
+            log.error("Operation TIMEOUT while setting pipeconf on {}",
+                      device.id());
+        }
+        return false;
+    }
+
+    private Runnable withLock(Runnable task, Object object) {
+        return () -> {
+            final Lock lock = locks.get(object);
+            lock.lock();
+            try {
+                task.run();
+            } finally {
+                lock.unlock();
+            }
+        };
+    }
+
+    private void signalStatusUnknown(DeviceId deviceId) {
+        statusMap.remove(deviceId);
+    }
+
+    private void signalStatusReady(DeviceId deviceId) {
+        statusMap.put(deviceId, PipelineStatus.READY);
+    }
+
+    private boolean isLocalMaster(Device device) {
+        if (mastershipService.isLocalMaster(device.id())) {
+            return true;
+        }
+        // The device might have no master (e.g. after it has been disconnected
+        // from core), hence we use device mastership state.
+        final MastershipInfo info = mastershipService.getMastershipFor(device.id());
+        return !info.master().isPresent() &&
+                device.is(DeviceHandshaker.class) &&
+                device.as(DeviceHandshaker.class).getRole()
+                        .equals(MastershipRole.MASTER);
+    }
+
+    private void startProbeTask() {
+        synchronized (timer) {
+            log.info("Starting pipeline probe thread with {} seconds interval...", probeInterval);
+            task = new InternalTimerTask();
+            timer.scheduleAtFixedRate(task, probeInterval * SECONDS,
+                                      probeInterval * SECONDS);
+        }
+    }
+
+
+    private void stopProbeTask() {
+        synchronized (timer) {
+            log.info("Stopping pipeline probe thread...");
+            task.cancel();
+            task = null;
+        }
+    }
+
+
+    private synchronized void rescheduleProbeTask() {
+        synchronized (timer) {
+            stopProbeTask();
+            startProbeTask();
+        }
+    }
+
+    private class InternalTimerTask extends TimerTask {
+        @Override
+        public void run() {
+            triggerCheckAllDevices();
+        }
+    }
+
+    /**
+     * Listener of device events used to update the pipeline status.
+     */
+    private class InternalDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+            final Device device = event.subject();
+            switch (event.type()) {
+                case DEVICE_ADDED:
+                case DEVICE_UPDATED:
+                case DEVICE_AVAILABILITY_CHANGED:
+                    if (!deviceService.isAvailable(device.id())) {
+                        signalStatusUnknown(device.id());
+                    }
+                    break;
+                case DEVICE_REMOVED:
+                case DEVICE_SUSPENDED:
+                    signalStatusUnknown(device.id());
+                    break;
+                case PORT_ADDED:
+                case PORT_UPDATED:
+                case PORT_REMOVED:
+                case PORT_STATS_UPDATED:
+                default:
+                    break;
+            }
+        }
+    }
+
+    private class StatusMapListener
+            implements EventuallyConsistentMapListener<DeviceId, PipelineStatus> {
+
+        @Override
+        public void event(EventuallyConsistentMapEvent<DeviceId, PipelineStatus> event) {
+            final DeviceId deviceId = event.key();
+            final PipelineStatus status = event.value();
+            switch (event.type()) {
+                case PUT:
+                    postStatusEvent(deviceId, status);
+                    break;
+                case REMOVE:
+                    postStatusEvent(deviceId, PipelineStatus.UNKNOWN);
+                    break;
+                default:
+                    log.error("Unknown map event type {}", event.type());
+            }
+        }
+
+        private void postStatusEvent(DeviceId deviceId, PipelineStatus newStatus) {
+            PipelineStatus oldStatus = localStatusMap.put(deviceId, newStatus);
+            oldStatus = oldStatus == null ? PipelineStatus.UNKNOWN : oldStatus;
+            final PiPipeconfWatchdogEvent.Type eventType =
+                    newStatus == PipelineStatus.READY
+                            ? PiPipeconfWatchdogEvent.Type.PIPELINE_READY
+                            : PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN;
+            if (newStatus != oldStatus) {
+                log.info("Pipeline status of {} is {}", deviceId, newStatus);
+                post(new PiPipeconfWatchdogEvent(eventType, deviceId));
+            }
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
index 64c23d8..092b8eb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
@@ -16,14 +16,16 @@
 
 package org.onosproject.store.pi.impl;
 
-import com.google.common.collect.Sets;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.pi.model.PiPipeconfId;
 import org.onosproject.net.pi.service.PiPipeconfDeviceMappingEvent;
@@ -31,24 +33,19 @@
 import org.onosproject.net.pi.service.PiPipeconfMappingStoreDelegate;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.MultiValuedTimestamp;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.WallClockTimestamp;
 import org.slf4j.Logger;
 
-import java.util.Collections;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Manages information of pipeconf to device binding using gossip protocol to distribute
- * information.
+ * Manages information of pipeconf to device binding.
  */
 @Component(immediate = true)
 @Service
@@ -61,22 +58,20 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
-    protected EventuallyConsistentMap<DeviceId, PiPipeconfId> deviceToPipeconf;
+    protected ConsistentMap<DeviceId, PiPipeconfId> deviceToPipeconf;
 
-    protected final EventuallyConsistentMapListener<DeviceId, PiPipeconfId> pipeconfListener =
+    protected final MapEventListener<DeviceId, PiPipeconfId> pipeconfListener =
             new InternalPiPipeconfListener();
 
-    protected ConcurrentMap<PiPipeconfId, Set<DeviceId>> pipeconfToDevices = new ConcurrentHashMap<>();
+    protected SetMultimap<PiPipeconfId, DeviceId> pipeconfToDevices =
+            Multimaps.synchronizedSetMultimap(HashMultimap.create());
 
     @Activate
     public void activate() {
-        KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .register(MultiValuedTimestamp.class);
-        deviceToPipeconf = storageService.<DeviceId, PiPipeconfId>eventuallyConsistentMapBuilder()
+        deviceToPipeconf = storageService.<DeviceId, PiPipeconfId>consistentMapBuilder()
                 .withName("onos-pipeconf-table")
-                .withSerializer(serializer)
-                .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
+                .withSerializer(Serializer.using(KryoNamespaces.API))
+                .build();
         deviceToPipeconf.addListener(pipeconfListener);
         log.info("Started");
     }
@@ -91,12 +86,15 @@
 
     @Override
     public PiPipeconfId getPipeconfId(DeviceId deviceId) {
-        return deviceToPipeconf.get(deviceId);
+        if (!deviceToPipeconf.containsKey(deviceId)) {
+            return null;
+        }
+        return deviceToPipeconf.get(deviceId).value();
     }
 
     @Override
     public Set<DeviceId> getDevices(PiPipeconfId pipeconfId) {
-        return pipeconfToDevices.getOrDefault(pipeconfId, Collections.emptySet());
+        return ImmutableSet.copyOf(pipeconfToDevices.get(pipeconfId));
     }
 
     @Override
@@ -109,35 +107,38 @@
         deviceToPipeconf.remove(deviceId);
     }
 
-    private class InternalPiPipeconfListener implements EventuallyConsistentMapListener<DeviceId, PiPipeconfId> {
+    private class InternalPiPipeconfListener implements MapEventListener<DeviceId, PiPipeconfId> {
 
         @Override
-        public void event(EventuallyConsistentMapEvent<DeviceId, PiPipeconfId> mapEvent) {
-            final PiPipeconfDeviceMappingEvent.Type type;
+        public void event(MapEvent<DeviceId, PiPipeconfId> mapEvent) {
+            PiPipeconfDeviceMappingEvent.Type eventType = null;
             final DeviceId deviceId = mapEvent.key();
-            final PiPipeconfId pipeconfId = mapEvent.value();
+            final PiPipeconfId newPipeconfId = mapEvent.newValue() != null
+                    ? mapEvent.newValue().value() : null;
+            final PiPipeconfId oldPipeconfId = mapEvent.oldValue() != null
+                    ? mapEvent.oldValue().value() : null;
             switch (mapEvent.type()) {
-                case PUT:
-                    type = PiPipeconfDeviceMappingEvent.Type.CREATED;
-                    pipeconfToDevices.compute(pipeconfId, (pipeconf, devices) -> {
-                        if (devices == null) {
-                            devices = Sets.newConcurrentHashSet();
+                case INSERT:
+                case UPDATE:
+                    if (newPipeconfId != null) {
+                        if (!newPipeconfId.equals(oldPipeconfId)) {
+                            eventType = PiPipeconfDeviceMappingEvent.Type.CREATED;
                         }
-                        devices.add(deviceId);
-                        return devices;
-                    });
+                        pipeconfToDevices.put(newPipeconfId, deviceId);
+                    }
                     break;
                 case REMOVE:
-                    type = PiPipeconfDeviceMappingEvent.Type.REMOVED;
-                    pipeconfToDevices.computeIfPresent(pipeconfId, (pipeconf, devices) -> {
-                        devices.remove(deviceId);
-                        return devices;
-                    });
+                    if (oldPipeconfId != null) {
+                        eventType = PiPipeconfDeviceMappingEvent.Type.REMOVED;
+                        pipeconfToDevices.remove(oldPipeconfId, deviceId);
+                    }
                     break;
                 default:
                     throw new IllegalArgumentException("Wrong event type " + mapEvent.type());
             }
-            notifyDelegate(new PiPipeconfDeviceMappingEvent(type, deviceId));
+            if (eventType != null) {
+                notifyDelegate(new PiPipeconfDeviceMappingEvent(eventType, deviceId));
+            }
         }
     }
 }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java
index 9a31b0c..875bb2a 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java
@@ -22,7 +22,11 @@
 import org.onosproject.net.pi.model.PiPipeconfId;
 import org.onosproject.store.service.TestStorageService;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test class for the Distributed Device to Pipeconf store.
@@ -72,10 +76,14 @@
     @Test
     public void createOrUpdatePipeconfToDeviceBinding() {
         store.createOrUpdateBinding(DEVICE_ID, PIPECONF_ID);
-        assertTrue("Value should be in the map", store.deviceToPipeconf.containsKey(DEVICE_ID));
-        assertTrue("Value should be in the map", store.deviceToPipeconf.containsValue(PIPECONF_ID));
-        assertTrue("Value should be in the map", store.pipeconfToDevices.containsKey(PIPECONF_ID));
-        assertTrue("Value should be in the map", store.pipeconfToDevices.containsValue(ImmutableSet.of(DEVICE_ID)));
+        assertTrue("Value should be in the map",
+                   store.deviceToPipeconf.containsKey(DEVICE_ID));
+        assertEquals("Value should be in the map",
+                     PIPECONF_ID, store.deviceToPipeconf.get(DEVICE_ID).value());
+        assertTrue("Value should be in the map",
+                   store.pipeconfToDevices.containsKey(PIPECONF_ID));
+        assertTrue("Value should be in the map",
+                   store.pipeconfToDevices.get(PIPECONF_ID).contains(DEVICE_ID));
     }
 
     /**