Reduce frequency of flow store replication

Change-Id: I32528623f7dbbf0e9ec4b091394126f28ac5ffe2
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index 4edd2f3..0008ac1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -22,6 +22,7 @@
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -61,7 +62,7 @@
  * device, allowing mastership to be reassigned to non-backup nodes.
  */
 public class DeviceFlowTable {
-    private static final int NUM_BUCKETS = 1024;
+    private static final int NUM_BUCKETS = 128;
     private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
         .register(KryoNamespaces.API)
         .register(BucketId.class)
@@ -80,7 +81,8 @@
     private final DeviceId deviceId;
     private final ClusterCommunicationService clusterCommunicator;
     private final LifecycleManager lifecycleManager;
-    private final ScheduledExecutorService executorService;
+    private final ScheduledExecutorService scheduler;
+    private final Executor executor;
     private final NodeId localNodeId;
 
     private final LogicalClock clock = new LogicalClock();
@@ -88,14 +90,15 @@
     private volatile DeviceReplicaInfo replicaInfo;
     private volatile long activeTerm;
 
+    private long backupPeriod;
+
     private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
         @Override
         public void event(LifecycleEvent event) {
-            executorService.execute(() -> onLifecycleEvent(event));
+            executor.execute(() -> onLifecycleEvent(event));
         }
     };
 
-    private ScheduledFuture<?> backupFuture;
     private ScheduledFuture<?> antiEntropyFuture;
 
     private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
@@ -109,13 +112,15 @@
         ClusterService clusterService,
         ClusterCommunicationService clusterCommunicator,
         LifecycleManager lifecycleManager,
-        ScheduledExecutorService executorService,
+        ScheduledExecutorService scheduler,
+        Executor executor,
         long backupPeriod,
         long antiEntropyPeriod) {
         this.deviceId = deviceId;
         this.clusterCommunicator = clusterCommunicator;
         this.lifecycleManager = lifecycleManager;
-        this.executorService = executorService;
+        this.scheduler = scheduler;
+        this.executor = executor;
         this.localNodeId = clusterService.getLocalNode().id();
 
         addListeners();
@@ -133,6 +138,7 @@
         registerSubscribers();
 
         startTerm(lifecycleManager.getReplicaInfo());
+        scheduleBackups();
     }
 
     /**
@@ -141,12 +147,7 @@
      * @param backupPeriod the flow table backup period in milliseconds
      */
     synchronized void setBackupPeriod(long backupPeriod) {
-        ScheduledFuture<?> backupFuture = this.backupFuture;
-        if (backupFuture != null) {
-            backupFuture.cancel(false);
-        }
-        this.backupFuture = executorService.scheduleAtFixedRate(
-            this::backup, backupPeriod, backupPeriod, TimeUnit.MILLISECONDS);
+        this.backupPeriod = backupPeriod;
     }
 
     /**
@@ -159,8 +160,11 @@
         if (antiEntropyFuture != null) {
             antiEntropyFuture.cancel(false);
         }
-        this.antiEntropyFuture = executorService.scheduleAtFixedRate(
-            this::runAntiEntropy, antiEntropyPeriod, antiEntropyPeriod, TimeUnit.MILLISECONDS);
+        this.antiEntropyFuture = scheduler.scheduleAtFixedRate(
+                () -> executor.execute(this::runAntiEntropy),
+                antiEntropyPeriod,
+                antiEntropyPeriod,
+                TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -307,86 +311,114 @@
      * @return a future to be completed with the function result once it has been run
      */
     private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
-        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
-        if (!replicaInfo.isMaster(localNodeId)) {
-            return Tools.exceptionalFuture(new IllegalStateException());
-        }
-
-        FlowBucket bucket = getBucket(flowId);
-
-        // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
-        // the change to be executed once the master has been synchronized.
-        final long term = replicaInfo.term();
-        if (activeTerm < term) {
-            log.debug("Enqueueing operation for device {}", deviceId);
-            synchronized (flowTasks) {
-                // Double checked lock on the active term.
-                if (activeTerm < term) {
-                    CompletableFuture<T> future = new CompletableFuture<>();
-                    flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
-                        .add(() -> future.complete(function.apply(bucket, term)));
-                    return future;
-                }
+        CompletableFuture<T> future = new CompletableFuture<>();
+        executor.execute(() -> {
+            DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+            if (!replicaInfo.isMaster(localNodeId)) {
+                future.completeExceptionally(new IllegalStateException());
+                return;
             }
-        }
-        return CompletableFuture.completedFuture(function.apply(bucket, term));
+
+            FlowBucket bucket = getBucket(flowId);
+
+            // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
+            // the change to be executed once the master has been synchronized.
+            final long term = replicaInfo.term();
+            if (activeTerm < term) {
+                log.debug("Enqueueing operation for device {}", deviceId);
+                flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
+                        .add(() -> future.complete(function.apply(bucket, term)));
+            } else {
+                future.complete(function.apply(bucket, term));
+            }
+        });
+        return future;
+    }
+
+    /**
+     * Schedules bucket backups.
+     */
+    private void scheduleBackups() {
+        flowBuckets.values().forEach(bucket -> backupBucket(bucket).whenComplete((result, error) -> {
+            scheduleBackup(bucket);
+        }));
+    }
+
+    /**
+     * Schedules a backup for the given bucket.
+     *
+     * @param bucket the bucket for which to schedule the backup
+     */
+    private void scheduleBackup(FlowBucket bucket) {
+        scheduler.schedule(
+                () -> executor.execute(() -> backupBucket(bucket)),
+                backupPeriod,
+                TimeUnit.MILLISECONDS);
     }
 
     /**
      * Backs up all buckets in the given device to the given node.
      */
-    private void backup() {
-        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
-
-        // If the local node is not currently the master, skip the backup.
-        if (!replicaInfo.isMaster(localNodeId)) {
-            return;
-        }
-
-        // Otherwise, iterate through backup nodes and backup the device.
-        for (NodeId nodeId : replicaInfo.backups()) {
-            try {
-                backup(nodeId, replicaInfo.term());
-            } catch (Exception e) {
-                log.error("Backup of " + deviceId + " to " + nodeId + " failed", e);
-            }
-        }
+    private CompletableFuture<Void> backupAll() {
+        CompletableFuture<?>[] futures = flowBuckets.values()
+                .stream()
+                .map(bucket -> backupBucket(bucket))
+                .toArray(CompletableFuture[]::new);
+        return CompletableFuture.allOf(futures);
     }
 
     /**
-     * Backs up all buckets for the device to the given node.
+     * Backs up the given flow bucket.
      *
-     * @param nodeId the node to which to back up the device
-     * @param term   the term for which to backup to the node
+     * @param bucket the flow bucket to backup
      */
-    private void backup(NodeId nodeId, long term) {
-        for (FlowBucket bucket : flowBuckets.values()) {
-            // If the bucket is not in the current term, skip it. This forces synchronization of the bucket
-            // to occur prior to the new master replicating changes in the bucket to backups.
-            if (bucket.term() != term) {
-                continue;
-            }
+    private CompletableFuture<Void> backupBucket(FlowBucket bucket) {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
 
-            // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
-            LogicalTimestamp timestamp = bucket.timestamp();
-
-            // If the backup can be run (no concurrent backup to the node in progress) then run it.
-            BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
-            if (startBackup(operation, timestamp)) {
-                backup(bucket.copy(), nodeId).whenCompleteAsync((succeeded, error) -> {
-                    if (error != null) {
-                        log.debug("Backup operation {} failed", operation, error);
-                        failBackup(operation);
-                    } else if (succeeded) {
-                        succeedBackup(operation, timestamp);
-                        backup(nodeId, term);
-                    } else {
-                        log.debug("Backup operation {} failed: term mismatch", operation);
-                        failBackup(operation);
-                    }
-                }, executorService);
-            }
+        // Only replicate if the bucket's term matches the replica term and the local node is the current master.
+        // This ensures that the bucket has been synchronized prior to a new master replicating changes to backups.
+        // Only replicate if the local node is the current master.
+        if (bucket.term() == replicaInfo.term() && replicaInfo.isMaster(localNodeId)) {
+            // Replicate the bucket to each of the backup nodes.
+            CompletableFuture<?>[] futures = replicaInfo.backups()
+                    .stream()
+                    .map(nodeId -> backupBucketToNode(bucket, nodeId))
+                    .toArray(CompletableFuture[]::new);
+            return CompletableFuture.allOf(futures);
         }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    /**
+     * Backs up the given flow bucket to the given node.
+     *
+     * @param bucket the bucket to backup
+     * @param nodeId the node to which to back up the bucket
+     * @return a future to be completed once the bucket has been backed up
+     */
+    private CompletableFuture<Void> backupBucketToNode(FlowBucket bucket, NodeId nodeId) {
+        // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
+        LogicalTimestamp timestamp = bucket.timestamp();
+
+        // If the backup can be run (no concurrent backup to the node in progress) then run it.
+        BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
+        if (startBackup(operation, timestamp)) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
+                if (error != null) {
+                    log.debug("Backup operation {} failed", operation, error);
+                    failBackup(operation);
+                } else if (succeeded) {
+                    succeedBackup(operation, timestamp);
+                } else {
+                    log.debug("Backup operation {} failed: term mismatch", operation);
+                    failBackup(operation);
+                }
+                future.complete(null);
+            }, executor);
+            return future;
+        }
+        return CompletableFuture.completedFuture(null);
     }
 
     /**
@@ -502,17 +534,19 @@
      * @param nodeId the node with which to execute the anti-entropy protocol
      */
     private void runAntiEntropy(NodeId nodeId) {
-        requestDigests(nodeId).thenAcceptAsync((digests) -> {
-            // Compute a set of missing BucketIds based on digest times and send them back to the master.
-            for (FlowBucketDigest remoteDigest : digests) {
-                FlowBucket localBucket = getBucket(remoteDigest.bucket());
-                if (localBucket.getDigest().isNewerThan(remoteDigest)) {
-                    log.debug("Detected missing flow entries on node {} in bucket {}/{}",
-                        nodeId, deviceId, remoteDigest.bucket());
-                    resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
+        backupAll().whenCompleteAsync((result, error) -> {
+            requestDigests(nodeId).thenAcceptAsync((digests) -> {
+                // Compute a set of missing BucketIds based on digest times and send them back to the master.
+                for (FlowBucketDigest remoteDigest : digests) {
+                    FlowBucket localBucket = getBucket(remoteDigest.bucket());
+                    if (localBucket.getDigest().isNewerThan(remoteDigest)) {
+                        log.debug("Detected missing flow entries on node {} in bucket {}/{}",
+                                nodeId, deviceId, remoteDigest.bucket());
+                        resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
+                    }
                 }
-            }
-        }, executorService);
+            }, executor);
+        }, executor);
     }
 
     /**
@@ -556,7 +590,7 @@
                 } else {
                     activateMaster(newReplicaInfo);
                 }
-            }, executorService);
+            }, executor);
     }
 
     /**
@@ -576,7 +610,7 @@
                     log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
                 }
                 activateMaster(newReplicaInfo);
-            }, executorService);
+            }, executor);
     }
 
     /**
@@ -621,7 +655,7 @@
             .thenAcceptAsync(flowBucket -> {
                 flowBuckets.compute(flowBucket.bucketId().bucket(),
                     (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
-            }, executorService);
+            }, executor);
     }
 
     /**
@@ -666,10 +700,7 @@
      * @param bucket the bucket number to activate
      */
     private void activateBucket(int bucket) {
-        Queue<Runnable> tasks;
-        synchronized (flowTasks) {
-            tasks = flowTasks.remove(bucket);
-        }
+        Queue<Runnable> tasks = flowTasks.remove(bucket);
         if (tasks != null) {
             log.debug("Completing enqueued operations for device {}", deviceId);
             tasks.forEach(task -> task.run());
@@ -780,7 +811,7 @@
         clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
             clock.tick(request.timestamp());
             return clock.timestamp(function.apply(request.value()));
-        }, SERIALIZER::encode, executorService);
+        }, SERIALIZER::encode, executor);
     }
 
     /**
@@ -819,11 +850,6 @@
      * Cancels recurrent scheduled futures.
      */
     private synchronized void cancelFutures() {
-        ScheduledFuture<?> backupFuture = this.backupFuture;
-        if (backupFuture != null) {
-            backupFuture.cancel(false);
-        }
-
         ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
         if (antiEntropyFuture != null) {
             antiEntropyFuture.cancel(false);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index dad9351..ceaf12e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -19,6 +19,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Streams;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.OrderedExecutor;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
@@ -93,6 +94,8 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
@@ -174,8 +177,11 @@
     private ExecutorService messageHandlingExecutor;
     private ExecutorService eventHandler;
 
-    private final ScheduledExecutorService backupSenderExecutor =
-        Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
+    private final ScheduledExecutorService backupScheduler = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/flow", "backup-scheduler", log));
+    private final ExecutorService backupExecutor = Executors.newFixedThreadPool(
+            max(min(Runtime.getRuntime().availableProcessors() * 2, 16), 4),
+            groupedThreads("onos/flow", "backup-%d", log));
 
     private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
     private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
@@ -244,7 +250,8 @@
         deviceTableStats.destroy();
         eventHandler.shutdownNow();
         messageHandlingExecutor.shutdownNow();
-        backupSenderExecutor.shutdownNow();
+        backupScheduler.shutdownNow();
+        backupExecutor.shutdownNow();
         log.info("Stopped");
     }
 
@@ -691,7 +698,8 @@
                 clusterService,
                 clusterCommunicator,
                 new InternalLifecycleManager(id),
-                backupSenderExecutor,
+                backupScheduler,
+                new OrderedExecutor(backupExecutor),
                 backupPeriod,
                 antiEntropyPeriod));
         }
@@ -727,7 +735,8 @@
                 clusterService,
                 clusterCommunicator,
                 new InternalLifecycleManager(deviceId),
-                backupSenderExecutor,
+                backupScheduler,
+                new OrderedExecutor(backupExecutor),
                 backupPeriod,
                 antiEntropyPeriod));
         }
@@ -941,7 +950,7 @@
             if (replicaInfo != null && replicaInfo.term() == term) {
                 NodeId master = replicaInfo.master().orElse(null);
                 List<NodeId> backups = replicaInfo.backups()
-                    .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
+                    .subList(0, min(replicaInfo.backups().size(), backupCount));
                 listenerRegistry.process(new LifecycleEvent(
                     LifecycleEvent.Type.TERM_ACTIVE,
                     new DeviceReplicaInfo(term, master, backups)));
@@ -975,7 +984,7 @@
         private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
             NodeId master = replicaInfo.master().orElse(null);
             List<NodeId> backups = replicaInfo.backups()
-                .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
+                .subList(0, min(replicaInfo.backups().size(), backupCount));
             return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
         }