rehash with ScheduledExecutors
Change-Id: I37c377781a4478250ce5805fd22eb5c589af6bae
diff --git a/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java b/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
index 3e85b4e..ad5a03f 100644
--- a/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
+++ b/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
@@ -15,10 +15,10 @@
*/
package org.onosproject.provider.nil.link.impl;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -47,10 +47,13 @@
import org.slf4j.Logger;
import java.util.Dictionary;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.io.BufferedReader;
import java.io.FileReader;
@@ -76,11 +79,14 @@
private final Logger log = getLogger(getClass());
+ // default topology file location and name.
private static final String CFG_PATH = "/opt/onos/apache-karaf-3.0.2/etc/linkGraph.cfg";
+ // default number of workers. Eventually make this tunable
+ private static final int THREADS = 8;
private static final int CHECK_DURATION = 10;
- private static final int DEFAULT_RATE = 0;
- private static final int REFRESH_RATE = 3000000; // in us
+ private static final int DEFAULT_RATE = 0; // usec
+ private static final int REFRESH_RATE = 3; // sec
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@@ -99,15 +105,17 @@
private final InternalLinkProvider linkProvider = new InternalLinkProvider();
// True for device with Driver, false otherwise.
- private final ConcurrentMap<DeviceId, Boolean> driverMap = Maps
+ private final ConcurrentMap<DeviceId, Set<LinkDriver>> driverMap = Maps
.newConcurrentMap();
// Link descriptions
- private final ConcurrentMap<DeviceId, Set<LinkDescription>> linkDescrs = Maps
- .newConcurrentMap();
+ private final Set<LinkDescription> linkDescrs = Sets.newConcurrentHashSet();
- private ExecutorService linkDriver =
- Executors.newCachedThreadPool(groupedThreads("onos/null", "link-driver-%d"));
+ // Thread to description map
+ private final List<Set<LinkDescription>> linkTasks = new ArrayList<>(THREADS);
+
+ private ScheduledExecutorService linkDriver =
+ Executors.newScheduledThreadPool(THREADS, groupedThreads("onos/null", "link-driver-%d"));
// For flicker = true, duration between events in msec.
@Property(name = "eventRate", value = "0", label = "Duration between Link Event")
@@ -130,8 +138,27 @@
public void activate(ComponentContext context) {
providerService = providerRegistry.register(this);
modified(context);
- deviceService.addListener(linkProvider);
+ if (flicker) {
+ allocateLinks();
+ for (int i = 0; i < linkTasks.size(); i++) {
+ Set<LinkDescription> links = linkTasks.get(i);
+ LinkDriver driver = new LinkDriver(links, i);
+ links.forEach(v -> {
+ DeviceId d = v.src().deviceId();
+ Set<LinkDriver> s = driverMap.getOrDefault(d, Sets.newConcurrentHashSet());
+ s.add(driver);
+ driverMap.put(d, s);
+ });
+ try {
+ linkDriver.schedule(driver, eventRate, TimeUnit.MICROSECONDS);
+ } catch (Exception e) {
+ log.warn(e.getMessage());
+ }
+ }
+ } else {
+ linkDriver.schedule(new LinkDriver(linkDescrs, 0), 3, TimeUnit.SECONDS);
+ }
log.info("started");
}
@@ -179,24 +206,13 @@
// test mode configuration
if (eventRate != newRate && newRate > 0) {
- driverMap.replaceAll((k, v) -> false);
eventRate = newRate;
flicker = true;
} else if (newRate == 0) {
- driverMap.replaceAll((k, v) -> false);
flicker = false;
}
log.info("Using settings: eventRate={}, topofile={}", eventRate, cfgFile);
- for (Device dev : deviceService.getDevices()) {
- DeviceId did = dev.id();
- synchronized (this) {
- if (driverMap.get(did) == null || !driverMap.get(did)) {
- driverMap.put(dev.id(), true);
- linkDriver.submit(new LinkDriver(dev));
- }
- }
- }
}
// parse simplified dot-like topology graph
@@ -266,7 +282,6 @@
String[] cp1 = linkArr[0].split(":");
String[] cp2 = linkArr[2].split(":");
- log.debug("cp1:{} cp2:{}", cp1, cp2);
if (cp1.length != 2 && (cp2.length != 2 || cp2.length != 3)) {
log.warn("Malformed endpoint descriptor(s):"
+ "endpoint format should be DeviceId:port or DeviceId:port:NodeId,"
@@ -285,18 +300,15 @@
DeviceId ddev = (adj == null) ? recover(cp2[0], me) : recover(cp2[0], adj);
ConnectPoint src = new ConnectPoint(sdev, PortNumber.portNumber(cp1[1]));
ConnectPoint dst = new ConnectPoint(ddev, PortNumber.portNumber(cp2[1]));
-
+ // both link types have incoming half-link
+ LinkDescription in = new DefaultLinkDescription(dst, src, DIRECT);
+ linkDescrs.add(in);
if (op.equals("--")) {
- // bidirectional - within our node's island
+ // bidirectional - within our node's island, make outbound link
LinkDescription out = new DefaultLinkDescription(src, dst, DIRECT);
- LinkDescription in = new DefaultLinkDescription(dst, src, DIRECT);
- addLdesc(sdev, out);
- addLdesc(ddev, in);
- log.info("Created bidirectional link: {}", out);
+ linkDescrs.add(out);
+ log.info("Created bidirectional link: {}, {}", out, in);
} else if (op.equals("->")) {
- // unidirectional - likely from another island
- LinkDescription in = new DefaultLinkDescription(dst, src, DIRECT);
- addLdesc(ddev, in);
log.info("Created unidirectional link: {}", in);
} else {
log.warn("Unknown link descriptor operand:"
@@ -309,7 +321,6 @@
private DeviceId recover(String base, NodeId node) {
long hash = node.hashCode() << 16;
int dev = Integer.valueOf(base);
- log.debug("hash: {}, dev: {}, {}", hash, dev, toHex(hash | dev));
try {
return DeviceId.deviceId(new URI("null", toHex(hash | dev), null));
} catch (URISyntaxException e) {
@@ -318,11 +329,19 @@
}
}
- // add LinkDescriptions to map
- private boolean addLdesc(DeviceId did, LinkDescription ldesc) {
- Set<LinkDescription> ldescs = ConcurrentUtils.putIfAbsent(
- linkDescrs, did, Sets.newConcurrentHashSet());
- return ldescs.add(ldesc);
+ // adds a LinkDescription to a worker's to-be queue, for flickering
+ private void allocateLinks() {
+ int index, lcount = 0;
+ for (LinkDescription ld : linkDescrs) {
+ index = (lcount % THREADS);
+ log.info("allocation: total={}, index={}", linkDescrs.size(), lcount, index);
+ if (linkTasks.size() <= index) {
+ linkTasks.add(index, Sets.newHashSet(ld));
+ } else {
+ linkTasks.get(index).add(ld);
+ }
+ lcount++;
+ }
}
/**
@@ -335,19 +354,14 @@
Device dev = event.subject();
switch (event.type()) {
case DEVICE_ADDED:
- synchronized (this) {
- if (!driverMap.getOrDefault(dev.id(), false)) {
- driverMap.put(dev.id(), true);
- linkDriver.submit(new LinkDriver(dev));
- }
- }
+ // TODO: wait for all devices to stop core from balking
break;
case DEVICE_REMOVED:
- driverMap.put(dev.id(), false);
- if (!MASTER.equals(roleService.getLocalRole(dev.id()))) {
- return;
+ if (MASTER.equals(roleService.getLocalRole(dev.id()))) {
+ for (LinkDriver d : driverMap.get(dev.id())) {
+ d.deviceRemoved(dev.id());
+ }
}
- // no need to remove static links, just stop advertising them
providerService.linksVanished(dev.id());
break;
default:
@@ -358,16 +372,27 @@
/**
* Generates link events using fake links.
+ * TODO: stats collection should be its own thing.
*/
private class LinkDriver implements Runnable {
- Device myDev;
- LinkDriver(Device dev) {
- myDev = dev;
+ // List to actually work off of
+ List<LinkDescription> tasks = Lists.newArrayList();
+ float effLoad = 0;
+ Long counter = 0L;
+ int next = 0;
+
+ long startTime;
+
+ LinkDriver(Set<LinkDescription> links, int index) {
+ for (LinkDescription link : links) {
+ tasks.add(link);
+ }
+ startTime = System.currentTimeMillis();
}
@Override
public void run() {
- log.info("Thread started for dev {}", myDev.id());
+ log.info("Thread started for links {}", tasks);
if (flicker) {
flicker();
} else {
@@ -376,53 +401,54 @@
}
private void flicker() {
- long startTime = System.currentTimeMillis();
- long countEvent = 0;
- float effLoad = 0;
-
- while (!linkDriver.isShutdown() && driverMap.get(myDev.id())) {
- if (!flicker) {
- break;
- }
- //Assuming eventRate is in microsecond unit
- if (countEvent <= CHECK_DURATION * 1000000 / eventRate) {
- for (LinkDescription desc : linkDescrs.get(myDev.id())) {
- providerService.linkVanished(desc);
- countEvent++;
- sleepFor(eventRate);
- providerService.linkDetected(desc);
- countEvent++;
- sleepFor(eventRate);
+ if ((!linkDriver.isShutdown() || !tasks.isEmpty())) {
+ log.info("next: {}, count: {}", next, counter);
+ if (counter <= CHECK_DURATION * 1000000 / eventRate) {
+ if ((counter % 2) == 0) {
+ providerService.linkVanished(tasks.get(next++));
+ } else {
+ providerService.linkDetected(tasks.get(next++));
}
+ if (next == tasks.size()) {
+ next = 0;
+ }
+ counter++;
} else {
// log in WARN the effective load generation rate in events/sec, every 10 seconds
- effLoad = (float) (countEvent * 1000.0 /
- (System.currentTimeMillis() - startTime));
+ effLoad = (float) (counter * 1000.0 / (System
+ .currentTimeMillis() - startTime));
log.warn("Effective Loading for thread is {} events/second",
String.valueOf(effLoad));
- countEvent = 0;
+ counter = 0L;
startTime = System.currentTimeMillis();
}
+ linkDriver.schedule(this, eventRate, TimeUnit.MICROSECONDS);
}
}
private void refresh() {
- while (!linkDriver.isShutdown() && driverMap.get(myDev.id())) {
- if (flicker) {
- break;
- }
- for (LinkDescription desc : linkDescrs.get(myDev.id())) {
+ if (!linkDriver.isShutdown() || !tasks.isEmpty()) {
+ log.info("iter {} refresh_links for {} links", counter, linkDescrs.size());
+
+ for (LinkDescription desc : linkDescrs) {
providerService.linkDetected(desc);
- sleepFor(REFRESH_RATE);
+ log.info("iteration {}, {}", counter, desc);
}
+ counter++;
+ linkDriver.schedule(this, REFRESH_RATE, TimeUnit.SECONDS);
}
}
- private void sleepFor(int time) {
- try {
- TimeUnit.MICROSECONDS.sleep(time);
- } catch (InterruptedException e) {
- log.warn(String.valueOf(e));
+ public void deviceRemoved(DeviceId did) {
+ synchronized (tasks) {
+ Iterator<LinkDescription> it = tasks.iterator();
+ while (it.hasNext()) {
+ LinkDescription ld = it.next();
+ if (did.equals(ld.dst().deviceId())
+ || (did.equals(ld.src().deviceId()))) {
+ it.remove();
+ }
+ }
}
}
}