ONOS-1981: Move partition manager rebalance activity off of the event loop thread
Change-Id: I32241a53be683dbf2611069072f80269655baba8
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index bb3d602..b341165 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -40,6 +40,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
@@ -57,9 +58,12 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false);
+
static final int NUM_PARTITIONS = 14;
private static final int BACKOFF_TIME = 2;
- private static final int CHECK_PERIOD = 10;
+ private static final int CHECK_PARTITION_BALANCE_PERIOD_SEC = 10;
+ private static final int RETRY_AFTER_DELAY_SEC = 5;
private static final String ELECTION_PREFIX = "intent-partition-";
@@ -78,8 +82,8 @@
leadershipService.runForLeadership(getPartitionPath(i));
}
- executor.scheduleAtFixedRate(this::doRelinquish, 0,
- CHECK_PERIOD, TimeUnit.SECONDS);
+ executor.scheduleAtFixedRate(() -> scheduleRebalance(0), 0,
+ CHECK_PARTITION_BALANCE_PERIOD_SEC, TimeUnit.SECONDS);
}
@Deactivate
@@ -129,11 +133,13 @@
return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
}
- private void doRelinquish() {
+ protected void doRebalance() {
+ rebalanceScheduled.set(false);
try {
- relinquish();
+ rebalance();
} catch (Exception e) {
- log.warn("Exception caught during relinquish task", e);
+ log.warn("Exception caught during rebalance task. Will retry in " + RETRY_AFTER_DELAY_SEC + " seconds", e);
+ scheduleRebalance(RETRY_AFTER_DELAY_SEC);
}
}
@@ -142,11 +148,10 @@
* so, relinquish leadership of some of them for a little while to let
* other instances take over.
*/
- private void relinquish() {
+ private void rebalance() {
int activeNodes = (int) clusterService.getNodes()
.stream()
- .filter(n -> clusterService.getState(n.id())
- == ControllerNode.State.ACTIVE)
+ .filter(node -> ControllerNode.State.ACTIVE == clusterService.getState(node.id()))
.count();
int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes);
@@ -172,6 +177,12 @@
}
}
+ private void scheduleRebalance(int afterDelaySec) {
+ if (rebalanceScheduled.compareAndSet(false, true)) {
+ executor.schedule(this::doRebalance, afterDelaySec, TimeUnit.SECONDS);
+ }
+ }
+
/**
* Try and recontest for leadership of a partition.
*
@@ -191,7 +202,7 @@
leadership.topic().startsWith(ELECTION_PREFIX)) {
// See if we need to let some partitions go
- relinquish();
+ scheduleRebalance(0);
}
}
}
@@ -201,7 +212,7 @@
@Override
public void event(ClusterEvent event) {
- relinquish();
+ scheduleRebalance(0);
}
}
}