Rely on anti-entropy protocol to resolve missing flows after mastership change in ECFlowRuleStore
Change-Id: Ia4cf91d8441e2ed8d539dc29b22cbbb7e49dd66b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index fed6643..2919f08 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -28,6 +28,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -97,7 +98,6 @@
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_ANTI_ENTROPY;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
@@ -872,6 +872,8 @@
private final Map<BucketId, Long> lastUpdateTimes = Maps.newConcurrentMap();
private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
+ private final AtomicLong currentTimestamp = new AtomicLong();
+
@Override
public void event(ReplicaInfoEvent event) {
eventHandler.execute(() -> handleEvent(event));
@@ -893,15 +895,7 @@
}
return;
}
-
- // If this is a master changed event, record update timestamps for the device to ensure tables are
- // replicated to backups.
- if (event.type() == MASTER_CHANGED) {
- for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
- recordUpdate(new BucketId(deviceId, bucket));
- }
- }
- backupSenderExecutor.execute(this::backup);
+ backupSenderExecutor.execute(this::runAntiEntropy);
}
/**
@@ -997,7 +991,11 @@
}
private void recordUpdate(BucketId bucketId) {
- lastUpdateTimes.put(bucketId, System.currentTimeMillis());
+ recordUpdate(bucketId, currentTimestamp.accumulateAndGet(System.currentTimeMillis(), Math::max));
+ }
+
+ private void recordUpdate(BucketId bucketId, long timestamp) {
+ lastUpdateTimes.put(bucketId, timestamp);
}
public void add(FlowEntry rule) {
@@ -1246,7 +1244,7 @@
.removeIf(entry -> isInBucket(entry.getKey(), flowBucket.bucketId().bucket())
&& !flowBucket.table().containsKey(entry.getKey()));
backedupFlows.addAll(flowBucket.table().keySet());
- lastUpdateTimes.put(flowBucket.bucketId(), flowBucket.timestamp());
+ recordUpdate(flowBucket.bucketId(), flowBucket.timestamp());
}
} catch (Exception e) {
log.warn("Failure processing backup request", e);