Extending DeviceProvider interface to include triggerDisconnect method.
- extended interface with default method implementation
- modified DeviceManager to exploit the new provider feature
- refactored a number of device providers to use the new method
instead of relying on indirect DEVICE_REMOVED events
Change-Id: Ib315357ef06463012fcf26bbe937c8cdccbf3a94
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index f735006..48d8007 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -85,6 +85,7 @@
import java.util.Collections;
import java.util.Dictionary;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -173,6 +174,8 @@
private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
+ private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
+
protected ScheduledExecutorService connectionExecutor
= newScheduledThreadPool(CORE_POOL_SIZE,
@@ -208,6 +211,7 @@
cfgService.registerConfigFactory(factory);
cfgService.addListener(cfgListener);
deviceService.addListener(deviceListener);
+ handshakers.clear();
//This will fail if ONOS has CFG and drivers which depend on this provider
// are activated, failing due to not finding the driver.
cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
@@ -239,10 +243,8 @@
scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
});
}
-
}
-
@Deactivate
public void deactivate() {
portStatsExecutor.shutdown();
@@ -254,6 +256,7 @@
connectionExecutor.shutdown();
deviceService.removeListener(deviceListener);
providerRegistry.unregister(this);
+ handshakers.clear();
providerService = null;
cfgService.unregisterConfigFactory(factory);
log.info("Stopped");
@@ -266,7 +269,7 @@
@Override
public void triggerProbe(DeviceId deviceId) {
- //TODO Really don't see the point of this in non OF Context,
+ // TODO Really don't see the point of this in non OF Context,
// for now testing reachability, can be moved to no-op
log.debug("Triggering probe equals testing reachability on device {}", deviceId);
isReachable(deviceId);
@@ -330,10 +333,18 @@
}
}
+ @Override
+ public void triggerDisconnect(DeviceId deviceId) {
+ connectionExecutor.execute(() -> disconnectDevice(deviceId));
+ }
+
private DeviceHandshaker getHandshaker(DeviceId deviceId) {
- Driver driver = getDriver(deviceId);
- return getBehaviour(driver, DeviceHandshaker.class,
- new DefaultDriverData(driver, deviceId));
+ return handshakers.computeIfAbsent(deviceId, id -> {
+ Driver driver = getDriver(deviceId);
+ return driver == null ? null :
+ getBehaviour(driver, DeviceHandshaker.class,
+ new DefaultDriverData(driver, deviceId));
+ });
}
private PortAdmin getPortAdmin(DeviceId deviceId) {
@@ -358,23 +369,6 @@
return driver;
}
- //Distinguishing from getDriver to not impose everywhere the overhead to get the whole device.
- // This is what the driverService does with the getDriver(deviceId) method.
- // A redundant method here is needed because the driverService returns null when the device is not in the store
- // as happens during disconnection.
- // The whole device object is needed only in disconnection.
- private Driver getDriverFromAnnotations(Device device) {
- String driverName = device.annotations().value(DRIVER);
- if (driverName != null) {
- try {
- return driverService.getDriver(driverName);
- } catch (ItemNotFoundException e) {
- log.warn("Specified driver {} not found, falling back.", driverName);
- }
- }
- return null;
- }
-
//needed since the device manager will not return the driver through implementation()
// method since the device is not pushed to the core so for the connectDevice
// we need to work around that in order to test before calling
@@ -403,23 +397,14 @@
} else {
log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
- Driver driver;
- try {
- driver = driverService.getDriver(basicDeviceConfig.driver());
- } catch (ItemNotFoundException e) {
- log.warn("The driver of {} is not found : {}", deviceId, e.getMessage());
- return;
- }
-
- DriverData driverData = new DefaultDriverData(driver, deviceId);
- DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
+ DeviceHandshaker handshaker = getHandshaker(deviceId);
if (handshaker == null) {
- log.error("Device {}, with driver {} does not support DeviceHandshaker " +
- "behaviour, {}", deviceId, driver.name(), driver.behaviours());
+ log.error("Device {} does not support DeviceHandshaker behaviour", deviceId);
return;
}
+ Driver driver = handshaker.handler().driver();
- addConfigData(providerConfig, driverData);
+ addConfigData(providerConfig, handshaker.data());
//Connecting to the device
CompletableFuture<Boolean> connected = handshaker.connect();
@@ -442,7 +427,7 @@
List<PortDescription> ports = new ArrayList<>();
DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(driver,
- DeviceDescriptionDiscovery.class, driverData);
+ DeviceDescriptionDiscovery.class, handshaker.data());
if (deviceDiscovery != null) {
DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
if (newdescription != null) {
@@ -454,7 +439,7 @@
"description or ports.", deviceId);
}
- if (!handlePipeconf(deviceId, driver, driverData, true)) {
+ if (!handlePipeconf(deviceId, driver, handshaker.data(), true)) {
// Something went wrong during handling of pipeconf.
// We already logged the error.
handshaker.disconnect();
@@ -471,27 +456,21 @@
}
private void connectStandbyDevice(DeviceId deviceId) {
-
- //if device is pipeline programmable we merge pipeconf + base driver for every other role
+ // if device is pipeline programmable we merge pipeconf + base driver for every other role
GeneralProviderDeviceConfig providerConfig =
cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
- Driver driver = getDriver(deviceId);
-
-
- DriverData driverData = new DefaultDriverData(driver, deviceId);
- DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
+ DeviceHandshaker handshaker = getHandshaker(deviceId);
if (handshaker == null) {
- log.error("Device {}, with driver {} does not support DeviceHandshaker " +
- "behaviour, supported behaviours={}", deviceId, driver.name(), driver.behaviours());
+ log.error("Device {} does not support DeviceHandshaker behaviour", deviceId);
return;
}
- addConfigData(providerConfig, driverData);
+ addConfigData(providerConfig, handshaker.data());
//Connecting to the device
handshaker.connect().thenAcceptAsync(result -> {
if (result) {
- handlePipeconf(deviceId, driver, driverData, false);
+ handlePipeconf(deviceId, handshaker.handler().driver(), handshaker.data(), false);
}
});
}
@@ -501,9 +480,8 @@
* device can be registered to the core, false otherwise.
*/
private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) {
-
- PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
- driverData);
+ PiPipelineProgrammable pipelineProg =
+ getBehaviour(driver, PiPipelineProgrammable.class, driverData);
if (pipelineProg == null) {
// Device is not pipeline programmable.
@@ -513,14 +491,13 @@
PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
if (pipeconf != null) {
-
PiPipeconfId pipeconfId = pipeconf.id();
try {
if (deployPipeconf) {
if (!pipelineProg.deployPipeconf(pipeconf).get()) {
log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
- pipeconfId, deviceId);
+ pipeconfId, deviceId);
return false;
}
}
@@ -531,7 +508,7 @@
try {
if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
- driver.name(), deviceId, pipeconfId);
+ driver.name(), deviceId, pipeconfId);
return false;
}
} catch (InterruptedException | ExecutionException e) {
@@ -568,7 +545,6 @@
return null;
}
-
return piPipeconfService.getPipeconf(pipeconfId).get();
}
@@ -577,41 +553,29 @@
providerService.updatePorts(deviceId, ports);
}
- private void disconnectDevice(Device device) {
- DeviceId deviceId = device.id();
+ private void disconnectDevice(DeviceId deviceId) {
log.info("Disconnecting for device {}", deviceId);
- //The driver service will return a null driver for the given deviceId
- //since it's already removed form the device store, we leverage the device object from the DEVICE_REMOVED
- //event to get the driver.
- Driver driver = getDriverFromAnnotations(device);
- if (driver != null) {
- DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class,
- new DefaultDriverData(driver, deviceId));
- if (handshaker != null) {
- CompletableFuture<Boolean> disconnect = handshaker.disconnect();
- disconnect.thenAcceptAsync(result -> {
- if (result) {
- log.info("Disconnected device {}", deviceId);
- providerService.deviceDisconnected(deviceId);
- } else {
- log.warn("Device {} was unable to disconnect", deviceId);
- }
- });
- } else {
- //gracefully ignoring.
- log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
- "shutdown of communication", deviceId);
- }
+ DeviceHandshaker handshaker = handshakers.remove(deviceId);
+ if (handshaker != null) {
+ CompletableFuture<Boolean> disconnect = handshaker.disconnect();
+ disconnect.thenAcceptAsync(result -> {
+ if (result) {
+ log.info("Disconnected device {}", deviceId);
+ providerService.deviceDisconnected(deviceId);
+ } else {
+ log.warn("Device {} was unable to disconnect", deviceId);
+ }
+ });
} else {
//gracefully ignoring.
- log.warn("Can't find driver for device {}, no guarantees of complete shutdown of communication", deviceId);
+ log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
+ "shutdown of communication", deviceId);
}
ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
if (pollingStatisticsTask != null) {
pollingStatisticsTask.cancel(true);
}
-
}
//Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -649,7 +613,6 @@
*/
private class InternalNetworkConfigListener implements NetworkConfigListener {
-
@Override
public void event(NetworkConfigEvent event) {
DeviceId deviceId = (DeviceId) event.subject();
@@ -681,7 +644,6 @@
cleanUpConfigInfo(deviceId);
}
}
-
}
private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) {
@@ -786,28 +748,20 @@
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
- Type type = event.type();
DeviceId deviceId = event.subject().id();
- if (type.equals((Type.DEVICE_ADDED))) {
+ // FIXME handling for mastership change scenario missing?
- // FIXME handling for mastership change scenario missing?
-
- //For now this is scheduled periodically, when streaming API will
- // be available we check and base it on the streaming API (e.g. gNMI)
- if (mastershipService.isLocalMaster(deviceId)) {
- scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
- }
-
- } else if (type.equals(Type.DEVICE_REMOVED)) {
-
- //Passing the whole device object to get driver information
- connectionExecutor.execute(() -> disconnectDevice(event.subject()));
+ // For now this is scheduled periodically, when streaming API will
+ // be available we check and base it on the streaming API (e.g. gNMI)
+ if (mastershipService.isLocalMaster(deviceId)) {
+ scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
}
}
@Override
public boolean isRelevant(DeviceEvent event) {
- return event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
+ return event.type() == Type.DEVICE_ADDED &&
+ event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
}
}
}