Fix case where multiple stats poller tasks are created for the same device

Change-Id: I5d4602cbf557be458b52983d00c5d670fd3a3f2c
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java
index 1426946..c0ac32d 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/StatsPoller.java
@@ -17,6 +17,7 @@
 package org.onosproject.provider.general.device.impl;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Streams;
 import com.google.common.util.concurrent.Striped;
 import org.onosproject.mastership.MastershipEvent;
 import org.onosproject.mastership.MastershipListener;
@@ -33,12 +34,13 @@
 
 import java.security.SecureRandom;
 import java.util.Collection;
-import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
+import java.util.stream.StreamSupport;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.Executors.newScheduledThreadPool;
@@ -65,8 +67,8 @@
     private final Striped<Lock> deviceLocks = Striped.lock(30);
 
     private ScheduledExecutorService statsExecutor;
-    private Map<DeviceId, ScheduledFuture<?>> statsPollingTasks;
-    private Map<DeviceId, Integer> pollFrequencies;
+    private ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks;
+    private ConcurrentMap<DeviceId, Integer> pollFrequencies;
     private int statsPollInterval;
 
     StatsPoller(DeviceService deviceService, MastershipService mastershipService,
@@ -79,12 +81,11 @@
 
     void activate(int statsPollInterval) {
         checkArgument(statsPollInterval > 0, "statsPollInterval must be greater than 0");
-        this.statsPollInterval = statsPollInterval;
         statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
                 "onos/gdp-stats", "%d", log));
-        statsPollingTasks = Maps.newHashMap();
-        pollFrequencies = Maps.newHashMap();
-        deviceService.getDevices().forEach(d -> updatePollingTask(d.id()));
+        statsPollingTasks = Maps.newConcurrentMap();
+        pollFrequencies = Maps.newConcurrentMap();
+        reschedule(statsPollInterval);
         deviceService.addListener(deviceListener);
         mastershipService.addListener(mastershipListener);
         log.info("Started");
@@ -93,7 +94,15 @@
     void reschedule(int statsPollInterval) {
         checkArgument(statsPollInterval > 0, "statsPollInterval must be greater than 0");
         this.statsPollInterval = statsPollInterval;
-        statsPollingTasks.keySet().forEach(this::updatePollingTask);
+        // Consider all devices in the store, plus those of existing tasks
+        // (which for some reason might disappear from the store and so we want
+        // to cancel).
+        Streams.concat(
+                StreamSupport.stream(deviceService.getDevices().spliterator(), false)
+                        .map(Device::id),
+                statsPollingTasks.keySet().stream())
+                .distinct()
+                .forEach(this::updatePollingTask);
     }
 
     void deactivate() {
@@ -122,14 +131,15 @@
         deviceLocks.get(deviceId).lock();
         try {
             final ScheduledFuture<?> existingTask = statsPollingTasks.get(deviceId);
-            final boolean shouldHaveTask = deviceService.getDevice(deviceId) != null
+            final boolean shouldHaveTask = myScheme(deviceId)
+                    && deviceService.getDevice(deviceId) != null
                     && deviceService.isAvailable(deviceId)
                     && mastershipService.isLocalMaster(deviceId)
                     && deviceService.getDevice(deviceId).is(PortStatisticsDiscovery.class);
-            final boolean pollFrequencyChanged = !Objects.equals(
+            final boolean pollIntervalChanged = !Objects.equals(
                     pollFrequencies.get(deviceId), statsPollInterval);
 
-            if (existingTask != null && (!shouldHaveTask || pollFrequencyChanged)) {
+            if (existingTask != null && (!shouldHaveTask || pollIntervalChanged)) {
                 existingTask.cancel(false);
                 statsPollingTasks.remove(deviceId);
                 pollFrequencies.remove(deviceId);
@@ -137,6 +147,10 @@
             }
 
             if (shouldHaveTask) {
+                if (statsPollingTasks.containsKey(deviceId)) {
+                    // There's already a task, with the same interval.
+                    return;
+                }
                 final int delay = new SecureRandom().nextInt(statsPollInterval);
                 statsPollingTasks.put(deviceId, statsExecutor.scheduleAtFixedRate(
                         exceptionSafe(() -> updatePortStatistics(deviceId)),
@@ -152,6 +166,9 @@
 
     private void updatePortStatistics(DeviceId deviceId) {
         final Device device = deviceService.getDevice(deviceId);
+        if (!device.is(PortStatisticsDiscovery.class)) {
+            log.error("Missing PortStatisticsDiscovery behaviour for {}", deviceId);
+        }
         final Collection<PortStatistics> statistics = device.as(
                 PortStatisticsDiscovery.class).discoverPortStatistics();
         if (!statistics.isEmpty()) {
@@ -178,8 +195,7 @@
 
         @Override
         public boolean isRelevant(MastershipEvent event) {
-            return event.type() == MastershipEvent.Type.MASTER_CHANGED
-                    && myScheme(event.subject());
+            return event.type() == MastershipEvent.Type.MASTER_CHANGED;
         }
     }
 
@@ -200,7 +216,7 @@
                 case DEVICE_AVAILABILITY_CHANGED:
                 case DEVICE_REMOVED:
                 case DEVICE_SUSPENDED:
-                    return myScheme(event.subject().id());
+                    return true;
                 default:
                     return false;
             }