Implement anti-entropy protocol for detecting and resolving missing flows in the ECFlowRuleStore

Change-Id: I90e1243a40b90328b9e4fdbeb36f830c20f18b5c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index b89bd58..9dc2f60 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -30,8 +30,10 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Streams;
@@ -97,6 +99,7 @@
 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
+import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_ANTI_ENTROPY;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
@@ -119,6 +122,7 @@
     private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
     private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
     private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
+    private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
     private static final int NUM_BUCKETS = 1024;
 
@@ -129,6 +133,11 @@
     @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
             label = "Delay in ms between successive backup runs")
     private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
+
+    @Property(name = "antiEntropyPeriod", intValue = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS,
+            label = "Delay in ms between anti-entropy runs")
+    private int antiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
+
     @Property(name = "persistenceEnabled", boolValue = false,
             label = "Indicates whether or not changes in the flow table should be persisted to disk.")
     private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
@@ -168,6 +177,7 @@
     private ExecutorService eventHandler;
 
     private ScheduledFuture<?> backupTask;
+    private ScheduledFuture<?> antiEntropyTask;
     private final ScheduledExecutorService backupSenderExecutor =
             Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
 
@@ -215,6 +225,11 @@
                 0,
                 backupPeriod,
                 TimeUnit.MILLISECONDS);
+        antiEntropyTask = backupSenderExecutor.scheduleWithFixedDelay(
+                flowTable::runAntiEntropy,
+                0,
+                antiEntropyPeriod,
+                TimeUnit.MILLISECONDS);
 
         flowCounts = storageService.<BucketId, Integer>eventuallyConsistentMapBuilder()
                 .withName("onos-flow-counts")
@@ -262,6 +277,7 @@
         int newPoolSize;
         int newBackupPeriod;
         int newBackupCount;
+        int newAntiEntropyPeriod;
         try {
             String s = get(properties, "msgHandlerPoolSize");
             newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
@@ -271,18 +287,29 @@
 
             s = get(properties, "backupCount");
             newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
+
+            s = get(properties, "antiEntropyPeriod");
+            newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
         } catch (NumberFormatException | ClassCastException e) {
             newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
             newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
             newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
+            newAntiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
         }
 
         boolean restartBackupTask = false;
+        boolean restartAntiEntropyTask = false;
 
         if (newBackupPeriod != backupPeriod) {
             backupPeriod = newBackupPeriod;
             restartBackupTask = true;
         }
+
+        if (newAntiEntropyPeriod != antiEntropyPeriod) {
+            antiEntropyPeriod = newAntiEntropyPeriod;
+            restartAntiEntropyTask = true;
+        }
+
         if (restartBackupTask) {
             if (backupTask != null) {
                 // cancel previously running task
@@ -294,6 +321,19 @@
                     backupPeriod,
                     TimeUnit.MILLISECONDS);
         }
+
+        if (restartAntiEntropyTask) {
+            if (antiEntropyTask != null) {
+                // cancel previously running task
+                antiEntropyTask.cancel(false);
+            }
+            antiEntropyTask = backupSenderExecutor.scheduleWithFixedDelay(
+                    flowTable::runAntiEntropy,
+                    0,
+                    antiEntropyPeriod,
+                    TimeUnit.MILLISECONDS);
+        }
+
         if (newPoolSize != msgHandlerPoolSize) {
             msgHandlerPoolSize = newPoolSize;
             ExecutorService oldMsgHandler = messageHandlingExecutor;
@@ -304,6 +344,7 @@
             registerMessageHandlers(messageHandlingExecutor);
             oldMsgHandler.shutdown();
         }
+
         if (backupCount != newBackupCount) {
             backupCount = newBackupCount;
         }
@@ -323,6 +364,8 @@
                 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
                 FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackup, serializer::encode, executor);
+        clusterCommunicator.addSubscriber(
+                FLOW_TABLE_ANTI_ENTROPY, serializer::decode, flowTable::onAntiEntropy, serializer::encode, executor);
     }
 
     private void unregisterMessageHandlers() {
@@ -332,6 +375,7 @@
         clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
         clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
         clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
+        clusterCommunicator.removeSubscriber(FLOW_TABLE_ANTI_ENTROPY);
     }
 
     private void logConfig(String prefix) {
@@ -733,6 +777,7 @@
     private class FlowBucket {
         private final BucketId bucketId;
         private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
+        private final long timestamp;
 
         BucketId bucketId() {
             return bucketId;
@@ -742,9 +787,78 @@
             return table;
         }
 
-        FlowBucket(BucketId bucketId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table) {
+        long timestamp() {
+            return timestamp;
+        }
+
+        FlowBucket(BucketId bucketId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table, long timestamp) {
             this.bucketId = bucketId;
             this.table = table;
+            this.timestamp = timestamp;
+        }
+    }
+
+    /**
+     * Device digest.
+     */
+    private class DeviceDigest {
+        private final DeviceId deviceId;
+        private final Set<FlowBucketDigest> digests;
+
+        DeviceDigest(DeviceId deviceId, Set<FlowBucketDigest> digests) {
+            this.deviceId = deviceId;
+            this.digests = digests;
+        }
+
+        DeviceId deviceId() {
+            return deviceId;
+        }
+
+        Set<FlowBucketDigest> digests() {
+            return digests;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(deviceId, digests);
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            return object instanceof DeviceDigest
+                && ((DeviceDigest) object).deviceId.equals(deviceId);
+        }
+    }
+
+    /**
+     * Flow bucket digest.
+     */
+    private class FlowBucketDigest {
+        private final BucketId bucketId;
+        private final long timestamp;
+
+        FlowBucketDigest(BucketId bucketId, long timestamp) {
+            this.bucketId = bucketId;
+            this.timestamp = timestamp;
+        }
+
+        BucketId bucketId() {
+            return bucketId;
+        }
+
+        long timestamp() {
+            return timestamp;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(bucketId);
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            return object instanceof FlowBucketDigest
+                && ((FlowBucketDigest) object).bucketId.equals(bucketId);
         }
     }
 
@@ -777,6 +891,30 @@
         }
 
         /**
+         * Returns the set of devices in the flow table.
+         *
+         * @return the set of devices in the flow table
+         */
+        private Set<DeviceId> getDevices() {
+            return flowEntries.keySet();
+        }
+
+        /**
+         * Returns the digests for all buckets in the flow table for the given device.
+         *
+         * @param deviceId the device for which to return digests
+         * @return the set of digests for all buckets for the given device
+         */
+        private Set<FlowBucketDigest> getDigests(DeviceId deviceId) {
+            return IntStream.range(0, NUM_BUCKETS)
+                .mapToObj(bucket -> {
+                    BucketId bucketId = new BucketId(deviceId, bucket);
+                    long timestamp = lastUpdateTimes.getOrDefault(bucketId, 0L);
+                    return new FlowBucketDigest(bucketId, timestamp);
+                }).collect(Collectors.toSet());
+        }
+
+        /**
          * Returns the flow table for specified device.
          *
          * @param deviceId identifier of the device
@@ -801,11 +939,13 @@
         }
 
         private FlowBucket getFlowBucket(BucketId bucketId) {
+            long timestamp = lastUpdateTimes.getOrDefault(bucketId, 0L);
             return new FlowBucket(bucketId, getFlowTable(bucketId.deviceId())
                 .entrySet()
                 .stream()
                 .filter(entry -> isInBucket(entry.getKey(), bucketId.bucket()))
-                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
+                timestamp);
         }
 
         private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
@@ -927,7 +1067,7 @@
          * Backs up all devices to all backup nodes.
          */
         private void backup() {
-            for (DeviceId deviceId : flowEntries.keySet()) {
+            for (DeviceId deviceId : getDevices()) {
                 backup(deviceId);
             }
         }
@@ -1036,8 +1176,8 @@
             FlowBucket flowBucket = getFlowBucket(operation.bucketId());
 
             CompletableFuture<Boolean> future = new CompletableFuture<>();
-            clusterCommunicator.<FlowBucket, Set<FlowId>>
-                sendAndReceive(flowBucket,
+            clusterCommunicator.<FlowBucket, Set<FlowId>>sendAndReceive(
+                flowBucket,
                 FLOW_TABLE_BACKUP,
                 serializer::encode,
                 serializer::decode,
@@ -1080,6 +1220,7 @@
                         .removeIf(entry -> isInBucket(entry.getKey(), flowBucket.bucketId().bucket())
                             && !flowBucket.table().containsKey(entry.getKey()));
                     backedupFlows.addAll(flowBucket.table().keySet());
+                    lastUpdateTimes.put(flowBucket.bucketId(), flowBucket.timestamp());
                 }
             } catch (Exception e) {
                 log.warn("Failure processing backup request", e);
@@ -1088,6 +1229,97 @@
         }
 
         /**
+         * Runs the anti-entropy protocol.
+         */
+        private void runAntiEntropy() {
+            for (DeviceId deviceId : getDevices()) {
+                runAntiEntropy(deviceId);
+            }
+        }
+
+        /**
+         * Runs the anti-entropy protocol for the given device.
+         *
+         * @param deviceId the device for which to run the anti-entropy protocol
+         */
+        private void runAntiEntropy(DeviceId deviceId) {
+            if (!isMasterNode(deviceId)) {
+                return;
+            }
+
+            // Get the set of digests for the node.
+            Set<FlowBucketDigest> digests = getDigests(deviceId);
+
+            // Get a list of backup nodes for the device and compute the real backup count.
+            List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
+            int availableBackupCount = Math.min(backupCount, backupNodes.size());
+
+            // Iterate through backup nodes and run the anti-entropy protocol.
+            for (int index = 0; index < availableBackupCount; index++) {
+                NodeId backupNode = backupNodes.get(index);
+                try {
+                    runAntiEntropy(deviceId, backupNode, digests);
+                } catch (Exception e) {
+                    log.error("Anti-entropy for " + deviceId + " to " + backupNode + " failed", e);
+                }
+            }
+        }
+
+        /**
+         * Sends an anti-entropy advertisement to the given node.
+         *
+         * @param deviceId the device ID for which to send the advertisement
+         * @param nodeId the node to which to send the advertisement
+         * @param digests the digests to send to the given node
+         */
+        private void runAntiEntropy(DeviceId deviceId, NodeId nodeId, Set<FlowBucketDigest> digests) {
+            log.trace("Sending anti-entropy advertisement for device {} to {}", deviceId, nodeId);
+            clusterCommunicator.<Set<FlowBucketDigest>, Set<BucketId>>sendAndReceive(
+                digests,
+                FLOW_TABLE_ANTI_ENTROPY,
+                serializer::encode,
+                serializer::decode,
+                nodeId)
+                .whenComplete((missingBuckets, error) -> {
+                    if (error == null) {
+                        log.debug("Detected {} missing buckets on node {} for device {}",
+                            missingBuckets.size(), nodeId, deviceId);
+                    } else {
+                        log.trace("Anti-entropy advertisement for device {} to {} failed", deviceId, nodeId, error);
+                    }
+                });
+        }
+
+        /**
+         * Handles a device anti-entropy request from a remote peer.
+         *
+         * @param digest the device digest
+         * @return the set of flow buckets to update
+         */
+        private Set<BucketId> onAntiEntropy(DeviceDigest digest) {
+            // If the local node is the master, reject the anti-entropy request.
+            // TODO: We really should be using mastership terms in anti-entropy requests to determine whether
+            // this node is a newer master, but that would only reduce the time it takes to resolve missing flows
+            // as a later anti-entropy request will still succeed once this node recognizes it's no longer the master.
+            NodeId master = replicaInfoManager.getReplicaInfoFor(digest.deviceId())
+                .master()
+                .orElse(null);
+            if (Objects.equals(master, local)) {
+                return ImmutableSet.of();
+            }
+
+            // Compute a set of missing BucketIds based on digest times and send them back to the master.
+            Set<BucketId> missingBuckets = new HashSet<>();
+            for (FlowBucketDigest flowBucketDigest : digest.digests()) {
+                long lastUpdated = lastUpdateTimes.getOrDefault(flowBucketDigest.bucketId(), 0L);
+                if (lastUpdated < flowBucketDigest.timestamp()) {
+                    missingBuckets.add(flowBucketDigest.bucketId());
+                }
+            }
+            return missingBuckets;
+        }
+
+        /**
          * Updates all flow counts for the given device.
          *
          * @param deviceId the device for which to update flow counts
@@ -1115,8 +1347,7 @@
     }
 
     @Override
-    public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
-                                               List<TableStatisticsEntry> tableStats) {
+    public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
         deviceTableStats.put(deviceId, tableStats);
         return null;
     }
@@ -1152,4 +1383,4 @@
             //TODO: Generate an event to listeners (do we need?)
         }
     }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index fd4f6d7..254f5df 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -40,4 +40,7 @@
 
     public static final MessageSubject FLOW_TABLE_BACKUP
         = new MessageSubject("peer-flow-table-backup");
+
+    public static final MessageSubject FLOW_TABLE_ANTI_ENTROPY
+        = new MessageSubject("peer-flow-table-anti-entropy");
 }