[ONOS-6556] Implementation of PiPeconfService
Change-Id: I0b40f1808e459a4a7cb83cde50010f6d38b04771
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
new file mode 100644
index 0000000..85d3fc3
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.ItemNotFoundException;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.basics.BasicDeviceConfig;
+import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.driver.Behaviour;
+import org.onosproject.net.driver.DefaultDriver;
+import org.onosproject.net.driver.Driver;
+import org.onosproject.net.driver.DriverAdminService;
+import org.onosproject.net.driver.DriverProvider;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.runtime.PiPipeconfConfig;
+import org.onosproject.net.pi.runtime.PiPipeconfService;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+
+/**
+ * Implementation of the PiPipeconfService.
+ */
+@Component(immediate = true)
+@Service
+@Beta
+public class PiPipeconfManager implements PiPipeconfService {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final String DRIVER = "driver";
+ private static final String CFG_SCHEME = "piPipeconf";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected NetworkConfigRegistry cfgService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DriverService driverService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DriverAdminService driverAdminService;
+
+ //TODO move to replicated map
+ protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>();
+ //TODO move to replicated map
+ protected ConcurrentHashMap<DeviceId, PiPipeconfId> devicesToPipeconf = new ConcurrentHashMap<>();
+
+ protected ExecutorService executor =
+ Executors.newFixedThreadPool(5, groupedThreads("onos/pipipeconfservice",
+ "pipeline-to-device-%d", log));
+
+ protected final ConfigFactory factory =
+ new ConfigFactory<DeviceId, PiPipeconfConfig>(
+ SubjectFactories.DEVICE_SUBJECT_FACTORY,
+ PiPipeconfConfig.class, CFG_SCHEME) {
+ @Override
+ public PiPipeconfConfig createConfig() {
+ return new PiPipeconfConfig();
+ }
+ };
+
+ protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
+
+ @Activate
+ public void activate() {
+ cfgService.registerConfigFactory(factory);
+ cfgService.addListener(cfgListener);
+ cfgService.getSubjects(DeviceId.class, PiPipeconfConfig.class)
+ .forEach(this::addPipeconfFromCfg);
+ log.info("Started");
+ }
+
+
+ @Deactivate
+ public void deactivate() {
+ executor.shutdown();
+ cfgService.removeListener(cfgListener);
+ cfgService.unregisterConfigFactory(factory);
+ piPipeconfs.clear();
+ devicesToPipeconf.clear();
+ cfgService = null;
+ driverAdminService = null;
+ driverService = null;
+ log.info("Stopped");
+ }
+
+ @Override
+ public void register(PiPipeconf pipeconf) throws IllegalStateException {
+ log.warn("Currently using local maps, needs to be moved to a distributed store");
+ piPipeconfs.put(pipeconf.id(), pipeconf);
+ }
+
+ @Override
+ public Iterable<PiPipeconf> getPipeconfs() {
+ return piPipeconfs.values();
+ }
+
+ @Override
+ public Optional<PiPipeconf> getPipeconf(PiPipeconfId id) {
+ return Optional.ofNullable(piPipeconfs.get(id));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId) {
+ CompletableFuture<Boolean> operationResult = new CompletableFuture<>();
+
+ executor.execute(() -> {
+ BasicDeviceConfig basicDeviceConfig =
+ cfgService.getConfig(deviceId, BasicDeviceConfig.class);
+ Driver baseDriver = driverService.getDriver(basicDeviceConfig.driver());
+
+ String completeDriverName = baseDriver.name() + ":" + pipeconfId;
+ PiPipeconf piPipeconf = piPipeconfs.get(pipeconfId);
+ if (piPipeconf == null) {
+ log.warn("Pipeconf {} is not present", pipeconfId);
+ operationResult.complete(false);
+ } else {
+ //if driver exists already we don't create a new one.
+ //needs to be done via exception catching due to DriverRegistry throwing it on a null return from
+ //the driver map.
+ try {
+ driverService.getDriver(completeDriverName);
+ } catch (ItemNotFoundException e) {
+
+ log.debug("First time pipeconf {} is used with base driver {}, merging the two",
+ pipeconfId, baseDriver);
+ //extract the behaviours from the pipipeconf.
+ Map<Class<? extends Behaviour>, Class<? extends Behaviour>> behaviours = new HashMap<>();
+ piPipeconf.behaviours().forEach(b -> {
+ behaviours.put(b, piPipeconf.implementation(b).get());
+ });
+
+ Driver piPipeconfDriver = new DefaultDriver(completeDriverName, baseDriver.parents(),
+ baseDriver.manufacturer(), baseDriver.hwVersion(), baseDriver.swVersion(),
+ behaviours, new HashMap<>());
+ //we take the base driver created with the behaviours of the PiPeconf and
+ // merge it with the base driver that was assigned to the device
+ Driver completeDriver = piPipeconfDriver.merge(baseDriver);
+
+ //This might lead to explosion of number of providers in the core,
+ // due to 1:1:1 pipeconf:driver:provider maybe find better way
+ DriverProvider provider = new PiPipeconfDriverProviderInternal(completeDriver);
+
+ //we register to the dirver susbystem the driver provider containing the merged driver
+ driverAdminService.registerProvider(provider);
+ }
+
+ //Changing the configuration for the device to enforce the full driver with pipipeconf
+ // and base behaviours
+ ObjectNode newCfg = (ObjectNode) basicDeviceConfig.node();
+ newCfg = newCfg.put(DRIVER, completeDriverName);
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
+ cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
+ //Completable future is needed for when this method will also apply the pipeline to the device.
+ operationResult.complete(true);
+ }
+ });
+ return operationResult;
+ }
+
+ @Override
+ public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) {
+ return Optional.ofNullable(devicesToPipeconf.get(deviceId));
+ }
+
+ private class PiPipeconfDriverProviderInternal implements DriverProvider {
+
+ Driver driver;
+
+ PiPipeconfDriverProviderInternal(Driver driver) {
+ this.driver = driver;
+ }
+
+ @Override
+ public Set<Driver> getDrivers() {
+ return ImmutableSet.of(driver);
+ }
+ }
+
+ private void addPipeconfFromCfg(DeviceId deviceId) {
+ PiPipeconfConfig pipeconfConfig =
+ cfgService.getConfig(deviceId, PiPipeconfConfig.class);
+ PiPipeconfId id = pipeconfConfig.piPipeconfId();
+ if (id.id().equals("")) {
+ log.warn("Not adding empty pipeconfId for device {}", deviceId);
+ } else {
+ devicesToPipeconf.put(deviceId, pipeconfConfig.piPipeconfId());
+ }
+ }
+
+ /**
+ * Listener for configuration events.
+ */
+ private class InternalNetworkConfigListener implements NetworkConfigListener {
+
+
+ @Override
+ public void event(NetworkConfigEvent event) {
+ DeviceId deviceId = (DeviceId) event.subject();
+ addPipeconfFromCfg(deviceId);
+ }
+
+ @Override
+ public boolean isRelevant(NetworkConfigEvent event) {
+ return event.configClass().equals(PiPipeconfConfig.class) &&
+ (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
+ event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
+ }
+ }
+}