Fixing various issues and re-tuning.
Change-Id: I8822fcf77cfa507788241c5bda98ef4741b284b4
diff --git a/cli/src/main/java/org/onlab/onos/cli/BalanceMastersCommand.java b/cli/src/main/java/org/onlab/onos/cli/BalanceMastersCommand.java
index a3f3c23..77d0c16 100644
--- a/cli/src/main/java/org/onlab/onos/cli/BalanceMastersCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/BalanceMastersCommand.java
@@ -23,13 +23,15 @@
import org.onlab.onos.mastership.MastershipAdminService;
import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.net.device.DeviceService;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import static com.google.common.collect.Lists.newArrayList;
+import static org.onlab.onos.net.MastershipRole.MASTER;
/**
* Forces device mastership rebalancing.
@@ -50,73 +52,62 @@
// Create buckets reflecting current ownership.
for (ControllerNode node : nodes) {
- controllerDevices.putAll(node, mastershipService.getDevicesOf(node.id()));
+ Set<DeviceId> devicesOf = mastershipService.getDevicesOf(node.id());
+ controllerDevices.putAll(node, devicesOf);
+ print("Node %s has %d devices.", node.id(), devicesOf.size());
}
- int bucketCount = nodes.size();
- for (int i = 0; i < bucketCount / 2; i++) {
+ int rounds = nodes.size();
+ for (int i = 0; i < rounds; i++) {
// Iterate over the buckets and find the smallest and the largest.
- ControllerNode smallest = findSmallestBucket(controllerDevices);
- ControllerNode largest = findLargestBucket(controllerDevices);
- balanceBuckets(smallest, largest, controllerDevices,
- mastershipService, adminService);
+ ControllerNode smallest = findBucket(true, nodes, controllerDevices);
+ ControllerNode largest = findBucket(false, nodes, controllerDevices);
+ balanceBuckets(smallest, largest, controllerDevices, adminService);
}
}
- private ControllerNode findSmallestBucket(Multimap<ControllerNode, DeviceId> controllerDevices) {
- int minSize = Integer.MAX_VALUE;
- ControllerNode minNode = null;
- for (ControllerNode node : controllerDevices.keySet()) {
+ private ControllerNode findBucket(boolean min, Collection<ControllerNode> nodes,
+ Multimap<ControllerNode, DeviceId> controllerDevices) {
+ int xSize = min ? Integer.MAX_VALUE : -1;
+ ControllerNode xNode = null;
+ for (ControllerNode node : nodes) {
int size = controllerDevices.get(node).size();
- if (size < minSize) {
- minSize = size;
- minNode = node;
+ if ((min && size < xSize) || (!min && size > xSize)) {
+ xSize = size;
+ xNode = node;
}
}
- return minNode;
- }
-
- private ControllerNode findLargestBucket(Multimap<ControllerNode, DeviceId> controllerDevices) {
- int maxSize = -1;
- ControllerNode maxNode = null;
- for (ControllerNode node : controllerDevices.keySet()) {
- int size = controllerDevices.get(node).size();
- if (size >= maxSize) {
- maxSize = size;
- maxNode = node;
- }
- }
- return maxNode;
+ return xNode;
}
// FIXME: enhance to better handle cases where smallest cannot take any of the devices from largest
private void balanceBuckets(ControllerNode smallest, ControllerNode largest,
Multimap<ControllerNode, DeviceId> controllerDevices,
- MastershipService mastershipService,
MastershipAdminService adminService) {
Collection<DeviceId> minBucket = controllerDevices.get(smallest);
Collection<DeviceId> maxBucket = controllerDevices.get(largest);
+ int bucketCount = controllerDevices.keySet().size();
+ int deviceCount = get(DeviceService.class).getDeviceCount();
int delta = (maxBucket.size() - minBucket.size()) / 2;
+ delta = Math.min(deviceCount / bucketCount, delta);
- print("Attempting to move %d nodes from %s to %s...",
- delta, largest.id(), smallest.id());
+ if (delta > 0) {
+ print("Attempting to move %d nodes from %s to %s...",
+ delta, largest.id(), smallest.id());
- int i = 0;
- Iterator<DeviceId> it = maxBucket.iterator();
- while (it.hasNext() && i < delta) {
- DeviceId deviceId = it.next();
-
- // Check that the transfer can happen for the current element.
- if (mastershipService.getNodesFor(deviceId).backups().contains(smallest.id())) {
- print("Setting %s as the new master for %s", smallest.id(), deviceId);
- adminService.setRole(smallest.id(), deviceId, MastershipRole.MASTER);
+ int i = 0;
+ Iterator<DeviceId> it = maxBucket.iterator();
+ while (it.hasNext() && i < delta) {
+ DeviceId deviceId = it.next();
+ print("Setting %s as the master for %s", smallest.id(), deviceId);
+ adminService.setRole(smallest.id(), deviceId, MASTER);
+ controllerDevices.put(smallest, deviceId);
+ it.remove();
i++;
}
}
-
- controllerDevices.removeAll(smallest);
}
}
diff --git a/core/api/src/main/java/org/onlab/onos/event/AbstractEventAccumulator.java b/core/api/src/main/java/org/onlab/onos/event/AbstractEventAccumulator.java
index f866633..39c1103 100644
--- a/core/api/src/main/java/org/onlab/onos/event/AbstractEventAccumulator.java
+++ b/core/api/src/main/java/org/onlab/onos/event/AbstractEventAccumulator.java
@@ -16,6 +16,8 @@
package org.onlab.onos.event;
import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Timer;
@@ -31,6 +33,8 @@
*/
public abstract class AbstractEventAccumulator implements EventAccumulator {
+ private Logger log = LoggerFactory.getLogger(AbstractEventAccumulator.class);
+
private final Timer timer;
private final int maxEvents;
private final int maxBatchMillis;
@@ -104,9 +108,13 @@
private class ProcessorTask extends TimerTask {
@Override
public void run() {
- idleTask = cancelIfActive(idleTask);
- maxTask = cancelIfActive(maxTask);
- processEvents(finalizeCurrentBatch());
+ try {
+ idleTask = cancelIfActive(idleTask);
+ maxTask = cancelIfActive(maxTask);
+ processEvents(finalizeCurrentBatch());
+ } catch (Exception e) {
+ log.warn("Unable to process batch due to {}", e.getMessage());
+ }
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
index ac9fd00..3f6e285 100644
--- a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
+++ b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
@@ -66,13 +66,13 @@
implements TopologyProvider {
private static final int MAX_THREADS = 8;
- private static final int DEFAULT_MAX_EVENTS = 100;
- private static final int DEFAULT_MAX_BATCH_MS = 50;
- private static final int DEFAULT_MAX_IDLE_MS = 5;
+ private static final int DEFAULT_MAX_EVENTS = 200;
+ private static final int DEFAULT_MAX_BATCH_MS = 60;
+ private static final int DEFAULT_MAX_IDLE_MS = 30;
// FIXME: Replace with a system-wide timer instance;
// TODO: Convert to use HashedWheelTimer or produce a variant of that; then decide which we want to adopt
- private static final Timer TIMER = new Timer();
+ private static final Timer TIMER = new Timer("topo-event-batching");
@Property(name = "maxEvents", intValue = DEFAULT_MAX_EVENTS,
label = "Maximum number of events to accumulate")
@@ -122,6 +122,9 @@
deviceService.addListener(deviceListener);
linkService.addListener(linkListener);
+ log.info("Configured with maxEvents = {}; maxBatchMs = {}; maxIdleMs = {}",
+ maxEvents, maxBatchMs, maxIdleMs);
+
isStarted = true;
triggerRecompute();
log.info("Started");
diff --git a/tools/test/bin/onos-topo-cfg-all b/tools/test/bin/onos-topo-cfg-all
new file mode 100755
index 0000000..adf84c1
--- /dev/null
+++ b/tools/test/bin/onos-topo-cfg-all
@@ -0,0 +1,15 @@
+#!/bin/bash
+# -----------------------------------------------------------------------------
+# ONOS topology configuration uploader.
+# -----------------------------------------------------------------------------
+
+[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
+. $ONOS_ROOT/tools/build/envDefaults
+
+nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
+
+for node in $nodes; do
+ printf "$node..."
+ onos-topo-cfg $node $1
+done
+printf "\n"
diff --git a/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
index 57781de..d624d04 100644
--- a/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
+++ b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
@@ -24,6 +24,9 @@
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
+import org.onlab.onos.event.AbstractEventAccumulator;
+import org.onlab.onos.event.Event;
+import org.onlab.onos.event.EventAccumulator;
import org.onlab.onos.mastership.MastershipEvent;
import org.onlab.onos.mastership.MastershipListener;
import org.onlab.onos.net.ConnectPoint;
@@ -36,6 +39,8 @@
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
+import org.onlab.onos.net.flow.FlowRuleEvent;
+import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.host.HostEvent;
@@ -84,8 +89,7 @@
private static final String APP_ID = "org.onlab.onos.gui";
- private static final long SUMMARY_FREQUENCY_SEC = 3000;
- private static final long TRAFFIC_FREQUENCY_SEC = 1500;
+ private static final long TRAFFIC_FREQUENCY_SEC = 2000;
private static final Comparator<? super ControllerNode> NODE_COMPARATOR =
new Comparator<ControllerNode>() {
@@ -95,6 +99,13 @@
}
};
+
+ private final Timer timer = new Timer("topology-view");
+
+ private static final int MAX_EVENTS = 500;
+ private static final int MAX_BATCH_MS = 1000;
+ private static final int MAX_IDLE_MS = 500;
+
private final ApplicationId appId;
private Connection connection;
@@ -106,16 +117,14 @@
private final LinkListener linkListener = new InternalLinkListener();
private final HostListener hostListener = new InternalHostListener();
private final IntentListener intentListener = new InternalIntentListener();
+ private final FlowRuleListener flowListener = new InternalFlowListener();
- // Timers and objects being monitored
- private final Timer timer = new Timer("topology-view");
+ private final EventAccumulator eventAccummulator = new InternalEventAccummulator();
+ private boolean summaryEnabled = true;
private TimerTask trafficTask;
private ObjectNode trafficEvent;
- private TimerTask summaryTask;
- private ObjectNode summaryEvent;
-
private long lastActive = System.currentTimeMillis();
private boolean listenersRemoved = false;
@@ -128,7 +137,6 @@
*/
public TopologyViewWebSocket(ServiceDirectory directory) {
super(directory);
-
intentFilter = new TopologyViewIntentFilter(intentService, deviceService,
hostService, linkService);
appId = directory.get(CoreService.class).registerApplication(APP_ID);
@@ -431,21 +439,13 @@
// Subscribes for summary messages.
private synchronized void requestSummary(ObjectNode event) {
- if (summaryTask == null) {
- summaryEvent = event;
- summaryTask = new SummaryMonitor();
- timer.schedule(summaryTask, SUMMARY_FREQUENCY_SEC, SUMMARY_FREQUENCY_SEC);
- }
+ summaryEnabled = true;
sendMessage(summmaryMessage(number(event, "sid")));
}
// Cancels sending summary messages.
private synchronized void cancelSummary(ObjectNode event) {
- if (summaryTask != null) {
- summaryTask.cancel();
- summaryTask = null;
- summaryEvent = null;
- }
+ summaryEnabled = false;
}
@@ -457,6 +457,7 @@
linkService.addListener(linkListener);
hostService.addListener(hostListener);
intentService.addListener(intentListener);
+ flowService.addListener(flowListener);
}
// Removes all internal listeners.
@@ -469,6 +470,7 @@
linkService.removeListener(linkListener);
hostService.removeListener(hostListener);
intentService.removeListener(intentListener);
+ flowService.removeListener(flowListener);
}
}
@@ -495,6 +497,7 @@
@Override
public void event(DeviceEvent event) {
sendMessage(deviceMessage(event));
+ eventAccummulator.add(event);
}
}
@@ -503,6 +506,7 @@
@Override
public void event(LinkEvent event) {
sendMessage(linkMessage(event));
+ eventAccummulator.add(event);
}
}
@@ -511,6 +515,7 @@
@Override
public void event(HostEvent event) {
sendMessage(hostMessage(event));
+ eventAccummulator.add(event);
}
}
@@ -521,33 +526,55 @@
if (trafficEvent != null) {
requestTraffic(trafficEvent);
}
+ eventAccummulator.add(event);
+ }
+ }
+
+ // Intent event listener.
+ private class InternalFlowListener implements FlowRuleListener {
+ @Override
+ public void event(FlowRuleEvent event) {
+ eventAccummulator.add(event);
}
}
private class TrafficMonitor extends TimerTask {
@Override
public void run() {
- if (trafficEvent != null) {
- String type = string(trafficEvent, "event", "unknown");
- if (type.equals("requestAllTraffic")) {
- requestAllTraffic(trafficEvent);
- } else if (type.equals("requestDeviceLinkFlows")) {
- requestDeviceLinkFlows(trafficEvent);
- } else {
- requestTraffic(trafficEvent);
+ try {
+ if (trafficEvent != null) {
+ String type = string(trafficEvent, "event", "unknown");
+ if (type.equals("requestAllTraffic")) {
+ requestAllTraffic(trafficEvent);
+ } else if (type.equals("requestDeviceLinkFlows")) {
+ requestDeviceLinkFlows(trafficEvent);
+ } else {
+ requestTraffic(trafficEvent);
+ }
}
+ } catch (Exception e) {
+ log.warn("Unable to handle traffic request due to {}", e.getMessage());
}
}
}
- private class SummaryMonitor extends TimerTask {
+ // Accummulates events to drive methodic update of the summary pane.
+ private class InternalEventAccummulator extends AbstractEventAccumulator {
+ protected InternalEventAccummulator() {
+ super(new Timer("topo-summary"), MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
+ }
+
@Override
- public void run() {
- if (summaryEvent != null) {
- requestSummary(summaryEvent);
+ public void processEvents(List<Event> events) {
+ try {
+ if (summaryEnabled) {
+ sendMessage(summmaryMessage(0));
+ }
+ } catch (Exception e) {
+ log.warn("Unable to handle summary request due to {}", e.getMessage());
}
+
}
}
-
}
diff --git a/web/gui/src/main/webapp/topo.js b/web/gui/src/main/webapp/topo.js
index eb05dbf..e212f95 100644
--- a/web/gui/src/main/webapp/topo.js
+++ b/web/gui/src/main/webapp/topo.js
@@ -356,6 +356,9 @@
hideInstances();
} else if (summaryPane.isVisible()) {
cancelSummary();
+ stopAntTimer();
+ } else {
+ hoverMode = hoverModeFlows;
}
}