Clean up implementation of buckets for flow rule replication

Change-Id: Ifc6aaed08c3a74e8b8f8fd9d69d5802351e5f47c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index f998dad..3080615 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -22,6 +22,7 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -87,7 +88,6 @@
 import org.slf4j.Logger;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
@@ -121,8 +121,7 @@
     private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
     private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
-    // number of devices whose flow entries will be backed up in one communication round
-    private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
+    private static final int NUM_BUCKETS = 1024;
 
     @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
             label = "Number of threads in the message handler pool")
@@ -180,13 +179,16 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
-    protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
+    protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
+        .register(KryoNamespaces.API)
+        .register(FlowBucket.class)
+        .build());
 
     protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
             .register(KryoNamespaces.API)
             .register(MastershipBasedTimestamp.class);
 
-    private EventuallyConsistentMap<DeviceId, Integer> flowCounts;
+    private EventuallyConsistentMap<DeviceId, Map<Integer, Integer>> flowCounts;
 
     private IdGenerator idGenerator;
     private NodeId local;
@@ -213,7 +215,7 @@
                 backupPeriod,
                 TimeUnit.MILLISECONDS);
 
-        flowCounts = storageService.<DeviceId, Integer>eventuallyConsistentMapBuilder()
+        flowCounts = storageService.<DeviceId, Map<Integer, Integer>>eventuallyConsistentMapBuilder()
                 .withName("onos-flow-counts")
                 .withSerializer(serializerBuilder)
                 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
@@ -348,9 +350,11 @@
 
     @Override
     public int getFlowRuleCount(DeviceId deviceId) {
-        Integer count = flowCounts.get(deviceId);
-        return count != null ? count : flowTable.flowEntries.get(deviceId) != null ?
-                flowTable.flowEntries.get(deviceId).keySet().size() : 0;
+        Map<Integer, Integer> counts = flowCounts.get(deviceId);
+        return counts != null
+            ? counts.values().stream().mapToInt(v -> v).sum()
+            : flowTable.flowEntries.get(deviceId) != null
+            ? flowTable.flowEntries.get(deviceId).keySet().size() : 0;
     }
 
     @Override
@@ -657,15 +661,17 @@
     private class BackupOperation {
         private final NodeId nodeId;
         private final DeviceId deviceId;
+        private final int bucket;
 
-        public BackupOperation(NodeId nodeId, DeviceId deviceId) {
+        public BackupOperation(NodeId nodeId, DeviceId deviceId, int bucket) {
             this.nodeId = nodeId;
             this.deviceId = deviceId;
+            this.bucket = bucket;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(nodeId, deviceId);
+            return Objects.hash(nodeId, deviceId, bucket);
         }
 
         @Override
@@ -673,13 +679,26 @@
             if (other != null && other instanceof BackupOperation) {
                 BackupOperation that = (BackupOperation) other;
                 return this.nodeId.equals(that.nodeId) &&
-                        this.deviceId.equals(that.deviceId);
+                        this.deviceId.equals(that.deviceId) &&
+                        this.bucket == that.bucket;
             } else {
                 return false;
             }
         }
     }
 
+    private class FlowBucket {
+        private final DeviceId deviceId;
+        private final int bucket;
+        private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
+
+        FlowBucket(DeviceId deviceId, int bucket, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table) {
+            this.deviceId = deviceId;
+            this.bucket = bucket;
+            this.table = table;
+        }
+    }
+
     private class InternalFlowTable implements ReplicaInfoEventListener {
 
         //TODO replace the Map<V,V> with ExtendedSet
@@ -687,7 +706,8 @@
                 flowEntries = Maps.newConcurrentMap();
 
         private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
-        private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
+        private final Map<DeviceId, Map<Integer, Long>> lastUpdateTimes = Maps.newConcurrentMap();
+        private final Map<NodeId, Set<DeviceId>> inFlightUpdates = Maps.newConcurrentMap();
 
         @Override
         public void event(ReplicaInfoEvent event) {
@@ -700,54 +720,48 @@
                 return;
             }
             if (event.type() == MASTER_CHANGED) {
-                lastUpdateTimes.put(deviceId, System.currentTimeMillis());
+                for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+                    recordUpdate(deviceId, bucket);
+                }
             }
-            backupSenderExecutor.schedule(this::backup, 0, TimeUnit.SECONDS);
+            backupSenderExecutor.execute(this::backup);
         }
 
-        private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
-            // split up the devices into smaller batches and send them separately.
-            Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
-                     .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
-        }
-
-        private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
-            if (deviceIds.isEmpty()) {
-                return;
-            }
-            log.debug("Sending flowEntries for devices {} to {} for backup.", deviceIds, nodeId);
-            Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
-                    deviceFlowEntries = Maps.newConcurrentMap();
-            deviceIds.forEach(id -> {
-                Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = getFlowTableCopy(id);
-                int flowCount = copy.entrySet().stream()
-                        .mapToInt(e -> e.getValue().values().size()).sum();
-                flowCounts.put(id, flowCount);
-                deviceFlowEntries.put(id, copy);
+        private CompletableFuture<Void> backupFlowEntries(
+            NodeId nodeId, DeviceId deviceId, int bucket, long timestamp) {
+            log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.", bucket, deviceId, nodeId);
+            FlowBucket flowBucket = getFlowBucket(deviceId, bucket);
+            int flowCount = flowBucket.table.entrySet().stream()
+                .mapToInt(e -> e.getValue().values().size()).sum();
+            flowCounts.compute(deviceId, (id, counts) -> {
+                if (counts == null) {
+                    counts = Maps.newConcurrentMap();
+                }
+                counts.put(bucket, flowCount);
+                return counts;
             });
-            clusterCommunicator.<Map<DeviceId,
-                                 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>,
-                                 Set<DeviceId>>
-                    sendAndReceive(deviceFlowEntries,
+
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            clusterCommunicator.<FlowBucket, Set<FlowId>>
+                    sendAndReceive(flowBucket,
                                    FLOW_TABLE_BACKUP,
                                    serializer::encode,
                                    serializer::decode,
                                    nodeId)
-                    .whenComplete((backedupDevices, error) -> {
-                        Set<DeviceId> devicesNotBackedup = error != null ?
-                            deviceFlowEntries.keySet() :
-                            Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
-                        if (devicesNotBackedup.size() > 0) {
-                            log.warn("Failed to backup devices: {}. Reason: {}, Node: {}",
-                                     devicesNotBackedup, error != null ? error.getMessage() : "none",
-                                     nodeId);
+                    .whenComplete((backedupFlows, error) -> {
+                        Set<FlowId> flowsNotBackedUp = error != null ?
+                            flowBucket.table.keySet() :
+                            Sets.difference(flowBucket.table.keySet(), backedupFlows);
+                        if (flowsNotBackedUp.size() > 0) {
+                            log.warn("Failed to backup flows: {}. Reason: {}, Node: {}",
+                                     flowsNotBackedUp, error != null ? error.getMessage() : "none", nodeId);
                         }
-                        if (backedupDevices != null) {
-                            backedupDevices.forEach(id -> {
-                                lastBackupTimes.put(new BackupOperation(nodeId, id), System.currentTimeMillis());
-                            });
+                        if (backedupFlows != null) {
+                            lastBackupTimes.put(new BackupOperation(nodeId, deviceId, bucket), timestamp);
                         }
+                        future.complete(null);
                     });
+            return future;
         }
 
         /**
@@ -783,34 +797,27 @@
             }
         }
 
-        private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTableCopy(DeviceId deviceId) {
-            Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
+        private FlowBucket getFlowBucket(DeviceId deviceId, int bucket) {
             if (persistenceEnabled) {
-                return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
-                        .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
+                return new FlowBucket(deviceId, bucket, flowEntries.computeIfAbsent(deviceId, id ->
+                    persistenceService.<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
                         .withName("FlowTable:" + deviceId.toString())
-                        .withSerializer(new Serializer() {
-                            @Override
-                            public <T> byte[] encode(T object) {
-                                return serializer.encode(object);
-                            }
-
-                            @Override
-                            public <T> T decode(byte[] bytes) {
-                                return serializer.decode(bytes);
-                            }
-
-                            @Override
-                            public <T> T copy(T object) {
-                                return serializer.copy(object);
-                            }
-                        })
-                        .build());
+                        .withSerializer(serializer)
+                        .build())
+                    .entrySet()
+                    .stream()
+                    .filter(entry -> isBucket(entry.getKey(), bucket))
+                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
             } else {
-                flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()).forEach((k, v) -> {
-                    copy.put(k, Maps.newHashMap(v));
-                });
-                return copy;
+                Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
+                flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap())
+                    .entrySet()
+                    .stream()
+                    .filter(entry -> isBucket(entry.getKey(), bucket))
+                    .forEach(entry -> {
+                        copy.put(entry.getKey(), Maps.newHashMap(entry.getValue()));
+                    });
+                return new FlowBucket(deviceId, bucket, copy);
             }
         }
 
@@ -836,12 +843,25 @@
             return getFlowEntriesInternal(deviceId);
         }
 
+        private boolean isBucket(FlowId flowId, int bucket) {
+            return bucket(flowId) == bucket;
+        }
+
+        private int bucket(FlowId flowId) {
+            return (int) (flowId.id() % NUM_BUCKETS);
+        }
+
+        private void recordUpdate(DeviceId deviceId, int bucket) {
+            lastUpdateTimes.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap())
+                .put(bucket, System.currentTimeMillis());
+        }
+
         public void add(FlowEntry rule) {
             getFlowEntriesInternal(rule.deviceId(), rule.id())
                     .compute((StoredFlowEntry) rule, (k, stored) -> {
                         return (StoredFlowEntry) rule;
                     });
-            lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+            recordUpdate(rule.deviceId(), bucket(rule.id()));
         }
 
         public void update(FlowEntry rule) {
@@ -852,7 +872,7 @@
                         if (stored instanceof DefaultFlowEntry) {
                             DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
                             if (updated.created() >= storedEntry.created()) {
-                                lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+                                recordUpdate(rule.deviceId(), bucket(rule.id()));
                                 return updated;
                             } else {
                                 log.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
@@ -887,7 +907,7 @@
             });
 
             if (removedRule.get() != null) {
-                lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+                recordUpdate(rule.deviceId(), bucket(rule.id()));
                 return removedRule.get();
             } else {
                 return null;
@@ -902,53 +922,79 @@
             flowEntries.clear();
         }
 
-        private List<NodeId> getBackupNodes(DeviceId deviceId) {
-            // The returned backup node list is in the order of preference i.e. next likely master first.
+        private boolean isMasterNode(DeviceId deviceId) {
+            NodeId master = replicaInfoManager.getReplicaInfoFor(deviceId).master().orElse(null);
+            return Objects.equals(master, clusterService.getLocalNode().id());
+        }
+
+        private boolean isBackupNode(NodeId nodeId, DeviceId deviceId) {
             List<NodeId> allPossibleBackupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
-            return ImmutableList.copyOf(allPossibleBackupNodes)
-                                .subList(0, Math.min(allPossibleBackupNodes.size(), backupCount));
+            return allPossibleBackupNodes.indexOf(nodeId) < backupCount;
         }
 
         private void backup() {
-            try {
-                // compute a mapping from node to the set of devices whose flow entries it should backup
-                Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
-                flowEntries.keySet().forEach(deviceId -> {
-                    List<NodeId> backupNodes = getBackupNodes(deviceId);
-                    backupNodes.forEach(backupNode -> {
-                            if (lastBackupTimes.getOrDefault(new BackupOperation(backupNode, deviceId), 0L)
-                                    < lastUpdateTimes.getOrDefault(deviceId, 0L)) {
-                                devicesToBackupByNode.computeIfAbsent(backupNode,
-                                                                      nodeId -> Sets.newHashSet()).add(deviceId);
-                            }
-                    });
+            clusterService.getNodes().stream()
+                .filter(node -> !node.id().equals(clusterService.getLocalNode().id()))
+                .forEach(node -> {
+                    try {
+                        backup(node.id());
+                    } catch (Exception e) {
+                        log.error("Backup failed.", e);
+                    }
                 });
-                // send the device flow entries to their respective backup nodes
-                devicesToBackupByNode.forEach(this::sendBackups);
-            } catch (Exception e) {
-                log.error("Backup failed.", e);
+        }
+
+        private void backup(NodeId nodeId) {
+            for (DeviceId deviceId : flowEntries.keySet()) {
+                if (isMasterNode(deviceId) && isBackupNode(nodeId, deviceId)) {
+                    backup(nodeId, deviceId);
+                }
             }
         }
 
-        private Set<DeviceId> onBackupReceipt(Map<DeviceId,
-                Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> flowTables) {
-            log.debug("Received flowEntries for {} to backup", flowTables.keySet());
-            Set<DeviceId> backedupDevices = Sets.newHashSet();
+        private void backup(NodeId nodeId, DeviceId deviceId) {
+            final long timestamp = System.currentTimeMillis();
+            for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+                long lastBackupTime = lastBackupTimes.getOrDefault(new BackupOperation(nodeId, deviceId, bucket), 0L);
+                long lastUpdateTime = lastUpdateTimes.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap())
+                    .getOrDefault(bucket, 0L);
+                if (lastBackupTime < lastUpdateTime && startBackup(nodeId, deviceId)) {
+                    backupFlowEntries(nodeId, deviceId, bucket, timestamp)
+                        .thenRunAsync(() -> {
+                            finishBackup(nodeId, deviceId);
+                            backup(nodeId);
+                        }, backupSenderExecutor);
+                    return;
+                }
+            }
+        }
+
+        private boolean startBackup(NodeId nodeId, DeviceId deviceId) {
+            return inFlightUpdates.computeIfAbsent(nodeId, id -> Sets.newConcurrentHashSet()).add(deviceId);
+        }
+
+        private void finishBackup(NodeId nodeId, DeviceId deviceId) {
+            inFlightUpdates.computeIfAbsent(nodeId, id -> Sets.newConcurrentHashSet()).remove(deviceId);
+        }
+
+        private Set<FlowId> onBackupReceipt(FlowBucket bucket) {
+            log.debug("Received flowEntries for {} bucket {} to backup", bucket.deviceId, bucket.bucket);
+            Set<FlowId> backedupFlows = Sets.newHashSet();
             try {
-                flowTables.forEach((deviceId, deviceFlowTable) -> {
-                    // Only process those devices are that not managed by the local node.
-                    if (!Objects.equals(local, mastershipService.getMasterFor(deviceId))) {
-                        Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable =
-                                getFlowTable(deviceId);
-                        backupFlowTable.clear();
-                        backupFlowTable.putAll(deviceFlowTable);
-                        backedupDevices.add(deviceId);
-                    }
-                });
+                // Only process those devices are that not managed by the local node.
+                NodeId master = replicaInfoManager.getReplicaInfoFor(bucket.deviceId).master().orElse(null);
+                if (!Objects.equals(local, master)) {
+                    Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable = getFlowTable(bucket.deviceId);
+                    backupFlowTable.putAll(bucket.table);
+                    backupFlowTable.entrySet()
+                        .removeIf(entry -> isBucket(entry.getKey(), bucket.bucket)
+                            && !bucket.table.containsKey(entry.getKey()));
+                    backedupFlows.addAll(bucket.table.keySet());
+                }
             } catch (Exception e) {
                 log.warn("Failure processing backup request", e);
             }
-            return backedupDevices;
+            return backedupFlows;
         }
     }