[ONOS-6564] Adding PiPeconf behaviours to driver for device.
Initial implementation of PiPipeconfService.
Tests for Initial implementation.

Change-Id: I9dea6fb3015788b8b61060c7f88395c3d45e6ed7
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 8909591..1b3c459 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -17,6 +17,8 @@
 package org.onosproject.provider.general.device.impl;
 
 import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -58,7 +60,9 @@
 import org.onosproject.net.driver.Driver;
 import org.onosproject.net.driver.DriverData;
 import org.onosproject.net.driver.DriverService;
-import org.onosproject.net.key.DeviceKeyAdminService;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.runtime.PiPipeconfConfig;
+import org.onosproject.net.pi.runtime.PiPipeconfService;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
@@ -66,12 +70,19 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
@@ -87,6 +98,7 @@
 @Component(immediate = true)
 public class GeneralDeviceProvider extends AbstractProvider
         implements DeviceProvider {
+    public static final String DRIVER = "driver";
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -105,7 +117,7 @@
     protected DriverService driverService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceKeyAdminService deviceKeyAdminService;
+    protected PiPipeconfService piPipeconfService;
 
     protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
     protected static final String URI_SCHEME = "device";
@@ -114,6 +126,15 @@
     private static final int CORE_POOL_SIZE = 10;
     private static final String UNKNOWN = "unknown";
     private static final int PORT_STATS_PERIOD_SECONDS = 10;
+    //FIXME this will be removed when the configuration is synced at the source.
+    private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
+    private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
+
+    //FIXME to be removed when netcfg will issue device events in a bundle or
+    //ensures all configuration needed is present
+    private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
+    private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
+    private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
 
 
     protected ScheduledExecutorService connectionExecutor
@@ -279,7 +300,7 @@
             log.error("Configuration is NULL: basic config {}, general provider " +
                     "config {}", basicDeviceConfig, providerConfig);
         } else {
-            log.info("Connecting to device {}", deviceId);
+            log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
 
             Driver driver = driverService.getDriver(basicDeviceConfig.driver());
             DriverData driverData = new DefaultDriverData(driver, deviceId);
@@ -287,62 +308,85 @@
             DeviceHandshaker handshaker =
                     getBehaviour(driver, DeviceHandshaker.class, driverData);
 
-            if (handshaker != null) {
-
-                //Storing deviceKeyId and all other config values
-                // as data in the driver with protocol_<info>
-                // name as the key. e.g protocol_ip
-                providerConfig.protocolsInfo()
-                        .forEach((protocol, deviceInfoConfig) -> {
-                            deviceInfoConfig.configValues()
-                                    .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
-                            driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
-                        });
-
-                //Connecting to the device
-                CompletableFuture<Boolean> connected = handshaker.connect();
-
-                connected.thenAcceptAsync(result -> {
-                    if (result) {
-
-                        //Populated with the default values obtained by the driver
-                        ChassisId cid = new ChassisId();
-                        SparseAnnotations annotations = DefaultAnnotations.builder()
-                                .set(AnnotationKeys.PROTOCOL,
-                                        providerConfig.protocolsInfo().keySet().toString())
-                                .build();
-                        DeviceDescription description =
-                                new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
-                                        driver.manufacturer(), driver.hwVersion(),
-                                        driver.swVersion(), UNKNOWN,
-                                        cid, false, annotations);
-                        //Empty list of ports
-                        List<PortDescription> ports = new ArrayList<>();
-
-                        if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
-                            DeviceDescriptionDiscovery deviceDiscovery = driver
-                                    .createBehaviour(driverData, DeviceDescriptionDiscovery.class);
-
-                            DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
-                            if (newdescription != null) {
-                                description = newdescription;
-                            }
-                            ports = deviceDiscovery.discoverPortDetails();
-                        }
-                        providerService.deviceConnected(deviceId, description);
-                        providerService.updatePorts(deviceId, ports);
-
-                    } else {
-                        log.warn("Can't connect to device {}", deviceId);
-                    }
-                });
-            } else {
+            if (handshaker == null) {
                 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
                         "behaviour, {}", deviceId, driver.name(), driver.behaviours());
+                return;
             }
+            //Storing deviceKeyId and all other config values
+            // as data in the driver with protocol_<info>
+            // name as the key. e.g protocol_ip
+            providerConfig.protocolsInfo()
+                    .forEach((protocol, deviceInfoConfig) -> {
+                        deviceInfoConfig.configValues()
+                                .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
+                        driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
+                    });
+
+            //Connecting to the device
+            CompletableFuture<Boolean> connected = handshaker.connect();
+
+            connected.thenAcceptAsync(result -> {
+                if (result) {
+
+                    //Populated with the default values obtained by the driver
+                    ChassisId cid = new ChassisId();
+                    SparseAnnotations annotations = DefaultAnnotations.builder()
+                            .set(AnnotationKeys.PROTOCOL,
+                                    providerConfig.protocolsInfo().keySet().toString())
+                            .build();
+                    DeviceDescription description =
+                            new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
+                                    driver.manufacturer(), driver.hwVersion(),
+                                    driver.swVersion(), UNKNOWN,
+                                    cid, false, annotations);
+                    //Empty list of ports
+                    List<PortDescription> ports = new ArrayList<>();
+
+                    if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
+                        DeviceDescriptionDiscovery deviceDiscovery = driver
+                                .createBehaviour(driverData, DeviceDescriptionDiscovery.class);
+
+                        DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
+                        if (newdescription != null) {
+                            description = newdescription;
+                        }
+                        ports = deviceDiscovery.discoverPortDetails();
+                    }
+
+                    Optional<PiPipeconfId> pipeconfId = piPipeconfService.ofDevice(deviceId);
+                    //Apply the Pipeline configuration and then connect the device
+                    if (pipeconfId.isPresent()) {
+                        DeviceDescription finalDescription = description;
+                        List<PortDescription> finalPorts = ports;
+                        piPipeconfService.bindToDevice(pipeconfId.get(), deviceId).whenComplete((success, ex) -> {
+                            if (success) {
+                                advertiseDevice(deviceId, finalDescription, finalPorts);
+                            } else {
+                                log.error("Can't merge driver {} with pipeconf {} for device {}, " +
+                                                "not reporting it to the device manager",
+                                        driver.name(), pipeconfId.get(), deviceId);
+                            }
+                        }).exceptionally(ex -> {
+                            throw new IllegalStateException(ex);
+                        });
+                    } else {
+                        //No other operation is needed, advertise the device to the core.
+                        advertiseDevice(deviceId, description, ports);
+                    }
+
+                } else {
+                    log.warn("Can't connect to device {}", deviceId);
+                }
+            });
         }
     }
 
+    private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
+        providerService.deviceConnected(deviceId, description);
+        providerService.updatePorts(deviceId, ports);
+    }
+
     private void disconnectDevice(DeviceId deviceId) {
         log.info("Disconnecting for device {}", deviceId);
         DeviceHandshaker handshaker = getHandshaker(deviceId);
@@ -396,21 +440,78 @@
                 log.debug("{} is not my scheme, skipping", deviceId);
                 return;
             }
-            if (deviceService.getDevice(deviceId) == null || !deviceService.isAvailable(deviceId)) {
-                connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
-            } else {
+            if (deviceService.getDevice(deviceId) != null || deviceService.isAvailable(deviceId)) {
                 log.info("Device {} is already connected to ONOS and is available", deviceId);
+                return;
+            }
+            //FIXME to be removed when netcfg will issue device events in a bundle or
+            // ensure all configuration needed is present
+            Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
+            lock.lock();
+            try {
+                if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
+                    //FIXME we currently assume that p4runtime devices are pipeline configurable.
+                    //If we want to connect a p4runtime device with no pipeline
+                    if (event.config().isPresent() &&
+                            Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
+                                    PIPELINE_CONFIGURABLE_PROTOCOLS)) {
+                        pipelineConfigured.add(deviceId);
+                    }
+                    deviceConfigured.add(deviceId);
+                } else if (event.configClass().equals(BasicDeviceConfig.class)) {
+                    if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
+                        //TODO add check for pipeline and add it to the pipeline list if no
+                        // p4runtime is present.
+                        driverConfigured.add(deviceId);
+                    }
+                } else if (event.configClass().equals(PiPipeconfConfig.class)) {
+                    if (event.config().isPresent()
+                            && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
+                        pipelineConfigured.add(deviceId);
+                    }
+                }
+                //if the device has no "pipeline configurable protocol it will be present
+                // in the pipelineConfigured
+                if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
+                        && pipelineConfigured.contains(deviceId)) {
+                    checkAndSubmitDeviceTask(deviceId);
+                } else {
+                    if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
+                        log.debug("Waiting for pipeline configuration for device {}", deviceId);
+                    } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
+                        log.debug("Waiting for device configuration for device {}", deviceId);
+                    } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
+                        log.debug("Waiting for driver configuration for device {}", deviceId);
+                    } else if (driverConfigured.contains(deviceId)) {
+                        log.debug("Only driver configuration for device {}", deviceId);
+                    } else if (deviceConfigured.contains(deviceId)) {
+                        log.debug("Only device configuration for device {}", deviceId);
+                    }
+                }
+            } finally {
+                lock.unlock();
             }
         }
 
         @Override
         public boolean isRelevant(NetworkConfigEvent event) {
-            return event.configClass().equals(GeneralProviderDeviceConfig.class) &&
+            return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
+                    event.configClass().equals(BasicDeviceConfig.class) ||
+                    event.configClass().equals(PiPipeconfConfig.class)) &&
                     (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
                             event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
         }
     }
 
+    private void checkAndSubmitDeviceTask(DeviceId deviceId) {
+        connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
+        //FIXME this will be removed when configuration is synced.
+        deviceConfigured.remove(deviceId);
+        driverConfigured.remove(deviceId);
+        pipelineConfigured.remove(deviceId);
+
+    }
+
     /**
      * Listener for core device events.
      */