More improvements and bugfixes in P4Runtime subsystem

Most notably, we fix a bug in which some nodes were not able to find
pipeconf-specific behaviors for a given device. The problem is not
completelly solved but it's mitigated.

There's a race condition caused by the fact that the GDP updates the cfg
with the merged driver name before advertising the device to the core.
Some nodes might receive the cfg update after the device has been
advertised. We mitigate the problem by performing the pipeline deploy
(slow operation) after the cfg update, giving more time for nodes
to catch up. Perhaps we should listen for cfg update events before
advertising the device to the core?

Also:
- NPE when getting P4Runtime client
- Detect if a base driver is already merged in pipeconf manager
- Longer timeouts in P4Runtime driver and protocol (for slow networks)
- Configurable timeout in P4Runtime driver and GDP
- NPE when adding/removing device agent listeners in P4Rtunime handshaker
- Various exceptions due to race conditions in GDP when disconnecting
devices (by serializing disconnect tasks per device)
- NPE when cancelling polling tasks in GDP
- Refactored PipeconfService to distinguish between driver merge,
pipeconf map update, and cfg update (now performed in the GDP)
- Fixed PipeconfManagerTest, not testing driver behaviours
- Use Guava striped locks when possible (more memory-efficient than maps,
and with strict atomicity guarantees w.r.t. to caches).

Change-Id: I30f3887541ba0fd44439a86885e9821ac565b64c
diff --git a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfService.java b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfService.java
index 1e4088e..b574b5b 100644
--- a/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfService.java
+++ b/core/api/src/main/java/org/onosproject/net/pi/service/PiPipeconfService.java
@@ -22,7 +22,6 @@
 import org.onosproject.net.pi.model.PiPipeconfId;
 
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * A service to manage the configurations of protocol-independent pipelines.
@@ -36,17 +35,21 @@
      * Registers the given pipeconf.
      *
      * @param pipeconf a pipeconf
-     * @throws IllegalStateException if the same pipeconf identifier is already registered.
+     * @throws IllegalStateException if the same pipeconf identifier is already
+     *                               registered.
      */
     void register(PiPipeconf pipeconf) throws IllegalStateException;
 
     /**
-     * Unregisters the Pipeconf identified by the given PiPipeconfId. Unregistering a Pipeconf removes it from the ONOS
-     * controller, thus making it un-capable of controlling (e.g installing flow rules) the devices that have the
-     * pipeconf's P4 program deployed. For now this method DOES NOT remove the P4 program from the devices.
+     * Unregisters the Pipeconf identified by the given PiPipeconfId.
+     * Unregistering a Pipeconf removes it from the ONOS controller, thus making
+     * it un-capable of controlling (e.g installing flow rules) the devices that
+     * have the pipeconf's P4 program deployed. For now this method DOES NOT
+     * remove the P4 program from the devices.
      *
      * @param pipeconfId a pipeconfId
-     * @throws IllegalStateException if the same pipeconf identifier is already registered.
+     * @throws IllegalStateException if the same pipeconf identifier is already
+     *                               registered.
      */
     void remove(PiPipeconfId pipeconfId) throws IllegalStateException;
 
@@ -58,8 +61,9 @@
     Iterable<PiPipeconf> getPipeconfs();
 
     /**
-     * Returns the pipeconf instance associated with the given identifier, if present. If not present, it means that no
-     * pipeconf with such identifier has been registered so far.
+     * Returns the pipeconf instance associated with the given identifier, if
+     * present. If not present, it means that no pipeconf with such identifier
+     * has been registered so far.
      *
      * @param id a pipeconf identifier
      * @return an optional pipeconf
@@ -67,21 +71,39 @@
     Optional<PiPipeconf> getPipeconf(PiPipeconfId id);
 
     /**
-     * Binds the given pipeconf to the given infrastructure device. As a result of this method call, if the given
-     * pipeconf exposes any pipeline-specific behaviours, those will be merged to the device's driver. Returns a
-     * completable future to provide async methods with a boolean if the merge of the drivers succeeded.
+     * Signals that the given pipeconf is associated to the given infrastructure
+     * device. As a result of this method, the pipeconf for the given device can
+     * be later retrieved using {@link #ofDevice(DeviceId)}
+     *
+     * @param pipeconfId a pipeconf identifier
+     * @param deviceId   a device identifier
+     */
+    void bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId);
+
+    /**
+     * Returns the name of a driver that is equivalent to the base driver of the
+     * given device plus all the pipeline-specific behaviors exposed by the
+     * given pipeconf (previously registered using {@link
+     * #register(PiPipeconf)}). If such driver does not exist, this method
+     * creates one and registers is with all necessary ONOS subsystems, such
+     * that the returned name can be used to retrieve the driver instance using
+     * {@link org.onosproject.net.driver.DriverService#getDriver(String)}.
+     * <p>
+     * This method needs to be called on all nodes of the cluster that wants to
+     * use such merged driver.
+     * <p>
+     * Returns null if such merged driver cannot be created.
      *
      * @param deviceId   a device identifier
      * @param pipeconfId a pipeconf identifier
-     * @return a CompletableFuture with a boolean, true if operation succeeded
+     * @return driver name or null.
      */
-    // TODO: This service doesn't make any effort in deploying the configuration to the device.
-    // Someone else should do that.
-    CompletableFuture<Boolean> bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId);
+    String mergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId);
 
     /**
-     * Returns the pipeconf identifier currently associated with the given device identifier, if present. If not
-     * present, it means no pipeconf has been associated with that device so far.
+     * Returns the pipeconf identifier currently associated with the given
+     * device identifier, if present. If not present, it means no pipeconf has
+     * been associated with that device so far.
      *
      * @param deviceId device identifier
      * @return an optional pipeconf identifier
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index ba16601..5e07333 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -16,10 +16,8 @@
 
 package org.onosproject.net.pi.impl;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.annotations.Beta;
+import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableSet;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -29,7 +27,6 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.ItemNotFoundException;
 import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigRegistry;
@@ -52,7 +49,6 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -73,19 +69,14 @@
 
     private final Logger log = getLogger(getClass());
 
+    private static final String MERGED_DRIVER_SEPARATOR = ":";
     private static final String DRIVER = "driver";
     private static final String CFG_SCHEME = "piPipeconf";
 
-    private static final String DRIVER_MERGE_TOPIC =
-            PiPipeconfManager.class.getSimpleName() + "-driver-merge-";
-
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetworkConfigRegistry cfgService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private LeadershipService leadershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DriverService driverService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -165,10 +156,13 @@
     }
 
     @Override
-    public CompletableFuture<Boolean> bindToDevice(
-            PiPipeconfId pipeconfId, DeviceId deviceId) {
-        return CompletableFuture.supplyAsync(() -> doMergeDriver(
-                deviceId, pipeconfId), executor);
+    public void bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId) {
+        pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
+    }
+
+    @Override
+    public String mergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
+        return doMergeDriver(deviceId, pipeconfId);
     }
 
     @Override
@@ -176,55 +170,79 @@
         return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
     }
 
-    private boolean doMergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
-        // Perform the following operations:
-        // 1) ALL nodes: create and register new merged driver (pipeconf + base driver)
-        // 2) ONE node (leader): updated netcfg with new driver
-        log.warn("Starting device driver merge of {} with {}...", deviceId, pipeconfId);
+    private String doMergeDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
+        log.debug("Starting device driver merge of {} with {}...", deviceId, pipeconfId);
         final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
                 deviceId, BasicDeviceConfig.class);
-        final Driver baseDriver = driverService.getDriver(
-                basicDeviceConfig.driver());
-        final String newDriverName = baseDriver.name() + ":" + pipeconfId;
-        if (baseDriver.name().equals(newDriverName)) {
-            log.warn("Requested to merge {} driver with {} for {}, "
-                             + "but current device driver is already merged",
-                     baseDriver.name(), pipeconfId, deviceId);
-            return true;
+        if (basicDeviceConfig == null) {
+            log.warn("Unable to get basic device config for {}, " +
+                             "aborting pipeconf driver merge");
+            return null;
         }
-        final PiPipeconf pipeconf = pipeconfs.get(pipeconfId);
-        if (pipeconf == null) {
-            log.error("Pipeconf {} is not registered", pipeconfId);
-            return false;
+        String baseDriverName = basicDeviceConfig.driver();
+        if (baseDriverName.endsWith(mergedDriverSuffix(pipeconfId))) {
+            // The config already has driver name that is a merged one. We still
+            // need to make sure an instance of that merged driver is present in
+            // this node.
+            log.debug("Base driver of {} ({}) has been already merged with {}",
+                      deviceId, baseDriverName, pipeconfId);
+            baseDriverName = getBaseDriverNameFromMerged(baseDriverName);
         }
-        // 1) if merged driver exists already we don't create a new one.
+
+        final String newDriverName = mergedDriverName(baseDriverName, pipeconfId);
+        // If merged driver exists already we don't create a new one.
         try {
             driverService.getDriver(newDriverName);
-            log.info("Found existing merged driver {}, re-using that", newDriverName);
+            return newDriverName;
         } catch (ItemNotFoundException e) {
             log.info("Creating merged driver {}...", newDriverName);
-            createMergedDriver(pipeconf, baseDriver, newDriverName);
         }
-        // 2) Updating device cfg to enforce the merged driver (one node only)
-        final boolean isLeader = leadershipService
-                .runForLeadership(DRIVER_MERGE_TOPIC + deviceId.toString())
-                .leaderNodeId()
-                .equals(clusterService.getLocalNode().id());
-        if (isLeader) {
-            // FIXME: this binding should be updated by the same entity
-            // deploying the pipeconf.
-            pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
-            if (!basicDeviceConfig.driver().equals(newDriverName)) {
-                log.info("Applying new driver {} for device {} via cfg...",
-                         newDriverName, deviceId);
-                setDriverViaCfg(deviceId, newDriverName, basicDeviceConfig);
-            }
+        final Driver mergedDriver = buildMergedDriver(
+                pipeconfId, baseDriverName, newDriverName);
+        if (mergedDriver == null) {
+            // Error logged by buildMergedDriver
+            return null;
         }
-        return true;
+        registerMergedDriver(mergedDriver);
+        return newDriverName;
     }
 
-    private void createMergedDriver(PiPipeconf pipeconf, Driver baseDriver,
-                                    String newDriverName) {
+    private String mergedDriverSuffix(PiPipeconfId pipeconfId) {
+        return MERGED_DRIVER_SEPARATOR + pipeconfId.id();
+    }
+
+    private String mergedDriverName(String baseDriverName, PiPipeconfId pipeconfId) {
+        return baseDriverName + mergedDriverSuffix(pipeconfId);
+    }
+
+    private String getBaseDriverNameFromMerged(String mergedDriverName) {
+        final String[] pieces = mergedDriverName.split(MERGED_DRIVER_SEPARATOR);
+        if (pieces.length != 2) {
+            log.error("Unrecognized merged driver name format '{}', cannot " +
+                              "extract base driver name", mergedDriverName);
+            return null;
+        }
+        return pieces[0];
+    }
+
+    private Driver buildMergedDriver(PiPipeconfId pipeconfId, String baseDriverName,
+                                     String newDriverName) {
+        final Driver baseDriver;
+        try {
+            baseDriver = driverService.getDriver(baseDriverName);
+        } catch (ItemNotFoundException e) {
+            log.error("Base driver {} not found, cannot build a merged one",
+                      baseDriverName);
+            return null;
+        }
+
+        final PiPipeconf pipeconf = pipeconfs.get(pipeconfId);
+        if (pipeconf == null) {
+            log.error("Pipeconf {} is not registered, cannot build a merged driver",
+                      pipeconfId);
+            return null;
+        }
+
         // extract the behaviours from the pipipeconf.
         final Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours =
                 new HashMap<>();
@@ -236,29 +254,27 @@
                 baseDriver.swVersion(), behaviours, new HashMap<>());
         // take the base driver created with the behaviours of the PiPeconf and
         // merge it with the base driver that was assigned to the device
-        final Driver completeDriver = piPipeconfDriver.merge(baseDriver);
-        // This might lead to explosion of number of providers in the core,
-        // due to 1:1:1 pipeconf:driver:provider maybe find better way
-        final DriverProvider provider = new PiPipeconfDriverProviderInternal(
-                completeDriver);
-        // register the merged driver
+        return piPipeconfDriver.merge(baseDriver);
+    }
+
+    private void registerMergedDriver(Driver driver) {
+        final DriverProvider provider = new InternalDriverProvider(driver);
+        if (driverAdminService.getProviders().contains(provider)) {
+            // A provider for this driver already exist.
+            return;
+        }
         driverAdminService.registerProvider(provider);
     }
 
-    private void setDriverViaCfg(DeviceId deviceId, String driverName,
-                                 BasicDeviceConfig basicDeviceConfig) {
-        ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
-        newCfg = newCfg.put(DRIVER, driverName);
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
-        cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
-    }
-
-    private class PiPipeconfDriverProviderInternal implements DriverProvider {
+    /**
+     * Internal driver provider used to register merged pipeconf drivers in the
+     * core.
+     */
+    private class InternalDriverProvider implements DriverProvider {
 
         Driver driver;
 
-        PiPipeconfDriverProviderInternal(Driver driver) {
+        InternalDriverProvider(Driver driver) {
             this.driver = driver;
         }
 
@@ -266,5 +282,22 @@
         public Set<Driver> getDrivers() {
             return ImmutableSet.of(driver);
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            InternalDriverProvider that = (InternalDriverProvider) o;
+            return Objects.equal(driver.name(), that.driver.name());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(driver.name());
+        }
     }
 }
diff --git a/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java b/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
index 4d29e69..a418461 100644
--- a/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfManagerTest.java
@@ -20,11 +20,11 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.util.ItemNotFoundException;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.behaviour.Pipeliner;
 import org.onosproject.net.config.Config;
 import org.onosproject.net.config.ConfigApplyDelegate;
 import org.onosproject.net.config.ConfigFactory;
@@ -46,9 +46,7 @@
 import org.onosproject.net.driver.DriverServiceAdapter;
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.model.PiPipeconfId;
-import org.onosproject.net.pi.model.PiPipelineInterpreter;
 import org.onosproject.net.pi.service.PiPipeconfConfig;
-import org.onosproject.pipelines.basic.PipeconfLoader;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -61,6 +59,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.onosproject.pipelines.basic.PipeconfLoader.BASIC_PIPECONF;
 
 
 /**
@@ -70,15 +69,12 @@
 
     private static final DeviceId DEVICE_ID = DeviceId.deviceId("test:test");
     private static final String BASE_DRIVER = "baseDriver";
-    private static final Set<Class<? extends Behaviour>> EXPECTED_BEHAVIOURS =
-            ImmutableSet.of(DeviceDescriptionDiscovery.class, Pipeliner.class, PiPipelineInterpreter.class);
 
     //Mock util sets and classes
     private final NetworkConfigRegistry cfgService = new MockNetworkConfigRegistry();
     private final DriverService driverService = new MockDriverService();
     private final DriverAdminService driverAdminService = new MockDriverAdminService();
     private Driver baseDriver = new MockDriver();
-    private String completeDriverName;
 
     private final Set<ConfigFactory> cfgFactories = new HashSet<>();
     private final Set<NetworkConfigListener> netCfgListeners = new HashSet<>();
@@ -99,8 +95,7 @@
     @Before
     public void setUp() throws IOException {
         piPipeconfService = new PiPipeconfManager();
-        piPipeconf = PipeconfLoader.BASIC_PIPECONF;
-        completeDriverName = BASE_DRIVER + ":" + piPipeconf.id();
+        piPipeconf = BASIC_PIPECONF;
         piPipeconfService.cfgService = cfgService;
         piPipeconfService.driverService = driverService;
         piPipeconfService.driverAdminService = driverAdminService;
@@ -147,7 +142,7 @@
 
 
     @Test
-    public void bindToDevice() throws Exception {
+    public void mergeDriver() {
         PiPipeconfId piPipeconfId = cfgService.getConfig(DEVICE_ID, PiPipeconfConfig.class).piPipeconfId();
         assertEquals(piPipeconf.id(), piPipeconfId);
 
@@ -158,25 +153,24 @@
         assertEquals("Returned PiPipeconf is not correct", piPipeconf,
                      piPipeconfService.getPipeconf(piPipeconf.id()).get());
 
-        piPipeconfService.bindToDevice(piPipeconfId, DEVICE_ID).whenComplete((booleanResult, ex) -> {
+        String mergedDriverName = piPipeconfService.mergeDriver(DEVICE_ID, piPipeconfId);
 
-            //we assume that the provider is 1 and that it contains 1 driver
-            //we also assume that everything after driverAdminService.registerProvider(provider); has been tested.
-            assertTrue("Provider should be registered", providers.size() != 0);
+        //we assume that the provider is 1 and that it contains 1 driver
+        //we also assume that everything after driverAdminService.registerProvider(provider); has been tested.
+        assertTrue("Provider should be registered", providers.size() == 1);
 
-            assertTrue("Boolean Result of method should be True", booleanResult);
+        assertTrue("Merged driver name should be valid",
+                   mergedDriverName != null && !mergedDriverName.isEmpty());
 
-            providers.forEach(p -> {
-                assertTrue("Provider should contain a driver", p.getDrivers().size() != 0);
-                p.getDrivers().forEach(driver -> {
-                    assertEquals("The driver has wrong name", driver.name(), completeDriverName);
-                    assertEquals("The driver contains wrong behaviours", EXPECTED_BEHAVIOURS, driver.behaviours());
+        DriverProvider provider = providers.iterator().next();
+        assertTrue("Provider should contain one driver", provider.getDrivers().size() == 1);
 
-                });
-            });
-        }).exceptionally(ex -> {
-            throw new IllegalStateException(ex);
-        });
+        Driver driver = provider.getDrivers().iterator().next();
+
+        Set<Class<? extends Behaviour>> expectedBehaviours = Sets.newHashSet();
+        expectedBehaviours.addAll(BASIC_PIPECONF.behaviours());
+        expectedBehaviours.addAll(baseDriver.behaviours());
+        assertEquals("The driver contains wrong behaviours", expectedBehaviours, driver.behaviours());
     }
 
     private class MockNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
@@ -230,6 +224,11 @@
         public void registerProvider(DriverProvider provider) {
             providers.add(provider);
         }
+
+        @Override
+        public Set<DriverProvider> getProviders() {
+            return providers;
+        }
     }
 
     private class MockDelegate implements ConfigApplyDelegate {