MastershipLoadBalancer: listen to RegionEvents

Simplify logic around scheduling a balancing task and listen
to RegionEvents. Region Membership updates and Region Updates
should also trigger a rebalance.

Now, it is possible to queue up another balancing task even
if there is one running. They'll still never run in parallel
and will still run with at least a few seconds in between (30),
but this way we don't ever risk missing an event that makes it
necessary to rebalance - not even if we were rebalancing exactly
when that event fired.

Change-Id: I64e1c6fc5e87f2b1fffbefb54c96303dac55d1d1
diff --git a/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java b/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
index 0dfa0b6..5494a48 100644
--- a/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
+++ b/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
@@ -16,10 +16,6 @@
 
 package org.onosproject.mlb;
 
-import com.google.common.util.concurrent.ListenableScheduledFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -39,15 +35,19 @@
 import org.onosproject.mastership.MastershipListener;
 import org.onosproject.mastership.MastershipService;
 import org.osgi.service.component.ComponentContext;
+import org.onosproject.net.region.RegionEvent;
+import org.onosproject.net.region.RegionListener;
+import org.onosproject.net.region.RegionService;
 import org.slf4j.Logger;
 
 import java.util.Dictionary;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -62,7 +62,7 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final int DEFAULT_SCHEDULE_PERIOD = 5;
+    private static final int DEFAULT_SCHEDULE_PERIOD = 30;
     @Property(name = "schedulePeriod", intValue = DEFAULT_SCHEDULE_PERIOD,
             label = "Period to schedule balancing the mastership to be shared as evenly as by all online instances.")
     private int schedulePeriod = DEFAULT_SCHEDULE_PERIOD;
@@ -85,6 +85,9 @@
     protected LeadershipService leadershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected RegionService regionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -92,15 +95,24 @@
 
     private InnerLeadershipListener leadershipListener = new InnerLeadershipListener();
 
-    /* This listener is used to trigger balancing for any mastership event which will include switches changing state
-    between active and inactive states as well as the same variety of event occurring with ONOS nodes. Must
-    use a listenable executor to ensure events are triggered with no frequency greater than once every 30 seconds.
+    /* This listener is used to trigger balancing for any mastership event
+     * which will include switches changing state between active and inactive
+     * states as well as the same variety of event occurring with ONOS nodes.
      */
     private InnerMastershipListener mastershipListener = new InnerMastershipListener();
 
-    //Ensures that all executions do not interfere with one another (single thread)
-    private ListeningScheduledExecutorService executorService = MoreExecutors.
-            listeningDecorator(newSingleThreadScheduledExecutor(groupedThreads("MastershipLoadBalancer", "%d", log)));
+    /* Used to trigger balancing on region events where there was either a
+     * change on the master sets of a given region or a change on the devices
+     * that belong to a region.
+     */
+    private InnerRegionListener regionEventListener = new InnerRegionListener();
+
+    /* Ensures that all executions do not interfere with one another (single
+     * thread) and that they are apart from each other by at least what is
+     * defined as the schedulePeriod.
+     */
+    private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
+            groupedThreads("MastershipLoadBalancer", "%d", log));
 
     @Activate
     public void activate(ComponentContext context) {
@@ -110,6 +122,7 @@
         localId = clusterService.getLocalNode().id();
         leadershipService.addListener(leadershipListener);
         leadershipService.runForLeadership(REBALANCE_MASTERSHIP);
+        regionService.addListener(regionEventListener);
         log.info("Started");
     }
 
@@ -119,6 +132,7 @@
         mastershipService.removeListener(mastershipListener);
         leadershipService.withdraw(REBALANCE_MASTERSHIP);
         leadershipService.removeListener(leadershipListener);
+        regionService.removeListener(regionEventListener);
         cancelBalance();
         executorService.shutdown();
         log.info("Stopped");
@@ -143,23 +157,34 @@
         }
     }
 
+    // Sets flag at execution to indicate there is currently a scheduled
+    // rebalancing. As soon as it starts running, the flag is set back to
+    // null and another rebalancing can be queued.
     private void scheduleBalance() {
         if (isLeader.get() && nextTask.get() == null) {
 
-            ListenableScheduledFuture task =
-                    executorService.schedule(mastershipAdminService::balanceRoles,
-                            schedulePeriod, TimeUnit.SECONDS);
-            task.addListener(() -> {
-                        log.info("Completed balance roles");
-                        nextTask.set(null);
-                    }, MoreExecutors.directExecutor()
-            );
+            Future task = executorService.schedule(new BalanceTask(),
+                    schedulePeriod, TimeUnit.SECONDS);
+
             if (!nextTask.compareAndSet(null, task)) {
                 task.cancel(false);
             }
         }
     }
 
+    private class BalanceTask implements Runnable {
+
+        @Override
+        public void run() {
+            // nextTask is now running, free the spot so that it is possible
+            // to queue up another upcoming task.
+            nextTask.set(null);
+
+            mastershipAdminService.balanceRoles();
+            log.info("Completed balance roles");
+        }
+    }
+
     private void cancelBalance() {
         Future task = nextTask.getAndSet(null);
         if (task != null) {
@@ -191,7 +216,6 @@
 
         @Override
         public void event(MastershipEvent event) {
-            //Sets flag at execution to indicate there is currently a scheduled rebalancing, reverts upon completion
             scheduleBalance();
         }
     }
@@ -207,4 +231,18 @@
             processLeaderChange(event.subject().leaderNodeId());
         }
     }
+
+    private class InnerRegionListener implements RegionListener {
+        @Override
+        public void event(RegionEvent event) {
+            switch (event.type()) {
+                case REGION_MEMBERSHIP_CHANGED:
+                case REGION_UPDATED:
+                    scheduleBalance();
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
 }