Refactor P4Runtime subsystem to implement async connection procedure

This patch is an attempt to solve issues observed when restarting both
switches and ONOS nodes. Most of the issues seemed to depend on a
brittle mastership handling when deploying the pipeline.

With this patch, GDP registers devices to the core with available=false
(i.e. offline) and marks them online only when the P4 pipeline has been
deployed to the device. A new PiPipeconfWatchdogService takes care of
deploying pipelines and producing event when devices are ready.

Moreover, we fix a race condition where pipeconf-related behaviors
were not found. This was caused by GDP enforcing the merged
driver name in the network config, while external entities (e.g.
Mininet) were pushing a JSON blob with the base driver name. This patch
removes the need to rely on such a trick and instead uses
pipeconf-aware logic directly in the driver manager (change #19622).

Finally, we fix issues in P4RuntimeClientImpl that were causing the
stream channel not detecting unreachable devices. The solution is to
follow gRPC APIs and re-instantiate a new channel once the first fails.

Change-Id: I6fbc91859c0fb58a6db3bc197b7081a8fe9f97f7
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java b/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java
index 2373af4..22367ba 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/PiPipelineProgrammable.java
@@ -24,21 +24,40 @@
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Behavior to program the pipeline of a device that supports protocol-independence.
+ * Behavior to program the pipeline of a device that supports
+ * protocol-independence.
  */
 @Beta
 public interface PiPipelineProgrammable extends HandlerBehaviour {
     /**
-     * Deploys the given pipeconf to the device.
+     * Writes the given pipeconf to the device, returns a completable future
+     * with true is the operations was successful, false otherwise.
+     * <p>
+     * After the future has been completed, the device is expected to process
+     * data plane packets according to the written pipeconf.
      *
      * @param pipeconf pipeconf
-     * @return true if the operation was successful, false otherwise
+     * @return completable future set to true if the operation was successful,
+     * false otherwise
      */
     // TODO: return an explanation of why things went wrong, and the status of the device.
-    CompletableFuture<Boolean> deployPipeconf(PiPipeconf pipeconf);
+    CompletableFuture<Boolean> setPipeconf(PiPipeconf pipeconf);
 
     /**
-     * Returns the default pipeconf for ths device, to be used when any other pipeconf is not available.
+     * Returns true if the device is configured with the given pipeconf, false
+     * otherwise.
+     * <p>
+     * This method is expected to always return true after successfully calling
+     * {@link #setPipeconf(PiPipeconf)} with the given pipeconf.
+     *
+     * @param pipeconf pipeconf
+     * @return true if the device has the given pipeconf set, false otherwise
+     */
+    boolean isPipeconfSet(PiPipeconf pipeconf);
+
+    /**
+     * Returns the default pipeconf for this device, to be used when any other
+     * pipeconf is not available.
      *
      * @return optional pipeconf
      */
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
index bc870d3..bceaeff 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceHandshaker.java
@@ -19,6 +19,7 @@
 import com.google.common.annotations.Beta;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.driver.DeviceConnect;
+import org.onosproject.net.provider.ProviderId;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -48,21 +49,32 @@
     void roleChanged(MastershipRole newRole);
 
     /**
-     * Adds a device agent listener.
+     * Returns the last known mastership role agreed by the device for this
+     * node.
      *
-     * @param listener device agent listener
+     * @return mastership role
      */
-    default void addDeviceAgentListener(DeviceAgentListener listener) {
+    MastershipRole getRole();
+
+    /**
+     * Adds a device agent listener for the given provider ID.
+     *
+     * @param providerId provider ID
+     * @param listener   device agent listener
+     */
+    default void addDeviceAgentListener(
+            ProviderId providerId, DeviceAgentListener listener) {
         throw new UnsupportedOperationException(
                 "Device agent listener registration not supported");
     }
 
     /**
-     * Removes a device agent listener.
+     * Removes a device agent listener previously registered for the given
+     * provider ID.
      *
-     * @param listener device agent listener
+     * @param providerId provider ID
      */
-    default void removeDeviceAgentListener(DeviceAgentListener listener) {
+    default void removeDeviceAgentListener(ProviderId providerId) {
         throw new UnsupportedOperationException(
                 "Device agent listener removal not supported");
     }
diff --git a/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java b/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
index 2f53620..f30386d 100644
--- a/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
+++ b/core/api/src/main/java/org/onosproject/net/driver/DeviceConnect.java
@@ -20,27 +20,38 @@
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Abstraction of handler behaviour used to set-up and tear-down
- * connection with a device.
+ * Abstraction of handler behaviour used to set-up and tear-down connections
+ * with a device.
  */
 @Beta
 public interface DeviceConnect extends HandlerBehaviour {
 
     /**
-     * Connects to the device.
-     * It's supposed to initiate the transport sessions, channel and also,
-     * if applicable, store them in the proper protocol specific
-     * controller (e.g. GrpcController).
+     * Connects to the device, for example by opening the transport session that
+     * will be later used to send control messages. Returns true if the
+     * connection was initiated successfully, false otherwise.
+     * <p>
+     * Calling multiple times this method while a connection to the device is
+     * open should result in a no-op.
      *
      * @return CompletableFuture with true if the operation was successful
      */
     CompletableFuture<Boolean> connect();
 
     /**
-     * Disconnects from the device.
-     * It's supposed to destroy the transport sessions and channel and also,
-     * if applicable, remove them in the proper protocol specific
-     * controller (e.g. GrpcController).
+     * Returns true if a connection to the device is open, false otherwise.
+     *
+     * @return true if the connection is open, false otherwise
+     */
+    boolean isConnected();
+
+    /**
+     * Disconnects from the device, for example closing the transport session
+     * previously opened. Returns true if the disconnection procedure was
+     * successful, false otherwise.
+     * <p>
+     * Calling multiple times this method while a connection to the device is
+     * closed should result in a no-op.
      *
      * @return CompletableFuture with true if the operation was successful
      */
diff --git a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogEvent.java b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogEvent.java
new file mode 100644
index 0000000..0d34dd6
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.service;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.AbstractEvent;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Event representing changes in the status of a device pipeline.
+ */
+@Beta
+public class PiPipeconfWatchdogEvent
+        extends AbstractEvent<PiPipeconfWatchdogEvent.Type, DeviceId> {
+
+    /**
+     * Type of event.
+     */
+    public enum Type {
+        PIPELINE_READY,
+        PIPELINE_UNKNOWN
+    }
+
+    /**
+     * Creates a new event for the given device.
+     *
+     * @param type type
+     * @param subject device ID
+     */
+    public PiPipeconfWatchdogEvent(PiPipeconfWatchdogEvent.Type type, DeviceId subject) {
+        super(type, subject);
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogListener.java b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogListener.java
new file mode 100644
index 0000000..ad6a899
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogListener.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.service;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.EventListener;
+
+/**
+ * Listener of pipeline status change events produced by {@link
+ * PiPipeconfWatchdogService}.
+ */
+@Beta
+public interface PiPipeconfWatchdogListener
+        extends EventListener<PiPipeconfWatchdogEvent> {
+}
diff --git a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogService.java b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogService.java
new file mode 100644
index 0000000..9462569
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfWatchdogService.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.service;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.ListenerService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.model.PiPipeconfId;
+
+/**
+ * Service that periodically probes pipeline programmable devices, to check that
+ * their pipeline is configured with the expected pipeconf. It emits events
+ * about pipeline status changes.
+ */
+@Beta
+public interface PiPipeconfWatchdogService
+        extends ListenerService<PiPipeconfWatchdogEvent, PiPipeconfWatchdogListener> {
+
+    /**
+     * Status of a device pipeline.
+     */
+    enum PipelineStatus {
+        /**
+         * The device pipeline is ready to process packets.
+         */
+        READY,
+        /**
+         * The status is unknown and the device might not be able to process
+         * packets yet.
+         */
+        UNKNOWN,
+    }
+
+    /**
+     * Asynchronously triggers a probe task that checks the device pipeline
+     * status and, if required, configures it with the pipeconf associated to
+     * this device (via {@link PiPipeconfService#bindToDevice(PiPipeconfId,
+     * DeviceId)}).
+     *
+     * @param deviceId device to probe
+     */
+    void triggerProbe(DeviceId deviceId);
+
+    /**
+     * Returns the last known pipeline status of the given device.
+     *
+     * @param deviceId device ID
+     * @return pipeline status
+     */
+    PipelineStatus getStatus(DeviceId deviceId);
+}