Prevent link event type aliasing and proper switching between flicker = on/off.
Also scale number of threads with number of available cores.
Change-Id: I438d92ab9c3df5c478f451c353135a9a64f5183e
diff --git a/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java b/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
index 0fddce8..38c5a57 100644
--- a/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
+++ b/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
@@ -49,7 +49,6 @@
import org.slf4j.Logger;
import java.util.Dictionary;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@@ -65,7 +64,6 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.toHex;
-import static org.onosproject.net.MastershipRole.MASTER;
import static org.onosproject.net.Link.Type.DIRECT;
import static org.slf4j.LoggerFactory.getLogger;
@@ -83,11 +81,12 @@
// default topology file location and name.
private static final String CFG_PATH = "/opt/onos/apache-karaf-3.0.2/etc/linkGraph.cfg";
// default number of workers. Eventually make this tunable
- private static final int THREADS = 8;
+ private static final int THREADS = (int) Math.max(1, Runtime.getRuntime().availableProcessors() * 0.8);
- private static final int CHECK_DURATION = 10;
- private static final int DEFAULT_RATE = 0; // usec
- private static final int REFRESH_RATE = 3; // sec
+ private static final int CHECK_DURATION = 10; // sec
+ private static final int DEFAULT_RATE = 0; // usec
+ private static final int REFRESH_RATE = 3; // sec
+ // Fake device used for non-flickering thread in deviceMap
private static final DeviceId DEFAULT = DeviceId.deviceId("null:ffffffffffffffff");
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -109,14 +108,14 @@
private final InternalLinkProvider linkProvider = new InternalLinkProvider();
- // True for device with Driver, false otherwise.
+ // Mapping between device and drivers that advertise links from device
private final ConcurrentMap<DeviceId, Set<LinkDriver>> driverMap = Maps
.newConcurrentMap();
// Link descriptions
private final List<LinkDescription> linkDescrs = Lists.newArrayList();
- // Thread to description map
+ // Thread to description map for dividing links amongst threads in flicker mode
private final List<List<LinkDescription>> linkTasks = Lists.newArrayList();
private ScheduledExecutorService linkDriver =
@@ -131,7 +130,7 @@
private String cfgFile = CFG_PATH;
// flag checked to create a LinkDriver, if rate is non-zero.
- private boolean flicker = false;
+ private volatile boolean flicker = false;
public NullLinkProvider() {
super(new ProviderId("null", "org.onosproject.provider.nil"));
@@ -142,28 +141,6 @@
cfgService.registerProperties(getClass());
providerService = providerRegistry.register(this);
modified(context);
-
- if (flicker) {
- for (int i = 0; i < linkTasks.size(); i++) {
- List<LinkDescription> links = linkTasks.get(i);
- LinkDriver driver = new LinkDriver(links);
- links.forEach(v -> {
- DeviceId d = v.src().deviceId();
- Set<LinkDriver> s = driverMap.getOrDefault(d, Sets.newConcurrentHashSet());
- s.add(driver);
- driverMap.put(d, s);
- });
- try {
- linkDriver.schedule(driver, eventRate, TimeUnit.MICROSECONDS);
- } catch (Exception e) {
- log.warn(e.getMessage());
- }
- }
- } else {
- LinkDriver driver = new LinkDriver(linkDescrs);
- driverMap.put(DEFAULT, Sets.newHashSet(driver));
- linkDriver.schedule(driver, 3, TimeUnit.SECONDS);
- }
log.info("started");
}
@@ -195,7 +172,7 @@
String newPath;
try {
String s = (String) properties.get("eventRate");
- newRate = isNullOrEmpty(s) ? eventRate : Integer.parseInt(s.trim());
+ newRate = isNullOrEmpty(s) ? DEFAULT_RATE : Integer.parseInt(s.trim());
s = (String) properties.get("cfgFile");
newPath = s.trim();
} catch (NumberFormatException | ClassCastException e) {
@@ -209,17 +186,45 @@
cfgFile = newPath;
}
readGraph(cfgFile, nodeService.getLocalNode().id());
-
- // test mode configuration
- if (eventRate != newRate && newRate > 0) {
- eventRate = newRate;
- flicker = true;
- allocateLinks();
- } else if (newRate == 0) {
+ if (newRate != eventRate) {
+ if (eventRate < 0) {
+ log.warn("Invalid rate, ignoring and using default");
+ eventRate = DEFAULT_RATE;
+ } else {
+ eventRate = newRate;
+ }
+ }
+ if (eventRate > 0) {
+ if (!flicker) { // previously not flickering
+ flicker = true;
+ allocateLinks();
+ driverMap.remove(DEFAULT);
+ for (int i = 0; i < linkTasks.size(); i++) {
+ List<LinkDescription> links = linkTasks.get(i);
+ LinkDriver driver = new LinkDriver(links);
+ links.forEach(v -> {
+ DeviceId sd = v.src().deviceId();
+ DeviceId dd = v.src().deviceId();
+ driverMap.computeIfAbsent(sd, k -> Sets.newConcurrentHashSet()).add(driver);
+ driverMap.computeIfAbsent(dd, k -> Sets.newConcurrentHashSet()).add(driver);
+ });
+ try {
+ linkDriver.schedule(driver, eventRate, TimeUnit.MICROSECONDS);
+ } catch (Exception e) {
+ log.warn(e.getMessage());
+ }
+ }
+ }
+ } else {
+ if (flicker) {
+ driverMap.forEach((dev, lds) -> lds.forEach(l -> l.deviceRemoved(dev)));
+ driverMap.clear();
+ linkTasks.clear();
+ }
flicker = false;
- // reconfigure driver - dumb but should work.
- driverMap.getOrDefault(DEFAULT, Sets.newHashSet()).forEach(
- v -> v.setTasks(linkDescrs));
+ LinkDriver driver = new LinkDriver(linkDescrs);
+ driverMap.put(DEFAULT, Sets.newHashSet(driver));
+ linkDriver.schedule(driver, REFRESH_RATE, TimeUnit.SECONDS);
}
log.info("Using settings: eventRate={}, topofile={}", eventRate, cfgFile);
@@ -374,10 +379,8 @@
// TODO: wait for all devices to stop core from balking
break;
case DEVICE_REMOVED:
- if (MASTER.equals(roleService.getLocalRole(dev.id()))) {
- for (LinkDriver d : driverMap.get(dev.id())) {
- d.deviceRemoved(dev.id());
- }
+ for (LinkDriver d : driverMap.get(dev.id())) {
+ d.deviceRemoved(dev.id());
}
providerService.linksVanished(dev.id());
break;
@@ -393,10 +396,11 @@
*/
private class LinkDriver implements Runnable {
// List to actually work off of
- List<LinkDescription> tasks = Lists.newArrayList();
+ List<LinkDescription> tasks = Lists.newCopyOnWriteArrayList();
float effLoad = 0;
Long counter = 0L;
int next = 0;
+ boolean up = true;
long startTime;
@@ -416,15 +420,16 @@
private void flicker() {
if ((!linkDriver.isShutdown() || !tasks.isEmpty())) {
- log.info("next: {}, count: {}", next, counter);
- if (counter <= CHECK_DURATION * 1000000 / eventRate) {
- if ((counter % 2) == 0) {
- providerService.linkVanished(tasks.get(next++));
- } else {
+ log.trace("next: {}, count: {}", next, counter);
+ if (counter <= CHECK_DURATION * 1_000_000 / eventRate) {
+ if (up) {
providerService.linkDetected(tasks.get(next++));
+ } else {
+ providerService.linkVanished(tasks.get(next++));
}
- if (next == tasks.size()) {
+ if (next >= tasks.size()) {
next = 0;
+ up = !up;
}
counter++;
} else {
@@ -442,7 +447,7 @@
private void refresh() {
if (!linkDriver.isShutdown() || !tasks.isEmpty()) {
- log.info("iter {} refresh_links", counter);
+ log.trace("iter {} refresh_links", counter);
for (LinkDescription desc : tasks) {
providerService.linkDetected(desc);
@@ -454,31 +459,27 @@
}
public void deviceRemoved(DeviceId did) {
- synchronized (tasks) {
- Iterator<LinkDescription> it = tasks.iterator();
- while (it.hasNext()) {
- LinkDescription ld = it.next();
- if (did.equals(ld.dst().deviceId())
- || (did.equals(ld.src().deviceId()))) {
- it.remove();
- }
+ List<LinkDescription> rm = Lists.newArrayList();
+ for (LinkDescription ld : tasks) {
+ if (did.equals(ld.dst().deviceId())
+ || (did.equals(ld.src().deviceId()))) {
+ rm.add(ld);
}
}
+ tasks.removeAll(rm);
}
public void setTasks(List<LinkDescription> links) {
HashMultimap<ConnectPoint, ConnectPoint> nm = HashMultimap.create();
links.forEach(v -> nm.put(v.src(), v.dst()));
// remove and send linkVanished for stale links.
- synchronized (this) {
- for (LinkDescription l : tasks) {
- if (!nm.containsEntry(l.src(), l.dst())) {
- providerService.linkVanished(l);
- }
+ for (LinkDescription l : tasks) {
+ if (!nm.containsEntry(l.src(), l.dst())) {
+ providerService.linkVanished(l);
}
- tasks.clear();
- tasks.addAll(links);
}
+ tasks.clear();
+ tasks.addAll(links);
}
}