[ONOS-6564] Adding PiPeconf behaviours to driver for device.
Initial implementation of PiPipeconfService.
Tests for Initial implementation.
Change-Id: I9dea6fb3015788b8b61060c7f88395c3d45e6ed7
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 8909591..1b3c459 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -17,6 +17,8 @@
package org.onosproject.provider.general.device.impl;
import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -58,7 +60,9 @@
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverData;
import org.onosproject.net.driver.DriverService;
-import org.onosproject.net.key.DeviceKeyAdminService;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.runtime.PiPipeconfConfig;
+import org.onosproject.net.pi.runtime.PiPipeconfService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
@@ -66,12 +70,19 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
@@ -87,6 +98,7 @@
@Component(immediate = true)
public class GeneralDeviceProvider extends AbstractProvider
implements DeviceProvider {
+ public static final String DRIVER = "driver";
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -105,7 +117,7 @@
protected DriverService driverService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceKeyAdminService deviceKeyAdminService;
+ protected PiPipeconfService piPipeconfService;
protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
protected static final String URI_SCHEME = "device";
@@ -114,6 +126,15 @@
private static final int CORE_POOL_SIZE = 10;
private static final String UNKNOWN = "unknown";
private static final int PORT_STATS_PERIOD_SECONDS = 10;
+ //FIXME this will be removed when the configuration is synced at the source.
+ private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
+ private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
+
+ //FIXME to be removed when netcfg will issue device events in a bundle or
+ //ensures all configuration needed is present
+ private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
+ private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
+ private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
protected ScheduledExecutorService connectionExecutor
@@ -279,7 +300,7 @@
log.error("Configuration is NULL: basic config {}, general provider " +
"config {}", basicDeviceConfig, providerConfig);
} else {
- log.info("Connecting to device {}", deviceId);
+ log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
Driver driver = driverService.getDriver(basicDeviceConfig.driver());
DriverData driverData = new DefaultDriverData(driver, deviceId);
@@ -287,62 +308,85 @@
DeviceHandshaker handshaker =
getBehaviour(driver, DeviceHandshaker.class, driverData);
- if (handshaker != null) {
-
- //Storing deviceKeyId and all other config values
- // as data in the driver with protocol_<info>
- // name as the key. e.g protocol_ip
- providerConfig.protocolsInfo()
- .forEach((protocol, deviceInfoConfig) -> {
- deviceInfoConfig.configValues()
- .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
- driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
- });
-
- //Connecting to the device
- CompletableFuture<Boolean> connected = handshaker.connect();
-
- connected.thenAcceptAsync(result -> {
- if (result) {
-
- //Populated with the default values obtained by the driver
- ChassisId cid = new ChassisId();
- SparseAnnotations annotations = DefaultAnnotations.builder()
- .set(AnnotationKeys.PROTOCOL,
- providerConfig.protocolsInfo().keySet().toString())
- .build();
- DeviceDescription description =
- new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
- driver.manufacturer(), driver.hwVersion(),
- driver.swVersion(), UNKNOWN,
- cid, false, annotations);
- //Empty list of ports
- List<PortDescription> ports = new ArrayList<>();
-
- if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
- DeviceDescriptionDiscovery deviceDiscovery = driver
- .createBehaviour(driverData, DeviceDescriptionDiscovery.class);
-
- DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
- if (newdescription != null) {
- description = newdescription;
- }
- ports = deviceDiscovery.discoverPortDetails();
- }
- providerService.deviceConnected(deviceId, description);
- providerService.updatePorts(deviceId, ports);
-
- } else {
- log.warn("Can't connect to device {}", deviceId);
- }
- });
- } else {
+ if (handshaker == null) {
log.error("Device {}, with driver {} does not support DeviceHandshaker " +
"behaviour, {}", deviceId, driver.name(), driver.behaviours());
+ return;
}
+ //Storing deviceKeyId and all other config values
+ // as data in the driver with protocol_<info>
+ // name as the key. e.g protocol_ip
+ providerConfig.protocolsInfo()
+ .forEach((protocol, deviceInfoConfig) -> {
+ deviceInfoConfig.configValues()
+ .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
+ driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
+ });
+
+ //Connecting to the device
+ CompletableFuture<Boolean> connected = handshaker.connect();
+
+ connected.thenAcceptAsync(result -> {
+ if (result) {
+
+ //Populated with the default values obtained by the driver
+ ChassisId cid = new ChassisId();
+ SparseAnnotations annotations = DefaultAnnotations.builder()
+ .set(AnnotationKeys.PROTOCOL,
+ providerConfig.protocolsInfo().keySet().toString())
+ .build();
+ DeviceDescription description =
+ new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
+ driver.manufacturer(), driver.hwVersion(),
+ driver.swVersion(), UNKNOWN,
+ cid, false, annotations);
+ //Empty list of ports
+ List<PortDescription> ports = new ArrayList<>();
+
+ if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
+ DeviceDescriptionDiscovery deviceDiscovery = driver
+ .createBehaviour(driverData, DeviceDescriptionDiscovery.class);
+
+ DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
+ if (newdescription != null) {
+ description = newdescription;
+ }
+ ports = deviceDiscovery.discoverPortDetails();
+ }
+
+ Optional<PiPipeconfId> pipeconfId = piPipeconfService.ofDevice(deviceId);
+ //Apply the Pipeline configuration and then connect the device
+ if (pipeconfId.isPresent()) {
+ DeviceDescription finalDescription = description;
+ List<PortDescription> finalPorts = ports;
+ piPipeconfService.bindToDevice(pipeconfId.get(), deviceId).whenComplete((success, ex) -> {
+ if (success) {
+ advertiseDevice(deviceId, finalDescription, finalPorts);
+ } else {
+ log.error("Can't merge driver {} with pipeconf {} for device {}, " +
+ "not reporting it to the device manager",
+ driver.name(), pipeconfId.get(), deviceId);
+ }
+ }).exceptionally(ex -> {
+ throw new IllegalStateException(ex);
+ });
+ } else {
+ //No other operation is needed, advertise the device to the core.
+ advertiseDevice(deviceId, description, ports);
+ }
+
+ } else {
+ log.warn("Can't connect to device {}", deviceId);
+ }
+ });
}
}
+ private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
+ providerService.deviceConnected(deviceId, description);
+ providerService.updatePorts(deviceId, ports);
+ }
+
private void disconnectDevice(DeviceId deviceId) {
log.info("Disconnecting for device {}", deviceId);
DeviceHandshaker handshaker = getHandshaker(deviceId);
@@ -396,21 +440,78 @@
log.debug("{} is not my scheme, skipping", deviceId);
return;
}
- if (deviceService.getDevice(deviceId) == null || !deviceService.isAvailable(deviceId)) {
- connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
- } else {
+ if (deviceService.getDevice(deviceId) != null || deviceService.isAvailable(deviceId)) {
log.info("Device {} is already connected to ONOS and is available", deviceId);
+ return;
+ }
+ //FIXME to be removed when netcfg will issue device events in a bundle or
+ // ensure all configuration needed is present
+ Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
+ lock.lock();
+ try {
+ if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
+ //FIXME we currently assume that p4runtime devices are pipeline configurable.
+ //If we want to connect a p4runtime device with no pipeline
+ if (event.config().isPresent() &&
+ Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
+ PIPELINE_CONFIGURABLE_PROTOCOLS)) {
+ pipelineConfigured.add(deviceId);
+ }
+ deviceConfigured.add(deviceId);
+ } else if (event.configClass().equals(BasicDeviceConfig.class)) {
+ if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
+ //TODO add check for pipeline and add it to the pipeline list if no
+ // p4runtime is present.
+ driverConfigured.add(deviceId);
+ }
+ } else if (event.configClass().equals(PiPipeconfConfig.class)) {
+ if (event.config().isPresent()
+ && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
+ pipelineConfigured.add(deviceId);
+ }
+ }
+ //if the device has no "pipeline configurable protocol it will be present
+ // in the pipelineConfigured
+ if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
+ && pipelineConfigured.contains(deviceId)) {
+ checkAndSubmitDeviceTask(deviceId);
+ } else {
+ if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
+ log.debug("Waiting for pipeline configuration for device {}", deviceId);
+ } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
+ log.debug("Waiting for device configuration for device {}", deviceId);
+ } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
+ log.debug("Waiting for driver configuration for device {}", deviceId);
+ } else if (driverConfigured.contains(deviceId)) {
+ log.debug("Only driver configuration for device {}", deviceId);
+ } else if (deviceConfigured.contains(deviceId)) {
+ log.debug("Only device configuration for device {}", deviceId);
+ }
+ }
+ } finally {
+ lock.unlock();
}
}
@Override
public boolean isRelevant(NetworkConfigEvent event) {
- return event.configClass().equals(GeneralProviderDeviceConfig.class) &&
+ return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
+ event.configClass().equals(BasicDeviceConfig.class) ||
+ event.configClass().equals(PiPipeconfConfig.class)) &&
(event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
}
}
+ private void checkAndSubmitDeviceTask(DeviceId deviceId) {
+ connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
+ //FIXME this will be removed when configuration is synced.
+ deviceConfigured.remove(deviceId);
+ driverConfigured.remove(deviceId);
+ pipelineConfigured.remove(deviceId);
+
+ }
+
/**
* Listener for core device events.
*/