Improve scalability of P4Runtime subsystem

The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.

Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)

Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups

Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
index 8e2b54d..ac4856a 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/AbstractP4RuntimeHandlerBehaviour.java
@@ -16,11 +16,13 @@
 
 package org.onosproject.drivers.p4runtime;
 
+import io.grpc.StatusRuntimeException;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
 import org.onosproject.net.pi.service.PiPipeconfService;
 import org.onosproject.net.pi.service.PiTranslationService;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
@@ -28,6 +30,11 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -35,6 +42,10 @@
  */
 public class AbstractP4RuntimeHandlerBehaviour extends AbstractHandlerBehaviour {
 
+    // Timeout in seconds for device operations.
+    // TODO make configurable via driver properties
+    private static final int DEVICE_OP_TIMEOUT = 5;
+
     public static final String P4RUNTIME_SERVER_ADDR_KEY = "p4runtime_ip";
     public static final String P4RUNTIME_SERVER_PORT_KEY = "p4runtime_port";
     public static final String P4RUNTIME_DEVICE_ID_KEY = "p4runtime_deviceId";
@@ -76,12 +87,18 @@
         client = controller.getClient(deviceId);
 
         PiPipeconfService piPipeconfService = handler().get(PiPipeconfService.class);
-        if (!piPipeconfService.ofDevice(deviceId).isPresent() ||
-                !piPipeconfService.getPipeconf(piPipeconfService.ofDevice(deviceId).get()).isPresent()) {
-            log.warn("Unable to get the pipeconf of {}, aborting operation", deviceId);
+        if (!piPipeconfService.ofDevice(deviceId).isPresent()) {
+            log.warn("Unable to get assigned pipeconf for {} (mapping " +
+                             "missing in PiPipeconfService), aborting operation",
+                     deviceId);
             return false;
         }
-        pipeconf = piPipeconfService.getPipeconf(piPipeconfService.ofDevice(deviceId).get()).get();
+        PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).get();
+        if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
+            log.warn("Cannot find any pipeconf with ID '{}' ({}), aborting operation", pipeconfId, deviceId);
+            return false;
+        }
+        pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
 
         piTranslationService = handler().get(PiTranslationService.class);
 
@@ -89,12 +106,12 @@
     }
 
     /**
-     * Create a P4Runtime client for this device. Returns true if the operation
-     * was successful, false otherwise.
+     * Returns a P4Runtime client for this device, null if such client cannot
+     * be created.
      *
-     * @return true if successful, false otherwise
+     * @return client or null
      */
-    protected boolean createClient() {
+     P4RuntimeClient createClient() {
         deviceId = handler().data().deviceId();
         controller = handler().get(P4RuntimeController.class);
 
@@ -105,24 +122,38 @@
         if (serverAddr == null || serverPortString == null || p4DeviceIdString == null) {
             log.warn("Unable to create client for {}, missing driver data key (required is {}, {}, and {})",
                      deviceId, P4RUNTIME_SERVER_ADDR_KEY, P4RUNTIME_SERVER_PORT_KEY, P4RUNTIME_DEVICE_ID_KEY);
-            return false;
+            return null;
         }
 
-        if (!controller.createClient(deviceId, serverAddr,
-                                     Integer.parseUnsignedInt(serverPortString),
-                                     Long.parseUnsignedLong(p4DeviceIdString))) {
+         final int serverPort;
+         final long p4DeviceId;
+
+         try {
+            serverPort = Integer.parseUnsignedInt(serverPortString);
+        } catch (NumberFormatException e) {
+            log.error("{} is not a valid P4Runtime port number", serverPortString);
+            return null;
+        }
+         try {
+             p4DeviceId = Long.parseUnsignedLong(p4DeviceIdString);
+         } catch (NumberFormatException e) {
+             log.error("{} is not a valid P4Runtime-internal device ID", p4DeviceIdString);
+             return null;
+         }
+
+        if (!controller.createClient(deviceId, serverAddr, serverPort, p4DeviceId)) {
             log.warn("Unable to create client for {}, aborting operation", deviceId);
-            return false;
+            return null;
         }
 
-        return true;
+        return controller.getClient(deviceId);
     }
 
     /**
-     * Returns the value of the given driver property, if present,
-     * otherwise returns the given default value.
+     * Returns the value of the given driver property, if present, otherwise
+     * returns the given default value.
      *
-     * @param propName property name
+     * @param propName   property name
      * @param defaultVal default value
      * @return boolean
      */
@@ -134,4 +165,36 @@
             return Boolean.parseBoolean(handler().driver().getProperty(propName));
         }
     }
+
+    /**
+     * Convenience method to get the result of a completable future while
+     * setting a timeout and checking for exceptions.
+     *
+     * @param future        completable future
+     * @param opDescription operation description to use in log messages. Should
+     *                      be a sentence starting with a verb ending in -ing,
+     *                      e.g. "reading...", "writing...", etc.
+     * @param defaultValue  value to return if operation fails
+     * @param <U>           type of returned value
+     * @return future result or default value
+     */
+    <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
+                                U defaultValue) {
+        try {
+            return future.get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("Exception while {} on {}", opDescription, deviceId);
+        } catch (ExecutionException e) {
+            final Throwable cause = e.getCause();
+            if (cause instanceof StatusRuntimeException) {
+                final StatusRuntimeException grpcError = (StatusRuntimeException) cause;
+                log.warn("Error while {} on {}: {}", opDescription, deviceId, grpcError.getMessage());
+            } else {
+                log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
+            }
+        } catch (TimeoutException e) {
+            log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
+        }
+        return defaultValue;
+    }
 }