Fix case where multiple stats poller tasks are created for the same device
Change-Id: I5d4602cbf557be458b52983d00c5d670fd3a3f2c
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java
index 1426946..c0ac32d 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java
@@ -17,6 +17,7 @@
package org.onosproject.provider.general.device.impl;
import com.google.common.collect.Maps;
+import com.google.common.collect.Streams;
import com.google.common.util.concurrent.Striped;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipListener;
@@ -33,12 +34,13 @@
import java.security.SecureRandom;
import java.util.Collection;
-import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
+import java.util.stream.StreamSupport;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.Executors.newScheduledThreadPool;
@@ -65,8 +67,8 @@
private final Striped<Lock> deviceLocks = Striped.lock(30);
private ScheduledExecutorService statsExecutor;
- private Map<DeviceId, ScheduledFuture<?>> statsPollingTasks;
- private Map<DeviceId, Integer> pollFrequencies;
+ private ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks;
+ private ConcurrentMap<DeviceId, Integer> pollFrequencies;
private int statsPollInterval;
StatsPoller(DeviceService deviceService, MastershipService mastershipService,
@@ -79,12 +81,11 @@
void activate(int statsPollInterval) {
checkArgument(statsPollInterval > 0, "statsPollInterval must be greater than 0");
- this.statsPollInterval = statsPollInterval;
statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
"onos/gdp-stats", "%d", log));
- statsPollingTasks = Maps.newHashMap();
- pollFrequencies = Maps.newHashMap();
- deviceService.getDevices().forEach(d -> updatePollingTask(d.id()));
+ statsPollingTasks = Maps.newConcurrentMap();
+ pollFrequencies = Maps.newConcurrentMap();
+ reschedule(statsPollInterval);
deviceService.addListener(deviceListener);
mastershipService.addListener(mastershipListener);
log.info("Started");
@@ -93,7 +94,15 @@
void reschedule(int statsPollInterval) {
checkArgument(statsPollInterval > 0, "statsPollInterval must be greater than 0");
this.statsPollInterval = statsPollInterval;
- statsPollingTasks.keySet().forEach(this::updatePollingTask);
+ // Consider all devices in the store, plus those of existing tasks
+ // (which for some reason might disappear from the store and so we want
+ // to cancel).
+ Streams.concat(
+ StreamSupport.stream(deviceService.getDevices().spliterator(), false)
+ .map(Device::id),
+ statsPollingTasks.keySet().stream())
+ .distinct()
+ .forEach(this::updatePollingTask);
}
void deactivate() {
@@ -122,14 +131,15 @@
deviceLocks.get(deviceId).lock();
try {
final ScheduledFuture<?> existingTask = statsPollingTasks.get(deviceId);
- final boolean shouldHaveTask = deviceService.getDevice(deviceId) != null
+ final boolean shouldHaveTask = myScheme(deviceId)
+ && deviceService.getDevice(deviceId) != null
&& deviceService.isAvailable(deviceId)
&& mastershipService.isLocalMaster(deviceId)
&& deviceService.getDevice(deviceId).is(PortStatisticsDiscovery.class);
- final boolean pollFrequencyChanged = !Objects.equals(
+ final boolean pollIntervalChanged = !Objects.equals(
pollFrequencies.get(deviceId), statsPollInterval);
- if (existingTask != null && (!shouldHaveTask || pollFrequencyChanged)) {
+ if (existingTask != null && (!shouldHaveTask || pollIntervalChanged)) {
existingTask.cancel(false);
statsPollingTasks.remove(deviceId);
pollFrequencies.remove(deviceId);
@@ -137,6 +147,10 @@
}
if (shouldHaveTask) {
+ if (statsPollingTasks.containsKey(deviceId)) {
+ // There's already a task, with the same interval.
+ return;
+ }
final int delay = new SecureRandom().nextInt(statsPollInterval);
statsPollingTasks.put(deviceId, statsExecutor.scheduleAtFixedRate(
exceptionSafe(() -> updatePortStatistics(deviceId)),
@@ -152,6 +166,9 @@
private void updatePortStatistics(DeviceId deviceId) {
final Device device = deviceService.getDevice(deviceId);
+ if (!device.is(PortStatisticsDiscovery.class)) {
+ log.error("Missing PortStatisticsDiscovery behaviour for {}", deviceId);
+ }
final Collection<PortStatistics> statistics = device.as(
PortStatisticsDiscovery.class).discoverPortStatistics();
if (!statistics.isEmpty()) {
@@ -178,8 +195,7 @@
@Override
public boolean isRelevant(MastershipEvent event) {
- return event.type() == MastershipEvent.Type.MASTER_CHANGED
- && myScheme(event.subject());
+ return event.type() == MastershipEvent.Type.MASTER_CHANGED;
}
}
@@ -200,7 +216,7 @@
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_REMOVED:
case DEVICE_SUSPENDED:
- return myScheme(event.subject().id());
+ return true;
default:
return false;
}