Rely on anti-entropy protocol to resolve missing flows after mastership change in ECFlowRuleStore

Change-Id: Ia4cf91d8441e2ed8d539dc29b22cbbb7e49dd66b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index fed6643..2919f08 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -97,7 +98,6 @@
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_ANTI_ENTROPY;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
@@ -872,6 +872,8 @@
         private final Map<BucketId, Long> lastUpdateTimes = Maps.newConcurrentMap();
         private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
 
+        private final AtomicLong currentTimestamp = new AtomicLong();
+
         @Override
         public void event(ReplicaInfoEvent event) {
             eventHandler.execute(() -> handleEvent(event));
@@ -893,15 +895,7 @@
                 }
                 return;
             }
-
-            // If this is a master changed event, record update timestamps for the device to ensure tables are
-            // replicated to backups.
-            if (event.type() == MASTER_CHANGED) {
-                for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
-                    recordUpdate(new BucketId(deviceId, bucket));
-                }
-            }
-            backupSenderExecutor.execute(this::backup);
+            backupSenderExecutor.execute(this::runAntiEntropy);
         }
 
         /**
@@ -997,7 +991,11 @@
         }
 
         private void recordUpdate(BucketId bucketId) {
-            lastUpdateTimes.put(bucketId, System.currentTimeMillis());
+            recordUpdate(bucketId, currentTimestamp.accumulateAndGet(System.currentTimeMillis(), Math::max));
+        }
+
+        private void recordUpdate(BucketId bucketId, long timestamp) {
+            lastUpdateTimes.put(bucketId, timestamp);
         }
 
         public void add(FlowEntry rule) {
@@ -1246,7 +1244,7 @@
                         .removeIf(entry -> isInBucket(entry.getKey(), flowBucket.bucketId().bucket())
                             && !flowBucket.table().containsKey(entry.getKey()));
                     backedupFlows.addAll(flowBucket.table().keySet());
-                    lastUpdateTimes.put(flowBucket.bucketId(), flowBucket.timestamp());
+                    recordUpdate(flowBucket.bucketId(), flowBucket.timestamp());
                 }
             } catch (Exception e) {
                 log.warn("Failure processing backup request", e);