Improve scalability of P4Runtime subsystem

The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.

Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)

Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups

Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
index 6c61148..cc79cc1 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimePacketProgrammable.java
@@ -26,7 +26,9 @@
 /**
  * Implementation of PacketProgrammable behaviour for P4Runtime.
  */
-public class P4RuntimePacketProgrammable extends AbstractP4RuntimeHandlerBehaviour implements PacketProgrammable {
+public class P4RuntimePacketProgrammable
+        extends AbstractP4RuntimeHandlerBehaviour
+        implements PacketProgrammable {
 
     @Override
     public void emit(OutboundPacket packet) {
@@ -38,7 +40,8 @@
         final PiPipelineInterpreter interpreter = device.is(PiPipelineInterpreter.class)
                 ? device.as(PiPipelineInterpreter.class) : null;
         if (interpreter == null) {
-            log.warn("Device {} with pipeconf {} has no interpreter, aborting emit operation", deviceId, pipeconf.id());
+            log.warn("Unable to get interpreter for {} with pipeconf {}, aborting emit operation",
+                     deviceId, pipeconf.id());
             return;
         }
 
@@ -46,11 +49,13 @@
             Collection<PiPacketOperation> operations = interpreter.mapOutboundPacket(packet);
             operations.forEach(piPacketOperation -> {
                 log.debug("Doing PiPacketOperation {}", piPacketOperation);
-                client.packetOut(piPacketOperation, pipeconf);
+                getFutureWithDeadline(
+                        client.packetOut(piPacketOperation, pipeconf),
+                        "sending packet-out", false);
             });
         } catch (PiPipelineInterpreter.PiInterpreterException e) {
-            log.error("Interpreter of pipeconf {} was unable to translate outbound packet: {}",
-                    pipeconf.id(), e.getMessage());
+            log.error("Unable to translate outbound packet for {} with pipeconf {}: {}",
+                      deviceId, pipeconf.id(), e.getMessage());
         }
     }
 }