MastershipLoadBalancer: listen to RegionEvents
Simplify logic around scheduling a balancing task and listen
to RegionEvents. Region Membership updates and Region Updates
should also trigger a rebalance.
Now, it is possible to queue up another balancing task even
if there is one running. They'll still never run in parallel
and will still run with at least a few seconds in between (30),
but this way we don't ever risk missing an event that makes it
necessary to rebalance - not even if we were rebalancing exactly
when that event fired.
Change-Id: I64e1c6fc5e87f2b1fffbefb54c96303dac55d1d1
diff --git a/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java b/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
index 0dfa0b6..5494a48 100644
--- a/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
+++ b/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
@@ -16,10 +16,6 @@
package org.onosproject.mlb;
-import com.google.common.util.concurrent.ListenableScheduledFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -39,15 +35,19 @@
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.osgi.service.component.ComponentContext;
+import org.onosproject.net.region.RegionEvent;
+import org.onosproject.net.region.RegionListener;
+import org.onosproject.net.region.RegionService;
import org.slf4j.Logger;
import java.util.Dictionary;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -62,7 +62,7 @@
private final Logger log = getLogger(getClass());
- private static final int DEFAULT_SCHEDULE_PERIOD = 5;
+ private static final int DEFAULT_SCHEDULE_PERIOD = 30;
@Property(name = "schedulePeriod", intValue = DEFAULT_SCHEDULE_PERIOD,
label = "Period to schedule balancing the mastership to be shared as evenly as by all online instances.")
private int schedulePeriod = DEFAULT_SCHEDULE_PERIOD;
@@ -85,6 +85,9 @@
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected RegionService regionService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -92,15 +95,24 @@
private InnerLeadershipListener leadershipListener = new InnerLeadershipListener();
- /* This listener is used to trigger balancing for any mastership event which will include switches changing state
- between active and inactive states as well as the same variety of event occurring with ONOS nodes. Must
- use a listenable executor to ensure events are triggered with no frequency greater than once every 30 seconds.
+ /* This listener is used to trigger balancing for any mastership event
+ * which will include switches changing state between active and inactive
+ * states as well as the same variety of event occurring with ONOS nodes.
*/
private InnerMastershipListener mastershipListener = new InnerMastershipListener();
- //Ensures that all executions do not interfere with one another (single thread)
- private ListeningScheduledExecutorService executorService = MoreExecutors.
- listeningDecorator(newSingleThreadScheduledExecutor(groupedThreads("MastershipLoadBalancer", "%d", log)));
+ /* Used to trigger balancing on region events where there was either a
+ * change on the master sets of a given region or a change on the devices
+ * that belong to a region.
+ */
+ private InnerRegionListener regionEventListener = new InnerRegionListener();
+
+ /* Ensures that all executions do not interfere with one another (single
+ * thread) and that they are apart from each other by at least what is
+ * defined as the schedulePeriod.
+ */
+ private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("MastershipLoadBalancer", "%d", log));
@Activate
public void activate(ComponentContext context) {
@@ -110,6 +122,7 @@
localId = clusterService.getLocalNode().id();
leadershipService.addListener(leadershipListener);
leadershipService.runForLeadership(REBALANCE_MASTERSHIP);
+ regionService.addListener(regionEventListener);
log.info("Started");
}
@@ -119,6 +132,7 @@
mastershipService.removeListener(mastershipListener);
leadershipService.withdraw(REBALANCE_MASTERSHIP);
leadershipService.removeListener(leadershipListener);
+ regionService.removeListener(regionEventListener);
cancelBalance();
executorService.shutdown();
log.info("Stopped");
@@ -143,23 +157,34 @@
}
}
+ // Sets flag at execution to indicate there is currently a scheduled
+ // rebalancing. As soon as it starts running, the flag is set back to
+ // null and another rebalancing can be queued.
private void scheduleBalance() {
if (isLeader.get() && nextTask.get() == null) {
- ListenableScheduledFuture task =
- executorService.schedule(mastershipAdminService::balanceRoles,
- schedulePeriod, TimeUnit.SECONDS);
- task.addListener(() -> {
- log.info("Completed balance roles");
- nextTask.set(null);
- }, MoreExecutors.directExecutor()
- );
+ Future task = executorService.schedule(new BalanceTask(),
+ schedulePeriod, TimeUnit.SECONDS);
+
if (!nextTask.compareAndSet(null, task)) {
task.cancel(false);
}
}
}
+ private class BalanceTask implements Runnable {
+
+ @Override
+ public void run() {
+ // nextTask is now running, free the spot so that it is possible
+ // to queue up another upcoming task.
+ nextTask.set(null);
+
+ mastershipAdminService.balanceRoles();
+ log.info("Completed balance roles");
+ }
+ }
+
private void cancelBalance() {
Future task = nextTask.getAndSet(null);
if (task != null) {
@@ -191,7 +216,6 @@
@Override
public void event(MastershipEvent event) {
- //Sets flag at execution to indicate there is currently a scheduled rebalancing, reverts upon completion
scheduleBalance();
}
}
@@ -207,4 +231,18 @@
processLeaderChange(event.subject().leaderNodeId());
}
}
+
+ private class InnerRegionListener implements RegionListener {
+ @Override
+ public void event(RegionEvent event) {
+ switch (event.type()) {
+ case REGION_MEMBERSHIP_CHANGED:
+ case REGION_UPDATED:
+ scheduleBalance();
+ break;
+ default:
+ break;
+ }
+ }
+ }
}