Improve scalability of P4Runtime subsystem

The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.

Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)

Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups

Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 550ca59..2b31b91 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -50,9 +50,9 @@
 import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.config.basics.BasicDeviceConfig;
 import org.onosproject.net.config.basics.SubjectFactories;
-import org.onosproject.net.device.ChannelEvent;
-import org.onosproject.net.device.ChannelListener;
 import org.onosproject.net.device.DefaultDeviceDescription;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.net.device.DeviceDescription;
 import org.onosproject.net.device.DeviceDescriptionDiscovery;
 import org.onosproject.net.device.DeviceEvent;
@@ -95,6 +95,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -102,6 +103,7 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static java.util.concurrent.Executors.newFixedThreadPool;
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.device.DeviceEvent.Type;
@@ -116,45 +118,48 @@
 @Component(immediate = true)
 public class GeneralDeviceProvider extends AbstractProvider
         implements DeviceProvider {
-    public static final String DRIVER = "driver";
-    public static final int REACHABILITY_TIMEOUT = 10;
-    public static final String DEPLOY = "deploy-";
-    public static final String PIPECONF_TOPIC = "-pipeconf";
-    public static final String CHECK = "check-";
-    public static final String CONNECTION = "-connection";
+
+    // Timeout in seconds for operations on devices.
+    private static final int DEVICE_OP_TIMEOUT = 10;
+
+    private static final String DRIVER = "driver";
+    private static final String DEPLOY = "deploy-";
+    private static final String PIPECONF_TOPIC = "-pipeconf";
+    private static final String CHECK = "check-";
+    private static final String CONNECTION = "-connection";
     private static final String POLL_FREQUENCY = "pollFrequency";
 
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceProviderRegistry providerRegistry;
+    private DeviceProviderRegistry providerRegistry;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ComponentConfigService componentConfigService;
+    private ComponentConfigService componentConfigService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected NetworkConfigRegistry cfgService;
+    private NetworkConfigRegistry cfgService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected CoreService coreService;
+    private CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceService deviceService;
+    private DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DriverService driverService;
+    private DriverService driverService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected MastershipService mastershipService;
+    private MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected PiPipeconfService piPipeconfService;
+    private PiPipeconfService piPipeconfService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterService clusterService;
+    private ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LeadershipService leadershipService;
+    private LeadershipService leadershipService;
 
     private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
     @Property(name = POLL_FREQUENCY, intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
@@ -168,9 +173,9 @@
                     "default is 10 sec")
     private int deviceAvailabilityPollFrequency = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS;
 
-    protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
-    protected static final String URI_SCHEME = "device";
-    protected static final String CFG_SCHEME = "generalprovider";
+    private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
+    private static final String URI_SCHEME = "device";
+    private static final String CFG_SCHEME = "generalprovider";
     private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
     private static final int CORE_POOL_SIZE = 10;
     private static final String UNKNOWN = "unknown";
@@ -187,25 +192,24 @@
 
     private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
 
+    private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
 
-    protected ScheduledExecutorService connectionExecutor
-            = newScheduledThreadPool(CORE_POOL_SIZE,
-            groupedThreads("onos/generaldeviceprovider-device",
-                    "connection-executor-%d", log));
-    protected ScheduledExecutorService portStatsExecutor
-            = newScheduledThreadPool(CORE_POOL_SIZE,
-            groupedThreads("onos/generaldeviceprovider-port-stats",
-                    "port-stats-executor-%d", log));
-    protected ScheduledExecutorService availabilityCheckExecutor
-            = newScheduledThreadPool(CORE_POOL_SIZE,
-            groupedThreads("onos/generaldeviceprovider-availability-check",
-                    "availability-check-executor-%d", log));
-    protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
 
-    protected DeviceProviderService providerService;
+    private ExecutorService connectionExecutor
+            = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
+                    "onos/generaldeviceprovider-device-connect", "%d", log));
+    private ScheduledExecutorService portStatsExecutor
+            = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
+                    "onos/generaldeviceprovider-port-stats", "%d", log));
+    private ScheduledExecutorService availabilityCheckExecutor
+            = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
+                    "onos/generaldeviceprovider-availability-check", "%d", log));
+    private ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
+
+    private DeviceProviderService providerService;
     private InternalDeviceListener deviceListener = new InternalDeviceListener();
 
-    protected final ConfigFactory factory =
+    private final ConfigFactory factory =
             new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
                     SubjectFactories.DEVICE_SUBJECT_FACTORY,
                     GeneralProviderDeviceConfig.class, CFG_SCHEME) {
@@ -215,8 +219,8 @@
                 }
             };
 
-    protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
-    private ChannelListener channelListener = new InternalChannelListener();
+    private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
+    private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
 
 
     @Activate
@@ -233,7 +237,8 @@
         cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
                 .forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
         //Initiating a periodic check to see if any device is available again and reconnect it.
-        availabilityCheckExecutor.scheduleAtFixedRate(this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
+        availabilityCheckExecutor.scheduleAtFixedRate(
+                this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
                 deviceAvailabilityPollFrequency, TimeUnit.SECONDS);
         modified(context);
         log.info("Started");
@@ -255,12 +260,12 @@
             Set<DeviceId> deviceSubjects =
                     cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
             deviceSubjects.forEach(deviceId -> {
-                if (!compareScheme(deviceId)) {
+                if (notMyScheme(deviceId)) {
                     // not under my scheme, skipping
                     log.debug("{} is not my scheme, skipping", deviceId);
                     return;
                 }
-                scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, true));
+                scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, true));
             });
         }
     }
@@ -299,17 +304,18 @@
     @Override
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
         log.info("Received role {} for device {}", newRole, deviceId);
-        CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
-        roleReply.thenAcceptAsync(mastership -> {
-            providerService.receivedRoleReply(deviceId, newRole, mastership);
-            if (!mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) != null) {
-                scheduledTasks.get(deviceId).cancel(false);
-                scheduledTasks.remove(deviceId);
-            } else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
-                scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, false));
-                updatePortStatistics(deviceId);
-            }
-        });
+        requestedRoles.put(deviceId, newRole);
+        connectionExecutor.submit(() -> doRoleChanged(deviceId, newRole));
+    }
+
+    private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
+        DeviceHandshaker handshaker = getHandshaker(deviceId);
+        if (handshaker == null) {
+            log.warn("Null handshaker. Unable to notify new role {} to {}",
+                     newRole, deviceId);
+            return;
+        }
+        handshaker.roleChanged(newRole);
     }
 
     @Override
@@ -321,9 +327,9 @@
             return false;
         }
 
-        CompletableFuture<Boolean> reachable = handshaker.isReachable();
         try {
-            return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
+            return handshaker.isReachable()
+                    .get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.warn("Device {} is not reachable {}", deviceId, e.getMessage());
             log.debug("Exception", e);
@@ -358,11 +364,8 @@
     @Override
     public void triggerDisconnect(DeviceId deviceId) {
         log.debug("Triggering disconnection of device {}", deviceId);
-        connectionExecutor.execute(() -> {
-            disconnectDevice(deviceId).whenComplete((success, ex) -> {
-                checkAndConnect(deviceId);
-            });
-        });
+        connectionExecutor.execute(() -> disconnectDevice(deviceId)
+                .thenRunAsync(() -> checkAndConnect(deviceId)));
     }
 
     private DeviceHandshaker getHandshaker(DeviceId deviceId) {
@@ -438,7 +441,7 @@
 
             connected.thenAcceptAsync(result -> {
                 if (result) {
-                    handshaker.addChannelListener(channelListener);
+                    handshaker.addDeviceAgentListener(deviceAgentListener);
                     //Populated with the default values obtained by the driver
                     ChassisId cid = new ChassisId();
                     SparseAnnotations annotations = DefaultAnnotations.builder()
@@ -496,7 +499,7 @@
         //Connecting to the device
         handshaker.connect().thenAcceptAsync(result -> {
             if (result) {
-                handshaker.addChannelListener(channelListener);
+                handshaker.addDeviceAgentListener(deviceAgentListener);
                 handlePipeconf(deviceId, handshaker.handler().driver(), handshaker.data(), false);
             }
         });
@@ -583,32 +586,29 @@
     private CompletableFuture<Boolean> disconnectDevice(DeviceId deviceId) {
         log.info("Disconnecting for device {}", deviceId);
 
-        CompletableFuture<Boolean> disconnectError = new CompletableFuture<>();
+        if (scheduledTasks.containsKey(deviceId)) {
+            scheduledTasks.remove(deviceId).cancel(true);
+        }
 
         DeviceHandshaker handshaker = handshakers.remove(deviceId);
-        if (handshaker != null) {
-            handshaker.disconnect().thenAcceptAsync(result -> {
-                if (result) {
-                    log.info("Disconnected device {}", deviceId);
-                    providerService.deviceDisconnected(deviceId);
-                    disconnectError.complete(true);
-                } else {
-                    disconnectError.complete(false);
-                    log.warn("Device {} was unable to disconnect", deviceId);
-                }
-            });
-        } else {
-            //gracefully ignoring.
+
+        if (handshaker == null) {
+            // gracefully ignoring.
             log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
-                    "shutdown of communication", deviceId);
-            disconnectError.complete(false);
+                             "shutdown of communication", deviceId);
+            return CompletableFuture.completedFuture(false);
         }
-        ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
-        if (pollingStatisticsTask != null) {
-            pollingStatisticsTask.cancel(true);
-            scheduledTasks.remove(deviceId);
-        }
-        return disconnectError;
+
+        return handshaker.disconnect()
+                .thenApplyAsync(result -> {
+                    if (result) {
+                        log.info("Disconnected device {}", deviceId);
+                        providerService.deviceDisconnected(deviceId);
+                    } else {
+                        log.warn("Device {} was unable to disconnect", deviceId);
+                    }
+                    return result;
+                });
     }
 
     //Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -637,8 +637,8 @@
         }
     }
 
-    private boolean compareScheme(DeviceId deviceId) {
-        return deviceId.uri().getScheme().equals(URI_SCHEME);
+    private boolean notMyScheme(DeviceId deviceId) {
+        return !deviceId.uri().getScheme().equals(URI_SCHEME);
     }
 
     /**
@@ -650,7 +650,7 @@
         public void event(NetworkConfigEvent event) {
             DeviceId deviceId = (DeviceId) event.subject();
             //Assuming that the deviceId comes with uri 'device:'
-            if (!compareScheme(deviceId)) {
+            if (notMyScheme(deviceId)) {
                 // not under my scheme, skipping
                 log.debug("{} is not my scheme, skipping", deviceId);
                 return;
@@ -765,7 +765,7 @@
         pipelineConfigured.remove(deviceId);
     }
 
-    private ScheduledFuture<?> scheduleStatistcsPolling(DeviceId deviceId, boolean randomize) {
+    private ScheduledFuture<?> scheduleStatsPolling(DeviceId deviceId, boolean randomize) {
         int delay = 0;
         if (randomize) {
             delay = new SecureRandom().nextInt(10);
@@ -826,6 +826,34 @@
         return present;
     }
 
+    private void handleChannelClosed(DeviceId deviceId) {
+        disconnectDevice(deviceId).thenRunAsync(() -> {
+            // If master, notifies disconnection to the core.
+            if (mastershipService.isLocalMaster(deviceId)) {
+                log.info("Disconnecting device {}, due to channel closed event",
+                         deviceId);
+                providerService.deviceDisconnected(deviceId);
+            }
+        });
+    }
+
+    private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
+        //Notify core about response.
+        if (!requestedRoles.containsKey(deviceId)) {
+            return;
+        }
+        providerService.receivedRoleReply(deviceId, requestedRoles.get(deviceId), response);
+        // If not master, cancel polling tasks, otherwise start them.
+        if (!response.equals(MastershipRole.MASTER)
+                && scheduledTasks.get(deviceId) != null) {
+            scheduledTasks.remove(deviceId).cancel(false);
+        } else if (response.equals(MastershipRole.MASTER)
+                && scheduledTasks.get(deviceId) == null) {
+            scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
+            updatePortStatistics(deviceId);
+        }
+    }
+
     /**
      * Listener for core device events.
      */
@@ -834,11 +862,10 @@
         public void event(DeviceEvent event) {
             DeviceId deviceId = event.subject().id();
             // FIXME handling for mastership change scenario missing?
-
             // For now this is scheduled periodically, when streaming API will
             // be available we check and base it on the streaming API (e.g. gNMI)
             if (mastershipService.isLocalMaster(deviceId)) {
-                scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, false));
+                scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
             }
         }
 
@@ -850,26 +877,37 @@
     }
 
     /**
-     * Listener for device channel events.
+     * Listener for device agent events.
      */
-    private class InternalChannelListener implements ChannelListener {
+    private class InternalDeviceAgentListener implements DeviceAgentListener {
 
         @Override
-        public void event(ChannelEvent event) {
+        public void event(DeviceAgentEvent event) {
             DeviceId deviceId = event.subject();
-            if (event.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
-                //let's properly handle device disconnection
-                CompletableFuture<Boolean> disconnection = disconnectDevice(deviceId);
-                disconnection.thenAcceptAsync(result -> {
-                    //If master notifying of disconnection to the core.
-                    if (mastershipService.isLocalMaster(deviceId)) {
-                        log.info("Disconnecting unreachable device {}, due to error on channel", deviceId);
-                        providerService.deviceDisconnected(deviceId);
-                    }
-                });
-
+            switch (event.type()) {
+                case CHANNEL_OPEN:
+                    // Ignore.
+                    break;
+                case CHANNEL_CLOSED:
+                    handleChannelClosed(deviceId);
+                    break;
+                case CHANNEL_ERROR:
+                    // TODO evaluate other reaction to channel error.
+                    log.warn("Received CHANNEL_ERROR from {}. Is the channel still open?",
+                             deviceId);
+                    break;
+                case ROLE_MASTER:
+                    handleMastershipResponse(deviceId, MastershipRole.MASTER);
+                    break;
+                case ROLE_STANDBY:
+                    handleMastershipResponse(deviceId, MastershipRole.STANDBY);
+                    break;
+                case ROLE_NONE:
+                    handleMastershipResponse(deviceId, MastershipRole.NONE);
+                    break;
+                default:
+                    log.warn("Unrecognized device agent event {}", event.type());
             }
-            //TODO evaluate other type of reactions.
         }
 
     }