Improve scalability of P4Runtime subsystem

The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.

Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)

Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups

Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
index 0bf61f2..a9a6e08 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeClient.java
@@ -53,99 +53,135 @@
     }
 
     /**
-     * Sets the device pipeline according to the given pipeconf, and for the given byte buffer representing the
-     * target-specific data to be used in the P4Runtime's SetPipelineConfig message. This method should be called
+     * Starts the client by starting the Stream RPC with the device.
+     *
+     * @return completable future containing true if the operation was
+     * successful, false otherwise.
+     */
+    CompletableFuture<Boolean> start();
+
+    /**
+     * Shutdowns the client by terminating any active RPC such as the Stream
+     * one.
+     *
+     * @return a completable future to signal the completion of the shutdown
+     * procedure
+     */
+    CompletableFuture<Void> shutdown();
+
+    /**
+     * Sends a master arbitration update to the device with a new election ID
+     * that is guaranteed to be the highest value between all clients.
+     *
+     * @return completable future containing true if the operation was
+     * successful; false otherwise
+     */
+    CompletableFuture<Boolean> becomeMaster();
+
+    /**
+     * Sets the device pipeline according to the given pipeconf, and for the
+     * given byte buffer representing the target-specific data to be used in the
+     * P4Runtime's SetPipelineConfig message. This method should be called
      * before any other method of this client.
      *
      * @param pipeconf   pipeconf
      * @param deviceData target-specific data
-     * @return a completable future of a boolean, true if the operations was successful, false otherwise.
+     * @return a completable future of a boolean, true if the operations was
+     * successful, false otherwise.
      */
-    CompletableFuture<Boolean> setPipelineConfig(PiPipeconf pipeconf, ByteBuffer deviceData);
+    CompletableFuture<Boolean> setPipelineConfig(
+            PiPipeconf pipeconf, ByteBuffer deviceData);
 
     /**
-     * Initializes the stream channel, after which all messages received from the device will be notified using the
-     * {@link P4RuntimeController} event listener.
-     *
-     * @return a completable future of a boolean, true if the operations was successful, false otherwise.
-     */
-    CompletableFuture<Boolean> initStreamChannel();
-
-    /**
-     * Performs the given write operation for the given table entries and pipeconf.
+     * Performs the given write operation for the given table entries and
+     * pipeconf.
      *
      * @param entries  table entries
      * @param opType   operation type
      * @param pipeconf pipeconf currently deployed on the device
      * @return true if the operation was successful, false otherwise.
      */
-    CompletableFuture<Boolean> writeTableEntries(Collection<PiTableEntry> entries, WriteOperationType opType,
-                                                 PiPipeconf pipeconf);
+    CompletableFuture<Boolean> writeTableEntries(
+            Collection<PiTableEntry> entries, WriteOperationType opType,
+            PiPipeconf pipeconf);
 
     /**
-     * Dumps all entries currently installed in the given table, for the given pipeconf.
+     * Dumps all entries currently installed in the given table, for the given
+     * pipeconf.
      *
      * @param tableId  table identifier
      * @param pipeconf pipeconf currently deployed on the device
      * @return completable future of a collection of table entries
      */
-    CompletableFuture<Collection<PiTableEntry>> dumpTable(PiTableId tableId, PiPipeconf pipeconf);
+    CompletableFuture<Collection<PiTableEntry>> dumpTable(
+            PiTableId tableId, PiPipeconf pipeconf);
+
+    /**
+     * Dumps entries from all tables, for the given pipeconf.
+     *
+     * @param pipeconf pipeconf currently deployed on the device
+     * @return completable future of a collection of table entries
+     */
+    CompletableFuture<Collection<PiTableEntry>> dumpAllTables(PiPipeconf pipeconf);
 
     /**
      * Executes a packet-out operation for the given pipeconf.
      *
      * @param packet   packet-out operation to be performed by the device
      * @param pipeconf pipeconf currently deployed on the device
-     * @return a completable future of a boolean, true if the operations was successful, false otherwise.
+     * @return a completable future of a boolean, true if the operations was
+     * successful, false otherwise.
      */
-    CompletableFuture<Boolean> packetOut(PiPacketOperation packet, PiPipeconf pipeconf);
+    CompletableFuture<Boolean> packetOut(
+            PiPacketOperation packet, PiPipeconf pipeconf);
 
     /**
-     * Returns the value of all counter cells for the given set of counter identifiers and pipeconf.
+     * Returns the value of all counter cells for the given set of counter
+     * identifiers and pipeconf.
      *
      * @param counterIds counter identifiers
      * @param pipeconf   pipeconf
      * @return collection of counter data
      */
-    CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(Set<PiCounterId> counterIds,
-                                                                         PiPipeconf pipeconf);
+    CompletableFuture<Collection<PiCounterCellData>> readAllCounterCells(
+            Set<PiCounterId> counterIds, PiPipeconf pipeconf);
 
     /**
-     * Returns a collection of counter data corresponding to the given set of counter cell identifiers, for the given
-     * pipeconf.
+     * Returns a collection of counter data corresponding to the given set of
+     * counter cell identifiers, for the given pipeconf.
      *
      * @param cellIds  set of counter cell identifiers
      * @param pipeconf pipeconf
      * @return collection of counter data
      */
-    CompletableFuture<Collection<PiCounterCellData>> readCounterCells(Set<PiCounterCellId> cellIds,
-                                                                      PiPipeconf pipeconf);
+    CompletableFuture<Collection<PiCounterCellData>> readCounterCells(
+            Set<PiCounterCellId> cellIds, PiPipeconf pipeconf);
 
     /**
-     * Performs the given write operation for the given action group members and pipeconf.
+     * Performs the given write operation for the given action group members and
+     * pipeconf.
      *
-     * @param profileId  action group profile ID
-     * @param members    action group members
-     * @param opType   write operation type
-     * @param pipeconf the pipeconf currently deployed on the device
+     * @param profileId action group profile ID
+     * @param members   action group members
+     * @param opType    write operation type
+     * @param pipeconf  the pipeconf currently deployed on the device
      * @return true if the operation was successful, false otherwise
      */
-    CompletableFuture<Boolean> writeActionGroupMembers(PiActionProfileId profileId,
-                                                       Collection<PiActionGroupMember> members,
-                                                       WriteOperationType opType,
-                                                       PiPipeconf pipeconf);
+    CompletableFuture<Boolean> writeActionGroupMembers(
+            PiActionProfileId profileId, Collection<PiActionGroupMember> members,
+            WriteOperationType opType, PiPipeconf pipeconf);
 
     /**
-     * Performs the given write operation for the given action group and pipeconf.
+     * Performs the given write operation for the given action group and
+     * pipeconf.
      *
      * @param group    the action group
      * @param opType   write operation type
      * @param pipeconf the pipeconf currently deployed on the device
      * @return true if the operation was successful, false otherwise
      */
-    CompletableFuture<Boolean> writeActionGroup(PiActionGroup group,
-                                                WriteOperationType opType,
-                                                PiPipeconf pipeconf);
+    CompletableFuture<Boolean> writeActionGroup(
+            PiActionGroup group, WriteOperationType opType, PiPipeconf pipeconf);
 
     /**
      * Dumps all groups currently installed for the given action profile.
@@ -154,50 +190,39 @@
      * @param pipeconf        the pipeconf currently deployed on the device
      * @return completable future of a collection of groups
      */
-    CompletableFuture<Collection<PiActionGroup>> dumpGroups(PiActionProfileId actionProfileId,
-                                                            PiPipeconf pipeconf);
+    CompletableFuture<Collection<PiActionGroup>> dumpGroups(
+            PiActionProfileId actionProfileId, PiPipeconf pipeconf);
 
     /**
-     * Returns the configuration of all meter cells for the given set of meter identifiers and pipeconf.
+     * Returns the configuration of all meter cells for the given set of meter
+     * identifiers and pipeconf.
      *
-     * @param meterIds   meter identifiers
-     * @param pipeconf   pipeconf
+     * @param meterIds meter identifiers
+     * @param pipeconf pipeconf
      * @return collection of meter configurations
      */
-    CompletableFuture<Collection<PiMeterCellConfig>> readAllMeterCells(Set<PiMeterId> meterIds,
-                                                                       PiPipeconf pipeconf);
+    CompletableFuture<Collection<PiMeterCellConfig>> readAllMeterCells(
+            Set<PiMeterId> meterIds, PiPipeconf pipeconf);
 
     /**
-     * Returns a collection of meter configurations corresponding to the given set of meter cell identifiers,
-     * for the given pipeconf.
+     * Returns a collection of meter configurations corresponding to the given
+     * set of meter cell identifiers, for the given pipeconf.
      *
-     * @param cellIds    set of meter cell identifiers
-     * @param pipeconf   pipeconf
+     * @param cellIds  set of meter cell identifiers
+     * @param pipeconf pipeconf
      * @return collection of meter configrations
      */
-    CompletableFuture<Collection<PiMeterCellConfig>> readMeterCells(Set<PiMeterCellId> cellIds,
-                                                                    PiPipeconf pipeconf);
+    CompletableFuture<Collection<PiMeterCellConfig>> readMeterCells(
+            Set<PiMeterCellId> cellIds, PiPipeconf pipeconf);
 
     /**
-     * Performs a write operation for the given meter configurations and pipeconf.
+     * Performs a write operation for the given meter configurations and
+     * pipeconf.
      *
-     * @param cellConfigs  meter cell configurations
-     * @param pipeconf     pipeconf currently deployed on the device
+     * @param cellConfigs meter cell configurations
+     * @param pipeconf    pipeconf currently deployed on the device
      * @return true if the operation was successful, false otherwise.
      */
-    CompletableFuture<Boolean> writeMeterCells(Collection<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf);
-
-    /**
-     * Shutdown the client by terminating any active RPC such as the stream channel.
-     */
-    void shutdown();
-
-    /**
-     * Sends a master arbitration update to the device.
-     *
-     * @return a completable future containing true if the operation was successful; false otherwise
-     */
-    CompletableFuture<Boolean> sendMasterArbitrationUpdate();
-
-    // TODO: work in progress.
+    CompletableFuture<Boolean> writeMeterCells(
+            Collection<PiMeterCellConfig> cellConfigs, PiPipeconf pipeconf);
 }
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
index 3cd81f2..fc76a23 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeController.java
@@ -19,13 +19,14 @@
 import com.google.common.annotations.Beta;
 import org.onosproject.event.ListenerService;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentListener;
 
 /**
  * Controller of P4Runtime devices.
  */
 @Beta
-public interface P4RuntimeController extends ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
+public interface P4RuntimeController
+        extends ListenerService<P4RuntimeEvent, P4RuntimeEventListener> {
 
     /**
      * Instantiates a new client to operate on a P4Runtime device identified by
@@ -58,20 +59,22 @@
      *
      * @param deviceId device identifier
      * @return client instance
-     * @throws IllegalStateException if no client exists for the given device identifier
+     * @throws IllegalStateException if no client exists for the given device
+     *                               identifier
      */
     P4RuntimeClient getClient(DeviceId deviceId);
 
     /**
-     * Removes the client for the given device. If no client exists for the given device identifier, the
-     * result is a no-op.
+     * Removes the client for the given device. If no client exists for the
+     * given device identifier, the result is a no-op.
      *
      * @param deviceId device identifier
      */
     void removeClient(DeviceId deviceId);
 
     /**
-     * Returns true if a client exists for the given device identifier, false otherwise.
+     * Returns true if a client exists for the given device identifier, false
+     * otherwise.
      *
      * @param deviceId device identifier
      * @return true if client exists, false otherwise.
@@ -79,37 +82,31 @@
     boolean hasClient(DeviceId deviceId);
 
     /**
-     * Returns true if the P4Runtime server running on the given device is reachable, i.e. the channel is open and the
-     * server is able to respond to RPCs, false otherwise. Reachability can be tested only if a client was previously
-     * created using {@link #createClient(DeviceId, String, int, long)}, otherwise this method returns false.
+     * Returns true if the P4Runtime server running on the given device is
+     * reachable, i.e. the channel is open and the server is able to respond to
+     * RPCs, false otherwise. Reachability can be tested only if a client was
+     * previously created using {@link #createClient(DeviceId, String, int,
+     * long)}, otherwise this method returns false.
      *
      * @param deviceId device identifier.
-     * @return true if a client was created and is able to contact the P4Runtime server, false otherwise.
+     * @return true if a client was created and is able to contact the P4Runtime
+     * server, false otherwise.
      */
     boolean isReacheable(DeviceId deviceId);
 
     /**
-     * Gets new election id for device arbitration request.
-     *
-     * @return the election id
-     */
-    long getNewMasterElectionId();
-
-    /**
-     * Adds a listener for P4Runtime client-server channel events.
-     * If the channel for the device is not present and/or established the listener will get notified
-     * only after channel setup.
+     * Adds a listener for device agent events.
      *
      * @param deviceId device identifier
-     * @param listener the channel listener
+     * @param listener the device agent listener
      */
-    void addChannelListener(DeviceId deviceId, ChannelListener listener);
+    void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener);
 
     /**
-     * Removes the listener for P4Runtime client-server channel events.
+     * Removes the listener for device agent events.
      *
      * @param deviceId device identifier
-     * @param listener the channel listener
+     * @param listener the device agent listener
      */
-    void removeChannelListener(DeviceId deviceId, ChannelListener listener);
+    void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener);
 }
diff --git a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
index ae29097..0316ed4 100644
--- a/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
+++ b/protocols/p4runtime/api/src/main/java/org/onosproject/p4runtime/api/P4RuntimeEvent.java
@@ -37,7 +37,7 @@
         /**
          * Arbitration reply.
          */
-        ARBITRATION,
+        ARBITRATION_RESPONSE,
 
         /**
          * Channel Event.