ONOS-2322: Support for periodic purging of ECMap tombstones
Change-Id: I6fe5475a472c383c4a51bd61446fba8f1dba1d37
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyAdvertisement.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyAdvertisement.java
index 276eb4e..3914ffd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyAdvertisement.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyAdvertisement.java
@@ -17,10 +17,10 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
-
import org.onosproject.cluster.NodeId;
import java.util.Map;
+
import static com.google.common.base.Preconditions.checkNotNull;
/**
@@ -28,6 +28,7 @@
*/
public class AntiEntropyAdvertisement<K> {
+ private final long creationTime;
private final NodeId sender;
private final Map<K, MapValue.Digest> digest;
@@ -39,11 +40,21 @@
*/
public AntiEntropyAdvertisement(NodeId sender,
Map<K, MapValue.Digest> digest) {
+ this.creationTime = System.currentTimeMillis();
this.sender = checkNotNull(sender);
this.digest = ImmutableMap.copyOf(checkNotNull(digest));
}
/**
+ * Returns the ad creation time.
+ *
+ * @return ad creation time
+ */
+ public long creationTime() {
+ return creationTime;
+ }
+
+ /**
* Returns the sender's node ID.
*
* @return the sender's node ID
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyResponse.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyResponse.java
new file mode 100644
index 0000000..f6a03b5
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyResponse.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+/**
+ * Status of anti-entropy exchange, returned by the receiver.
+ *
+ */
+public enum AntiEntropyResponse {
+ /**
+ * Signifies a successfully processed anti-entropy message.
+ */
+ PROCESSED,
+
+ /**
+ * Signifies a unexpected failure during anti-entropy message processing.
+ */
+ FAILED,
+
+ /**
+ * Signifies a ignored anti-entropy message, potentially due to the receiver operating under high load.
+ */
+ IGNORED
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 9bd9c50..205a24b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -102,6 +102,9 @@
private final ExecutorService communicationExecutor;
private final Map<NodeId, EventAccumulator> senderPending;
+ private long previousTombstonePurgeTime;
+ private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
+
private final String mapName;
private volatile boolean destroyed = false;
@@ -250,8 +253,15 @@
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
serializer::decode,
this::handleAntiEntropyAdvertisement,
+ serializer::encode,
this.backgroundExecutor);
+ previousTombstonePurgeTime = 0;
+ this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
+ initialDelaySec,
+ antiEntropyPeriod,
+ TimeUnit.SECONDS);
+
this.tombstonesDisabled = tombstonesDisabled;
this.lightweightAntiEntropy = !convergeFaster;
}
@@ -267,6 +277,7 @@
.register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
.register(AntiEntropyAdvertisement.class)
+ .register(AntiEntropyResponse.class)
.register(UpdateEntry.class)
.register(MapValue.class)
.register(MapValue.Digest.class)
@@ -563,13 +574,17 @@
}
private void sendAdvertisementToPeer(NodeId peer) {
- clusterCommunicator.unicast(createAdvertisement(),
+ AntiEntropyAdvertisement<K> ad = createAdvertisement();
+ clusterCommunicator.sendAndReceive(ad,
antiEntropyAdvertisementSubject,
serializer::encode,
+ serializer::decode,
peer)
.whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
+ } else if (result == AntiEntropyResponse.PROCESSED) {
+ antiEntropyTimes.put(peer, ad.creationTime());
}
});
}
@@ -579,9 +594,9 @@
ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
}
- private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
+ private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
if (destroyed || underHighLoad()) {
- return;
+ return AntiEntropyResponse.IGNORED;
}
try {
if (log.isTraceEnabled()) {
@@ -600,7 +615,9 @@
}
} catch (Exception e) {
log.warn("Error handling anti-entropy advertisement", e);
+ return AntiEntropyResponse.FAILED;
}
+ return AntiEntropyResponse.PROCESSED;
}
/**
@@ -634,13 +651,37 @@
return externalEvents;
}
+ private void purgeTombstones() {
+ /*
+ * In order to mitigate the resource exhausation that can ensue due to an ever-growing set
+ * of tombstones we employ the following heuristic to purge old tombstones periodically.
+ * First, we keep track of the time (local system time) when we were able to have a successful
+ * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
+ * as the time before which all tombstones are considered safe to purge.
+ */
+ if (tombstonesDisabled || antiEntropyTimes.size() != clusterService.getNodes().size() - 1) {
+ return;
+ }
+ long currentSafeTombstonePurgeTime = antiEntropyTimes.values().stream().reduce(Math::min).orElse(0L);
+ if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
+ return;
+ }
+ List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
+ .stream()
+ .filter(e -> e.getValue().isTombstone())
+ .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
+ .collect(Collectors.toList());
+ previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
+ tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
+ }
+
private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
if (destroyed) {
return;
}
updates.forEach(update -> {
final K key = update.key();
- final MapValue<V> value = update.value();
+ final MapValue<V> value = update.value() == null ? null : update.value().copy();
if (value == null || value.isTombstone()) {
MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
if (previousValue != null && previousValue.isAlive()) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MapValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MapValue.java
index 8e13d03..457caeb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MapValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MapValue.java
@@ -30,6 +30,7 @@
public class MapValue<V> implements Comparable<MapValue<V>> {
private final Timestamp timestamp;
private final V value;
+ private long creationTime;
/**
* Creates a tombstone value with the specified timestamp.
@@ -39,12 +40,35 @@
* @param <U> value type
*/
public static <U> MapValue<U> tombstone(Timestamp timestamp) {
- return new MapValue<>(null, timestamp);
+ return new MapValue<>(null, timestamp, System.currentTimeMillis());
}
public MapValue(V value, Timestamp timestamp) {
+ this(value, timestamp, System.currentTimeMillis());
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param value value
+ * @param timestamp value timestamp.
+ * @param creationTime the system time (on local instance) of construction
+ */
+ public MapValue(V value, Timestamp timestamp, long creationTime) {
this.value = value;
this.timestamp = checkNotNull(timestamp, "Timestamp cannot be null");
+ this.creationTime = creationTime;
+ }
+
+ /**
+ * Creates a copy of MapValue.
+ * <p>
+ * The copy will have an updated creation time corresponding to when the copy was constructed.
+ *
+ * @return MapValue copy
+ */
+ public MapValue<V> copy() {
+ return new MapValue<>(this.value, this.timestamp, System.currentTimeMillis());
}
public boolean isTombstone() {
@@ -63,6 +87,10 @@
return value;
}
+ public long creationTime() {
+ return creationTime;
+ }
+
@Override
public int compareTo(MapValue<V> o) {
return this.timestamp.compareTo(o.timestamp);