More improvements and bugfixes in P4Runtime subsystem

Most notably, we fix a bug in which some nodes were not able to find
pipeconf-specific behaviors for a given device. The problem is not
completelly solved but it's mitigated.

There's a race condition caused by the fact that the GDP updates the cfg
with the merged driver name before advertising the device to the core.
Some nodes might receive the cfg update after the device has been
advertised. We mitigate the problem by performing the pipeline deploy
(slow operation) after the cfg update, giving more time for nodes
to catch up. Perhaps we should listen for cfg update events before
advertising the device to the core?

Also:
- NPE when getting P4Runtime client
- Detect if a base driver is already merged in pipeconf manager
- Longer timeouts in P4Runtime driver and protocol (for slow networks)
- Configurable timeout in P4Runtime driver and GDP
- NPE when adding/removing device agent listeners in P4Rtunime handshaker
- Various exceptions due to race conditions in GDP when disconnecting
devices (by serializing disconnect tasks per device)
- NPE when cancelling polling tasks in GDP
- Refactored PipeconfService to distinguish between driver merge,
pipeconf map update, and cfg update (now performed in the GDP)
- Fixed PipeconfManagerTest, not testing driver behaviours
- Use Guava striped locks when possible (more memory-efficient than maps,
and with strict atomicity guarantees w.r.t. to caches).

Change-Id: I30f3887541ba0fd44439a86885e9821ac565b64c
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index ba16601..5e07333 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -16,10 +16,8 @@
 
 package org.onosproject.net.pi.impl;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.annotations.Beta;
+import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableSet;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -29,7 +27,6 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.ItemNotFoundException;
 import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigRegistry;
@@ -52,7 +49,6 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -73,19 +69,14 @@
 
     private final Logger log = getLogger(getClass());
 
+    private static final String MERGED_DRIVER_SEPARATOR = ":";
     private static final String DRIVER = "driver";
     private static final String CFG_SCHEME = "piPipeconf";
 
-    private static final String DRIVER_MERGE_TOPIC =
-            PiPipeconfManager.class.getSimpleName() + "-driver-merge-";
-
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetworkConfigRegistry cfgService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private LeadershipService leadershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DriverService driverService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -165,10 +156,13 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> bindToDevice(
-            PiPipeconfId pipeconfId, DeviceId deviceId) {
-        return CompletableFuture.supplyAsync(() -> doMergeDriver(
-                deviceId, pipeconfId), executor);
+    public void bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId) {
+        pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
+    }
+
+    @Override
+    public String mergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
+        return doMergeDriver(deviceId, pipeconfId);
     }
 
     @Override
@@ -176,55 +170,79 @@
         return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
     }
 
-    private boolean doMergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
-        // Perform the following operations:
-        // 1) ALL nodes: create and register new merged driver (pipeconf + base driver)
-        // 2) ONE node (leader): updated netcfg with new driver
-        log.warn("Starting device driver merge of {} with {}...", deviceId, pipeconfId);
+    private String doMergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
+        log.debug("Starting device driver merge of {} with {}...", deviceId, pipeconfId);
         final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
                 deviceId, BasicDeviceConfig.class);
-        final Driver baseDriver = driverService.getDriver(
-                basicDeviceConfig.driver());
-        final String newDriverName = baseDriver.name() + ":" + pipeconfId;
-        if (baseDriver.name().equals(newDriverName)) {
-            log.warn("Requested to merge {} driver with {} for {}, "
-                             + "but current device driver is already merged",
-                     baseDriver.name(), pipeconfId, deviceId);
-            return true;
+        if (basicDeviceConfig == null) {
+            log.warn("Unable to get basic device config for {}, " +
+                             "aborting pipeconf driver merge");
+            return null;
         }
-        final PiPipeconf pipeconf = pipeconfs.get(pipeconfId);
-        if (pipeconf == null) {
-            log.error("Pipeconf {} is not registered", pipeconfId);
-            return false;
+        String baseDriverName = basicDeviceConfig.driver();
+        if (baseDriverName.endsWith(mergedDriverSuffix(pipeconfId))) {
+            // The config already has driver name that is a merged one. We still
+            // need to make sure an instance of that merged driver is present in
+            // this node.
+            log.debug("Base driver of {} ({}) has been already merged with {}",
+                      deviceId, baseDriverName, pipeconfId);
+            baseDriverName = getBaseDriverNameFromMerged(baseDriverName);
         }
-        // 1) if merged driver exists already we don't create a new one.
+
+        final String newDriverName = mergedDriverName(baseDriverName, pipeconfId);
+        // If merged driver exists already we don't create a new one.
         try {
             driverService.getDriver(newDriverName);
-            log.info("Found existing merged driver {}, re-using that", newDriverName);
+            return newDriverName;
         } catch (ItemNotFoundException e) {
             log.info("Creating merged driver {}...", newDriverName);
-            createMergedDriver(pipeconf, baseDriver, newDriverName);
         }
-        // 2) Updating device cfg to enforce the merged driver (one node only)
-        final boolean isLeader = leadershipService
-                .runForLeadership(DRIVER_MERGE_TOPIC + deviceId.toString())
-                .leaderNodeId()
-                .equals(clusterService.getLocalNode().id());
-        if (isLeader) {
-            // FIXME: this binding should be updated by the same entity
-            // deploying the pipeconf.
-            pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
-            if (!basicDeviceConfig.driver().equals(newDriverName)) {
-                log.info("Applying new driver {} for device {} via cfg...",
-                         newDriverName, deviceId);
-                setDriverViaCfg(deviceId, newDriverName, basicDeviceConfig);
-            }
+        final Driver mergedDriver = buildMergedDriver(
+                pipeconfId, baseDriverName, newDriverName);
+        if (mergedDriver == null) {
+            // Error logged by buildMergedDriver
+            return null;
         }
-        return true;
+        registerMergedDriver(mergedDriver);
+        return newDriverName;
     }
 
-    private void createMergedDriver(PiPipeconf pipeconf, Driver baseDriver,
-                                    String newDriverName) {
+    private String mergedDriverSuffix(PiPipeconfId pipeconfId) {
+        return MERGED_DRIVER_SEPARATOR + pipeconfId.id();
+    }
+
+    private String mergedDriverName(String baseDriverName, PiPipeconfId pipeconfId) {
+        return baseDriverName + mergedDriverSuffix(pipeconfId);
+    }
+
+    private String getBaseDriverNameFromMerged(String mergedDriverName) {
+        final String[] pieces = mergedDriverName.split(MERGED_DRIVER_SEPARATOR);
+        if (pieces.length != 2) {
+            log.error("Unrecognized merged driver name format '{}', cannot " +
+                              "extract base driver name", mergedDriverName);
+            return null;
+        }
+        return pieces[0];
+    }
+
+    private Driver buildMergedDriver(PiPipeconfId pipeconfId, String baseDriverName,
+                                     String newDriverName) {
+        final Driver baseDriver;
+        try {
+            baseDriver = driverService.getDriver(baseDriverName);
+        } catch (ItemNotFoundException e) {
+            log.error("Base driver {} not found, cannot build a merged one",
+                      baseDriverName);
+            return null;
+        }
+
+        final PiPipeconf pipeconf = pipeconfs.get(pipeconfId);
+        if (pipeconf == null) {
+            log.error("Pipeconf {} is not registered, cannot build a merged driver",
+                      pipeconfId);
+            return null;
+        }
+
         // extract the behaviours from the pipipeconf.
         final Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours =
                 new HashMap<>();
@@ -236,29 +254,27 @@
                 baseDriver.swVersion(), behaviours, new HashMap<>());
         // take the base driver created with the behaviours of the PiPeconf and
         // merge it with the base driver that was assigned to the device
-        final Driver completeDriver = piPipeconfDriver.merge(baseDriver);
-        // This might lead to explosion of number of providers in the core,
-        // due to 1:1:1 pipeconf:driver:provider maybe find better way
-        final DriverProvider provider = new PiPipeconfDriverProviderInternal(
-                completeDriver);
-        // register the merged driver
+        return piPipeconfDriver.merge(baseDriver);
+    }
+
+    private void registerMergedDriver(Driver driver) {
+        final DriverProvider provider = new InternalDriverProvider(driver);
+        if (driverAdminService.getProviders().contains(provider)) {
+            // A provider for this driver already exist.
+            return;
+        }
         driverAdminService.registerProvider(provider);
     }
 
-    private void setDriverViaCfg(DeviceId deviceId, String driverName,
-                                 BasicDeviceConfig basicDeviceConfig) {
-        ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
-        newCfg = newCfg.put(DRIVER, driverName);
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
-        cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
-    }
-
-    private class PiPipeconfDriverProviderInternal implements DriverProvider {
+    /**
+     * Internal driver provider used to register merged pipeconf drivers in the
+     * core.
+     */
+    private class InternalDriverProvider implements DriverProvider {
 
         Driver driver;
 
-        PiPipeconfDriverProviderInternal(Driver driver) {
+        InternalDriverProvider(Driver driver) {
             this.driver = driver;
         }
 
@@ -266,5 +282,22 @@
         public Set<Driver> getDrivers() {
             return ImmutableSet.of(driver);
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            InternalDriverProvider that = (InternalDriverProvider) o;
+            return Objects.equal(driver.name(), that.driver.name());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(driver.name());
+        }
     }
 }