More improvements and bugfixes in P4Runtime subsystem

Most notably, we fix a bug in which some nodes were not able to find
pipeconf-specific behaviors for a given device. The problem is not
completelly solved but it's mitigated.

There's a race condition caused by the fact that the GDP updates the cfg
with the merged driver name before advertising the device to the core.
Some nodes might receive the cfg update after the device has been
advertised. We mitigate the problem by performing the pipeline deploy
(slow operation) after the cfg update, giving more time for nodes
to catch up. Perhaps we should listen for cfg update events before
advertising the device to the core?

Also:
- NPE when getting P4Runtime client
- Detect if a base driver is already merged in pipeconf manager
- Longer timeouts in P4Runtime driver and protocol (for slow networks)
- Configurable timeout in P4Runtime driver and GDP
- NPE when adding/removing device agent listeners in P4Rtunime handshaker
- Various exceptions due to race conditions in GDP when disconnecting
devices (by serializing disconnect tasks per device)
- NPE when cancelling polling tasks in GDP
- Refactored PipeconfService to distinguish between driver merge,
pipeconf map update, and cfg update (now performed in the GDP)
- Fixed PipeconfManagerTest, not testing driver behaviours
- Use Guava striped locks when possible (more memory-efficient than maps,
and with strict atomicity guarantees w.r.t. to caches).

Change-Id: I30f3887541ba0fd44439a86885e9821ac565b64c
diff --git a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
index d2773b2..b35b999 100644
--- a/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
+++ b/protocols/p4runtime/ctl/src/main/java/org/onosproject/p4runtime/ctl/P4RuntimeControllerImpl.java
@@ -16,10 +16,8 @@
 
 package org.onosproject.p4runtime.ctl;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.NameResolverProvider;
@@ -50,10 +48,10 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -66,21 +64,16 @@
         extends AbstractListenerManager<P4RuntimeEvent, P4RuntimeEventListener>
         implements P4RuntimeController {
 
-    private static final int DEVICE_LOCK_EXPIRE_TIME_IN_MIN = 10;
     private final Logger log = getLogger(getClass());
     private final NameResolverProvider nameResolverProvider = new DnsNameResolverProvider();
-    private final Map<DeviceId, ClientKey> deviceIdToClientKey = Maps.newHashMap();
-    private final Map<ClientKey, P4RuntimeClient> clientKeyToClient = Maps.newHashMap();
+
+    private final Map<DeviceId, ClientKey> clientKeys = Maps.newHashMap();
+    private final Map<ClientKey, P4RuntimeClient> clients = Maps.newHashMap();
     private final Map<DeviceId, GrpcChannelId> channelIds = Maps.newHashMap();
+
     private final ConcurrentMap<DeviceId, List<DeviceAgentListener>> deviceAgentListeners = Maps.newConcurrentMap();
-    private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
-            .expireAfterAccess(DEVICE_LOCK_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
-            .build(new CacheLoader<DeviceId, ReadWriteLock>() {
-                @Override
-                public ReadWriteLock load(DeviceId deviceId) {
-                    return new ReentrantReadWriteLock();
-                }
-            });
+    private final Striped<Lock> stripedLocks = Striped.lock(30);
+
     private DistributedElectionIdGenerator electionIdGenerator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -99,6 +92,11 @@
 
     @Deactivate
     public void deactivate() {
+        clientKeys.keySet().forEach(this::removeClient);
+        clientKeys.clear();
+        clients.clear();
+        channelIds.clear();
+        deviceAgentListeners.clear();
         grpcController = null;
         electionIdGenerator.destroy();
         electionIdGenerator = null;
@@ -111,44 +109,40 @@
                                 int serverPort, long p4DeviceId) {
         checkNotNull(deviceId);
         checkNotNull(serverAddr);
+        checkArgument(serverPort > 0, "Invalid server port");
 
-        ClientKey newKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
+        return withDeviceLock(() -> doCreateClient(
+                deviceId, serverAddr, serverPort, p4DeviceId), deviceId);
+    }
+
+    private boolean doCreateClient(DeviceId deviceId, String serverAddr,
+                                   int serverPort, long p4DeviceId) {
+
+        ClientKey clientKey = new ClientKey(deviceId, serverAddr, serverPort, p4DeviceId);
+
+        if (clientKeys.containsKey(deviceId)) {
+            final ClientKey existingKey = clientKeys.get(deviceId);
+            if (clientKey.equals(existingKey)) {
+                log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
+                         deviceId, serverAddr, serverPort, p4DeviceId);
+                return true;
+            } else {
+                throw new IllegalStateException(
+                        "A client for the same device ID but different " +
+                                "server endpoints already exists");
+            }
+        }
+
+        log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
+                 deviceId, serverAddr, serverPort, p4DeviceId);
+
+        GrpcChannelId channelId = GrpcChannelId.of(
+                clientKey.deviceId(), "p4runtime-" + clientKey);
 
         ManagedChannelBuilder channelBuilder = NettyChannelBuilder
                 .forAddress(serverAddr, serverPort)
-                .usePlaintext(true);
-
-        deviceLocks.getUnchecked(deviceId).writeLock().lock();
-
-        try {
-            if (deviceIdToClientKey.containsKey(deviceId)) {
-                final ClientKey existingKey = deviceIdToClientKey.get(deviceId);
-                if (newKey.equals(existingKey)) {
-                    log.info("Not creating client for {} as it already exists (server={}:{}, p4DeviceId={})...",
-                             deviceId, serverAddr, serverPort, p4DeviceId);
-                    return true;
-                } else {
-                    throw new IllegalStateException(
-                            "A client for the same device ID but different " +
-                                    "server endpoints already exists");
-                }
-            } else {
-                log.info("Creating client for {} (server={}:{}, p4DeviceId={})...",
-                         deviceId, serverAddr, serverPort, p4DeviceId);
-                return doCreateClient(newKey, channelBuilder);
-            }
-        } finally {
-            deviceLocks.getUnchecked(deviceId).writeLock().unlock();
-        }
-    }
-
-    private boolean doCreateClient(ClientKey clientKey, ManagedChannelBuilder channelBuilder) {
-
-        GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(),
-                                                   "p4runtime-" + clientKey.p4DeviceId());
-
-        // Channel defaults.
-        channelBuilder.nameResolverFactory(nameResolverProvider);
+                .usePlaintext(true)
+                .nameResolverFactory(nameResolverProvider);
 
         ManagedChannel channel;
         try {
@@ -162,72 +156,69 @@
         P4RuntimeClient client = new P4RuntimeClientImpl(
                 clientKey.deviceId(), clientKey.p4DeviceId(), channel, this);
 
+        clientKeys.put(clientKey.deviceId(), clientKey);
+        clients.put(clientKey, client);
         channelIds.put(clientKey.deviceId(), channelId);
-        deviceIdToClientKey.put(clientKey.deviceId(), clientKey);
-        clientKeyToClient.put(clientKey, client);
 
         return true;
     }
 
     @Override
     public P4RuntimeClient getClient(DeviceId deviceId) {
+        if (deviceId == null) {
+            return null;
+        }
+        return withDeviceLock(() -> doGetClient(deviceId), deviceId);
+    }
 
-        deviceLocks.getUnchecked(deviceId).readLock().lock();
-
-        try {
-            if (!deviceIdToClientKey.containsKey(deviceId)) {
-                return null;
-            } else {
-                return clientKeyToClient.get(deviceIdToClientKey.get(deviceId));
-            }
-        } finally {
-            deviceLocks.getUnchecked(deviceId).readLock().unlock();
+    private P4RuntimeClient doGetClient(DeviceId deviceId) {
+        if (!clientKeys.containsKey(deviceId)) {
+            return null;
+        } else {
+            return clients.get(clientKeys.get(deviceId));
         }
     }
 
     @Override
     public void removeClient(DeviceId deviceId) {
-
-        deviceLocks.getUnchecked(deviceId).writeLock().lock();
-        try {
-            if (deviceIdToClientKey.containsKey(deviceId)) {
-                final ClientKey clientKey = deviceIdToClientKey.get(deviceId);
-                clientKeyToClient.remove(clientKey).shutdown();
-                grpcController.disconnectChannel(channelIds.get(deviceId));
-                deviceIdToClientKey.remove(deviceId);
-                channelIds.remove(deviceId);
-            }
-        } finally {
-           deviceLocks.getUnchecked(deviceId).writeLock().unlock();
+        if (deviceId == null) {
+            return;
         }
+        withDeviceLock(() -> doRemoveClient(deviceId), deviceId);
+    }
+
+    private Void doRemoveClient(DeviceId deviceId) {
+        if (clientKeys.containsKey(deviceId)) {
+            final ClientKey clientKey = clientKeys.get(deviceId);
+            clients.get(clientKey).shutdown();
+            grpcController.disconnectChannel(channelIds.get(deviceId));
+            clientKeys.remove(deviceId);
+            clients.remove(clientKey);
+            channelIds.remove(deviceId);
+        }
+        return null;
     }
 
     @Override
     public boolean hasClient(DeviceId deviceId) {
-        deviceLocks.getUnchecked(deviceId).readLock().lock();
-
-        try {
-            return deviceIdToClientKey.containsKey(deviceId);
-        } finally {
-            deviceLocks.getUnchecked(deviceId).readLock().unlock();
-        }
+        return clientKeys.containsKey(deviceId);
     }
 
     @Override
-    public boolean isReacheable(DeviceId deviceId) {
-
-        deviceLocks.getUnchecked(deviceId).readLock().lock();
-
-        try {
-            if (!deviceIdToClientKey.containsKey(deviceId)) {
-                log.debug("No client for {}, can't check for reachability", deviceId);
-                return false;
-            }
-            // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
-            return grpcController.isChannelOpen(channelIds.get(deviceId));
-        } finally {
-            deviceLocks.getUnchecked(deviceId).readLock().unlock();
+    public boolean isReachable(DeviceId deviceId) {
+        if (deviceId == null) {
+            return false;
         }
+        return withDeviceLock(() -> doIsReacheable(deviceId), deviceId);
+    }
+
+    private boolean doIsReacheable(DeviceId deviceId) {
+        // FIXME: we're not checking for a P4Runtime server, it could be any gRPC service
+        if (!clientKeys.containsKey(deviceId)) {
+            log.debug("No client for {}, can't check for reachability", deviceId);
+            return false;
+        }
+        return grpcController.isChannelOpen(channelIds.get(deviceId));
     }
 
     @Override
@@ -244,6 +235,16 @@
         });
     }
 
+    private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
+        final Lock lock = stripedLocks.get(deviceId);
+        lock.lock();
+        try {
+            return task.get();
+        } finally {
+            lock.unlock();
+        }
+    }
+
     BigInteger newMasterElectionId(DeviceId deviceId) {
         return electionIdGenerator.generate(deviceId);
     }
@@ -274,7 +275,7 @@
                 agentEventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
                 break;
             case ERROR:
-                agentEventType = !isReacheable(deviceId)
+                agentEventType = !isReachable(deviceId)
                         ? DeviceAgentEvent.Type.CHANNEL_CLOSED
                         : DeviceAgentEvent.Type.CHANNEL_ERROR;
                 break;