[ONOS-7584] Adding Capability of re-connecting to a P4Runtime Device.
Also addresses ONOS-7359
Change-Id: I47ec4ed429af82feb225ab5ac180b94c91366a53
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 48d8007..550ca59 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -50,6 +50,8 @@
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.basics.BasicDeviceConfig;
import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.device.ChannelEvent;
+import org.onosproject.net.device.ChannelListener;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
@@ -118,6 +120,9 @@
public static final int REACHABILITY_TIMEOUT = 10;
public static final String DEPLOY = "deploy-";
public static final String PIPECONF_TOPIC = "-pipeconf";
+ public static final String CHECK = "check-";
+ public static final String CONNECTION = "-connection";
+ private static final String POLL_FREQUENCY = "pollFrequency";
private final Logger log = getLogger(getClass());
@@ -152,11 +157,17 @@
protected LeadershipService leadershipService;
private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
- @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
+ @Property(name = POLL_FREQUENCY, intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
label = "Configure poll frequency for port status and statistics; " +
"default is 10 sec")
private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
+ private static final int DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS = 10;
+ @Property(name = "deviceAvailabilityPollFrequency", intValue = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS,
+ label = "Configure poll frequency for checking device availability; " +
+ "default is 10 sec")
+ private int deviceAvailabilityPollFrequency = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS;
+
protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
protected static final String URI_SCHEME = "device";
protected static final String CFG_SCHEME = "generalprovider";
@@ -185,6 +196,10 @@
= newScheduledThreadPool(CORE_POOL_SIZE,
groupedThreads("onos/generaldeviceprovider-port-stats",
"port-stats-executor-%d", log));
+ protected ScheduledExecutorService availabilityCheckExecutor
+ = newScheduledThreadPool(CORE_POOL_SIZE,
+ groupedThreads("onos/generaldeviceprovider-availability-check",
+ "availability-check-executor-%d", log));
protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
protected DeviceProviderService providerService;
@@ -201,10 +216,11 @@
};
protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
+ private ChannelListener channelListener = new InternalChannelListener();
@Activate
- public void activate() {
+ public void activate(ComponentContext context) {
providerService = providerRegistry.register(this);
componentConfigService.registerProperties(getClass());
coreService.registerApplication(APP_NAME);
@@ -216,6 +232,10 @@
// are activated, failing due to not finding the driver.
cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
.forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
+ //Initiating a periodic check to see if any device is available again and reconnect it.
+ availabilityCheckExecutor.scheduleAtFixedRate(this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
+ deviceAvailabilityPollFrequency, TimeUnit.SECONDS);
+ modified(context);
log.info("Started");
}
@@ -223,7 +243,7 @@
public void modified(ComponentContext context) {
if (context != null) {
Dictionary<?, ?> properties = context.getProperties();
- pollFrequency = Tools.getIntegerProperty(properties, "pollFrequency",
+ pollFrequency = Tools.getIntegerProperty(properties, POLL_FREQUENCY,
DEFAULT_POLL_FREQUENCY_SECONDS);
log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
}
@@ -240,7 +260,7 @@
log.debug("{} is not my scheme, skipping", deviceId);
return;
}
- scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
+ scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, true));
});
}
}
@@ -248,6 +268,7 @@
@Deactivate
public void deactivate() {
portStatsExecutor.shutdown();
+ availabilityCheckExecutor.shutdown();
componentConfigService.unregisterProperties(getClass(), false);
cfgService.removeListener(cfgListener);
//Not Removing the device so they can still be used from other driver providers
@@ -285,7 +306,7 @@
scheduledTasks.get(deviceId).cancel(false);
scheduledTasks.remove(deviceId);
} else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
- scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
+ scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, false));
updatePortStatistics(deviceId);
}
});
@@ -304,7 +325,8 @@
try {
return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
- log.error("Device {} is not reachable", deviceId, e);
+ log.warn("Device {} is not reachable {}", deviceId, e.getMessage());
+ log.debug("Exception", e);
return false;
}
}
@@ -335,7 +357,12 @@
@Override
public void triggerDisconnect(DeviceId deviceId) {
- connectionExecutor.execute(() -> disconnectDevice(deviceId));
+ log.debug("Triggering disconnection of device {}", deviceId);
+ connectionExecutor.execute(() -> {
+ disconnectDevice(deviceId).whenComplete((success, ex) -> {
+ checkAndConnect(deviceId);
+ });
+ });
}
private DeviceHandshaker getHandshaker(DeviceId deviceId) {
@@ -411,7 +438,7 @@
connected.thenAcceptAsync(result -> {
if (result) {
-
+ handshaker.addChannelListener(channelListener);
//Populated with the default values obtained by the driver
ChassisId cid = new ChassisId();
SparseAnnotations annotations = DefaultAnnotations.builder()
@@ -445,7 +472,6 @@
handshaker.disconnect();
return;
}
-
advertiseDevice(deviceId, description, ports);
} else {
@@ -470,6 +496,7 @@
//Connecting to the device
handshaker.connect().thenAcceptAsync(result -> {
if (result) {
+ handshaker.addChannelListener(channelListener);
handlePipeconf(deviceId, handshaker.handler().driver(), handshaker.data(), false);
}
});
@@ -553,17 +580,20 @@
providerService.updatePorts(deviceId, ports);
}
- private void disconnectDevice(DeviceId deviceId) {
+ private CompletableFuture<Boolean> disconnectDevice(DeviceId deviceId) {
log.info("Disconnecting for device {}", deviceId);
+ CompletableFuture<Boolean> disconnectError = new CompletableFuture<>();
+
DeviceHandshaker handshaker = handshakers.remove(deviceId);
if (handshaker != null) {
- CompletableFuture<Boolean> disconnect = handshaker.disconnect();
- disconnect.thenAcceptAsync(result -> {
+ handshaker.disconnect().thenAcceptAsync(result -> {
if (result) {
log.info("Disconnected device {}", deviceId);
providerService.deviceDisconnected(deviceId);
+ disconnectError.complete(true);
} else {
+ disconnectError.complete(false);
log.warn("Device {} was unable to disconnect", deviceId);
}
});
@@ -571,11 +601,14 @@
//gracefully ignoring.
log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
"shutdown of communication", deviceId);
+ disconnectError.complete(false);
}
ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
if (pollingStatisticsTask != null) {
pollingStatisticsTask.cancel(true);
+ scheduledTasks.remove(deviceId);
}
+ return disconnectError;
}
//Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -732,7 +765,7 @@
pipelineConfigured.remove(deviceId);
}
- private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
+ private ScheduledFuture<?> scheduleStatistcsPolling(DeviceId deviceId, boolean randomize) {
int delay = 0;
if (randomize) {
delay = new SecureRandom().nextInt(10);
@@ -742,6 +775,57 @@
delay, pollFrequency, TimeUnit.SECONDS);
}
+ private void scheduleDevicePolling() {
+ cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class).forEach(this::checkAndConnect);
+ }
+
+ private void checkAndConnect(DeviceId deviceId) {
+ // Let's try and reconnect to a device which is stored in the net-cfg.
+ // One of the following conditions must be satisfied:
+ // 1) device is null in the store meaning that is was never connected or it was administratively removed
+ // 2) the device is not available and there is no MASTER instance, meaning the device lost
+ // it's connection to ONOS at some point in the past.
+ // We also check that the general device provider config and the driver config are present.
+ // We do not check for reachability using isReachable(deviceId) since the behaviour of this method
+ // can vary depending on protocol nuances. We leave this check to the device handshaker
+ // at later stages of the connection process.
+ // IF the conditions are not met but instead the device is present in the store, available and this instance is
+ // MASTER but is not reachable we remove it from the store.
+
+ if ((deviceService.getDevice(deviceId) == null || (!deviceService.isAvailable(deviceId) &&
+ mastershipService.getMasterFor(deviceId) == null)) && configIsPresent(deviceId)) {
+ log.debug("Trying to re-connect to device {}", deviceId);
+ NodeId leaderNodeId = leadershipService.runForLeadership(CHECK + deviceId.toString() + CONNECTION)
+ .leader().nodeId();
+ NodeId localNodeId = clusterService.getLocalNode().id();
+ if (localNodeId.equals(leaderNodeId)) {
+ log.debug("{} is leader for {}, initiating the connection", leaderNodeId,
+ deviceId);
+ checkAndSubmitDeviceTask(deviceId);
+ } else {
+ log.debug("{} is not leader for {}, initiating connection, {} is LEADER",
+ localNodeId, deviceId, leaderNodeId);
+ connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
+ //FIXME this will be removed when config is synced
+ cleanUpConfigInfo(deviceId);
+ }
+ } else if ((deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)
+ && mastershipService.isLocalMaster(deviceId) && !isReachable(deviceId) && configIsPresent(deviceId))) {
+ log.info("Removing available but unreachable device {}", deviceId);
+ disconnectDevice(deviceId);
+ providerService.deviceDisconnected(deviceId);
+ }
+ }
+
+ private boolean configIsPresent(DeviceId deviceId) {
+ boolean present = cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
+ && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
+ if (!present) {
+ log.warn("Configuration for device {} is not complete", deviceId);
+ }
+ return present;
+ }
+
/**
* Listener for core device events.
*/
@@ -754,7 +838,7 @@
// For now this is scheduled periodically, when streaming API will
// be available we check and base it on the streaming API (e.g. gNMI)
if (mastershipService.isLocalMaster(deviceId)) {
- scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
+ scheduledTasks.put(deviceId, scheduleStatistcsPolling(deviceId, false));
}
}
@@ -764,4 +848,29 @@
event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
}
}
+
+ /**
+ * Listener for device channel events.
+ */
+ private class InternalChannelListener implements ChannelListener {
+
+ @Override
+ public void event(ChannelEvent event) {
+ DeviceId deviceId = event.subject();
+ if (event.type().equals(ChannelEvent.Type.CHANNEL_DISCONNECTED)) {
+ //let's properly handle device disconnection
+ CompletableFuture<Boolean> disconnection = disconnectDevice(deviceId);
+ disconnection.thenAcceptAsync(result -> {
+ //If master notifying of disconnection to the core.
+ if (mastershipService.isLocalMaster(deviceId)) {
+ log.info("Disconnecting unreachable device {}, due to error on channel", deviceId);
+ providerService.deviceDisconnected(deviceId);
+ }
+ });
+
+ }
+ //TODO evaluate other type of reactions.
+ }
+
+ }
}