[ONOS-6564] Adding PiPeconf behaviours to driver for device.
Initial implementation of PiPipeconfService.
Tests for Initial implementation.

Change-Id: I9dea6fb3015788b8b61060c7f88395c3d45e6ed7
diff --git a/core/api/src/main/java/org/onosproject/net/pi/model/PiPipeconf.java b/core/api/src/main/java/org/onosproject/net/pi/model/PiPipeconf.java
index 385c490..99754a8 100644
--- a/core/api/src/main/java/org/onosproject/net/pi/model/PiPipeconf.java
+++ b/core/api/src/main/java/org/onosproject/net/pi/model/PiPipeconf.java
@@ -45,7 +45,7 @@
     PiPipelineModel pipelineModel();
 
     /**
-     * Returns all pipeline-specific behaviours defined by this configuration.
+     * Returns all pipeline-specific behaviour interfaces defined by this configuration.
      *
      * @return a collection of behaviours
      */
diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfConfig.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfConfig.java
new file mode 100644
index 0000000..c476740
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.pi.runtime;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.Config;
+import org.onosproject.net.pi.model.PiPipeconfId;
+
+/**
+ * Configuration fot the PiPipeconf susbystem.
+ */
+@Beta
+public class PiPipeconfConfig extends Config<DeviceId> {
+
+    public static final String PIPIPECONFID = "piPipeconfId";
+
+    @Override
+    public boolean isValid() {
+        return hasOnlyFields(PIPIPECONFID);
+        //TODO will reinstate after synchonization of events
+        //&& !piPipeconfId().id().equals("");
+    }
+
+    public PiPipeconfId piPipeconfId() {
+        return new PiPipeconfId(get(PIPIPECONFID, ""));
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfService.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfService.java
index c037eb6..34b5174 100644
--- a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfService.java
+++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfService.java
@@ -22,6 +22,7 @@
 import org.onosproject.net.pi.model.PiPipeconfId;
 
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * A service to manage the configurations of protocol-independent pipelines.
@@ -58,14 +59,16 @@
     /**
      * Binds the given pipeconf to the given infrastructure device. As a result of this method call,
      * if the given pipeconf exposes any pipeline-specific behaviours, those will be merged to the
-     * device's driver.
+     * device's driver. Returns a completable future to provide async methods with a boolean if the merge
+     * of the drivers succeeded.
      *
      * @param deviceId a device identifier
-     * @param pipeconf a pipeconf identifier
+     * @param pipeconfId a pipeconf identifier
+     * @return a CompletableFuture with a boolean, true if operation succeeded
      */
     // TODO: This service doesn't make any effort in deploying the configuration to the device.
     // Someone else should do that.
-    void bindToDevice(PiPipeconfId pipeconf, DeviceId deviceId);
+    CompletableFuture<Boolean> bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId);
 
     /**
      * Returns the pipeconf identifier currently associated with the given device identifier, if
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfServiceImpl.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfServiceImpl.java
new file mode 100644
index 0000000..1e66d7f
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfServiceImpl.java
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.ItemNotFoundException;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.basics.BasicDeviceConfig;
+import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.driver.Behaviour;
+import org.onosproject.net.driver.DefaultDriver;
+import org.onosproject.net.driver.Driver;
+import org.onosproject.net.driver.DriverAdminService;
+import org.onosproject.net.driver.DriverProvider;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.runtime.PiPipeconfConfig;
+import org.onosproject.net.pi.runtime.PiPipeconfService;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+
+/**
+ * Implementation of the PiPipeconfService.
+ */
+@Component(immediate = true)
+@Service
+public class PiPipeconfServiceImpl implements PiPipeconfService {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final String DRIVER = "driver";
+    private static final String CFG_SCHEME = "piPipeconf";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected NetworkConfigRegistry cfgService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DriverService driverService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DriverAdminService driverAdminService;
+
+    //TODO move to replicated map
+    protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>();
+    //TODO move to replicated map
+    protected ConcurrentHashMap<DeviceId, PiPipeconfId> devicesToPipeconf = new ConcurrentHashMap<>();
+
+    protected ExecutorService executor =
+            Executors.newFixedThreadPool(5, groupedThreads("onos/pipipeconfservice",
+                    "pipeline-to-device-%d", log));
+
+    protected final ConfigFactory factory =
+            new ConfigFactory<DeviceId, PiPipeconfConfig>(
+                    SubjectFactories.DEVICE_SUBJECT_FACTORY,
+                    PiPipeconfConfig.class, CFG_SCHEME) {
+                @Override
+                public PiPipeconfConfig createConfig() {
+                    return new PiPipeconfConfig();
+                }
+            };
+
+    protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
+
+    @Activate
+    public void activate() {
+        cfgService.registerConfigFactory(factory);
+        cfgService.addListener(cfgListener);
+        cfgService.getSubjects(DeviceId.class, PiPipeconfConfig.class)
+                .forEach(this::addPipeconfFromCfg);
+        log.info("Started");
+    }
+
+
+    @Deactivate
+    public void deactivate() {
+        executor.shutdown();
+        cfgService.removeListener(cfgListener);
+        cfgService.unregisterConfigFactory(factory);
+        piPipeconfs.clear();
+        devicesToPipeconf.clear();
+        cfgService = null;
+        driverAdminService = null;
+        driverService = null;
+        log.info("Stopped");
+    }
+
+    @Override
+    public void register(PiPipeconf pipeconf) throws IllegalStateException {
+        log.warn("Currently using local maps, needs to be moved to a distributed store");
+        piPipeconfs.put(pipeconf.id(), pipeconf);
+    }
+
+    @Override
+    public Iterable<PiPipeconf> getPipeconfs() {
+        throw new UnsupportedOperationException("Currently unsupported");
+    }
+
+    @Override
+    public Optional<PiPipeconf> getPipeconf(PiPipeconfId id) {
+        return Optional.ofNullable(piPipeconfs.get(id));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId) {
+        CompletableFuture<Boolean> operationResult = new CompletableFuture<>();
+
+        executor.execute(() -> {
+            BasicDeviceConfig basicDeviceConfig =
+                    cfgService.getConfig(deviceId, BasicDeviceConfig.class);
+            Driver baseDriver = driverService.getDriver(basicDeviceConfig.driver());
+
+            String completeDriverName = baseDriver.name() + ":" + pipeconfId;
+            PiPipeconf piPipeconf = piPipeconfs.get(pipeconfId);
+            if (piPipeconf == null) {
+                log.warn("Pipeconf {} is not present", pipeconfId);
+                operationResult.complete(false);
+            } else {
+                //if driver exists already we don't create a new one.
+                //needs to be done via exception catching due to DriverRegistry throwing it on a null return from
+                //the driver map.
+                try {
+                    driverService.getDriver(completeDriverName);
+                } catch (ItemNotFoundException e) {
+
+                    log.debug("First time pipeconf {} is used with base driver {}, merging the two",
+                            pipeconfId, baseDriver);
+                    //extract the behaviours from the pipipeconf.
+                    Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours = new HashMap<>();
+                    piPipeconf.behaviours().forEach(b -> {
+                        behaviours.put(b, piPipeconf.implementation(b).get());
+                    });
+
+                    Driver piPipeconfDriver = new DefaultDriver(completeDriverName, baseDriver.parents(),
+                            baseDriver.manufacturer(), baseDriver.hwVersion(), baseDriver.swVersion(),
+                            behaviours, new HashMap<>());
+                    //we take the base driver created with the behaviours of the PiPeconf and
+                    // merge it with the base driver that was assigned to the device
+                    Driver completeDriver = piPipeconfDriver.merge(baseDriver);
+
+                    //This might lead to explosion of number of providers in the core,
+                    // due to 1:1:1 pipeconf:driver:provider maybe find better way
+                    DriverProvider provider = new PiPipeconfDriverProviderInternal(completeDriver);
+
+                    //we register to the dirver susbystem the driver provider containing the merged driver
+                    driverAdminService.registerProvider(provider);
+                }
+
+                //Changing the configuration for the device to enforce the full driver with pipipeconf
+                // and base behaviours
+                ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
+                newCfg = newCfg.put(DRIVER, completeDriverName);
+                ObjectMapper mapper = new ObjectMapper();
+                JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
+                cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
+                //Completable future is needed for when this method will also apply the pipeline to the device.
+                operationResult.complete(true);
+            }
+        });
+        return operationResult;
+    }
+
+    @Override
+    public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) {
+        return Optional.ofNullable(devicesToPipeconf.get(deviceId));
+    }
+
+    private class PiPipeconfDriverProviderInternal implements DriverProvider {
+
+        Driver driver;
+
+        PiPipeconfDriverProviderInternal(Driver driver) {
+            this.driver = driver;
+        }
+
+        @Override
+        public Set<Driver> getDrivers() {
+            return ImmutableSet.of(driver);
+        }
+    }
+
+    private void addPipeconfFromCfg(DeviceId deviceId) {
+        PiPipeconfConfig pipeconfConfig =
+                cfgService.getConfig(deviceId, PiPipeconfConfig.class);
+        PiPipeconfId id = pipeconfConfig.piPipeconfId();
+        if (id.id().equals("")) {
+            log.warn("Not adding empty pipeconfId for device {}", deviceId);
+        } else {
+            devicesToPipeconf.put(deviceId, pipeconfConfig.piPipeconfId());
+        }
+    }
+
+    /**
+     * Listener for configuration events.
+     */
+    private class InternalNetworkConfigListener implements NetworkConfigListener {
+
+
+        @Override
+        public void event(NetworkConfigEvent event) {
+            DeviceId deviceId = (DeviceId) event.subject();
+            addPipeconfFromCfg(deviceId);
+        }
+
+        @Override
+        public boolean isRelevant(NetworkConfigEvent event) {
+            return event.configClass().equals(PiPipeconfConfig.class) &&
+                    (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
+                            event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
+        }
+    }
+}
diff --git a/core/net/src/test/java/org/onosproject/net/pi/impl/MockPipeconf.java b/core/net/src/test/java/org/onosproject/net/pi/impl/MockPipeconf.java
index 6694215..c5f90eb 100644
--- a/core/net/src/test/java/org/onosproject/net/pi/impl/MockPipeconf.java
+++ b/core/net/src/test/java/org/onosproject/net/pi/impl/MockPipeconf.java
@@ -43,7 +43,7 @@
 
     private final PiPipeconfId id;
     private final PiPipelineModel pipelineModel;
-    private final Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours;
+    protected final Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours;
 
     public MockPipeconf() throws IOException {
         this.id = new PiPipeconfId(PIPECONF_ID);
@@ -70,7 +70,7 @@
 
     @Override
     public Collection<Class<? extends Behaviour>> behaviours() {
-        return behaviours.values();
+        return behaviours.keySet();
     }
 
     @Override
diff --git a/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfServiceImplTest.java b/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfServiceImplTest.java
new file mode 100644
index 0000000..e9a91d3
--- /dev/null
+++ b/core/net/src/test/java/org/onosproject/net/pi/impl/PiPipeconfServiceImplTest.java
@@ -0,0 +1,299 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.util.ItemNotFoundException;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.Pipeliner;
+import org.onosproject.net.behaviour.PipelinerAdapter;
+import org.onosproject.net.config.Config;
+import org.onosproject.net.config.ConfigApplyDelegate;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.NetworkConfigRegistryAdapter;
+import org.onosproject.net.config.basics.BasicDeviceConfig;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceDescriptionDiscovery;
+import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.driver.Behaviour;
+import org.onosproject.net.driver.Driver;
+import org.onosproject.net.driver.DriverAdapter;
+import org.onosproject.net.driver.DriverAdminService;
+import org.onosproject.net.driver.DriverAdminServiceAdapter;
+import org.onosproject.net.driver.DriverProvider;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.driver.DriverServiceAdapter;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.model.PiPipelineInterpreter;
+import org.onosproject.net.pi.runtime.PiPipeconfConfig;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Unit Test Class for PiPipeconfServiceImpl.
+ */
+public class PiPipeconfServiceImplTest {
+
+    private static final DeviceId DEVICE_ID = DeviceId.deviceId("test:test");
+    private static final String BASE_DRIVER = "baseDriver";
+    private static final Set<Class<? extends Behaviour>> EXPECTED_BEHAVIOURS =
+            ImmutableSet.of(DeviceDescriptionDiscovery.class, Pipeliner.class, PiPipelineInterpreter.class);
+
+    //Mock util sets and classes
+    private final NetworkConfigRegistry cfgService = new MockNetworkConfigRegistry();
+    private final DriverService driverService = new MockDriverService();
+    private final DriverAdminService driverAdminService = new MockDriverAdminService();
+    private Driver baseDriver = new MockDriver();
+    private String completeDriverName;
+
+    private final Set<ConfigFactory> cfgFactories = new HashSet<>();
+    private final Set<NetworkConfigListener> netCfgListeners = new HashSet<>();
+    private final Set<DriverProvider> providers = new HashSet<>();
+
+    private final PiPipeconfConfig piPipeconfConfig = new PiPipeconfConfig();
+    private final InputStream jsonStream = PiPipeconfServiceImplTest.class
+            .getResourceAsStream("/org/onosproject/net/pi/impl/piPipeconfId.json");
+    private final BasicDeviceConfig basicDeviceConfig = new BasicDeviceConfig();
+    private final InputStream jsonStreamBasic = PiPipeconfServiceImplTest.class
+            .getResourceAsStream("/org/onosproject/net/pi/impl/basic.json");
+
+
+    //Services
+    private PiPipeconfServiceImpl piPipeconfService;
+    private MockPipeconf piPipeconf;
+
+    @Before
+    public void setUp() throws IOException {
+        piPipeconfService = new PiPipeconfServiceImpl();
+        piPipeconf = new MockPipeconf();
+        completeDriverName = BASE_DRIVER + ":" + piPipeconf.id();
+        piPipeconf.behaviours.put(Pipeliner.class, PipelinerAdapter.class);
+        piPipeconfService.cfgService = cfgService;
+        piPipeconfService.driverService = driverService;
+        piPipeconfService.driverAdminService = driverAdminService;
+        String key = "piPipeconf";
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode jsonNode = mapper.readTree(jsonStream);
+        ConfigApplyDelegate delegate = new MockDelegate();
+        piPipeconfConfig.init(DEVICE_ID, key, jsonNode, mapper, delegate);
+        String keyBasic = "basic";
+        JsonNode jsonNodeBasic = mapper.readTree(jsonStreamBasic);
+        basicDeviceConfig.init(DEVICE_ID, keyBasic, jsonNodeBasic, mapper, delegate);
+        piPipeconfService.activate();
+    }
+
+    @Test
+    public void activate() {
+        assertEquals("Incorrect driver service", driverService, piPipeconfService.driverService);
+        assertEquals("Incorrect driverAdminService service", driverAdminService, piPipeconfService.driverAdminService);
+        assertEquals("Incorrect configuration service", cfgService, piPipeconfService.cfgService);
+        assertTrue("Incorrect config factory", cfgFactories.contains(piPipeconfService.factory));
+        assertTrue("Incorrect network configuration listener", netCfgListeners.contains(piPipeconfService.cfgListener));
+    }
+
+    @Test
+    public void deactivate() {
+        piPipeconfService.deactivate();
+        assertEquals("Incorrect driver service", null, piPipeconfService.driverService);
+        assertEquals("Incorrect driverAdminService service", null, piPipeconfService.driverAdminService);
+        assertEquals("Incorrect configuration service", null, piPipeconfService.cfgService);
+        assertFalse("Config factory should be unregistered", cfgFactories.contains(piPipeconfService.factory));
+        assertFalse("Network configuration listener should be unregistered",
+                netCfgListeners.contains(piPipeconfService.cfgListener));
+    }
+
+    @Test
+    public void register() {
+        piPipeconfService.register(piPipeconf);
+        assertTrue("PiPipeconf should be registered", piPipeconfService.piPipeconfs.contains(piPipeconf));
+    }
+
+    @Test
+    public void getPipeconf() {
+        piPipeconfService.register(piPipeconf);
+        assertEquals("Returned PiPipeconf is not correct", piPipeconf,
+                piPipeconfService.getPipeconf(piPipeconf.id()).get());
+    }
+
+
+    @Test
+    public void bindToDevice() throws Exception {
+        PiPipeconfId piPipeconfId = cfgService.getConfig(DEVICE_ID, PiPipeconfConfig.class).piPipeconfId();
+        assertEquals(piPipeconf.id(), piPipeconfId);
+
+        String baseDriverName = cfgService.getConfig(DEVICE_ID, BasicDeviceConfig.class).driver();
+        assertEquals(BASE_DRIVER, baseDriverName);
+
+        piPipeconfService.register(piPipeconf);
+        assertEquals("Returned PiPipeconf is not correct", piPipeconf,
+                piPipeconfService.getPipeconf(piPipeconf.id()).get());
+
+        piPipeconfService.bindToDevice(piPipeconfId, DEVICE_ID).whenComplete((booleanResult, ex) -> {
+
+            //we assume that the provider is 1 and that it contains 1 driver
+            //we also assume that everything after driverAdminService.registerProvider(provider); has been tested.
+            assertTrue("Provider should be registered", providers.size() != 0);
+
+            assertTrue("Boolean Result of method should be True", booleanResult);
+
+            providers.forEach(p -> {
+                assertTrue("Provider should contain a driver", p.getDrivers().size() != 0);
+                p.getDrivers().forEach(driver -> {
+                    assertEquals("The driver has wrong name", driver.name(), completeDriverName);
+                    assertEquals("The driver contains wrong behaviours", EXPECTED_BEHAVIOURS, driver.behaviours());
+
+                });
+            });
+        }).exceptionally(ex -> {
+            throw new IllegalStateException(ex);
+        });
+    }
+
+    private class MockNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
+        @Override
+        public void registerConfigFactory(ConfigFactory configFactory) {
+            cfgFactories.add(configFactory);
+        }
+
+        @Override
+        public void unregisterConfigFactory(ConfigFactory configFactory) {
+            cfgFactories.remove(configFactory);
+        }
+
+        @Override
+        public void addListener(NetworkConfigListener listener) {
+            netCfgListeners.add(listener);
+        }
+
+        @Override
+        public void removeListener(NetworkConfigListener listener) {
+            netCfgListeners.remove(listener);
+        }
+
+        @Override
+        public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
+            DeviceId did = (DeviceId) subject;
+            if (configClass.equals(PiPipeconfConfig.class)
+                    && did.equals(DEVICE_ID)) {
+                return (C) piPipeconfConfig;
+            } else if (configClass.equals(BasicDeviceConfig.class)
+                    && did.equals(DEVICE_ID)) {
+                return (C) basicDeviceConfig;
+            }
+            return null;
+        }
+    }
+
+    private class MockDriverService extends DriverServiceAdapter {
+        @Override
+        public Driver getDriver(String driverName) {
+            if (driverName.equals(BASE_DRIVER)) {
+                return baseDriver;
+            }
+            throw new ItemNotFoundException("Driver not found");
+        }
+    }
+
+    private class MockDriverAdminService extends DriverAdminServiceAdapter {
+
+        @Override
+        public void registerProvider(DriverProvider provider) {
+            providers.add(provider);
+        }
+    }
+
+    private class MockDelegate implements ConfigApplyDelegate {
+        @Override
+        public void onApply(Config configFile) {
+        }
+    }
+
+    private class MockDriver extends DriverAdapter {
+
+        @Override
+        public List<Driver> parents() {
+            return ImmutableList.of();
+        }
+
+        @Override
+        public String manufacturer() {
+            return "On.Lab";
+        }
+
+        @Override
+        public String hwVersion() {
+            return "testHW";
+        }
+
+        @Override
+        public Class<? extends Behaviour> implementation(Class<? extends Behaviour> behaviour) {
+            return MockDeviceDescriptionDiscovery.class;
+        }
+
+        @Override
+        public Map<String, String> properties() {
+            return new HashMap<>();
+        }
+
+        @Override
+        public Set<Class<? extends Behaviour>> behaviours() {
+            return ImmutableSet.of(DeviceDescriptionDiscovery.class);
+        }
+
+        @Override
+        public String swVersion() {
+            return "testSW";
+        }
+
+        @Override
+        public String name() {
+            return BASE_DRIVER;
+        }
+    }
+
+    private class MockDeviceDescriptionDiscovery extends AbstractHandlerBehaviour
+            implements DeviceDescriptionDiscovery {
+        @Override
+        public DeviceDescription discoverDeviceDetails() {
+            return null;
+        }
+
+        @Override
+        public List<PortDescription> discoverPortDetails() {
+            return null;
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/net/src/test/resources/org/onosproject/net/pi/impl/basic.json b/core/net/src/test/resources/org/onosproject/net/pi/impl/basic.json
new file mode 100644
index 0000000..42de9ad
--- /dev/null
+++ b/core/net/src/test/resources/org/onosproject/net/pi/impl/basic.json
@@ -0,0 +1,3 @@
+{
+    "driver": "baseDriver"
+}
\ No newline at end of file
diff --git a/core/net/src/test/resources/org/onosproject/net/pi/impl/piPipeconfId.json b/core/net/src/test/resources/org/onosproject/net/pi/impl/piPipeconfId.json
new file mode 100644
index 0000000..7f234b7
--- /dev/null
+++ b/core/net/src/test/resources/org/onosproject/net/pi/impl/piPipeconfId.json
@@ -0,0 +1,3 @@
+{
+    "piPipeconfId": "org.project.pipeconf.default"
+}
\ No newline at end of file
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 8909591..1b3c459 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -17,6 +17,8 @@
 package org.onosproject.provider.general.device.impl;
 
 import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -58,7 +60,9 @@
 import org.onosproject.net.driver.Driver;
 import org.onosproject.net.driver.DriverData;
 import org.onosproject.net.driver.DriverService;
-import org.onosproject.net.key.DeviceKeyAdminService;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.runtime.PiPipeconfConfig;
+import org.onosproject.net.pi.runtime.PiPipeconfService;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
@@ -66,12 +70,19 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
@@ -87,6 +98,7 @@
 @Component(immediate = true)
 public class GeneralDeviceProvider extends AbstractProvider
         implements DeviceProvider {
+    public static final String DRIVER = "driver";
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -105,7 +117,7 @@
     protected DriverService driverService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceKeyAdminService deviceKeyAdminService;
+    protected PiPipeconfService piPipeconfService;
 
     protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
     protected static final String URI_SCHEME = "device";
@@ -114,6 +126,15 @@
     private static final int CORE_POOL_SIZE = 10;
     private static final String UNKNOWN = "unknown";
     private static final int PORT_STATS_PERIOD_SECONDS = 10;
+    //FIXME this will be removed when the configuration is synced at the source.
+    private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
+    private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
+
+    //FIXME to be removed when netcfg will issue device events in a bundle or
+    //ensures all configuration needed is present
+    private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
+    private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
+    private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
 
 
     protected ScheduledExecutorService connectionExecutor
@@ -279,7 +300,7 @@
             log.error("Configuration is NULL: basic config {}, general provider " +
                     "config {}", basicDeviceConfig, providerConfig);
         } else {
-            log.info("Connecting to device {}", deviceId);
+            log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
 
             Driver driver = driverService.getDriver(basicDeviceConfig.driver());
             DriverData driverData = new DefaultDriverData(driver, deviceId);
@@ -287,62 +308,85 @@
             DeviceHandshaker handshaker =
                     getBehaviour(driver, DeviceHandshaker.class, driverData);
 
-            if (handshaker != null) {
-
-                //Storing deviceKeyId and all other config values
-                // as data in the driver with protocol_<info>
-                // name as the key. e.g protocol_ip
-                providerConfig.protocolsInfo()
-                        .forEach((protocol, deviceInfoConfig) -> {
-                            deviceInfoConfig.configValues()
-                                    .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
-                            driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
-                        });
-
-                //Connecting to the device
-                CompletableFuture<Boolean> connected = handshaker.connect();
-
-                connected.thenAcceptAsync(result -> {
-                    if (result) {
-
-                        //Populated with the default values obtained by the driver
-                        ChassisId cid = new ChassisId();
-                        SparseAnnotations annotations = DefaultAnnotations.builder()
-                                .set(AnnotationKeys.PROTOCOL,
-                                        providerConfig.protocolsInfo().keySet().toString())
-                                .build();
-                        DeviceDescription description =
-                                new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
-                                        driver.manufacturer(), driver.hwVersion(),
-                                        driver.swVersion(), UNKNOWN,
-                                        cid, false, annotations);
-                        //Empty list of ports
-                        List<PortDescription> ports = new ArrayList<>();
-
-                        if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
-                            DeviceDescriptionDiscovery deviceDiscovery = driver
-                                    .createBehaviour(driverData, DeviceDescriptionDiscovery.class);
-
-                            DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
-                            if (newdescription != null) {
-                                description = newdescription;
-                            }
-                            ports = deviceDiscovery.discoverPortDetails();
-                        }
-                        providerService.deviceConnected(deviceId, description);
-                        providerService.updatePorts(deviceId, ports);
-
-                    } else {
-                        log.warn("Can't connect to device {}", deviceId);
-                    }
-                });
-            } else {
+            if (handshaker == null) {
                 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
                         "behaviour, {}", deviceId, driver.name(), driver.behaviours());
+                return;
             }
+            //Storing deviceKeyId and all other config values
+            // as data in the driver with protocol_<info>
+            // name as the key. e.g protocol_ip
+            providerConfig.protocolsInfo()
+                    .forEach((protocol, deviceInfoConfig) -> {
+                        deviceInfoConfig.configValues()
+                                .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
+                        driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
+                    });
+
+            //Connecting to the device
+            CompletableFuture<Boolean> connected = handshaker.connect();
+
+            connected.thenAcceptAsync(result -> {
+                if (result) {
+
+                    //Populated with the default values obtained by the driver
+                    ChassisId cid = new ChassisId();
+                    SparseAnnotations annotations = DefaultAnnotations.builder()
+                            .set(AnnotationKeys.PROTOCOL,
+                                    providerConfig.protocolsInfo().keySet().toString())
+                            .build();
+                    DeviceDescription description =
+                            new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
+                                    driver.manufacturer(), driver.hwVersion(),
+                                    driver.swVersion(), UNKNOWN,
+                                    cid, false, annotations);
+                    //Empty list of ports
+                    List<PortDescription> ports = new ArrayList<>();
+
+                    if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
+                        DeviceDescriptionDiscovery deviceDiscovery = driver
+                                .createBehaviour(driverData, DeviceDescriptionDiscovery.class);
+
+                        DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
+                        if (newdescription != null) {
+                            description = newdescription;
+                        }
+                        ports = deviceDiscovery.discoverPortDetails();
+                    }
+
+                    Optional<PiPipeconfId> pipeconfId = piPipeconfService.ofDevice(deviceId);
+                    //Apply the Pipeline configuration and then connect the device
+                    if (pipeconfId.isPresent()) {
+                        DeviceDescription finalDescription = description;
+                        List<PortDescription> finalPorts = ports;
+                        piPipeconfService.bindToDevice(pipeconfId.get(), deviceId).whenComplete((success, ex) -> {
+                            if (success) {
+                                advertiseDevice(deviceId, finalDescription, finalPorts);
+                            } else {
+                                log.error("Can't merge driver {} with pipeconf {} for device {}, " +
+                                                "not reporting it to the device manager",
+                                        driver.name(), pipeconfId.get(), deviceId);
+                            }
+                        }).exceptionally(ex -> {
+                            throw new IllegalStateException(ex);
+                        });
+                    } else {
+                        //No other operation is needed, advertise the device to the core.
+                        advertiseDevice(deviceId, description, ports);
+                    }
+
+                } else {
+                    log.warn("Can't connect to device {}", deviceId);
+                }
+            });
         }
     }
 
+    private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
+        providerService.deviceConnected(deviceId, description);
+        providerService.updatePorts(deviceId, ports);
+    }
+
     private void disconnectDevice(DeviceId deviceId) {
         log.info("Disconnecting for device {}", deviceId);
         DeviceHandshaker handshaker = getHandshaker(deviceId);
@@ -396,21 +440,78 @@
                 log.debug("{} is not my scheme, skipping", deviceId);
                 return;
             }
-            if (deviceService.getDevice(deviceId) == null || !deviceService.isAvailable(deviceId)) {
-                connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
-            } else {
+            if (deviceService.getDevice(deviceId) != null || deviceService.isAvailable(deviceId)) {
                 log.info("Device {} is already connected to ONOS and is available", deviceId);
+                return;
+            }
+            //FIXME to be removed when netcfg will issue device events in a bundle or
+            // ensure all configuration needed is present
+            Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
+            lock.lock();
+            try {
+                if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
+                    //FIXME we currently assume that p4runtime devices are pipeline configurable.
+                    //If we want to connect a p4runtime device with no pipeline
+                    if (event.config().isPresent() &&
+                            Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
+                                    PIPELINE_CONFIGURABLE_PROTOCOLS)) {
+                        pipelineConfigured.add(deviceId);
+                    }
+                    deviceConfigured.add(deviceId);
+                } else if (event.configClass().equals(BasicDeviceConfig.class)) {
+                    if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
+                        //TODO add check for pipeline and add it to the pipeline list if no
+                        // p4runtime is present.
+                        driverConfigured.add(deviceId);
+                    }
+                } else if (event.configClass().equals(PiPipeconfConfig.class)) {
+                    if (event.config().isPresent()
+                            && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
+                        pipelineConfigured.add(deviceId);
+                    }
+                }
+                //if the device has no "pipeline configurable protocol it will be present
+                // in the pipelineConfigured
+                if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
+                        && pipelineConfigured.contains(deviceId)) {
+                    checkAndSubmitDeviceTask(deviceId);
+                } else {
+                    if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
+                        log.debug("Waiting for pipeline configuration for device {}", deviceId);
+                    } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
+                        log.debug("Waiting for device configuration for device {}", deviceId);
+                    } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
+                        log.debug("Waiting for driver configuration for device {}", deviceId);
+                    } else if (driverConfigured.contains(deviceId)) {
+                        log.debug("Only driver configuration for device {}", deviceId);
+                    } else if (deviceConfigured.contains(deviceId)) {
+                        log.debug("Only device configuration for device {}", deviceId);
+                    }
+                }
+            } finally {
+                lock.unlock();
             }
         }
 
         @Override
         public boolean isRelevant(NetworkConfigEvent event) {
-            return event.configClass().equals(GeneralProviderDeviceConfig.class) &&
+            return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
+                    event.configClass().equals(BasicDeviceConfig.class) ||
+                    event.configClass().equals(PiPipeconfConfig.class)) &&
                     (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
                             event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
         }
     }
 
+    private void checkAndSubmitDeviceTask(DeviceId deviceId) {
+        connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
+        //FIXME this will be removed when configuration is synced.
+        deviceConfigured.remove(deviceId);
+        driverConfigured.remove(deviceId);
+        pipelineConfigured.remove(deviceId);
+
+    }
+
     /**
      * Listener for core device events.
      */
diff --git a/tools/test/configs/general-provider-cfg.json b/tools/test/configs/general-provider-cfg.json
index 7c4f147..6208903 100644
--- a/tools/test/configs/general-provider-cfg.json
+++ b/tools/test/configs/general-provider-cfg.json
@@ -13,6 +13,9 @@
           "deviceKeyId": "p4runtime:device:identifier"
         }
       },
+      "piPipeconf":{
+        "piPipeconfId": "pipipeconfTest"
+      },
       "basic": {
         "driver": "bmv2"
       }