ONOS-6561 BMv2 handshaker via P4Runtime
+ support fort device-specific default pipeconf
+ improvements to P4runtime and gRPC protocol stuff
Change-Id: I8986fce3959df564454ea3d31859860f61eabcae
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 1b3c459..ad48e1e 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -60,7 +60,9 @@
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverData;
import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.model.PiPipelineProgrammable;
import org.onosproject.net.pi.runtime.PiPipeconfConfig;
import org.onosproject.net.pi.runtime.PiPipeconfService;
import org.onosproject.net.provider.AbstractProvider;
@@ -72,7 +74,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
@@ -139,12 +140,12 @@
protected ScheduledExecutorService connectionExecutor
= newScheduledThreadPool(CORE_POOL_SIZE,
- groupedThreads("onos/generaldeviceprovider-device",
- "connection-executor-%d", log));
+ groupedThreads("onos/generaldeviceprovider-device",
+ "connection-executor-%d", log));
protected ScheduledExecutorService portStatsExecutor
= newScheduledThreadPool(CORE_POOL_SIZE,
- groupedThreads("onos/generaldeviceprovider-port-stats",
- "port-stats-executor-%d", log));
+ groupedThreads("onos/generaldeviceprovider-port-stats",
+ "port-stats-executor-%d", log));
protected DeviceProviderService providerService;
private InternalDeviceListener deviceListener = new InternalDeviceListener();
@@ -239,7 +240,7 @@
modified.thenAcceptAsync(result -> {
if (!result) {
log.warn("Your device {} port {} status can't be changed to {}",
- deviceId, portNumber, enable);
+ deviceId, portNumber, enable);
}
});
@@ -251,13 +252,13 @@
private DeviceHandshaker getHandshaker(DeviceId deviceId) {
Driver driver = getDriver(deviceId);
return getBehaviour(driver, DeviceHandshaker.class,
- new DefaultDriverData(driver, deviceId));
+ new DefaultDriverData(driver, deviceId));
}
private PortAdmin getPortAdmin(DeviceId deviceId) {
Driver driver = getDriver(deviceId);
return getBehaviour(driver, PortAdmin.class,
- new DefaultDriverData(driver, deviceId));
+ new DefaultDriverData(driver, deviceId));
}
@@ -267,7 +268,7 @@
driver = driverService.getDriver(deviceId);
} catch (ItemNotFoundException e) {
log.debug("Falling back to configuration to fetch driver " +
- "for device {}", deviceId);
+ "for device {}", deviceId);
driver = driverService.getDriver(
cfgService.getConfig(deviceId, BasicDeviceConfig.class).driver());
}
@@ -298,7 +299,7 @@
if (providerConfig == null || basicDeviceConfig == null) {
log.error("Configuration is NULL: basic config {}, general provider " +
- "config {}", basicDeviceConfig, providerConfig);
+ "config {}", basicDeviceConfig, providerConfig);
} else {
log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
@@ -310,7 +311,7 @@
if (handshaker == null) {
log.error("Device {}, with driver {} does not support DeviceHandshaker " +
- "behaviour, {}", deviceId, driver.name(), driver.behaviours());
+ "behaviour, {}", deviceId, driver.name(), driver.behaviours());
return;
}
//Storing deviceKeyId and all other config values
@@ -333,13 +334,13 @@
ChassisId cid = new ChassisId();
SparseAnnotations annotations = DefaultAnnotations.builder()
.set(AnnotationKeys.PROTOCOL,
- providerConfig.protocolsInfo().keySet().toString())
+ providerConfig.protocolsInfo().keySet().toString())
.build();
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
- driver.manufacturer(), driver.hwVersion(),
- driver.swVersion(), UNKNOWN,
- cid, false, annotations);
+ driver.manufacturer(), driver.hwVersion(),
+ driver.swVersion(), UNKNOWN,
+ cid, false, annotations);
//Empty list of ports
List<PortDescription> ports = new ArrayList<>();
@@ -354,27 +355,15 @@
ports = deviceDiscovery.discoverPortDetails();
}
- Optional<PiPipeconfId> pipeconfId = piPipeconfService.ofDevice(deviceId);
- //Apply the Pipeline configuration and then connect the device
- if (pipeconfId.isPresent()) {
- DeviceDescription finalDescription = description;
- List<PortDescription> finalPorts = ports;
- piPipeconfService.bindToDevice(pipeconfId.get(), deviceId).whenComplete((success, ex) -> {
- if (success) {
- advertiseDevice(deviceId, finalDescription, finalPorts);
- } else {
- log.error("Can't merge driver {} with pipeconf {} for device {}, " +
- "not reporting it to the device manager",
- driver.name(), pipeconfId.get(), deviceId);
- }
- }).exceptionally(ex -> {
- throw new IllegalStateException(ex);
- });
- } else {
- //No other operation is needed, advertise the device to the core.
- advertiseDevice(deviceId, description, ports);
+ if (!handlePipeconf(deviceId, driver, driverData)) {
+ // Something went wrong during handling of pipeconf.
+ // We already logged the error.
+ handshaker.disconnect();
+ return;
}
+ advertiseDevice(deviceId, description, ports);
+
} else {
log.warn("Can't connect to device {}", deviceId);
}
@@ -382,6 +371,66 @@
}
}
+ /**
+ * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
+ * device can be registered to the core, false otherwise.
+ */
+ private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
+
+ PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
+ driverData);
+
+ if (pipelineProg == null) {
+ // Device is not pipeline programmable.
+ return true;
+ }
+
+ PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
+ // No pipeconf has been associated with this device.
+ // Check if device driver provides a default one.
+ if (pipelineProg.getDefaultPipeconf().isPresent()) {
+ PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
+ log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
+ // Register default one if it is not.
+ // TODO: this should be performed at driver loading.
+ if (!piPipeconfService.getPipeconf(defaultPipeconf.id()).isPresent()) {
+ piPipeconfService.register(defaultPipeconf);
+ }
+ return defaultPipeconf.id();
+ } else {
+ return null;
+ }
+ });
+
+ if (pipeconfId == null) {
+ log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
+ return true;
+ }
+
+
+ PiPipeconf pipeconf = piPipeconfService.getPipeconf(pipeconfId).orElseThrow(
+ () -> new IllegalStateException("Pipeconf is not registered: " + pipeconfId)
+ );
+
+
+ try {
+ if (!pipelineProg.deployPipeconf(pipeconf).get()) {
+ log.error("Unable to deploy pipeconf {} to {}, aborting device discovery", pipeconfId, deviceId);
+ return false;
+ }
+
+ if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
+ log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
+ driver.name(), deviceId, pipeconfId);
+ return false;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+
+ return true;
+ }
+
private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
providerService.deviceConnected(deviceId, description);
providerService.updatePorts(deviceId, ports);
@@ -454,7 +503,7 @@
//If we want to connect a p4runtime device with no pipeline
if (event.config().isPresent() &&
Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
- PIPELINE_CONFIGURABLE_PROTOCOLS)) {
+ PIPELINE_CONFIGURABLE_PROTOCOLS)) {
pipelineConfigured.add(deviceId);
}
deviceConfigured.add(deviceId);
@@ -526,15 +575,15 @@
// be available we check and base it on the streaming API (e.g. gNMI)
if (deviceService.getDevice(event.subject().id()).
is(PortStatisticsDiscovery.class)) {
- portStatsExecutor.scheduleAtFixedRate(exceptionSafe(() ->
- updatePortStatistics(event.subject().id())),
+ portStatsExecutor.scheduleAtFixedRate(
+ exceptionSafe(() -> updatePortStatistics(event.subject().id())),
0, PORT_STATS_PERIOD_SECONDS, TimeUnit.SECONDS);
updatePortStatistics(event.subject().id());
}
} else if (type.equals(Type.DEVICE_REMOVED)) {
connectionExecutor.submit(exceptionSafe(() ->
- disconnectDevice(event.subject().id())));
+ disconnectDevice(event.subject().id())));
}
}