Move flow count updates off write path in ECFlowRuleStore

Change-Id: I44c611625baec124a45524ddb39fbe74f4c3c907
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 8efb40a..204f98f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -31,7 +31,11 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.collect.Streams;
+import com.google.common.util.concurrent.Futures;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -56,11 +60,6 @@
 import org.onosproject.net.flow.FlowEntry.FlowEntryState;
 import org.onosproject.net.flow.FlowId;
 import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
 import org.onosproject.net.flow.FlowRuleEvent;
 import org.onosproject.net.flow.FlowRuleEvent.Type;
 import org.onosproject.net.flow.FlowRuleService;
@@ -68,6 +67,11 @@
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
 import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.TableStatisticsEntry;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@@ -87,11 +91,6 @@
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
@@ -323,7 +322,7 @@
         clusterCommunicator.addSubscriber(
                 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
-                FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackupReceipt, serializer::encode, executor);
+                FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackup, serializer::encode, executor);
     }
 
     private void unregisterMessageHandlers() {
@@ -851,12 +850,6 @@
 
         private void recordUpdate(BucketId bucketId) {
             lastUpdateTimes.put(bucketId, System.currentTimeMillis());
-            FlowBucket flowBucket = getFlowBucket(bucketId);
-            int flowCount = flowBucket.table().entrySet()
-                .stream()
-                .mapToInt(e -> e.getValue().values().size())
-                .sum();
-            flowCounts.put(bucketId, flowCount);
         }
 
         public void add(FlowEntry rule) {
@@ -925,37 +918,63 @@
             flowEntries.clear();
         }
 
+        /**
+         * Returns a boolean indicating whether the local node is the current master for the given device.
+         *
+         * @param deviceId the device for which to indicate whether the local node is the current master
+         * @return indicates whether the local node is the current master for the given device
+         */
         private boolean isMasterNode(DeviceId deviceId) {
             NodeId master = replicaInfoManager.getReplicaInfoFor(deviceId).master().orElse(null);
             return Objects.equals(master, clusterService.getLocalNode().id());
         }
 
-        private boolean isBackupNode(NodeId nodeId, DeviceId deviceId) {
-            List<NodeId> allPossibleBackupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
-            return allPossibleBackupNodes.indexOf(nodeId) < backupCount;
-        }
-
+        /**
+         * Backs up all devices to all backup nodes.
+         */
         private void backup() {
-            clusterService.getNodes().stream()
-                .filter(node -> !node.id().equals(clusterService.getLocalNode().id()))
-                .forEach(node -> {
-                    try {
-                        backup(node.id());
-                    } catch (Exception e) {
-                        log.error("Backup failed.", e);
-                    }
-                });
+            for (DeviceId deviceId : flowEntries.keySet()) {
+                backup(deviceId);
+            }
         }
 
-        private void backup(NodeId nodeId) {
-            for (DeviceId deviceId : flowEntries.keySet()) {
-                if (isMasterNode(deviceId) && isBackupNode(nodeId, deviceId)) {
-                    backup(nodeId, deviceId);
+        /**
+         * Backs up all buckets in the given device to the given node.
+         *
+         * @param deviceId the device to back up
+         */
+        private void backup(DeviceId deviceId) {
+            if (!isMasterNode(deviceId)) {
+                return;
+            }
+
+            // Get a list of backup nodes for the device.
+            List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
+            int availableBackupCount = Math.min(backupCount, backupNodes.size());
+
+            // If the list of backup nodes is empty, update the flow count.
+            if (availableBackupCount == 0) {
+                updateDeviceFlowCounts(deviceId);
+            } else {
+                // Otherwise, iterate through backup nodes and backup the device.
+                for (int index = 0; index < availableBackupCount; index++) {
+                    NodeId backupNode = backupNodes.get(index);
+                    try {
+                        backup(deviceId, backupNode);
+                    } catch (Exception e) {
+                        log.error("Backup of " + deviceId + " to " + backupNode + " failed", e);
+                    }
                 }
             }
         }
 
-        private void backup(NodeId nodeId, DeviceId deviceId) {
+        /**
+         * Backs up all buckets for the given device to the given node.
+         *
+         * @param deviceId the device to back up
+         * @param nodeId the node to which to back up the device
+         */
+        private void backup(DeviceId deviceId, NodeId nodeId) {
             final long timestamp = System.currentTimeMillis();
             for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
                 BucketId bucketId = new BucketId(deviceId, bucket);
@@ -967,27 +986,56 @@
                         } else {
                             failBackup(operation);
                         }
-                        backup(nodeId);
+                        backup(deviceId, nodeId);
                     }, backupSenderExecutor);
                 }
             }
         }
 
+        /**
+         * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
+         * <p>
+         * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
+         * are pending replication for the backup operation.
+         *
+         * @param operation the operation to start
+         * @return indicates whether the given backup operation should be started
+         */
         private boolean startBackup(BackupOperation operation) {
             long lastBackupTime = lastBackupTimes.getOrDefault(operation, 0L);
             long lastUpdateTime = lastUpdateTimes.getOrDefault(operation.bucketId(), 0L);
             return lastUpdateTime > 0 && lastBackupTime <= lastUpdateTime && inFlightUpdates.add(operation);
         }
 
+        /**
+         * Fails the given backup operation.
+         *
+         * @param operation the backup operation to fail
+         */
         private void failBackup(BackupOperation operation) {
             inFlightUpdates.remove(operation);
         }
 
+        /**
+         * Succeeds the given backup operation.
+         * <p>
+         * The last backup time for the operation will be updated and the operation will be removed from
+         * in-flight updates.
+         *
+         * @param operation the operation to succeed
+         * @param timestamp the timestamp at which the operation was <em>started</em>
+         */
         private void succeedBackup(BackupOperation operation, long timestamp) {
-            inFlightUpdates.remove(operation);
             lastBackupTimes.put(operation, timestamp);
+            inFlightUpdates.remove(operation);
         }
 
+        /**
+         * Performs the given backup operation.
+         *
+         * @param operation the operation to perform
+         * @return a future to be completed with a boolean indicating whether the backup operation was successful
+         */
         private CompletableFuture<Boolean> backup(BackupOperation operation) {
             log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.",
                 operation.bucketId().bucket(), operation.bucketId().deviceId(), operation.nodeId());
@@ -1010,10 +1058,18 @@
                     }
                     future.complete(backedupFlows != null);
                 });
+
+            updateFlowCounts(flowBucket);
             return future;
         }
 
-        private Set<FlowId> onBackupReceipt(FlowBucket flowBucket) {
+        /**
+         * Handles a flow bucket backup from a remote peer.
+         *
+         * @param flowBucket the flow bucket to back up
+         * @return the set of flows that could not be backed up
+         */
+        private Set<FlowId> onBackup(FlowBucket flowBucket) {
             log.debug("Received flowEntries for {} bucket {} to backup",
                 flowBucket.bucketId().deviceId(), flowBucket.bucketId);
             Set<FlowId> backedupFlows = Sets.newHashSet();
@@ -1036,6 +1092,32 @@
             }
             return backedupFlows;
         }
+
+        /**
+         * Updates all flow counts for the given device.
+         *
+         * @param deviceId the device for which to update flow counts
+         */
+        private void updateDeviceFlowCounts(DeviceId deviceId) {
+            for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+                BucketId bucketId = new BucketId(deviceId, bucket);
+                FlowBucket flowBucket = getFlowBucket(bucketId);
+                updateFlowCounts(flowBucket);
+            }
+        }
+
+        /**
+         * Updates the eventually consistent flow count for the given bucket.
+         *
+         * @param flowBucket the flow bucket for which to update flow counts
+         */
+        private void updateFlowCounts(FlowBucket flowBucket) {
+            int flowCount = flowBucket.table().entrySet()
+                .stream()
+                .mapToInt(e -> e.getValue().values().size())
+                .sum();
+            flowCounts.put(flowBucket.bucketId(), flowCount);
+        }
     }
 
     @Override