ONOS-1981: Move partition manager rebalance activity off of the event loop thread

Change-Id: I32241a53be683dbf2611069072f80269655baba8
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index bb3d602..b341165 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -40,6 +40,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 /**
@@ -57,9 +58,12 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
+    protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false);
+
     static final int NUM_PARTITIONS = 14;
     private static final int BACKOFF_TIME = 2;
-    private static final int CHECK_PERIOD = 10;
+    private static final int CHECK_PARTITION_BALANCE_PERIOD_SEC = 10;
+    private static final int RETRY_AFTER_DELAY_SEC = 5;
 
     private static final String ELECTION_PREFIX = "intent-partition-";
 
@@ -78,8 +82,8 @@
             leadershipService.runForLeadership(getPartitionPath(i));
         }
 
-        executor.scheduleAtFixedRate(this::doRelinquish, 0,
-                                     CHECK_PERIOD, TimeUnit.SECONDS);
+        executor.scheduleAtFixedRate(() -> scheduleRebalance(0), 0,
+                                     CHECK_PARTITION_BALANCE_PERIOD_SEC, TimeUnit.SECONDS);
     }
 
     @Deactivate
@@ -129,11 +133,13 @@
         return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
     }
 
-    private void doRelinquish() {
+    protected void doRebalance() {
+        rebalanceScheduled.set(false);
         try {
-            relinquish();
+            rebalance();
         } catch (Exception e) {
-            log.warn("Exception caught during relinquish task", e);
+            log.warn("Exception caught during rebalance task. Will retry in " + RETRY_AFTER_DELAY_SEC + " seconds", e);
+            scheduleRebalance(RETRY_AFTER_DELAY_SEC);
         }
     }
 
@@ -142,11 +148,10 @@
      * so, relinquish leadership of some of them for a little while to let
      * other instances take over.
      */
-    private void relinquish() {
+    private void rebalance() {
         int activeNodes = (int) clusterService.getNodes()
                 .stream()
-                .filter(n -> clusterService.getState(n.id())
-                        == ControllerNode.State.ACTIVE)
+                .filter(node -> ControllerNode.State.ACTIVE == clusterService.getState(node.id()))
                 .count();
 
         int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes);
@@ -172,6 +177,12 @@
         }
     }
 
+    private void scheduleRebalance(int afterDelaySec) {
+        if (rebalanceScheduled.compareAndSet(false, true)) {
+            executor.schedule(this::doRebalance, afterDelaySec, TimeUnit.SECONDS);
+        }
+    }
+
     /**
      * Try and recontest for leadership of a partition.
      *
@@ -191,7 +202,7 @@
                     leadership.topic().startsWith(ELECTION_PREFIX)) {
 
                 // See if we need to let some partitions go
-                relinquish();
+                scheduleRebalance(0);
             }
         }
     }
@@ -201,7 +212,7 @@
 
         @Override
         public void event(ClusterEvent event) {
-            relinquish();
+            scheduleRebalance(0);
         }
     }
 }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
index 9655b2d..3b091c0 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
@@ -172,41 +172,60 @@
     /**
      * Tests sending in LeadershipServiceEvents in the case when we have
      * too many partitions. The event will trigger the partition manager to
-     * reassess how many partitions it has and relinquish some.
+     * schedule a rebalancing activity.
      */
     @Test
-    public void testRelinquish() {
+    public void testRebalanceScheduling() {
         // We have all the partitions so we'll need to relinquish some
         setUpLeadershipService(PartitionManager.NUM_PARTITIONS);
 
-        expect(leadershipService.withdraw(anyString()))
-                                .andReturn(CompletableFuture.completedFuture(null))
-                                .times(7);
-
         replay(leadershipService);
 
         partitionManager.activate();
         // Send in the event
         leaderListener.event(event);
 
+        assertTrue(partitionManager.rebalanceScheduled.get());
+
         verify(leadershipService);
     }
 
     /**
-     * Tests sending in LeadershipServiceEvents in the case when we have the
-     * right amount or too many partitions. These events will not trigger any
-     * partition reassignments.
+     * Tests rebalance will trigger the right now of leadership withdraw calls.
      */
     @Test
-    public void testNoRelinquish() {
+    public void testRebalance() {
+        // We have all the partitions so we'll need to relinquish some
+        setUpLeadershipService(PartitionManager.NUM_PARTITIONS);
+
+        expect(leadershipService.withdraw(anyString()))
+                                 .andReturn(CompletableFuture.completedFuture(null))
+                                 .times(7);
+
+        replay(leadershipService);
+
+        partitionManager.activate();
+
+        // trigger rebalance
+        partitionManager.doRebalance();
+
+        verify(leadershipService);
+    }
+
+    /**
+     * Tests that attempts to rebalance when the paritions are already
+     * evenly distributed does not result in any relinquish attempts.
+     */
+    @Test
+    public void testNoRebalance() {
         // Partitions are already perfectly balanced among the two active instances
         setUpLeadershipService(PartitionManager.NUM_PARTITIONS / 2);
         replay(leadershipService);
 
         partitionManager.activate();
 
-        // Send in the event
-        leaderListener.event(event);
+        // trigger rebalance
+        partitionManager.doRebalance();
 
         verify(leadershipService);
 
@@ -215,8 +234,8 @@
         setUpLeadershipService(PartitionManager.NUM_PARTITIONS / 2 - 1);
         replay(leadershipService);
 
-        // Send in the event
-        leaderListener.event(event);
+        // trigger rebalance
+        partitionManager.doRebalance();
 
         verify(leadershipService);
     }