Reduce frequency of flow store replication
Change-Id: I32528623f7dbbf0e9ec4b091394126f28ac5ffe2
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/
index ec2532b..6154768 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/
@@ -22,6 +22,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -61,7 +62,7 @@
* device, allowing mastership to be reassigned to non-backup nodes.
public class DeviceFlowTable {
- private static final int NUM_BUCKETS = 1024;
+ private static final int NUM_BUCKETS = 128;
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
@@ -80,7 +81,8 @@
private final DeviceId deviceId;
private final ClusterCommunicationService clusterCommunicator;
private final LifecycleManager lifecycleManager;
- private final ScheduledExecutorService executorService;
+ private final ScheduledExecutorService scheduler;
+ private final Executor executor;
private final NodeId localNodeId;
private final LogicalClock clock = new LogicalClock();
@@ -88,14 +90,15 @@
private volatile DeviceReplicaInfo replicaInfo;
private volatile long activeTerm;
+ private long backupPeriod;
private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
public void event(LifecycleEvent event) {
- executorService.execute(() -> onLifecycleEvent(event));
+ executor.execute(() -> onLifecycleEvent(event));
- private ScheduledFuture<?> backupFuture;
private ScheduledFuture<?> antiEntropyFuture;
private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
@@ -109,13 +112,15 @@
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
LifecycleManager lifecycleManager,
- ScheduledExecutorService executorService,
+ ScheduledExecutorService scheduler,
+ Executor executor,
long backupPeriod,
long antiEntropyPeriod) {
this.deviceId = deviceId;
this.clusterCommunicator = clusterCommunicator;
this.lifecycleManager = lifecycleManager;
- this.executorService = executorService;
+ this.scheduler = scheduler;
+ this.executor = executor;
this.localNodeId = clusterService.getLocalNode().id();
@@ -133,6 +138,7 @@
+ scheduleBackups();
@@ -141,12 +147,7 @@
* @param backupPeriod the flow table backup period in milliseconds
synchronized void setBackupPeriod(long backupPeriod) {
- ScheduledFuture<?> backupFuture = this.backupFuture;
- if (backupFuture != null) {
- backupFuture.cancel(false);
- }
- this.backupFuture = executorService.scheduleAtFixedRate(
- this::backup, backupPeriod, backupPeriod, TimeUnit.MILLISECONDS);
+ this.backupPeriod = backupPeriod;
@@ -159,8 +160,11 @@
if (antiEntropyFuture != null) {
- this.antiEntropyFuture = executorService.scheduleAtFixedRate(
- this::runAntiEntropy, antiEntropyPeriod, antiEntropyPeriod, TimeUnit.MILLISECONDS);
+ this.antiEntropyFuture = scheduler.scheduleAtFixedRate(
+ () -> executor.execute(this::runAntiEntropy),
+ antiEntropyPeriod,
+ antiEntropyPeriod,
@@ -307,86 +311,114 @@
* @return a future to be completed with the function result once it has been run
private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
- DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
- if (!replicaInfo.isMaster(localNodeId)) {
- return Tools.exceptionalFuture(new IllegalStateException());
- }
- FlowBucket bucket = getBucket(flowId);
- // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
- // the change to be executed once the master has been synchronized.
- final long term = replicaInfo.term();
- if (activeTerm < term) {
- log.debug("Enqueueing operation for device {}", deviceId);
- synchronized (flowTasks) {
- // Double checked lock on the active term.
- if (activeTerm < term) {
- CompletableFuture<T> future = new CompletableFuture<>();
- flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
- .add(() -> future.complete(function.apply(bucket, term)));
- return future;
- }
+ CompletableFuture<T> future = new CompletableFuture<>();
+ executor.execute(() -> {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+ if (!replicaInfo.isMaster(localNodeId)) {
+ future.completeExceptionally(new IllegalStateException());
+ return;
- }
- return CompletableFuture.completedFuture(function.apply(bucket, term));
+ FlowBucket bucket = getBucket(flowId);
+ // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
+ // the change to be executed once the master has been synchronized.
+ final long term = replicaInfo.term();
+ if (activeTerm < term) {
+ log.debug("Enqueueing operation for device {}", deviceId);
+ flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
+ .add(() -> future.complete(function.apply(bucket, term)));
+ } else {
+ future.complete(function.apply(bucket, term));
+ }
+ });
+ return future;
+ }
+ /**
+ * Schedules bucket backups.
+ */
+ private void scheduleBackups() {
+ flowBuckets.values().forEach(bucket -> backupBucket(bucket).whenComplete((result, error) -> {
+ scheduleBackup(bucket);
+ }));
+ }
+ /**
+ * Schedules a backup for the given bucket.
+ *
+ * @param bucket the bucket for which to schedule the backup
+ */
+ private void scheduleBackup(FlowBucket bucket) {
+ scheduler.schedule(
+ () -> executor.execute(() -> backupBucket(bucket)),
+ backupPeriod,
* Backs up all buckets in the given device to the given node.
- private void backup() {
- DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
- // If the local node is not currently the master, skip the backup.
- if (!replicaInfo.isMaster(localNodeId)) {
- return;
- }
- // Otherwise, iterate through backup nodes and backup the device.
- for (NodeId nodeId : replicaInfo.backups()) {
- try {
- backup(nodeId, replicaInfo.term());
- } catch (Exception e) {
- log.error("Backup of " + deviceId + " to " + nodeId + " failed", e);
- }
- }
+ private CompletableFuture<Void> backupAll() {
+ CompletableFuture<?>[] futures = flowBuckets.values()
+ .stream()
+ .map(bucket -> backupBucket(bucket))
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(futures);
- * Backs up all buckets for the device to the given node.
+ * Backs up the given flow bucket.
- * @param nodeId the node to which to back up the device
- * @param term the term for which to backup to the node
+ * @param bucket the flow bucket to backup
- private void backup(NodeId nodeId, long term) {
- for (FlowBucket bucket : flowBuckets.values()) {
- // If the bucket is not in the current term, skip it. This forces synchronization of the bucket
- // to occur prior to the new master replicating changes in the bucket to backups.
- if (bucket.term() != term) {
- continue;
- }
+ private CompletableFuture<Void> backupBucket(FlowBucket bucket) {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
- // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
- LogicalTimestamp timestamp = bucket.timestamp();
- // If the backup can be run (no concurrent backup to the node in progress) then run it.
- BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
- if (startBackup(operation, timestamp)) {
- backup(bucket.copy(), nodeId).whenCompleteAsync((succeeded, error) -> {
- if (error != null) {
- log.debug("Backup operation {} failed", operation, error);
- failBackup(operation);
- } else if (succeeded) {
- succeedBackup(operation, timestamp);
- backup(nodeId, term);
- } else {
- log.debug("Backup operation {} failed: term mismatch", operation);
- failBackup(operation);
- }
- }, executorService);
- }
+ // Only replicate if the bucket's term matches the replica term and the local node is the current master.
+ // This ensures that the bucket has been synchronized prior to a new master replicating changes to backups.
+ // Only replicate if the local node is the current master.
+ if (bucket.term() == replicaInfo.term() && replicaInfo.isMaster(localNodeId)) {
+ // Replicate the bucket to each of the backup nodes.
+ CompletableFuture<?>[] futures = replicaInfo.backups()
+ .stream()
+ .map(nodeId -> backupBucketToNode(bucket, nodeId))
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(futures);
+ return CompletableFuture.completedFuture(null);
+ }
+ /**
+ * Backs up the given flow bucket to the given node.
+ *
+ * @param bucket the bucket to backup
+ * @param nodeId the node to which to back up the bucket
+ * @return a future to be completed once the bucket has been backed up
+ */
+ private CompletableFuture<Void> backupBucketToNode(FlowBucket bucket, NodeId nodeId) {
+ // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
+ LogicalTimestamp timestamp = bucket.timestamp();
+ // If the backup can be run (no concurrent backup to the node in progress) then run it.
+ BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
+ if (startBackup(operation, timestamp)) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
+ if (error != null) {
+ log.debug("Backup operation {} failed", operation, error);
+ failBackup(operation);
+ } else if (succeeded) {
+ succeedBackup(operation, timestamp);
+ } else {
+ log.debug("Backup operation {} failed: term mismatch", operation);
+ failBackup(operation);
+ }
+ future.complete(null);
+ }, executor);
+ return future;
+ }
+ return CompletableFuture.completedFuture(null);
@@ -502,17 +534,19 @@
* @param nodeId the node with which to execute the anti-entropy protocol
private void runAntiEntropy(NodeId nodeId) {
- requestDigests(nodeId).thenAcceptAsync((digests) -> {
- // Compute a set of missing BucketIds based on digest times and send them back to the master.
- for (FlowBucketDigest remoteDigest : digests) {
- FlowBucket localBucket = getBucket(remoteDigest.bucket());
- if (localBucket.getDigest().isNewerThan(remoteDigest)) {
- log.debug("Detected missing flow entries on node {} in bucket {}/{}",
- nodeId, deviceId, remoteDigest.bucket());
- resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
+ backupAll().whenCompleteAsync((result, error) -> {
+ requestDigests(nodeId).thenAcceptAsync((digests) -> {
+ // Compute a set of missing BucketIds based on digest times and send them back to the master.
+ for (FlowBucketDigest remoteDigest : digests) {
+ FlowBucket localBucket = getBucket(remoteDigest.bucket());
+ if (localBucket.getDigest().isNewerThan(remoteDigest)) {
+ log.debug("Detected missing flow entries on node {} in bucket {}/{}",
+ nodeId, deviceId, remoteDigest.bucket());
+ resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
+ }
- }
- }, executorService);
+ }, executor);
+ }, executor);
@@ -556,7 +590,7 @@
} else {
- }, executorService);
+ }, executor);
@@ -576,7 +610,7 @@
log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
- }, executorService);
+ }, executor);
@@ -621,7 +655,7 @@
.thenAcceptAsync(flowBucket -> {
(id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
- }, executorService);
+ }, executor);
@@ -666,10 +700,7 @@
* @param bucket the bucket number to activate
private void activateBucket(int bucket) {
- Queue<Runnable> tasks;
- synchronized (flowTasks) {
- tasks = flowTasks.remove(bucket);
- }
+ Queue<Runnable> tasks = flowTasks.remove(bucket);
if (tasks != null) {
log.debug("Completing enqueued operations for device {}", deviceId);
tasks.forEach(task ->;
@@ -780,7 +811,7 @@
clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
return clock.timestamp(function.apply(request.value()));
- }, SERIALIZER::encode, executorService);
+ }, SERIALIZER::encode, executor);
@@ -819,11 +850,6 @@
* Cancels recurrent scheduled futures.
private synchronized void cancelFutures() {
- ScheduledFuture<?> backupFuture = this.backupFuture;
- if (backupFuture != null) {
- backupFuture.cancel(false);
- }
ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
if (antiEntropyFuture != null) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/
index 90c503b..8382ad2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/
@@ -42,6 +42,7 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.OrderedExecutor;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
@@ -95,6 +96,8 @@
import org.slf4j.Logger;
import static;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static;
@@ -175,8 +178,11 @@
private ExecutorService messageHandlingExecutor;
private ExecutorService eventHandler;
- private final ScheduledExecutorService backupSenderExecutor =
- Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
+ private final ScheduledExecutorService backupScheduler = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/flow", "backup-scheduler", log));
+ private final ExecutorService backupExecutor = Executors.newFixedThreadPool(
+ max(min(Runtime.getRuntime().availableProcessors() * 2, 16), 4),
+ groupedThreads("onos/flow", "backup-%d", log));
private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
@@ -245,7 +251,8 @@
- backupSenderExecutor.shutdownNow();
+ backupScheduler.shutdownNow();
+ backupExecutor.shutdownNow();"Stopped");
@@ -692,7 +699,8 @@
new InternalLifecycleManager(id),
- backupSenderExecutor,
+ backupScheduler,
+ new OrderedExecutor(backupExecutor),
@@ -728,7 +736,8 @@
new InternalLifecycleManager(deviceId),
- backupSenderExecutor,
+ backupScheduler,
+ new OrderedExecutor(backupExecutor),
@@ -933,7 +942,7 @@
if (replicaInfo != null && replicaInfo.term() == term) {
NodeId master = replicaInfo.master().orElse(null);
List<NodeId> backups = replicaInfo.backups()
- .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
+ .subList(0, min(replicaInfo.backups().size(), backupCount));
listenerRegistry.process(new LifecycleEvent(
new DeviceReplicaInfo(term, master, backups)));
@@ -967,7 +976,7 @@
private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
NodeId master = replicaInfo.master().orElse(null);
List<NodeId> backups = replicaInfo.backups()
- .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
+ .subList(0, min(replicaInfo.backups().size(), backupCount));
return new DeviceReplicaInfo(replicaInfo.term(), master, backups);