ONOS-2322: Support for periodic purging of ECMap tombstones
Change-Id: I6fe5475a472c383c4a51bd61446fba8f1dba1d37
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 9bd9c50..205a24b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -102,6 +102,9 @@
private final ExecutorService communicationExecutor;
private final Map<NodeId, EventAccumulator> senderPending;
+ private long previousTombstonePurgeTime;
+ private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
+
private final String mapName;
private volatile boolean destroyed = false;
@@ -250,8 +253,15 @@
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
serializer::decode,
this::handleAntiEntropyAdvertisement,
+ serializer::encode,
this.backgroundExecutor);
+ previousTombstonePurgeTime = 0;
+ this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
+ initialDelaySec,
+ antiEntropyPeriod,
+ TimeUnit.SECONDS);
+
this.tombstonesDisabled = tombstonesDisabled;
this.lightweightAntiEntropy = !convergeFaster;
}
@@ -267,6 +277,7 @@
.register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
.register(AntiEntropyAdvertisement.class)
+ .register(AntiEntropyResponse.class)
.register(UpdateEntry.class)
.register(MapValue.class)
.register(MapValue.Digest.class)
@@ -563,13 +574,17 @@
}
private void sendAdvertisementToPeer(NodeId peer) {
- clusterCommunicator.unicast(createAdvertisement(),
+ AntiEntropyAdvertisement<K> ad = createAdvertisement();
+ clusterCommunicator.sendAndReceive(ad,
antiEntropyAdvertisementSubject,
serializer::encode,
+ serializer::decode,
peer)
.whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
+ } else if (result == AntiEntropyResponse.PROCESSED) {
+ antiEntropyTimes.put(peer, ad.creationTime());
}
});
}
@@ -579,9 +594,9 @@
ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
}
- private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
+ private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
if (destroyed || underHighLoad()) {
- return;
+ return AntiEntropyResponse.IGNORED;
}
try {
if (log.isTraceEnabled()) {
@@ -600,7 +615,9 @@
}
} catch (Exception e) {
log.warn("Error handling anti-entropy advertisement", e);
+ return AntiEntropyResponse.FAILED;
}
+ return AntiEntropyResponse.PROCESSED;
}
/**
@@ -634,13 +651,37 @@
return externalEvents;
}
+ private void purgeTombstones() {
+ /*
+ * In order to mitigate the resource exhausation that can ensue due to an ever-growing set
+ * of tombstones we employ the following heuristic to purge old tombstones periodically.
+ * First, we keep track of the time (local system time) when we were able to have a successful
+ * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
+ * as the time before which all tombstones are considered safe to purge.
+ */
+ if (tombstonesDisabled || antiEntropyTimes.size() != clusterService.getNodes().size() - 1) {
+ return;
+ }
+ long currentSafeTombstonePurgeTime = antiEntropyTimes.values().stream().reduce(Math::min).orElse(0L);
+ if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
+ return;
+ }
+ List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
+ .stream()
+ .filter(e -> e.getValue().isTombstone())
+ .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
+ .collect(Collectors.toList());
+ previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
+ tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
+ }
+
private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
if (destroyed) {
return;
}
updates.forEach(update -> {
final K key = update.key();
- final MapValue<V> value = update.value();
+ final MapValue<V> value = update.value() == null ? null : update.value().copy();
if (value == null || value.isTombstone()) {
MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
if (previousValue != null && previousValue.isAlive()) {