rehash with ScheduledExecutors

Change-Id: I37c377781a4478250ce5805fd22eb5c589af6bae
diff --git a/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java b/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
index 3e85b4e..ad5a03f 100644
--- a/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
+++ b/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
@@ -15,10 +15,10 @@
  */
 package org.onosproject.provider.nil.link.impl;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import org.apache.commons.lang3.concurrent.ConcurrentUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -47,10 +47,13 @@
 import org.slf4j.Logger;
 
 import java.util.Dictionary;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.io.BufferedReader;
 import java.io.FileReader;
@@ -76,11 +79,14 @@
 
     private final Logger log = getLogger(getClass());
 
+    // default topology file location and name.
     private static final String CFG_PATH = "/opt/onos/apache-karaf-3.0.2/etc/linkGraph.cfg";
+    // default number of workers. Eventually make this tunable
+    private static final int THREADS = 8;
 
     private static final int CHECK_DURATION = 10;
-    private static final int DEFAULT_RATE = 0;
-    private static final int REFRESH_RATE = 3000000; // in us
+    private static final int DEFAULT_RATE = 0; // usec
+    private static final int REFRESH_RATE = 3; // sec
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
@@ -99,15 +105,17 @@
     private final InternalLinkProvider linkProvider = new InternalLinkProvider();
 
     // True for device with Driver, false otherwise.
-    private final ConcurrentMap<DeviceId, Boolean> driverMap = Maps
+    private final ConcurrentMap<DeviceId, Set<LinkDriver>> driverMap = Maps
             .newConcurrentMap();
 
     // Link descriptions
-    private final ConcurrentMap<DeviceId, Set<LinkDescription>> linkDescrs = Maps
-            .newConcurrentMap();
+    private final Set<LinkDescription> linkDescrs = Sets.newConcurrentHashSet();
 
-    private ExecutorService linkDriver =
-            Executors.newCachedThreadPool(groupedThreads("onos/null", "link-driver-%d"));
+    // Thread to description map
+    private final List<Set<LinkDescription>> linkTasks = new ArrayList<>(THREADS);
+
+    private ScheduledExecutorService linkDriver =
+            Executors.newScheduledThreadPool(THREADS, groupedThreads("onos/null", "link-driver-%d"));
 
     // For flicker = true, duration between events in msec.
     @Property(name = "eventRate", value = "0", label = "Duration between Link Event")
@@ -130,8 +138,27 @@
     public void activate(ComponentContext context) {
         providerService = providerRegistry.register(this);
         modified(context);
-        deviceService.addListener(linkProvider);
 
+        if (flicker) {
+            allocateLinks();
+            for (int i = 0; i < linkTasks.size(); i++) {
+                Set<LinkDescription> links = linkTasks.get(i);
+                LinkDriver driver = new LinkDriver(links, i);
+                links.forEach(v -> {
+                    DeviceId d = v.src().deviceId();
+                    Set<LinkDriver> s = driverMap.getOrDefault(d, Sets.newConcurrentHashSet());
+                    s.add(driver);
+                    driverMap.put(d, s);
+                });
+                try {
+                    linkDriver.schedule(driver, eventRate, TimeUnit.MICROSECONDS);
+                } catch (Exception e) {
+                    log.warn(e.getMessage());
+                }
+            }
+        } else {
+            linkDriver.schedule(new LinkDriver(linkDescrs, 0), 3, TimeUnit.SECONDS);
+        }
         log.info("started");
     }
 
@@ -179,24 +206,13 @@
 
         // test mode configuration
         if (eventRate != newRate && newRate > 0) {
-            driverMap.replaceAll((k, v) -> false);
             eventRate = newRate;
             flicker = true;
         } else if (newRate == 0) {
-            driverMap.replaceAll((k, v) -> false);
             flicker = false;
         }
 
         log.info("Using settings: eventRate={}, topofile={}", eventRate, cfgFile);
-        for (Device dev : deviceService.getDevices()) {
-            DeviceId did = dev.id();
-            synchronized (this) {
-                if (driverMap.get(did) == null || !driverMap.get(did)) {
-                    driverMap.put(dev.id(), true);
-                    linkDriver.submit(new LinkDriver(dev));
-                }
-            }
-        }
     }
 
     // parse simplified dot-like topology graph
@@ -266,7 +282,6 @@
         String[] cp1 = linkArr[0].split(":");
         String[] cp2 = linkArr[2].split(":");
 
-        log.debug("cp1:{} cp2:{}", cp1, cp2);
         if (cp1.length != 2 && (cp2.length != 2 || cp2.length != 3)) {
             log.warn("Malformed endpoint descriptor(s):"
                     + "endpoint format should be DeviceId:port or DeviceId:port:NodeId,"
@@ -285,18 +300,15 @@
         DeviceId ddev = (adj == null) ? recover(cp2[0], me) : recover(cp2[0], adj);
         ConnectPoint src = new ConnectPoint(sdev, PortNumber.portNumber(cp1[1]));
         ConnectPoint dst = new ConnectPoint(ddev, PortNumber.portNumber(cp2[1]));
-
+        // both link types have incoming half-link
+        LinkDescription in = new DefaultLinkDescription(dst, src, DIRECT);
+        linkDescrs.add(in);
         if (op.equals("--")) {
-            // bidirectional - within our node's island
+            // bidirectional - within our node's island, make outbound link
             LinkDescription out = new DefaultLinkDescription(src, dst, DIRECT);
-            LinkDescription in = new DefaultLinkDescription(dst, src, DIRECT);
-            addLdesc(sdev, out);
-            addLdesc(ddev, in);
-            log.info("Created bidirectional link: {}", out);
+            linkDescrs.add(out);
+            log.info("Created bidirectional link: {}, {}", out, in);
         } else if (op.equals("->")) {
-            // unidirectional - likely from another island
-            LinkDescription in = new DefaultLinkDescription(dst, src, DIRECT);
-            addLdesc(ddev, in);
             log.info("Created unidirectional link: {}", in);
         } else {
             log.warn("Unknown link descriptor operand:"
@@ -309,7 +321,6 @@
     private DeviceId recover(String base, NodeId node) {
         long hash = node.hashCode() << 16;
         int dev = Integer.valueOf(base);
-        log.debug("hash: {}, dev: {}, {}", hash, dev, toHex(hash | dev));
         try {
             return DeviceId.deviceId(new URI("null", toHex(hash | dev), null));
         } catch (URISyntaxException e) {
@@ -318,11 +329,19 @@
         }
     }
 
-    // add LinkDescriptions to map
-    private boolean addLdesc(DeviceId did, LinkDescription ldesc) {
-        Set<LinkDescription> ldescs = ConcurrentUtils.putIfAbsent(
-                linkDescrs, did, Sets.newConcurrentHashSet());
-        return ldescs.add(ldesc);
+    // adds a LinkDescription to a worker's to-be queue, for flickering
+    private void allocateLinks() {
+        int index, lcount = 0;
+        for (LinkDescription ld : linkDescrs) {
+            index = (lcount % THREADS);
+            log.info("allocation: total={}, index={}", linkDescrs.size(), lcount, index);
+            if (linkTasks.size() <= index) {
+                linkTasks.add(index, Sets.newHashSet(ld));
+            } else {
+                linkTasks.get(index).add(ld);
+            }
+            lcount++;
+        }
     }
 
     /**
@@ -335,19 +354,14 @@
             Device dev = event.subject();
             switch (event.type()) {
             case DEVICE_ADDED:
-                synchronized (this) {
-                    if (!driverMap.getOrDefault(dev.id(), false)) {
-                        driverMap.put(dev.id(), true);
-                        linkDriver.submit(new LinkDriver(dev));
-                    }
-                }
+                // TODO: wait for all devices to stop core from balking
                 break;
             case DEVICE_REMOVED:
-                driverMap.put(dev.id(), false);
-                if (!MASTER.equals(roleService.getLocalRole(dev.id()))) {
-                    return;
+                if (MASTER.equals(roleService.getLocalRole(dev.id()))) {
+                    for (LinkDriver d : driverMap.get(dev.id())) {
+                        d.deviceRemoved(dev.id());
+                    }
                 }
-                // no need to remove static links, just stop advertising them
                 providerService.linksVanished(dev.id());
                 break;
             default:
@@ -358,16 +372,27 @@
 
     /**
      * Generates link events using fake links.
+     * TODO: stats collection should be its own thing.
      */
     private class LinkDriver implements Runnable {
-        Device myDev;
-        LinkDriver(Device dev) {
-            myDev = dev;
+        // List to actually work off of
+        List<LinkDescription> tasks = Lists.newArrayList();
+        float effLoad = 0;
+        Long counter = 0L;
+        int next = 0;
+
+        long startTime;
+
+        LinkDriver(Set<LinkDescription> links, int index) {
+            for (LinkDescription link : links) {
+                tasks.add(link);
+            }
+            startTime = System.currentTimeMillis();
         }
 
         @Override
         public void run() {
-            log.info("Thread started for dev {}", myDev.id());
+            log.info("Thread started for links {}", tasks);
             if (flicker) {
                 flicker();
             } else {
@@ -376,53 +401,54 @@
         }
 
         private void flicker() {
-            long startTime = System.currentTimeMillis();
-            long countEvent = 0;
-            float effLoad = 0;
-
-            while (!linkDriver.isShutdown() && driverMap.get(myDev.id())) {
-                if (!flicker) {
-                    break;
-                }
-                //Assuming eventRate is in microsecond unit
-                if (countEvent <= CHECK_DURATION * 1000000 / eventRate) {
-                    for (LinkDescription desc : linkDescrs.get(myDev.id())) {
-                        providerService.linkVanished(desc);
-                        countEvent++;
-                        sleepFor(eventRate);
-                        providerService.linkDetected(desc);
-                        countEvent++;
-                        sleepFor(eventRate);
+            if ((!linkDriver.isShutdown() || !tasks.isEmpty())) {
+                log.info("next: {}, count: {}", next, counter);
+                if (counter <= CHECK_DURATION * 1000000 / eventRate) {
+                    if ((counter % 2) == 0) {
+                        providerService.linkVanished(tasks.get(next++));
+                    } else {
+                        providerService.linkDetected(tasks.get(next++));
                     }
+                    if (next == tasks.size()) {
+                        next = 0;
+                    }
+                    counter++;
                 } else {
                     // log in WARN the effective load generation rate in events/sec, every 10 seconds
-                    effLoad = (float) (countEvent * 1000.0 /
-                            (System.currentTimeMillis() - startTime));
+                    effLoad = (float) (counter * 1000.0 / (System
+                            .currentTimeMillis() - startTime));
                     log.warn("Effective Loading for thread is {} events/second",
                             String.valueOf(effLoad));
-                    countEvent = 0;
+                    counter = 0L;
                     startTime = System.currentTimeMillis();
                 }
+                linkDriver.schedule(this, eventRate, TimeUnit.MICROSECONDS);
             }
         }
 
         private void refresh() {
-            while (!linkDriver.isShutdown() && driverMap.get(myDev.id())) {
-                if (flicker) {
-                    break;
-                }
-                for (LinkDescription desc : linkDescrs.get(myDev.id())) {
+            if (!linkDriver.isShutdown() || !tasks.isEmpty()) {
+                log.info("iter {} refresh_links for {} links", counter, linkDescrs.size());
+
+                for (LinkDescription desc : linkDescrs) {
                     providerService.linkDetected(desc);
-                    sleepFor(REFRESH_RATE);
+                    log.info("iteration {}, {}", counter, desc);
                 }
+                counter++;
+                linkDriver.schedule(this, REFRESH_RATE, TimeUnit.SECONDS);
             }
         }
 
-        private void sleepFor(int time) {
-            try {
-                TimeUnit.MICROSECONDS.sleep(time);
-            } catch (InterruptedException e) {
-                log.warn(String.valueOf(e));
+        public void deviceRemoved(DeviceId did) {
+            synchronized (tasks) {
+                Iterator<LinkDescription> it = tasks.iterator();
+                while (it.hasNext()) {
+                    LinkDescription ld = it.next();
+                    if (did.equals(ld.dst().deviceId())
+                            || (did.equals(ld.src().deviceId()))) {
+                        it.remove();
+                    }
+                }
             }
         }
     }