Ensure tombsone purge logic works correctly after a cluster scale down
Change-Id: I94a4c234982a9e8f44af5078b3cbcee13e4b93cb
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 205a24b..f4bb0d0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -256,11 +256,13 @@
serializer::encode,
this.backgroundExecutor);
- previousTombstonePurgeTime = 0;
- this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
- initialDelaySec,
- antiEntropyPeriod,
- TimeUnit.SECONDS);
+ if (!tombstonesDisabled) {
+ previousTombstonePurgeTime = 0;
+ this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
+ initialDelaySec,
+ antiEntropyPeriod,
+ TimeUnit.SECONDS);
+ }
this.tombstonesDisabled = tombstonesDisabled;
this.lightweightAntiEntropy = !convergeFaster;
@@ -659,10 +661,13 @@
* AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
* as the time before which all tombstones are considered safe to purge.
*/
- if (tombstonesDisabled || antiEntropyTimes.size() != clusterService.getNodes().size() - 1) {
- return;
- }
- long currentSafeTombstonePurgeTime = antiEntropyTimes.values().stream().reduce(Math::min).orElse(0L);
+ long currentSafeTombstonePurgeTime = clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .filter(id -> !id.equals(localNodeId))
+ .map(id -> antiEntropyTimes.getOrDefault(id, 0L))
+ .reduce(Math::min)
+ .orElse(0L);
if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
return;
}