Refactor channel and mastership handling in P4Runtime
This (big) change aims at solving the issue observed with mastership flapping
and device connection/disconnection with P4Runtime.
Channel handling is now based on the underlying gRPC channel state. Before,
channel events (open/close/error) were generated as a consequence of P4Runtime
StreamChannel events, making device availability dependent on mastership. Now
Stream Channel events only affect mastership (MASTER/STANDBY or NONE when the
SteamChannel RPC is not active).
Mastership handling has been refactored to generate P4Runtime election IDs that
are compatible with the mastership preference decided by the MastershipService.
GeneralDeviceProvider has been re-implemented to support in-order
device event processing and to reduce implementation complexity. Stats polling
has been moved to a separate component, and netcfg handling updated to only
depend on BasicDeviceConfig, augmented with a pipeconf field, and re-using the
managementAddress field to set the gRPC server endpoints (e.g.
grpc://myswitch.local:50051). Before it was depending on 3 different config
classes, making hard to detect changes.
Finally, this change affects some core interfaces:
- Adds a method to DeviceProvider and DeviceHandshaker to check for device
availability, making the meaning of availability device-specific. This is needed
in cases where the device manager needs to change the availability state of a
device (as in change #20842)
- Support device providers not capable of reconciling mastership role responses
with requests (like P4Runtime).
- Clarify the meaning of "connection" in the DeviceConnect behavior.
- Allows driver-based providers to check devices for reachability and
availability without probing the device via the network.
Change-Id: I7ff30d29f5d02ad938e3171536e54ae2916629a2
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/api/GeneralProviderDeviceConfig.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/api/GeneralProviderDeviceConfig.java
deleted file mode 100644
index 0675a4b..0000000
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/api/GeneralProviderDeviceConfig.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.provider.general.device.api;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.annotations.Beta;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.config.Config;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Configuration for General device provider.
- */
-@Beta
-public class GeneralProviderDeviceConfig extends Config<DeviceId> {
-
- private static final String DEVICEKEYID = "deviceKeyId";
-
-
- @Override
- public boolean isValid() {
- return true;
- }
-
- /**
- * Gets the information of all protocols associated to the device.
- *
- * @return map of protocol name and relative information
- */
- public Map<String, DeviceInfoConfig> protocolsInfo() {
- return getProtocolInfoMap();
- }
-
- private Map<String, DeviceInfoConfig> getProtocolInfoMap() {
- Map<String, DeviceInfoConfig> deviceMap = new HashMap<>();
- node.fieldNames().forEachRemaining(name -> {
-
- Map<String, String> configMap = new HashMap<>();
- JsonNode protocol = node.get(name);
- protocol.fieldNames().forEachRemaining(info -> configMap.put(info, protocol.get(info).asText()));
-
- String deviceKeyId = "";
- if (protocol.has(DEVICEKEYID)) {
- deviceKeyId = protocol.get(DEVICEKEYID).asText("");
- }
-
- deviceMap.put(name, new DeviceInfoConfig(configMap, deviceKeyId));
- });
- return deviceMap;
- }
-
-}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/DeviceTaskExecutor.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/DeviceTaskExecutor.java
new file mode 100644
index 0000000..6f90256
--- /dev/null
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/DeviceTaskExecutor.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.general.device.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Striped;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Allows submitting tasks related to a specific device. Takes care of executing
+ * pending tasks sequentially for each device in a FIFO order, while using a
+ * delegate executor. It also avoids executing duplicate tasks when arriving
+ * back-to-back.
+ *
+ * @param <T> enum describing the type of task
+ */
+
+class DeviceTaskExecutor<T extends Enum> {
+ /**
+ * Minimum interval between duplicate back-to-back tasks.
+ */
+ private static final int DUPLICATE_MIN_INTERVAL_MILLIS = 1000;
+
+ private final Logger log = getLogger(getClass());
+
+ private final ExecutorService delegate;
+ private final AtomicBoolean canceled = new AtomicBoolean(false);
+ private final Set<DeviceId> busyDevices = Sets.newConcurrentHashSet();
+ private final Set<DeviceId> pendingDevices = Sets.newConcurrentHashSet();
+ private final Striped<Lock> deviceLocks = Striped.lock(30);
+ private final LoadingCache<DeviceId, TaskQueue> taskQueues = CacheBuilder.newBuilder()
+ .expireAfterAccess(1, TimeUnit.MINUTES)
+ .removalListener((RemovalListener<DeviceId, TaskQueue>) notification -> {
+ if (!notification.getValue().isEmpty()) {
+ log.warn("Cache evicted non-empty task queue for {} ({} pending tasks)",
+ notification.getKey(), notification.getValue().size());
+ }
+ })
+ .build(new CacheLoader<DeviceId, TaskQueue>() {
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public TaskQueue load(DeviceId deviceId) {
+ return new TaskQueue();
+ }
+ });
+
+ /**
+ * Creates a new executor with the given delegate executor service.
+ *
+ * @param delegate executor service
+ */
+ DeviceTaskExecutor(ExecutorService delegate) {
+ checkNotNull(delegate);
+ this.delegate = delegate;
+ }
+
+ /**
+ * Submit a tasks.
+ *
+ * @param deviceId device associated with the task
+ * @param type type of task (used to remove eventual back-to-back
+ * duplicates)
+ * @param runnable runnable to execute
+ */
+ void submit(DeviceId deviceId, T type, Runnable runnable) {
+ checkNotNull(deviceId);
+ checkNotNull(type);
+ checkNotNull(runnable);
+
+ if (canceled.get()) {
+ log.warn("Executor was cancelled, cannot submit task {} for {}",
+ type, deviceId);
+ return;
+ }
+
+ final DeviceTask task = new DeviceTask(deviceId, type, runnable);
+ deviceLocks.get(deviceId).lock();
+ try {
+ if (taskQueues.get(deviceId).isBackToBackDuplicate(type)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Dropping back-to-back duplicate task {} for {}",
+ type, deviceId);
+ }
+ return;
+ }
+ if (taskQueues.get(deviceId).offer(task)) {
+ pendingDevices.add(deviceId);
+ if (!busyDevices.contains(deviceId)) {
+ // The task was submitted to the queue and we are not
+ // performing any other task for this device. There is at
+ // least one task that is ready to be executed.
+ delegate.execute(this::performTaskIfAny);
+ }
+ } else {
+ log.warn("Unable to submit task {} for {}",
+ task.type, task.deviceId);
+ }
+ } catch (ExecutionException e) {
+ log.warn("Exception while accessing task queue cache", e);
+ } finally {
+ deviceLocks.get(task.deviceId).unlock();
+ }
+ }
+
+ /**
+ * Prevents the executor from executing any more tasks.
+ */
+ void cancel() {
+ canceled.set(true);
+ }
+
+ private void performTaskIfAny() {
+ final DeviceTask task = pollTask();
+ if (task == null) {
+ // No tasks.
+ return;
+ }
+ if (canceled.get()) {
+ log.warn("Executor was cancelled, dropping task {} for {}",
+ task.type, task.deviceId);
+ return;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("STARTING task {} for {}...", task.type.name(), task.deviceId);
+ }
+ try {
+ task.runnable.run();
+ } catch (DeviceTaskException e) {
+ log.error("Unable to complete task {} for {}: {}",
+ task.type, task.deviceId, e.getMessage());
+ } catch (Throwable t) {
+ log.error(format(
+ "Uncaught exception when executing task %s for %s",
+ task.type, task.deviceId), t);
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("COMPLETED task {} for {}", task.type.name(), task.deviceId);
+ }
+ busyDevices.remove(task.deviceId);
+ delegate.execute(this::performTaskIfAny);
+ }
+
+ private DeviceTask pollTask() {
+ for (DeviceId deviceId : pendingDevices) {
+ final DeviceTask task;
+ deviceLocks.get(deviceId).lock();
+ try {
+ if (busyDevices.contains(deviceId)) {
+ // Next device.
+ continue;
+ }
+ task = taskQueues.get(deviceId).poll();
+ if (task == null) {
+ // Next device.
+ continue;
+ }
+ if (taskQueues.get(deviceId).isEmpty()) {
+ pendingDevices.remove(deviceId);
+ }
+ busyDevices.add(deviceId);
+ return task;
+ } catch (ExecutionException e) {
+ log.warn("Exception while accessing task queue cache", e);
+ } finally {
+ deviceLocks.get(deviceId).unlock();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Device task as stored in the task queue.
+ */
+ private class DeviceTask {
+
+ private final DeviceId deviceId;
+ private final T type;
+ private final Runnable runnable;
+
+ DeviceTask(DeviceId deviceId, T type, Runnable runnable) {
+ this.deviceId = deviceId;
+ this.type = type;
+ this.runnable = runnable;
+ }
+ }
+
+ /**
+ * A queue that keeps track of the last task added to detects back-to-back
+ * duplicates.
+ */
+ private class TaskQueue extends ConcurrentLinkedQueue<DeviceTask> {
+
+ private T lastTaskAdded;
+ private long lastAddedMillis;
+
+ @Override
+ public boolean offer(DeviceTask deviceTask) {
+ lastTaskAdded = deviceTask.type;
+ lastAddedMillis = currentTimeMillis();
+ return super.offer(deviceTask);
+ }
+
+ boolean isBackToBackDuplicate(T taskType) {
+ return lastTaskAdded != null
+ && lastTaskAdded.equals(taskType)
+ && (currentTimeMillis() - lastAddedMillis) <= DUPLICATE_MIN_INTERVAL_MILLIS;
+ }
+ }
+
+ /**
+ * Signals an error that prevented normal execution of the task.
+ */
+ static class DeviceTaskException extends RuntimeException {
+
+ /**
+ * Creates a new exception.
+ *
+ * @param message explanation
+ */
+ DeviceTaskException(String message) {
+ super(message);
+ }
+ }
+}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 1eaa808..481a310 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -16,18 +16,17 @@
package org.onosproject.provider.general.device.impl;
-import com.google.common.annotations.Beta;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Striped;
import org.onlab.packet.ChassisId;
import org.onlab.util.ItemNotFoundException;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
import org.onosproject.gnmi.api.GnmiController;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
@@ -35,27 +34,21 @@
import org.onosproject.net.PortNumber;
import org.onosproject.net.behaviour.PiPipelineProgrammable;
import org.onosproject.net.behaviour.PortAdmin;
-import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.basics.BasicDeviceConfig;
-import org.onosproject.net.config.basics.SubjectFactories;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DeviceAgentEvent;
import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
-import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceHandshaker;
-import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortDescription;
-import org.onosproject.net.device.PortStatistics;
-import org.onosproject.net.device.PortStatisticsDiscovery;
import org.onosproject.net.driver.Behaviour;
import org.onosproject.net.driver.DefaultDriverData;
import org.onosproject.net.driver.DefaultDriverHandler;
@@ -64,14 +57,13 @@
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiPipeconfId;
-import org.onosproject.net.pi.service.PiPipeconfConfig;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
-import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
+import org.onosproject.provider.general.device.impl.DeviceTaskExecutor.DeviceTaskException;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -81,50 +73,43 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
-import java.security.SecureRandom;
-import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.StringJoiner;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Lock;
-import java.util.function.Supplier;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.Executors.newFixedThreadPool;
-import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.net.device.DeviceEvent.Type;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.OP_TIMEOUT_SHORT;
import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.OP_TIMEOUT_SHORT_DEFAULT;
-import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_FREQUENCY;
-import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_FREQUENCY_DEFAULT;
-import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_FREQUENCY;
-import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_FREQUENCY_DEFAULT;
+import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_INTERVAL;
+import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.PROBE_INTERVAL_DEFAULT;
+import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL;
+import static org.onosproject.provider.general.device.impl.OsgiPropertyConstants.STATS_POLL_INTERVAL_DEFAULT;
import static org.slf4j.LoggerFactory.getLogger;
/**
- * Provider which uses drivers to detect device and do initial handshake and
- * channel establishment with devices. Any other provider specific operation is
- * also delegated to the DeviceHandshaker driver.
+ * Provider which uses drivers to discover devices, perform initial handshake,
+ * and notify the core of disconnection events. The implementation listens for
+ * events from netcfg or the drivers (via {@link DeviceAgentListener}) andP
+ * schedules task for each event.
*/
-@Beta
@Component(immediate = true,
property = {
- STATS_POLL_FREQUENCY + ":Integer=" + STATS_POLL_FREQUENCY_DEFAULT,
- PROBE_FREQUENCY + ":Integer=" + PROBE_FREQUENCY_DEFAULT,
+ PROBE_INTERVAL + ":Integer=" + PROBE_INTERVAL_DEFAULT,
+ STATS_POLL_INTERVAL + ":Integer=" + STATS_POLL_INTERVAL_DEFAULT,
OP_TIMEOUT_SHORT + ":Integer=" + OP_TIMEOUT_SHORT_DEFAULT,
})
public class GeneralDeviceProvider extends AbstractProvider
@@ -132,16 +117,12 @@
private final Logger log = getLogger(getClass());
- private static final String APP_NAME = "org.onosproject.gdp";
+ private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
private static final String URI_SCHEME = "device";
- private static final String CFG_SCHEME = "generalprovider";
private static final String DEVICE_PROVIDER_PACKAGE =
"org.onosproject.general.provider.device";
private static final int CORE_POOL_SIZE = 10;
private static final String UNKNOWN = "unknown";
- private static final String DRIVER = "driver";
- private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS =
- ImmutableSet.of("p4runtime");
@Reference(cardinality = ReferenceCardinality.MANDATORY)
private DeviceProviderRegistry providerRegistry;
@@ -165,6 +146,9 @@
private MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
private PiPipeconfService pipeconfService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -180,68 +164,55 @@
private GnmiDeviceStateSubscriber gnmiDeviceStateSubscriber;
- /**
- * Configure poll frequency for port status and statistics; default is 10 sec.
- */
- private int statsPollFrequency = STATS_POLL_FREQUENCY_DEFAULT;
+ /** Configure interval for checking device availability; default is 10 sec. */
+ private int probeInterval = PROBE_INTERVAL_DEFAULT;
- /**
- * Configure probe frequency for checking device availability; default is 10 sec.
- */
- private int probeFrequency = PROBE_FREQUENCY_DEFAULT;
+ /** Configure poll frequency for port status and stats; default is 10 sec. */
+ private int statsPollInterval = STATS_POLL_INTERVAL_DEFAULT;
- /**
- * Configure timeout in seconds for device operations that are supposed to take a short time
- * (e.g. checking device reachability); default is 10 seconds.
- */
+ /** Configure timeout in seconds for device operations; default is 10 sec. */
private int opTimeoutShort = OP_TIMEOUT_SHORT_DEFAULT;
- //FIXME to be removed when netcfg will issue device events in a bundle or
- //ensures all configuration needed is present
- private final Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
- private final Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
- private final Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
-
- private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
- private final ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
private final Map<DeviceId, DeviceHandshaker> handshakersWithListeners = Maps.newConcurrentMap();
- private final InternalDeviceListener deviceListener = new InternalDeviceListener();
+ private final Map<DeviceId, Long> lastProbedAvailability = Maps.newConcurrentMap();
private final InternalPipeconfWatchdogListener pipeconfWatchdogListener = new InternalPipeconfWatchdogListener();
private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
- private final ConfigFactory factory = new InternalConfigFactory();
- private final Striped<Lock> deviceLocks = Striped.lock(30);
- private ExecutorService connectionExecutor;
- private ScheduledExecutorService statsExecutor;
+ private ExecutorService mainExecutor;
+ private DeviceTaskExecutor<TaskType> taskExecutor;
private ScheduledExecutorService probeExecutor;
private ScheduledFuture<?> probeTask;
+ private StatsPoller statsPoller;
private DeviceProviderService providerService;
public GeneralDeviceProvider() {
super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
}
+ protected DeviceProviderService providerService() {
+ return providerService;
+ }
+
@Activate
public void activate(ComponentContext context) {
- connectionExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
- "onos/gdp-connect", "%d", log));
- statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
- "onos/gdp-stats", "%d", log));
+ mainExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
+ "onos/gdp-task", "%d", log));
+ taskExecutor = new DeviceTaskExecutor<>(mainExecutor);
probeExecutor = newSingleThreadScheduledExecutor(groupedThreads(
"onos/gdp-probe", "%d", log));
providerService = providerRegistry.register(this);
componentConfigService.registerProperties(getClass());
coreService.registerApplication(APP_NAME);
- cfgService.registerConfigFactory(factory);
cfgService.addListener(cfgListener);
- deviceService.addListener(deviceListener);
pipeconfWatchdogService.addListener(pipeconfWatchdogListener);
- rescheduleProbeTask(false);
- modified(context);
- gnmiDeviceStateSubscriber = new GnmiDeviceStateSubscriber(gnmiController,
- deviceService, mastershipService, providerService);
+ gnmiDeviceStateSubscriber = new GnmiDeviceStateSubscriber(
+ gnmiController, deviceService, mastershipService, providerService);
gnmiDeviceStateSubscriber.activate();
+ startOrRescheduleProbeTask();
+ statsPoller = new StatsPoller(deviceService, mastershipService, providerService);
+ statsPoller.activate(statsPollInterval);
+ modified(context);
log.info("Started");
}
@@ -252,55 +223,35 @@
}
Dictionary<?, ?> properties = context.getProperties();
- final int oldStatsPollFrequency = statsPollFrequency;
- statsPollFrequency = Tools.getIntegerProperty(
- properties, STATS_POLL_FREQUENCY, STATS_POLL_FREQUENCY_DEFAULT);
+ final int oldProbeFrequency = probeInterval;
+ probeInterval = Tools.getIntegerProperty(
+ properties, PROBE_INTERVAL, PROBE_INTERVAL_DEFAULT);
log.info("Configured. {} is configured to {} seconds",
- STATS_POLL_FREQUENCY, statsPollFrequency);
- final int oldProbeFrequency = probeFrequency;
- probeFrequency = Tools.getIntegerProperty(
- properties, PROBE_FREQUENCY, PROBE_FREQUENCY_DEFAULT);
+ PROBE_INTERVAL, probeInterval);
+ final int oldStatsPollFrequency = statsPollInterval;
+ statsPollInterval = Tools.getIntegerProperty(
+ properties, STATS_POLL_INTERVAL, STATS_POLL_INTERVAL_DEFAULT);
log.info("Configured. {} is configured to {} seconds",
- PROBE_FREQUENCY, probeFrequency);
+ STATS_POLL_INTERVAL, statsPollInterval);
opTimeoutShort = Tools.getIntegerProperty(
properties, OP_TIMEOUT_SHORT, OP_TIMEOUT_SHORT_DEFAULT);
log.info("Configured. {} is configured to {} seconds",
OP_TIMEOUT_SHORT, opTimeoutShort);
- if (oldStatsPollFrequency != statsPollFrequency) {
- rescheduleStatsPollingTasks();
+ if (oldProbeFrequency != probeInterval) {
+ startOrRescheduleProbeTask();
}
- if (oldProbeFrequency != probeFrequency) {
- rescheduleProbeTask(true);
- }
- }
-
- private void rescheduleProbeTask(boolean deelay) {
- synchronized (this) {
- if (probeTask != null) {
- probeTask.cancel(false);
- }
- probeTask = probeExecutor.scheduleAtFixedRate(
- this::triggerProbeAllDevices,
- deelay ? probeFrequency : 0,
- probeFrequency,
- TimeUnit.SECONDS);
+ if (oldStatsPollFrequency != statsPollInterval) {
+ statsPoller.reschedule(statsPollInterval);
}
}
@Deactivate
public void deactivate() {
- // Shutdown stats polling tasks.
- statsPollingTasks.keySet().forEach(this::cancelStatsPolling);
- statsPollingTasks.clear();
- statsExecutor.shutdownNow();
- try {
- statsExecutor.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("statsExecutor not terminated properly");
- }
- statsExecutor = null;
+ // Shutdown stats poller.
+ statsPoller.deactivate();
+ statsPoller = null;
// Shutdown probe executor.
probeTask.cancel(true);
probeTask = null;
@@ -311,87 +262,125 @@
log.warn("probeExecutor not terminated properly");
}
probeExecutor = null;
- // Shutdown connection executor.
- connectionExecutor.shutdownNow();
+ // Shutdown main and task executor.
+ taskExecutor.cancel();
+ taskExecutor = null;
+ mainExecutor.shutdownNow();
try {
- connectionExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ mainExecutor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("connectionExecutor not terminated properly");
}
- connectionExecutor = null;
+ mainExecutor = null;
// Remove all device agent listeners
handshakersWithListeners.values().forEach(h -> h.removeDeviceAgentListener(id()));
handshakersWithListeners.clear();
// Other cleanup.
+ lastProbedAvailability.clear();
componentConfigService.unregisterProperties(getClass(), false);
cfgService.removeListener(cfgListener);
- deviceService.removeListener(deviceListener);
pipeconfWatchdogService.removeListener(pipeconfWatchdogListener);
providerRegistry.unregister(this);
providerService = null;
- cfgService.unregisterConfigFactory(factory);
gnmiDeviceStateSubscriber.deactivate();
gnmiDeviceStateSubscriber = null;
log.info("Stopped");
}
-
@Override
public void triggerProbe(DeviceId deviceId) {
- connectionExecutor.execute(withDeviceLock(
- () -> doDeviceProbe(deviceId), deviceId));
+ checkNotNull(deviceId);
+ submitTask(deviceId, TaskType.PROBE_AVAILABILITY);
}
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
- log.info("Notifying role {} to device {}", newRole, deviceId);
- requestedRoles.put(deviceId, newRole);
- connectionExecutor.execute(() -> doRoleChanged(deviceId, newRole));
- }
- private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
+ final MastershipInfo mastershipInfo = mastershipService.getMastershipFor(deviceId);
+ final NodeId localNodeId = clusterService.getLocalNode().id();
+
+ if (!mastershipInfo.getRole(localNodeId).equals(newRole)) {
+ log.warn("Inconsistent mastership info for {}! Requested {}, but " +
+ "mastership service reports {}, will apply the latter...",
+ deviceId, newRole, mastershipInfo.getRole(localNodeId));
+ newRole = mastershipInfo.getRole(localNodeId);
+ }
+
+ // Derive preference value.
+ final int preference;
+ switch (newRole) {
+ case MASTER:
+ preference = 0;
+ break;
+ case STANDBY:
+ preference = mastershipInfo.backups().indexOf(localNodeId) + 1;
+ if (preference == 0) {
+ // Not found in list.
+ log.error("Unable to derive mastership preference for {}, " +
+ "requested role {} but local node ID was " +
+ "not found among list of backup nodes " +
+ "reported by mastership service");
+ return;
+ }
+ break;
+ case NONE:
+ // No preference for NONE, apply as is.
+ log.info("Notifying role {} to {}", newRole, deviceId);
+ roleChanged(deviceId, newRole);
+ return;
+ default:
+ log.error("Unrecognized mastership role {}", newRole);
+ return;
+ }
+
+ log.info("Notifying role {} (preference {}) for term {} to {}",
+ newRole, preference, mastershipInfo.term(), deviceId);
+
final DeviceHandshaker handshaker = getBehaviour(
deviceId, DeviceHandshaker.class);
if (handshaker == null) {
- log.error("Null handshaker. Unable to notify new role {} to {}",
+ log.error("Null handshaker. Unable to notify role {} to {}",
newRole, deviceId);
return;
}
- handshaker.roleChanged(newRole);
+
+ try {
+ handshaker.roleChanged(preference, mastershipInfo.term());
+ } catch (UnsupportedOperationException e) {
+ // Preference-based method not supported.
+ handshaker.roleChanged(newRole);
+ }
}
@Override
public boolean isReachable(DeviceId deviceId) {
- log.debug("Testing reachability for device {}", deviceId);
final DeviceHandshaker handshaker = getBehaviour(
deviceId, DeviceHandshaker.class);
if (handshaker == null) {
return false;
}
- return getFutureWithDeadline(
- handshaker.isReachable(), "checking reachability",
- deviceId, false, opTimeoutShort);
+ return handshaker.isReachable();
}
- private boolean isConnected(DeviceId deviceId) {
- log.debug("Testing connection to device {}", deviceId);
+ @Override
+ public boolean isAvailable(DeviceId deviceId) {
final DeviceHandshaker handshaker = getBehaviour(
deviceId, DeviceHandshaker.class);
if (handshaker == null) {
return false;
}
- return handshaker.isConnected();
+ try {
+ // Try without probing the device...
+ return handshaker.isAvailable();
+ } catch (UnsupportedOperationException e) {
+ // Driver does not support that.
+ return probeAvailability(handshaker);
+ }
}
@Override
public void changePortState(DeviceId deviceId, PortNumber portNumber,
boolean enable) {
- connectionExecutor.execute(
- () -> doChangePortState(deviceId, portNumber, enable));
- }
-
- private void doChangePortState(DeviceId deviceId, PortNumber portNumber,
- boolean enable) {
if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
deviceId);
@@ -409,9 +398,450 @@
@Override
public void triggerDisconnect(DeviceId deviceId) {
- log.debug("Triggering disconnection of device {}", deviceId);
- connectionExecutor.execute(withDeviceLock(
- () -> doDisconnectDevice(deviceId), deviceId));
+ checkNotNull(deviceId);
+ log.info("Triggering disconnection of device {}", deviceId);
+ submitTask(deviceId, TaskType.CONNECTION_TEARDOWN);
+ }
+
+ /**
+ * Listener for configuration events.
+ */
+ private class InternalNetworkConfigListener implements NetworkConfigListener {
+ @Override
+ public void event(NetworkConfigEvent event) {
+ DeviceId deviceId = (DeviceId) event.subject();
+ switch (event.type()) {
+ case CONFIG_ADDED:
+ if (configIsComplete(deviceId)) {
+ submitTask(deviceId, TaskType.CONNECTION_SETUP);
+ }
+ break;
+ case CONFIG_UPDATED:
+ if (configIsComplete(deviceId) && mgmtAddrUpdated(event)) {
+ submitTask(deviceId, TaskType.CONNECTION_UPDATE);
+ }
+ break;
+ case CONFIG_REMOVED:
+ if (event.configClass().equals(BasicDeviceConfig.class)) {
+ submitTask(deviceId, TaskType.CONNECTION_TEARDOWN);
+ }
+ break;
+ default:
+ // Ignore
+ break;
+ }
+ }
+
+ private boolean mgmtAddrUpdated(NetworkConfigEvent event) {
+ if (!event.prevConfig().isPresent() || !event.config().isPresent()) {
+ return false;
+ }
+ final BasicDeviceConfig prev = (BasicDeviceConfig) event.prevConfig().get();
+ final BasicDeviceConfig current = (BasicDeviceConfig) event.config().get();
+ return !Objects.equals(prev.managementAddress(), current.managementAddress());
+ }
+
+ @Override
+ public boolean isRelevant(NetworkConfigEvent event) {
+ return event.configClass().equals(BasicDeviceConfig.class) &&
+ (event.subject() instanceof DeviceId) &&
+ myScheme((DeviceId) event.subject());
+ }
+ }
+
+ /**
+ * Listener for device agent events.
+ */
+ private class InternalDeviceAgentListener implements DeviceAgentListener {
+ @Override
+ public void event(DeviceAgentEvent event) {
+ DeviceId deviceId = event.subject();
+ switch (event.type()) {
+ case CHANNEL_OPEN:
+ submitTask(deviceId, TaskType.CHANNEL_OPEN);
+ break;
+ case CHANNEL_CLOSED:
+ case CHANNEL_ERROR:
+ submitTask(deviceId, TaskType.CHANNEL_CLOSED);
+ break;
+ case ROLE_MASTER:
+ submitTask(deviceId, TaskType.ROLE_MASTER);
+ break;
+ case ROLE_STANDBY:
+ submitTask(deviceId, TaskType.ROLE_STANDBY);
+ break;
+ case ROLE_NONE:
+ submitTask(deviceId, TaskType.ROLE_NONE);
+ break;
+ case NOT_MASTER:
+ submitTask(deviceId, TaskType.NOT_MASTER);
+ break;
+ default:
+ log.warn("Unrecognized device agent event {}", event.type());
+ }
+ }
+ }
+
+ /**
+ * Pipeline event listener.
+ */
+ private class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
+ @Override
+ public void event(PiPipeconfWatchdogEvent event) {
+ final DeviceId deviceId = event.subject();
+ switch (event.type()) {
+ case PIPELINE_READY:
+ submitTask(deviceId, TaskType.PIPELINE_READY);
+ break;
+ case PIPELINE_UNKNOWN:
+ submitTask(deviceId, TaskType.PIPELINE_NOT_READY);
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public boolean isRelevant(PiPipeconfWatchdogEvent event) {
+ return myScheme(event.subject());
+ }
+ }
+
+ private void startOrRescheduleProbeTask() {
+ synchronized (this) {
+ if (probeTask != null) {
+ probeTask.cancel(false);
+ }
+ probeTask = probeExecutor.scheduleAtFixedRate(
+ this::submitProbeTasks,
+ 0,
+ probeInterval,
+ TimeUnit.SECONDS);
+ }
+ }
+
+ private void submitProbeTasks() {
+ // Async trigger a task for all devices in the cfg.
+ log.debug("Starting probing for all devices");
+ cfgService.getSubjects(DeviceId.class).stream()
+ .filter(GeneralDeviceProvider::myScheme)
+ .forEach(this::submitProbeTask);
+ }
+
+ private void submitProbeTask(DeviceId deviceId) {
+ final DeviceHandshaker handshaker = handshakersWithListeners.get(deviceId);
+
+ if (handshaker == null) {
+ if (configIsComplete(deviceId)) {
+ // Device in config but we have not initiated a connection.
+ // Perhaps we missed the config event?
+ submitTask(deviceId, TaskType.CONNECTION_SETUP);
+ }
+ return;
+ }
+
+ if (!handshaker.isConnected()) {
+ // Device is in the core, but driver reports there is NOT a
+ // connection to it. Perhaps the netcfg changed and we didn't
+ // pick the event?
+ log.warn("Re-establishing lost connection to {}", deviceId);
+ submitTask(deviceId, TaskType.CONNECTION_TEARDOWN);
+ submitTask(deviceId, TaskType.CONNECTION_SETUP);
+ return;
+ }
+
+ // On probing offline devices, while we expect them to signal
+ // availability via CHANNEL_OPEN or similar events, periodic probing
+ // might be needed to stimulate some channel activity. We might consider
+ // requiring active probing of closed channels in the protocol layer.
+
+ final Long lastProbe = lastProbedAvailability.get(deviceId);
+ if (lastProbe != null &&
+ (currentTimeMillis() - lastProbe) < (probeInterval * 1000 / 3)) {
+ // This avoids overload of probe tasks which might involve sending
+ // messages over the network. We require a minimum interval of 1/3
+ // of the configured probeInterval between consecutive probe tasks.
+ if (log.isDebugEnabled()) {
+ log.debug("Dropping probe task for {} as it happened recently",
+ deviceId);
+ }
+ return;
+ }
+
+ submitTask(deviceId, TaskType.PROBE_AVAILABILITY);
+ }
+
+ /**
+ * Type of tasks performed by this provider.
+ */
+ enum TaskType {
+ CONNECTION_SETUP,
+ CONNECTION_UPDATE,
+ CONNECTION_TEARDOWN,
+ PIPELINE_READY,
+ CHANNEL_OPEN,
+ CHANNEL_CLOSED,
+ PIPELINE_NOT_READY,
+ PROBE_AVAILABILITY,
+ ROLE_MASTER,
+ ROLE_NONE,
+ ROLE_STANDBY,
+ NOT_MASTER,
+ }
+
+ private void submitTask(DeviceId deviceId, TaskType taskType) {
+ taskExecutor.submit(deviceId, taskType, taskRunnable(deviceId, taskType));
+ }
+
+ private Runnable taskRunnable(DeviceId deviceId, TaskType taskType) {
+ switch (taskType) {
+ case CONNECTION_SETUP:
+ return () -> handleConnectionSetup(deviceId);
+ case CONNECTION_UPDATE:
+ return () -> handleConnectionUpdate(deviceId);
+ case CONNECTION_TEARDOWN:
+ return () -> handleConnectionTeardown(deviceId);
+ case CHANNEL_OPEN:
+ return () -> handleProbeAvailability(deviceId);
+ case CHANNEL_CLOSED:
+ return () -> markOfflineIfNeeded(deviceId);
+ case PIPELINE_NOT_READY:
+ return () -> markOfflineIfNeeded(deviceId);
+ case PIPELINE_READY:
+ return () -> handleProbeAvailability(deviceId);
+ case PROBE_AVAILABILITY:
+ return () -> handleProbeAvailability(deviceId);
+ case ROLE_MASTER:
+ return () -> handleMastershipResponse(deviceId, MastershipRole.MASTER);
+ case ROLE_STANDBY:
+ return () -> handleMastershipResponse(deviceId, MastershipRole.STANDBY);
+ case ROLE_NONE:
+ return () -> handleMastershipResponse(deviceId, MastershipRole.NONE);
+ case NOT_MASTER:
+ return () -> handleNotMaster(deviceId);
+ default:
+ throw new IllegalArgumentException("Unrecognized task type " + taskType);
+ }
+ }
+
+ private void handleConnectionSetup(DeviceId deviceId) {
+ assertConfig(deviceId);
+ // Bind pipeconf (if any and if device is capable).
+ bindPipeconfIfRequired(deviceId);
+ // Get handshaker.
+ final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
+ if (handshaker.isConnected() || handshakersWithListeners.containsKey(deviceId)) {
+ throw new DeviceTaskException("connection already exists");
+ }
+ // Add device agent listener.
+ handshaker.addDeviceAgentListener(id(), deviceAgentListener);
+ handshakersWithListeners.put(deviceId, handshaker);
+ // Start connection via handshaker.
+ final Boolean connectSuccess = getFutureWithDeadline(
+ handshaker.connect(), "initiating connection",
+ deviceId, false, opTimeoutShort);
+ if (!connectSuccess) {
+ // Failed! Remove listeners.
+ handshaker.removeDeviceAgentListener(id());
+ handshakersWithListeners.remove(deviceId);
+ throw new DeviceTaskException("connection failed");
+ }
+ createOrUpdateDevice(deviceId, false);
+ final List<PortDescription> ports = getPortDetails(deviceId);
+ providerService.updatePorts(deviceId, ports);
+ // From here we expect a CHANNEL_OPEN event to update availability.
+ }
+
+ private void handleConnectionUpdate(DeviceId deviceId) {
+ assertConfig(deviceId);
+ final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
+ if (!handshaker.isConnected()) {
+ // If driver reports that a connection still exists, perhaps the
+ // part of the netcfg that changed does not affect the connection.
+ // Otherwise, remove any previous connection state from the old
+ // netcfg and create a new one.
+ log.warn("Detected change of connection endpoints for {}, will " +
+ "tear down existing connection and set up a new one...",
+ deviceId);
+ handleConnectionTeardown(deviceId);
+ handleConnectionSetup(deviceId);
+ }
+ }
+
+ private void createOrUpdateDevice(DeviceId deviceId, boolean available) {
+ if (deviceService.getDevice(deviceId) != null
+ && deviceService.isAvailable(deviceId) == available) {
+ // Other nodes might have advertised this device before us.
+ return;
+ }
+ assertConfig(deviceId);
+ providerService.deviceConnected(deviceId, getDeviceDescription(
+ deviceId, available));
+ }
+
+ private boolean probeAvailability(DeviceHandshaker handshaker) {
+ lastProbedAvailability.put(handshaker.data().deviceId(), currentTimeMillis());
+ return getFutureWithDeadline(
+ handshaker.probeAvailability(), "probing availability",
+ handshaker.data().deviceId(), false, opTimeoutShort);
+ }
+
+ private boolean probeReachability(DeviceHandshaker handshaker) {
+ lastProbedAvailability.put(handshaker.data().deviceId(), currentTimeMillis());
+ return getFutureWithDeadline(
+ handshaker.probeReachability(), "probing reachability",
+ handshaker.data().deviceId(), false, opTimeoutShort);
+ }
+
+ private void markOfflineIfNeeded(DeviceId deviceId) {
+ assertDeviceRegistered(deviceId);
+ if (deviceService.isAvailable(deviceId)) {
+ providerService.deviceDisconnected(deviceId);
+ }
+ }
+
+ private void handleProbeAvailability(DeviceId deviceId) {
+ assertDeviceRegistered(deviceId);
+
+ // Make device has a valid mastership role.
+ final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
+ final MastershipRole deviceRole = handshaker.getRole();
+ final MastershipRole expectedRole = mastershipService.getLocalRole(deviceId);
+ if (expectedRole == MastershipRole.NONE || expectedRole != deviceRole) {
+ // Device does NOT have a valid role...
+ if (!handshaker.isReachable() && !probeReachability(handshaker)) {
+ // ...but is not reachable. There isn't much we can do.
+ markOfflineIfNeeded(deviceId);
+ return;
+ }
+ // ...and is reachable, re-assert role.
+ roleChanged(deviceId, expectedRole == MastershipRole.NONE
+ ? mastershipService.requestRoleForSync(deviceId)
+ : expectedRole);
+ try {
+ // Wait for role to be notified and reachability state to be
+ // updated. This should be roughly equivalent to one RTT.
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+
+ // Check and update availability.
+ if (probeAvailability(handshakerOrFail(deviceId))) {
+ // Device ready to do its job.
+ createOrUpdateDevice(deviceId, true);
+ } else {
+ markOfflineIfNeeded(deviceId);
+ if (handshaker.isReachable() && isPipelineProgrammable(deviceId)) {
+ // If reachable, but not available, and pipeline programmable, there
+ // is a high chance it's because the pipeline is not READY
+ // (independently from what the pipeconf watchdog reports, as the
+ // status there might be outdated). Encourage pipeconf watchdog to
+ // perform a pipeline probe ASAP.
+ pipeconfWatchdogService.triggerProbe(deviceId);
+ }
+ }
+ }
+
+ private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
+ assertDeviceRegistered(deviceId);
+ log.debug("Device {} asserted role {}", deviceId, response);
+ providerService.receivedRoleReply(deviceId, response);
+ }
+
+ private void handleNotMaster(DeviceId deviceId) {
+ assertDeviceRegistered(deviceId);
+ if (mastershipService.isLocalMaster(deviceId)) {
+ log.warn("Device {} notified that this node is not master, " +
+ "relinquishing mastership...", deviceId);
+ mastershipService.relinquishMastership(deviceId);
+ }
+ }
+
+ private void assertDeviceRegistered(DeviceId deviceId) {
+ if (deviceService.getDevice(deviceId) == null) {
+ throw new DeviceTaskException("device not registered in the core");
+ }
+ }
+
+ private void handleConnectionTeardown(DeviceId deviceId) {
+ if (deviceService.getDevice(deviceId) != null
+ && deviceService.isAvailable(deviceId)) {
+ providerService.deviceDisconnected(deviceId);
+ }
+ final DeviceHandshaker handshaker = handshakerOrFail(deviceId);
+ handshaker.removeDeviceAgentListener(id());
+ handshakersWithListeners.remove(deviceId);
+ handshaker.disconnect();
+ lastProbedAvailability.remove(deviceId);
+ }
+
+ private void bindPipeconfIfRequired(DeviceId deviceId) {
+ if (pipeconfService.getPipeconf(deviceId).isPresent()
+ || !isPipelineProgrammable(deviceId)) {
+ // Nothing to do.
+ // Device has already a pipeconf or is not programmable.
+ return;
+ }
+ // Get pipeconf from netcfg or driver (default one).
+ final PiPipelineProgrammable pipelineProg = getBehaviour(
+ deviceId, PiPipelineProgrammable.class);
+ final PiPipeconfId pipeconfId = getPipeconfId(deviceId, pipelineProg);
+ if (pipeconfId == null) {
+ throw new DeviceTaskException("unable to find pipeconf");
+ }
+ // Store binding in pipeconf service.
+ pipeconfService.bindToDevice(pipeconfId, deviceId);
+ }
+
+ private PiPipeconfId getPipeconfId(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
+ // Places to look for a pipeconf ID (in priority order)):
+ // 1) netcfg
+ // 2) device driver (default one)
+ final PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
+ if (pipeconfId != null && !pipeconfId.id().isEmpty()) {
+ return pipeconfId;
+ }
+ if (pipelineProg != null
+ && pipelineProg.getDefaultPipeconf().isPresent()) {
+ final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
+ log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
+ return defaultPipeconf.id();
+ }
+ return null;
+ }
+
+ private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
+ BasicDeviceConfig config = cfgService.getConfig(
+ deviceId, BasicDeviceConfig.class);
+ if (config == null) {
+ return null;
+ }
+ return config.pipeconf() != null
+ ? new PiPipeconfId(config.pipeconf()) : null;
+ }
+
+ private DeviceHandshaker handshakerOrFail(DeviceId deviceId) {
+ final DeviceHandshaker handshaker = getBehaviour(
+ deviceId, DeviceHandshaker.class);
+ if (handshaker == null) {
+ throw new DeviceTaskException("missing handshaker behavior");
+ }
+ return handshaker;
+ }
+
+ private boolean configIsComplete(DeviceId deviceId) {
+ final BasicDeviceConfig basicDeviceCfg = cfgService.getConfig(
+ deviceId, BasicDeviceConfig.class);
+ return basicDeviceCfg != null && !isNullOrEmpty(basicDeviceCfg.driver());
+ }
+
+ private void assertConfig(DeviceId deviceId) {
+ if (!configIsComplete(deviceId)) {
+ throw new DeviceTaskException("configuration is not complete");
+ }
}
private Driver getDriver(DeviceId deviceId) {
@@ -425,8 +855,6 @@
}
private <T extends Behaviour> T getBehaviour(DeviceId deviceId, Class<T> type) {
- // Get handshaker.
-
Driver driver = getDriver(deviceId);
if (driver == null) {
return null;
@@ -435,83 +863,16 @@
return null;
}
final DriverData data = new DefaultDriverData(driver, deviceId);
- // Storing deviceKeyId and all other config values as data in the driver
- // with protocol_<info> name as the key. e.g protocol_ip.
- final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
- deviceId, GeneralProviderDeviceConfig.class);
- if (providerConfig != null) {
- providerConfig.protocolsInfo().forEach((protocol, info) -> {
- info.configValues().forEach(
- (k, v) -> data.set(protocol + "_" + k, v));
- data.set(protocol + "_key", info.deviceKeyId());
- });
- }
final DefaultDriverHandler handler = new DefaultDriverHandler(data);
return driver.createBehaviour(handler, type);
}
- private void doConnectDevice(DeviceId deviceId) {
- log.debug("Initiating connection to device {}...", deviceId);
- // Retrieve config
- if (configIsMissing(deviceId)) {
- return;
+ private boolean hasBehaviour(DeviceId deviceId, Class<? extends Behaviour> type) {
+ Driver driver = getDriver(deviceId);
+ if (driver == null) {
+ return false;
}
- // Bind pipeconf (if any and if device is capable).
- if (!bindPipeconfIfRequired(deviceId)) {
- // We already logged the error.
- return;
- }
- // Get handshaker.
- final DeviceHandshaker handshaker = getBehaviour(
- deviceId, DeviceHandshaker.class);
- if (handshaker == null) {
- log.error("Missing handshaker behavior for {}, aborting connection",
- deviceId);
- return;
- }
- // Add device agent listener.
- handshaker.addDeviceAgentListener(id(), deviceAgentListener);
- handshakersWithListeners.put(deviceId, handshaker);
- // Start connection via handshaker.
- final Boolean connectSuccess = getFutureWithDeadline(
- handshaker.connect(), "initiating connection",
- deviceId, false, opTimeoutShort);
- if (!connectSuccess) {
- log.warn("Unable to connect to {}", deviceId);
- }
- }
-
- private void triggerAdvertiseDevice(DeviceId deviceId) {
- connectionExecutor.execute(withDeviceLock(
- () -> doAdvertiseDevice(deviceId), deviceId));
- }
-
- private void doAdvertiseDevice(DeviceId deviceId) {
- // Retrieve config
- if (configIsMissing(deviceId)) {
- return;
- }
- // Obtain device and port description.
- final boolean isPipelineReady = isPipelineReady(deviceId);
- final DeviceDescription description = getDeviceDescription(
- deviceId, isPipelineReady);
- final List<PortDescription> ports = getPortDetails(deviceId);
- // Advertise to core.
- if (deviceService.getDevice(deviceId) == null ||
- (description.isDefaultAvailable() &&
- !deviceService.isAvailable(deviceId))) {
- if (!isPipelineReady) {
- log.info("Advertising device {} to core with available={} as " +
- "device pipeline is not ready yet",
- deviceId, description.isDefaultAvailable());
- }
- providerService.deviceConnected(deviceId, description);
- }
- providerService.updatePorts(deviceId, ports);
- // If pipeline is not ready, encourage watchdog to perform probe ASAP.
- if (!isPipelineReady) {
- pipeconfWatchdogService.triggerProbe(deviceId);
- }
+ return driver.hasBehaviour(type);
}
private DeviceDescription getDeviceDescription(
@@ -548,18 +909,6 @@
deviceId, DeviceHandshaker.class);
final Driver driver = handshaker != null
? handshaker.handler().driver() : null;
- final GeneralProviderDeviceConfig cfg = cfgService.getConfig(
- deviceId, GeneralProviderDeviceConfig.class);
- final DefaultAnnotations.Builder annBuilder = DefaultAnnotations.builder();
- // If device is pipeline programmable, let this provider decide when the
- // device can be marked online.
- annBuilder.set(AnnotationKeys.PROVIDER_MARK_ONLINE,
- String.valueOf(isPipelineProgrammable(deviceId)));
- if (cfg != null) {
- StringJoiner protoStringBuilder = new StringJoiner(", ");
- cfg.protocolsInfo().keySet().forEach(protoStringBuilder::add);
- annBuilder.set(AnnotationKeys.PROTOCOL, protoStringBuilder.toString());
- }
return new DefaultDeviceDescription(
deviceId.uri(),
Device.Type.SWITCH,
@@ -569,351 +918,15 @@
UNKNOWN,
new ChassisId(),
defaultAvailable,
- annBuilder.build());
+ DefaultAnnotations.EMPTY);
}
- private void triggerMarkAvailable(DeviceId deviceId) {
- connectionExecutor.execute(withDeviceLock(
- () -> doMarkAvailable(deviceId), deviceId));
- }
-
- private void doMarkAvailable(DeviceId deviceId) {
- if (deviceService.isAvailable(deviceId)) {
- return;
- }
- final DeviceDescription descr = getDeviceDescription(deviceId, true);
- // It has been observed that devices that were marked offline (e.g.
- // after device disconnection) might end up with no master. Here we
- // trigger a new master election (if device has no master).
- mastershipService.requestRoleForSync(deviceId);
- providerService.deviceConnected(deviceId, descr);
- }
-
- private boolean bindPipeconfIfRequired(DeviceId deviceId) {
- if (pipeconfService.ofDevice(deviceId).isPresent()
- || !isPipelineProgrammable(deviceId)) {
- // Nothing to do, all good.
- return true;
- }
- // Get pipeconf from netcfg or driver (default one).
- final PiPipelineProgrammable pipelineProg = getBehaviour(
- deviceId, PiPipelineProgrammable.class);
- final PiPipeconfId pipeconfId = getPipeconfId(deviceId, pipelineProg);
- if (pipeconfId == null) {
- return false;
- }
- // Store binding in pipeconf service.
- pipeconfService.bindToDevice(pipeconfId, deviceId);
- return true;
- }
-
- private PiPipeconfId getPipeconfId(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
- // Places to look for a pipeconf ID (in priority order)):
- // 1) netcfg
- // 2) device driver (default one)
- final PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
- if (pipeconfId != null && !pipeconfId.id().isEmpty()) {
- return pipeconfId;
- }
- if (pipelineProg != null
- && pipelineProg.getDefaultPipeconf().isPresent()) {
- final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
- log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
- return defaultPipeconf.id();
- } else {
- log.warn("Unable to associate a pipeconf to {}", deviceId);
- return null;
- }
- }
-
- private void doDisconnectDevice(DeviceId deviceId) {
- log.debug("Initiating disconnection from {}...", deviceId);
- final DeviceHandshaker handshaker = getBehaviour(
- deviceId, DeviceHandshaker.class);
- final boolean isAvailable = deviceService.isAvailable(deviceId);
- // Signal disconnection to core (if master).
- if (isAvailable && mastershipService.isLocalMaster(deviceId)) {
- providerService.deviceDisconnected(deviceId);
- }
- // Cancel tasks.
- cancelStatsPolling(deviceId);
- // Disconnect device.
- if (handshaker == null) {
- if (isAvailable) {
- // If not available don't bother logging. We are probably
- // invoking this method multiple times for the same device.
- log.warn("Missing DeviceHandshaker behavior for {}, " +
- "no guarantees of complete disconnection",
- deviceId);
- }
- return;
- }
- handshaker.removeDeviceAgentListener(id());
- handshakersWithListeners.remove(deviceId);
- final boolean disconnectSuccess = getFutureWithDeadline(
- handshaker.disconnect(), "performing disconnection",
- deviceId, false, opTimeoutShort);
- if (!disconnectSuccess) {
- log.warn("Unable to disconnect from {}", deviceId);
- }
- }
-
- // Needed to catch the exception in the executors since are not rethrown otherwise.
- private Runnable exceptionSafe(Runnable runnable) {
- return () -> {
- try {
- runnable.run();
- } catch (Exception e) {
- log.error("Unhandled Exception", e);
- }
- };
- }
-
- private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
- final Lock lock = deviceLocks.get(deviceId);
- lock.lock();
- try {
- return task.get();
- } finally {
- lock.unlock();
- }
- }
-
- private Runnable withDeviceLock(Runnable task, DeviceId deviceId) {
- // Wrapper of withDeviceLock(Supplier, ...) for void tasks.
- return () -> withDeviceLock(() -> {
- task.run();
- return null;
- }, deviceId);
- }
-
- private void updatePortStatistics(DeviceId deviceId) {
- Device device = deviceService.getDevice(deviceId);
- if (device != null && deviceService.isAvailable(deviceId) &&
- device.is(PortStatisticsDiscovery.class)) {
- Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
- .discoverPortStatistics();
- //updating statistcs only if not empty
- if (!statistics.isEmpty()) {
- providerService.updatePortStatistics(deviceId, statistics);
- }
- } else {
- log.debug("Can't update port statistics for device {}", deviceId);
- }
- }
-
- private boolean notMyScheme(DeviceId deviceId) {
- return !deviceId.uri().getScheme().equals(URI_SCHEME);
- }
-
- private void triggerConnect(DeviceId deviceId) {
- connectionExecutor.execute(withDeviceLock(
- () -> doConnectDevice(deviceId), deviceId));
+ static boolean myScheme(DeviceId deviceId) {
+ return deviceId.uri().getScheme().equals(URI_SCHEME);
}
private boolean isPipelineProgrammable(DeviceId deviceId) {
- final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
- deviceId, GeneralProviderDeviceConfig.class);
- if (providerConfig == null) {
- return false;
- }
- return !Collections.disjoint(
- ImmutableSet.copyOf(providerConfig.node().fieldNames()),
- PIPELINE_CONFIGURABLE_PROTOCOLS);
- }
-
- /**
- * Listener for configuration events.
- */
- private class InternalNetworkConfigListener implements NetworkConfigListener {
-
- @Override
- public void event(NetworkConfigEvent event) {
- connectionExecutor.execute(() -> consumeConfigEvent(event));
- }
-
- @Override
- public boolean isRelevant(NetworkConfigEvent event) {
- return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
- event.configClass().equals(BasicDeviceConfig.class) ||
- event.configClass().equals(PiPipeconfConfig.class)) &&
- (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
- event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
- }
-
- private void consumeConfigEvent(NetworkConfigEvent event) {
- DeviceId deviceId = (DeviceId) event.subject();
- //Assuming that the deviceId comes with uri 'device:'
- if (notMyScheme(deviceId)) {
- // not under my scheme, skipping
- log.debug("{} is not my scheme, skipping", deviceId);
- return;
- }
- final boolean configComplete = withDeviceLock(
- () -> isDeviceConfigComplete(event, deviceId), deviceId);
- if (!configComplete) {
- // Still waiting for some configuration.
- return;
- }
- // Good to go.
- triggerConnect(deviceId);
- cleanUpConfigInfo(deviceId);
- }
-
- private boolean isDeviceConfigComplete(NetworkConfigEvent event, DeviceId deviceId) {
- // FIXME to be removed when netcfg will issue device events in a bundle or
- // ensure all configuration needed is present
- if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
- //FIXME we currently assume that p4runtime devices are pipeline configurable.
- //If we want to connect a p4runtime device with no pipeline
- if (event.config().isPresent()) {
- deviceConfigured.add(deviceId);
- final boolean isNotPipelineConfigurable = Collections.disjoint(
- ImmutableSet.copyOf(event.config().get().node().fieldNames()),
- PIPELINE_CONFIGURABLE_PROTOCOLS);
- if (isNotPipelineConfigurable) {
- // Skip waiting for a pipeline if we can't support it.
- pipelineConfigured.add(deviceId);
- }
- }
- } else if (event.configClass().equals(BasicDeviceConfig.class)) {
- if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
- driverConfigured.add(deviceId);
- }
- } else if (event.configClass().equals(PiPipeconfConfig.class)) {
- if (event.config().isPresent()
- && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
- pipelineConfigured.add(deviceId);
- }
- }
-
- if (deviceConfigured.contains(deviceId)
- && driverConfigured.contains(deviceId)
- && pipelineConfigured.contains(deviceId)) {
- return true;
- } else {
- if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
- log.debug("Waiting for pipeline configuration for device {}", deviceId);
- } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
- log.debug("Waiting for device configuration for device {}", deviceId);
- } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
- log.debug("Waiting for driver configuration for device {}", deviceId);
- } else if (driverConfigured.contains(deviceId)) {
- log.debug("Only driver configuration for device {}", deviceId);
- } else if (deviceConfigured.contains(deviceId)) {
- log.debug("Only device configuration for device {}", deviceId);
- }
- }
- return false;
- }
- }
-
- private boolean isPipelineReady(DeviceId deviceId) {
- final boolean isPipelineProg = isPipelineProgrammable(deviceId);
- final boolean isPipeconfReady = pipeconfWatchdogService
- .getStatus(deviceId)
- .equals(PiPipeconfWatchdogService.PipelineStatus.READY);
- return !isPipelineProg || isPipeconfReady;
- }
-
- private void cleanUpConfigInfo(DeviceId deviceId) {
- deviceConfigured.remove(deviceId);
- driverConfigured.remove(deviceId);
- pipelineConfigured.remove(deviceId);
- }
-
- private void startStatsPolling(DeviceId deviceId, boolean withRandomDelay) {
- statsPollingTasks.compute(deviceId, (did, oldTask) -> {
- if (oldTask != null) {
- oldTask.cancel(false);
- }
- final int delay = withRandomDelay
- ? new SecureRandom().nextInt(10) : 0;
- return statsExecutor.scheduleAtFixedRate(
- exceptionSafe(() -> updatePortStatistics(deviceId)),
- delay, statsPollFrequency, TimeUnit.SECONDS);
- });
- }
-
- private void cancelStatsPolling(DeviceId deviceId) {
- statsPollingTasks.computeIfPresent(deviceId, (did, task) -> {
- task.cancel(false);
- return null;
- });
- }
-
- private void rescheduleStatsPollingTasks() {
- statsPollingTasks.keySet().forEach(deviceId -> {
- // startStatsPolling cancels old one if present.
- startStatsPolling(deviceId, true);
- });
- }
-
- private void triggerProbeAllDevices() {
- // Async trigger a task for all devices in the cfg.
- cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
- .forEach(this::triggerDeviceProbe);
- }
-
- private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
- PiPipeconfConfig config = cfgService.getConfig(
- deviceId, PiPipeconfConfig.class);
- if (config == null) {
- return null;
- }
- return config.piPipeconfId();
- }
-
- private void triggerDeviceProbe(DeviceId deviceId) {
- connectionExecutor.execute(withDeviceLock(
- () -> doDeviceProbe(deviceId), deviceId));
- }
-
- private void doDeviceProbe(DeviceId deviceId) {
- log.debug("Probing device {}...", deviceId);
- if (configIsMissing(deviceId)) {
- return;
- }
- if (!isConnected(deviceId)) {
- if (deviceService.isAvailable(deviceId)) {
- providerService.deviceDisconnected(deviceId);
- }
- triggerConnect(deviceId);
- }
- }
-
- private boolean configIsMissing(DeviceId deviceId) {
- final boolean present =
- cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
- && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
- if (!present) {
- log.warn("Configuration for device {} is not complete", deviceId);
- }
- return !present;
- }
-
- private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
- // Notify core about mastership response.
- final MastershipRole request = requestedRoles.get(deviceId);
- final boolean isAvailable = deviceService.isAvailable(deviceId);
- if (request == null || !isAvailable) {
- return;
- }
- log.debug("Device {} asserted role {} (requested was {})",
- deviceId, response, request);
- providerService.receivedRoleReply(deviceId, request, response);
- // FIXME: this should be based on assigned mastership, not what returned by device
- if (response.equals(MastershipRole.MASTER)) {
- startStatsPolling(deviceId, false);
- } else {
- cancelStatsPolling(deviceId);
- }
- }
-
- private void handleNotMaster(DeviceId deviceId) {
- log.warn("Device {} notified that this node is not master, " +
- "relinquishing mastership...", deviceId);
- mastershipService.relinquishMastership(deviceId);
+ return hasBehaviour(deviceId, PiPipelineProgrammable.class);
}
private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
@@ -930,86 +943,4 @@
}
return defaultValue;
}
-
- /**
- * Listener for core device events.
- */
- private class InternalDeviceListener implements DeviceListener {
- @Override
- public void event(DeviceEvent event) {
- DeviceId deviceId = event.subject().id();
- // For now this is scheduled periodically, when streaming API will
- // be available we check and base it on the streaming API (e.g. gNMI)
- if (mastershipService.isLocalMaster(deviceId)) {
- startStatsPolling(deviceId, true);
- }
- }
-
- @Override
- public boolean isRelevant(DeviceEvent event) {
- return event.type() == Type.DEVICE_ADDED &&
- event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
- }
- }
-
- /**
- * Listener for device agent events.
- */
- private class InternalDeviceAgentListener implements DeviceAgentListener {
-
- @Override
- public void event(DeviceAgentEvent event) {
- DeviceId deviceId = event.subject();
- switch (event.type()) {
- case CHANNEL_OPEN:
- triggerAdvertiseDevice(deviceId);
- break;
- case CHANNEL_CLOSED:
- case CHANNEL_ERROR:
- triggerDeviceProbe(deviceId);
- break;
- case ROLE_MASTER:
- handleMastershipResponse(deviceId, MastershipRole.MASTER);
- break;
- case ROLE_STANDBY:
- handleMastershipResponse(deviceId, MastershipRole.STANDBY);
- break;
- case ROLE_NONE:
- handleMastershipResponse(deviceId, MastershipRole.NONE);
- break;
- case NOT_MASTER:
- handleNotMaster(deviceId);
- break;
- default:
- log.warn("Unrecognized device agent event {}", event.type());
- }
- }
-
- }
-
- private class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
- @Override
- public void event(PiPipeconfWatchdogEvent event) {
- triggerMarkAvailable(event.subject());
- }
-
- @Override
- public boolean isRelevant(PiPipeconfWatchdogEvent event) {
- return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_READY);
- }
- }
-
- private class InternalConfigFactory
- extends ConfigFactory<DeviceId, GeneralProviderDeviceConfig> {
-
- InternalConfigFactory() {
- super(SubjectFactories.DEVICE_SUBJECT_FACTORY,
- GeneralProviderDeviceConfig.class, CFG_SCHEME);
- }
-
- @Override
- public GeneralProviderDeviceConfig createConfig() {
- return new GeneralProviderDeviceConfig();
- }
- }
}
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java
index 84419f4..358ce5f 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/OsgiPropertyConstants.java
@@ -23,11 +23,11 @@
private OsgiPropertyConstants() {}
- public static final String STATS_POLL_FREQUENCY = "deviceStatsPollFrequency";
- public static final int STATS_POLL_FREQUENCY_DEFAULT = 10;
+ public static final String STATS_POLL_INTERVAL = "deviceStatsPollInterval";
+ public static final int STATS_POLL_INTERVAL_DEFAULT = 10;
- public static final String PROBE_FREQUENCY = "deviceProbeFrequency";
- public static final int PROBE_FREQUENCY_DEFAULT = 10;
+ public static final String PROBE_INTERVAL = "deviceProbeInterval";
+ public static final int PROBE_INTERVAL_DEFAULT = 10;
public static final String OP_TIMEOUT_SHORT = "deviceOperationTimeoutShort";
public static final int OP_TIMEOUT_SHORT_DEFAULT = 10;
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java
new file mode 100644
index 0000000..1426946
--- /dev/null
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.general.device.impl;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Striped;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.device.PortStatisticsDiscovery;
+import org.slf4j.Logger;
+
+import java.security.SecureRandom;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.provider.general.device.impl.GeneralDeviceProvider.myScheme;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Component devoted to polling stats from devices managed by the
+ * GeneralDeviceProvider.
+ */
+public class StatsPoller {
+
+ private static final int CORE_POOL_SIZE = 5;
+
+ private final Logger log = getLogger(getClass());
+
+ private final DeviceService deviceService;
+ private final MastershipService mastershipService;
+ private final DeviceProviderService providerService;
+
+ private final InternalDeviceListener deviceListener = new InternalDeviceListener();
+ private final MastershipListener mastershipListener = new InternalMastershipListener();
+ private final Striped<Lock> deviceLocks = Striped.lock(30);
+
+ private ScheduledExecutorService statsExecutor;
+ private Map<DeviceId, ScheduledFuture<?>> statsPollingTasks;
+ private Map<DeviceId, Integer> pollFrequencies;
+ private int statsPollInterval;
+
+ StatsPoller(DeviceService deviceService, MastershipService mastershipService,
+ DeviceProviderService providerService) {
+ this.deviceService = deviceService;
+ this.mastershipService = mastershipService;
+ this.providerService = providerService;
+ }
+
+
+ void activate(int statsPollInterval) {
+ checkArgument(statsPollInterval > 0, "statsPollInterval must be greater than 0");
+ this.statsPollInterval = statsPollInterval;
+ statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
+ "onos/gdp-stats", "%d", log));
+ statsPollingTasks = Maps.newHashMap();
+ pollFrequencies = Maps.newHashMap();
+ deviceService.getDevices().forEach(d -> updatePollingTask(d.id()));
+ deviceService.addListener(deviceListener);
+ mastershipService.addListener(mastershipListener);
+ log.info("Started");
+ }
+
+ void reschedule(int statsPollInterval) {
+ checkArgument(statsPollInterval > 0, "statsPollInterval must be greater than 0");
+ this.statsPollInterval = statsPollInterval;
+ statsPollingTasks.keySet().forEach(this::updatePollingTask);
+ }
+
+ void deactivate() {
+ deviceService.removeListener(deviceListener);
+ mastershipService.removeListener(mastershipListener);
+
+ statsPollingTasks.values().forEach(t -> t.cancel(false));
+ statsPollingTasks.clear();
+ pollFrequencies.clear();
+ statsPollingTasks = null;
+ pollFrequencies = null;
+
+ statsExecutor.shutdownNow();
+ try {
+ statsExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("statsExecutor not terminated properly");
+ }
+ statsExecutor = null;
+
+ log.info("Stopped");
+ }
+
+
+ private void updatePollingTask(DeviceId deviceId) {
+ deviceLocks.get(deviceId).lock();
+ try {
+ final ScheduledFuture<?> existingTask = statsPollingTasks.get(deviceId);
+ final boolean shouldHaveTask = deviceService.getDevice(deviceId) != null
+ && deviceService.isAvailable(deviceId)
+ && mastershipService.isLocalMaster(deviceId)
+ && deviceService.getDevice(deviceId).is(PortStatisticsDiscovery.class);
+ final boolean pollFrequencyChanged = !Objects.equals(
+ pollFrequencies.get(deviceId), statsPollInterval);
+
+ if (existingTask != null && (!shouldHaveTask || pollFrequencyChanged)) {
+ existingTask.cancel(false);
+ statsPollingTasks.remove(deviceId);
+ pollFrequencies.remove(deviceId);
+ log.info("Cancelled polling task for {}", deviceId);
+ }
+
+ if (shouldHaveTask) {
+ final int delay = new SecureRandom().nextInt(statsPollInterval);
+ statsPollingTasks.put(deviceId, statsExecutor.scheduleAtFixedRate(
+ exceptionSafe(() -> updatePortStatistics(deviceId)),
+ delay, statsPollInterval, TimeUnit.SECONDS));
+ pollFrequencies.put(deviceId, statsPollInterval);
+ log.info("Started polling task for {} with interval {} seconds",
+ deviceId, statsPollInterval);
+ }
+ } finally {
+ deviceLocks.get(deviceId).unlock();
+ }
+ }
+
+ private void updatePortStatistics(DeviceId deviceId) {
+ final Device device = deviceService.getDevice(deviceId);
+ final Collection<PortStatistics> statistics = device.as(
+ PortStatisticsDiscovery.class).discoverPortStatistics();
+ if (!statistics.isEmpty()) {
+ providerService.updatePortStatistics(deviceId, statistics);
+ }
+ }
+
+ private Runnable exceptionSafe(Runnable runnable) {
+ return () -> {
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ log.error("Unhandled exception in stats poller", e);
+ }
+ };
+ }
+
+ private class InternalMastershipListener implements MastershipListener {
+
+ @Override
+ public void event(MastershipEvent event) {
+ updatePollingTask(event.subject());
+ }
+
+ @Override
+ public boolean isRelevant(MastershipEvent event) {
+ return event.type() == MastershipEvent.Type.MASTER_CHANGED
+ && myScheme(event.subject());
+ }
+ }
+
+ /**
+ * Listener for core device events.
+ */
+ private class InternalDeviceListener implements DeviceListener {
+ @Override
+ public void event(DeviceEvent event) {
+ updatePollingTask(event.subject().id());
+ }
+
+ @Override
+ public boolean isRelevant(DeviceEvent event) {
+ switch (event.type()) {
+ case DEVICE_ADDED:
+ case DEVICE_UPDATED:
+ case DEVICE_AVAILABILITY_CHANGED:
+ case DEVICE_REMOVED:
+ case DEVICE_SUSPENDED:
+ return myScheme(event.subject().id());
+ default:
+ return false;
+ }
+ }
+ }
+}