Prevent link event type aliasing and proper switching between flicker = on/off.
Also scale number of threads with number of available cores.

Change-Id: I438d92ab9c3df5c478f451c353135a9a64f5183e
diff --git a/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java b/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
index 0fddce8..38c5a57 100644
--- a/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
+++ b/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
@@ -49,7 +49,6 @@
 import org.slf4j.Logger;
 
 import java.util.Dictionary;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
@@ -65,7 +64,6 @@
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onlab.util.Tools.toHex;
-import static org.onosproject.net.MastershipRole.MASTER;
 import static org.onosproject.net.Link.Type.DIRECT;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -83,11 +81,12 @@
     // default topology file location and name.
     private static final String CFG_PATH = "/opt/onos/apache-karaf-3.0.2/etc/linkGraph.cfg";
     // default number of workers. Eventually make this tunable
-    private static final int THREADS = 8;
+    private static final int THREADS = (int) Math.max(1, Runtime.getRuntime().availableProcessors() * 0.8);
 
-    private static final int CHECK_DURATION = 10;
-    private static final int DEFAULT_RATE = 0; // usec
-    private static final int REFRESH_RATE = 3; // sec
+    private static final int CHECK_DURATION = 10;   // sec
+    private static final int DEFAULT_RATE = 0;      // usec
+    private static final int REFRESH_RATE = 3;      // sec
+    // Fake device used for non-flickering thread in deviceMap
     private static final DeviceId DEFAULT = DeviceId.deviceId("null:ffffffffffffffff");
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -109,14 +108,14 @@
 
     private final InternalLinkProvider linkProvider = new InternalLinkProvider();
 
-    // True for device with Driver, false otherwise.
+    // Mapping between device and drivers that advertise links from device
     private final ConcurrentMap<DeviceId, Set<LinkDriver>> driverMap = Maps
             .newConcurrentMap();
 
     // Link descriptions
     private final List<LinkDescription> linkDescrs = Lists.newArrayList();
 
-    // Thread to description map
+    // Thread to description map for dividing links amongst threads in flicker mode
     private final List<List<LinkDescription>> linkTasks = Lists.newArrayList();
 
     private ScheduledExecutorService linkDriver =
@@ -131,7 +130,7 @@
     private String cfgFile = CFG_PATH;
 
     // flag checked to create a LinkDriver, if rate is non-zero.
-    private boolean flicker = false;
+    private volatile boolean flicker = false;
 
     public NullLinkProvider() {
         super(new ProviderId("null", "org.onosproject.provider.nil"));
@@ -142,28 +141,6 @@
         cfgService.registerProperties(getClass());
         providerService = providerRegistry.register(this);
         modified(context);
-
-        if (flicker) {
-            for (int i = 0; i < linkTasks.size(); i++) {
-                List<LinkDescription> links = linkTasks.get(i);
-                LinkDriver driver = new LinkDriver(links);
-                links.forEach(v -> {
-                    DeviceId d = v.src().deviceId();
-                    Set<LinkDriver> s = driverMap.getOrDefault(d, Sets.newConcurrentHashSet());
-                    s.add(driver);
-                    driverMap.put(d, s);
-                });
-                try {
-                    linkDriver.schedule(driver, eventRate, TimeUnit.MICROSECONDS);
-                } catch (Exception e) {
-                    log.warn(e.getMessage());
-                }
-            }
-        } else {
-            LinkDriver driver = new LinkDriver(linkDescrs);
-            driverMap.put(DEFAULT, Sets.newHashSet(driver));
-            linkDriver.schedule(driver, 3, TimeUnit.SECONDS);
-        }
         log.info("started");
     }
 
@@ -195,7 +172,7 @@
         String newPath;
         try {
             String s = (String) properties.get("eventRate");
-            newRate = isNullOrEmpty(s) ? eventRate : Integer.parseInt(s.trim());
+            newRate = isNullOrEmpty(s) ? DEFAULT_RATE : Integer.parseInt(s.trim());
             s = (String) properties.get("cfgFile");
             newPath = s.trim();
         } catch (NumberFormatException | ClassCastException e) {
@@ -209,17 +186,45 @@
             cfgFile = newPath;
         }
         readGraph(cfgFile, nodeService.getLocalNode().id());
-
-        // test mode configuration
-        if (eventRate != newRate && newRate > 0) {
-            eventRate = newRate;
-            flicker = true;
-            allocateLinks();
-        } else if (newRate == 0) {
+        if (newRate != eventRate) {
+            if (eventRate < 0) {
+                log.warn("Invalid rate, ignoring and using default");
+                eventRate = DEFAULT_RATE;
+            } else {
+                eventRate = newRate;
+            }
+        }
+        if (eventRate > 0) {
+            if (!flicker) { // previously not flickering
+                flicker = true;
+                allocateLinks();
+                driverMap.remove(DEFAULT);
+                for (int i = 0; i < linkTasks.size(); i++) {
+                    List<LinkDescription> links = linkTasks.get(i);
+                    LinkDriver driver = new LinkDriver(links);
+                    links.forEach(v -> {
+                        DeviceId sd = v.src().deviceId();
+                        DeviceId dd = v.src().deviceId();
+                        driverMap.computeIfAbsent(sd, k -> Sets.newConcurrentHashSet()).add(driver);
+                        driverMap.computeIfAbsent(dd, k -> Sets.newConcurrentHashSet()).add(driver);
+                    });
+                    try {
+                        linkDriver.schedule(driver, eventRate, TimeUnit.MICROSECONDS);
+                    } catch (Exception e) {
+                        log.warn(e.getMessage());
+                    }
+                }
+            }
+        } else {
+            if (flicker) {
+                driverMap.forEach((dev, lds) -> lds.forEach(l -> l.deviceRemoved(dev)));
+                driverMap.clear();
+                linkTasks.clear();
+            }
             flicker = false;
-            // reconfigure driver - dumb but should work.
-            driverMap.getOrDefault(DEFAULT, Sets.newHashSet()).forEach(
-                    v -> v.setTasks(linkDescrs));
+            LinkDriver driver = new LinkDriver(linkDescrs);
+            driverMap.put(DEFAULT, Sets.newHashSet(driver));
+            linkDriver.schedule(driver, REFRESH_RATE, TimeUnit.SECONDS);
         }
 
         log.info("Using settings: eventRate={}, topofile={}", eventRate, cfgFile);
@@ -374,10 +379,8 @@
                 // TODO: wait for all devices to stop core from balking
                 break;
             case DEVICE_REMOVED:
-                if (MASTER.equals(roleService.getLocalRole(dev.id()))) {
-                    for (LinkDriver d : driverMap.get(dev.id())) {
-                        d.deviceRemoved(dev.id());
-                    }
+                for (LinkDriver d : driverMap.get(dev.id())) {
+                    d.deviceRemoved(dev.id());
                 }
                 providerService.linksVanished(dev.id());
                 break;
@@ -393,10 +396,11 @@
      */
     private class LinkDriver implements Runnable {
         // List to actually work off of
-        List<LinkDescription> tasks = Lists.newArrayList();
+        List<LinkDescription> tasks = Lists.newCopyOnWriteArrayList();
         float effLoad = 0;
         Long counter = 0L;
         int next = 0;
+        boolean up = true;
 
         long startTime;
 
@@ -416,15 +420,16 @@
 
         private void flicker() {
             if ((!linkDriver.isShutdown() || !tasks.isEmpty())) {
-                log.info("next: {}, count: {}", next, counter);
-                if (counter <= CHECK_DURATION * 1000000 / eventRate) {
-                    if ((counter % 2) == 0) {
-                        providerService.linkVanished(tasks.get(next++));
-                    } else {
+                log.trace("next: {}, count: {}", next, counter);
+                if (counter <= CHECK_DURATION * 1_000_000 / eventRate) {
+                    if (up) {
                         providerService.linkDetected(tasks.get(next++));
+                    } else {
+                        providerService.linkVanished(tasks.get(next++));
                     }
-                    if (next == tasks.size()) {
+                    if (next >= tasks.size()) {
                         next = 0;
+                        up = !up;
                     }
                     counter++;
                 } else {
@@ -442,7 +447,7 @@
 
         private void refresh() {
             if (!linkDriver.isShutdown() || !tasks.isEmpty()) {
-                log.info("iter {} refresh_links", counter);
+                log.trace("iter {} refresh_links", counter);
 
                 for (LinkDescription desc : tasks) {
                     providerService.linkDetected(desc);
@@ -454,31 +459,27 @@
         }
 
         public void deviceRemoved(DeviceId did) {
-            synchronized (tasks) {
-                Iterator<LinkDescription> it = tasks.iterator();
-                while (it.hasNext()) {
-                    LinkDescription ld = it.next();
-                    if (did.equals(ld.dst().deviceId())
-                            || (did.equals(ld.src().deviceId()))) {
-                        it.remove();
-                    }
+            List<LinkDescription> rm = Lists.newArrayList();
+            for (LinkDescription ld : tasks) {
+                if (did.equals(ld.dst().deviceId())
+                        || (did.equals(ld.src().deviceId()))) {
+                    rm.add(ld);
                 }
             }
+            tasks.removeAll(rm);
         }
 
         public void setTasks(List<LinkDescription> links) {
             HashMultimap<ConnectPoint, ConnectPoint> nm = HashMultimap.create();
             links.forEach(v -> nm.put(v.src(), v.dst()));
             // remove and send linkVanished for stale links.
-            synchronized (this) {
-                for (LinkDescription l : tasks) {
-                    if (!nm.containsEntry(l.src(), l.dst())) {
-                        providerService.linkVanished(l);
-                    }
+            for (LinkDescription l : tasks) {
+                if (!nm.containsEntry(l.src(), l.dst())) {
+                    providerService.linkVanished(l);
                 }
-                tasks.clear();
-                tasks.addAll(links);
             }
+            tasks.clear();
+            tasks.addAll(links);
         }
     }