Improve scalability of P4Runtime subsystem

The P4Runtime client was hanging (deadlock) on a master arbitration
request. As such, all other requests (e.g. table write) were waiting
for the client's request lock to become available.

Apart from fixing those deadlocks, this patch brings a number of
improvements that all together allow to run networks of 100+ P4Runtime
devices on a single ONOS instance (before only ~20 devices)

Includes:
- Asynchrounous mastership handling in DevicHandshaker (as defined in
the P4Runtime and OpenFlow spec)
- Refactored arbitration handling in the P4RuntimeClient
to be consistent with the P4Runtime spec
- Report suspect deadlocks in P4RuntimeClientImpl
- Exploit write errors in P4RuntimeClient to quickly report
channel/mastership errors to upper layers
- Complete all futures with deadlines in P4Runtime driver
- Dump all tables in one request
- Re-purposed ChannelEvent to DeviceAgentEvent to carry also mastership
response events
- Fixed IntelliJ warnings
- Various code and log clean-ups

Change-Id: I9376793a9fe69d8eddf7e8ac2ef0ee4c14fbd198
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index 987356b..d2773b2 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -35,20 +35,21 @@
 import org.onosproject.grpc.api.GrpcChannelId;
 import org.onosproject.grpc.api.GrpcController;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.ChannelEvent;
-import org.onosproject.net.device.ChannelListener;
+import org.onosproject.net.device.DeviceAgentEvent;
+import org.onosproject.net.device.DeviceAgentListener;
 import org.onosproject.p4runtime.api.P4RuntimeClient;
 import org.onosproject.p4runtime.api.P4RuntimeController;
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.onosproject.p4runtime.api.P4RuntimeEventListener;
-import org.onosproject.store.service.AtomicCounter;
 import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.math.BigInteger;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -65,14 +66,13 @@
         extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
         implements P4RuntimeController {
 
-    private static final String P4R_ELECTION = "p4runtime-election";
     private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
     private final Logger log = getLogger(getClass());
     private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
     private final Map<DeviceId, ClientKey> deviceIdToClientKey = Maps.newHashMap();
     private final Map<ClientKey, P4RuntimeClient> clientKeyToClient = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
-    private final Map<DeviceId, List<ChannelListener>> channelListeners = Maps.newConcurrentMap();
+    private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
     private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
             .expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
             .build(new CacheLoader<DeviceId, ReadWriteLock>() {
@@ -81,8 +81,7 @@
                     return new ReentrantReadWriteLock();
                 }
             });
-
-    private AtomicCounter electionIdGenerator;
+    private DistributedElectionIdGenerator electionIdGenerator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private GrpcController grpcController;
@@ -93,8 +92,7 @@
     @Activate
     public void activate() {
         eventDispatcher.addSink(P4RuntimeEvent.class, listenerRegistry);
-        electionIdGenerator = storageService.getAtomicCounter(P4R_ELECTION);
-
+        electionIdGenerator = new DistributedElectionIdGenerator(storageService);
         log.info("Started");
     }
 
@@ -102,6 +100,8 @@
     @Deactivate
     public void deactivate() {
         grpcController = null;
+        electionIdGenerator.destroy();
+        electionIdGenerator = null;
         eventDispatcher.removeSink(P4RuntimeEvent.class);
         log.info("Stopped");
     }
@@ -119,13 +119,13 @@
                 .usePlaintext(true);
 
         deviceLocks.getUnchecked(deviceId).writeLock().lock();
-        log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
-                 deviceId, serverAddr, serverPort, p4DeviceId);
 
         try {
             if (deviceIdToClientKey.containsKey(deviceId)) {
                 final ClientKey existingKey = deviceIdToClientKey.get(deviceId);
                 if (newKey.equals(existingKey)) {
+                    log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
+                             deviceId, serverAddr, serverPort, p4DeviceId);
                     return true;
                 } else {
                     throw new IllegalStateException(
@@ -133,6 +133,8 @@
                                     "server endpoints already exists");
                 }
             } else {
+                log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
+                         deviceId, serverAddr, serverPort, p4DeviceId);
                 return doCreateClient(newKey, channelBuilder);
             }
         } finally {
@@ -187,12 +189,11 @@
     public void removeClient(DeviceId deviceId) {
 
         deviceLocks.getUnchecked(deviceId).writeLock().lock();
-
         try {
             if (deviceIdToClientKey.containsKey(deviceId)) {
                 final ClientKey clientKey = deviceIdToClientKey.get(deviceId);
-                grpcController.disconnectChannel(channelIds.get(deviceId));
                 clientKeyToClient.remove(clientKey).shutdown();
+                grpcController.disconnectChannel(channelIds.get(deviceId));
                 deviceIdToClientKey.remove(deviceId);
                 channelIds.remove(deviceId);
             }
@@ -222,7 +223,7 @@
                 log.debug("No client for {}, can't check for reachability", deviceId);
                 return false;
             }
-
+            // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
             return grpcController.isChannelOpen(channelIds.get(deviceId));
         } finally {
             deviceLocks.getUnchecked(deviceId).readLock().unlock();
@@ -230,67 +231,73 @@
     }
 
     @Override
-    public long getNewMasterElectionId() {
-        return electionIdGenerator.incrementAndGet();
+    public void addDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+        deviceAgentListeners.putIfAbsent(deviceId, new CopyOnWriteArrayList<>());
+        deviceAgentListeners.get(deviceId).add(listener);
     }
 
     @Override
-    public void addChannelListener(DeviceId deviceId, ChannelListener listener) {
-        channelListeners.compute(deviceId, (devId, listeners) -> {
-            List<ChannelListener> newListeners;
-            if (listeners != null) {
-                newListeners = listeners;
-            } else {
-                newListeners = new ArrayList<>();
-            }
-            newListeners.add(listener);
-            return newListeners;
+    public void removeDeviceAgentListener(DeviceId deviceId, DeviceAgentListener listener) {
+        deviceAgentListeners.computeIfPresent(deviceId, (did, listeners) -> {
+            listeners.remove(listener);
+            return listeners;
         });
     }
 
-    @Override
-    public void removeChannelListener(DeviceId deviceId, ChannelListener listener) {
-        channelListeners.compute(deviceId, (devId, listeners) -> {
-            if (listeners != null) {
-                listeners.remove(listener);
-                return listeners;
-            } else {
-                log.debug("Device {} has no listener registered", deviceId);
-                return null;
-            }
-        });
+    BigInteger newMasterElectionId(DeviceId deviceId) {
+        return electionIdGenerator.generate(deviceId);
     }
 
     void postEvent(P4RuntimeEvent event) {
-        if (event.type().equals(P4RuntimeEvent.Type.CHANNEL_EVENT)) {
-            DefaultChannelEvent channelError = (DefaultChannelEvent) event.subject();
-            DeviceId deviceId = event.subject().deviceId();
-            ChannelEvent channelEvent = null;
-            //If disconnection is already known we propagate it.
-            if (channelError.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
-                channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
-                        channelError.throwable());
-            } else if (channelError.type().equals(ChannelEvent.Type.CHANNEL_ERROR)) {
-                //If we don't know what the error is we check for reachability
-                if (!isReacheable(deviceId)) {
-                    //if false the channel has disconnected
-                    channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_DISCONNECTED, channelError.deviceId(),
-                            channelError.throwable());
-                } else {
-                    // else we propagate the event.
-                    channelEvent = new ChannelEvent(ChannelEvent.Type.CHANNEL_ERROR, channelError.deviceId(),
-                            channelError.throwable());
-                }
-            }
-            //Ignoring CHANNEL_CONNECTED
-            if (channelEvent != null && channelListeners.get(deviceId) != null) {
-                for (ChannelListener listener : channelListeners.get(deviceId)) {
-                    listener.event(channelEvent);
-                }
-            }
-        } else {
-            post(event);
+        switch (event.type()) {
+            case CHANNEL_EVENT:
+                handleChannelEvent(event);
+                break;
+            case ARBITRATION_RESPONSE:
+                handleArbitrationReply(event);
+                break;
+            default:
+                post(event);
+                break;
         }
     }
 
+    private void handleChannelEvent(P4RuntimeEvent event) {
+        final ChannelEvent channelEvent = (ChannelEvent) event.subject();
+        final DeviceId deviceId = channelEvent.deviceId();
+        final DeviceAgentEvent.Type agentEventType;
+        switch (channelEvent.type()) {
+            case OPEN:
+                agentEventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
+                break;
+            case CLOSED:
+                agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
+                break;
+            case ERROR:
+                agentEventType = !isReacheable(deviceId)
+                        ? DeviceAgentEvent.Type.CHANNEL_CLOSED
+                        : DeviceAgentEvent.Type.CHANNEL_ERROR;
+                break;
+            default:
+                log.warn("Unrecognized channel event type {}", channelEvent.type());
+                return;
+        }
+        postDeviceAgentEvent(deviceId, new DeviceAgentEvent(agentEventType, deviceId));
+    }
+
+    private void handleArbitrationReply(P4RuntimeEvent event) {
+        final DeviceId deviceId = event.subject().deviceId();
+        final ArbitrationResponse response = (ArbitrationResponse) event.subject();
+        final DeviceAgentEvent.Type roleType = response.isMaster()
+                ? DeviceAgentEvent.Type.ROLE_MASTER
+                : DeviceAgentEvent.Type.ROLE_STANDBY;
+        postDeviceAgentEvent(deviceId, new DeviceAgentEvent(
+                roleType, response.deviceId()));
+    }
+
+    private void postDeviceAgentEvent(DeviceId deviceId, DeviceAgentEvent event) {
+        if (deviceAgentListeners.containsKey(deviceId)) {
+            deviceAgentListeners.get(deviceId).forEach(l -> l.event(event));
+        }
+    }
 }