ONOS-1981: Move partition manager rebalance activity off of the event loop thread
Change-Id: I32241a53be683dbf2611069072f80269655baba8
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index bb3d602..b341165 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -40,6 +40,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
@@ -57,9 +58,12 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false);
+
static final int NUM_PARTITIONS = 14;
private static final int BACKOFF_TIME = 2;
- private static final int CHECK_PERIOD = 10;
+ private static final int CHECK_PARTITION_BALANCE_PERIOD_SEC = 10;
+ private static final int RETRY_AFTER_DELAY_SEC = 5;
private static final String ELECTION_PREFIX = "intent-partition-";
@@ -78,8 +82,8 @@
leadershipService.runForLeadership(getPartitionPath(i));
}
- executor.scheduleAtFixedRate(this::doRelinquish, 0,
- CHECK_PERIOD, TimeUnit.SECONDS);
+ executor.scheduleAtFixedRate(() -> scheduleRebalance(0), 0,
+ CHECK_PARTITION_BALANCE_PERIOD_SEC, TimeUnit.SECONDS);
}
@Deactivate
@@ -129,11 +133,13 @@
return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
}
- private void doRelinquish() {
+ protected void doRebalance() {
+ rebalanceScheduled.set(false);
try {
- relinquish();
+ rebalance();
} catch (Exception e) {
- log.warn("Exception caught during relinquish task", e);
+ log.warn("Exception caught during rebalance task. Will retry in " + RETRY_AFTER_DELAY_SEC + " seconds", e);
+ scheduleRebalance(RETRY_AFTER_DELAY_SEC);
}
}
@@ -142,11 +148,10 @@
* so, relinquish leadership of some of them for a little while to let
* other instances take over.
*/
- private void relinquish() {
+ private void rebalance() {
int activeNodes = (int) clusterService.getNodes()
.stream()
- .filter(n -> clusterService.getState(n.id())
- == ControllerNode.State.ACTIVE)
+ .filter(node -> ControllerNode.State.ACTIVE == clusterService.getState(node.id()))
.count();
int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes);
@@ -172,6 +177,12 @@
}
}
+ private void scheduleRebalance(int afterDelaySec) {
+ if (rebalanceScheduled.compareAndSet(false, true)) {
+ executor.schedule(this::doRebalance, afterDelaySec, TimeUnit.SECONDS);
+ }
+ }
+
/**
* Try and recontest for leadership of a partition.
*
@@ -191,7 +202,7 @@
leadership.topic().startsWith(ELECTION_PREFIX)) {
// See if we need to let some partitions go
- relinquish();
+ scheduleRebalance(0);
}
}
}
@@ -201,7 +212,7 @@
@Override
public void event(ClusterEvent event) {
- relinquish();
+ scheduleRebalance(0);
}
}
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
index 9655b2d..3b091c0 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
@@ -172,41 +172,60 @@
/**
* Tests sending in LeadershipServiceEvents in the case when we have
* too many partitions. The event will trigger the partition manager to
- * reassess how many partitions it has and relinquish some.
+ * schedule a rebalancing activity.
*/
@Test
- public void testRelinquish() {
+ public void testRebalanceScheduling() {
// We have all the partitions so we'll need to relinquish some
setUpLeadershipService(PartitionManager.NUM_PARTITIONS);
- expect(leadershipService.withdraw(anyString()))
- .andReturn(CompletableFuture.completedFuture(null))
- .times(7);
-
replay(leadershipService);
partitionManager.activate();
// Send in the event
leaderListener.event(event);
+ assertTrue(partitionManager.rebalanceScheduled.get());
+
verify(leadershipService);
}
/**
- * Tests sending in LeadershipServiceEvents in the case when we have the
- * right amount or too many partitions. These events will not trigger any
- * partition reassignments.
+ * Tests rebalance will trigger the right now of leadership withdraw calls.
*/
@Test
- public void testNoRelinquish() {
+ public void testRebalance() {
+ // We have all the partitions so we'll need to relinquish some
+ setUpLeadershipService(PartitionManager.NUM_PARTITIONS);
+
+ expect(leadershipService.withdraw(anyString()))
+ .andReturn(CompletableFuture.completedFuture(null))
+ .times(7);
+
+ replay(leadershipService);
+
+ partitionManager.activate();
+
+ // trigger rebalance
+ partitionManager.doRebalance();
+
+ verify(leadershipService);
+ }
+
+ /**
+ * Tests that attempts to rebalance when the paritions are already
+ * evenly distributed does not result in any relinquish attempts.
+ */
+ @Test
+ public void testNoRebalance() {
// Partitions are already perfectly balanced among the two active instances
setUpLeadershipService(PartitionManager.NUM_PARTITIONS / 2);
replay(leadershipService);
partitionManager.activate();
- // Send in the event
- leaderListener.event(event);
+ // trigger rebalance
+ partitionManager.doRebalance();
verify(leadershipService);
@@ -215,8 +234,8 @@
setUpLeadershipService(PartitionManager.NUM_PARTITIONS / 2 - 1);
replay(leadershipService);
- // Send in the event
- leaderListener.event(event);
+ // trigger rebalance
+ partitionManager.doRebalance();
verify(leadershipService);
}