ONOS-2322: Support for periodic purging of ECMap tombstones

Change-Id: I6fe5475a472c383c4a51bd61446fba8f1dba1d37
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyAdvertisement.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyAdvertisement.java
index 276eb4e..3914ffd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyAdvertisement.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyAdvertisement.java
@@ -17,10 +17,10 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
-
 import org.onosproject.cluster.NodeId;
 
 import java.util.Map;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -28,6 +28,7 @@
  */
 public class AntiEntropyAdvertisement<K> {
 
+    private final long creationTime;
     private final NodeId sender;
     private final Map<K, MapValue.Digest> digest;
 
@@ -39,11 +40,21 @@
      */
     public AntiEntropyAdvertisement(NodeId sender,
                                     Map<K, MapValue.Digest> digest) {
+        this.creationTime = System.currentTimeMillis();
         this.sender = checkNotNull(sender);
         this.digest = ImmutableMap.copyOf(checkNotNull(digest));
     }
 
     /**
+     * Returns the ad creation time.
+     *
+     * @return ad creation time
+     */
+    public long creationTime() {
+        return creationTime;
+    }
+
+    /**
      * Returns the sender's node ID.
      *
      * @return the sender's node ID
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyResponse.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyResponse.java
new file mode 100644
index 0000000..f6a03b5
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AntiEntropyResponse.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+/**
+ * Status of anti-entropy exchange, returned by the receiver.
+ *
+ */
+public enum AntiEntropyResponse {
+    /**
+     * Signifies a successfully processed anti-entropy message.
+     */
+    PROCESSED,
+
+    /**
+     * Signifies a unexpected failure during anti-entropy message processing.
+     */
+    FAILED,
+
+    /**
+     * Signifies a ignored anti-entropy message, potentially due to the receiver operating under high load.
+     */
+    IGNORED
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 9bd9c50..205a24b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -102,6 +102,9 @@
     private final ExecutorService communicationExecutor;
     private final Map<NodeId, EventAccumulator> senderPending;
 
+    private long previousTombstonePurgeTime;
+    private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
+
     private final String mapName;
 
     private volatile boolean destroyed = false;
@@ -250,8 +253,15 @@
         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
                                           serializer::decode,
                                           this::handleAntiEntropyAdvertisement,
+                                          serializer::encode,
                                           this.backgroundExecutor);
 
+        previousTombstonePurgeTime = 0;
+        this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
+                                                       initialDelaySec,
+                                                       antiEntropyPeriod,
+                                                       TimeUnit.SECONDS);
+
         this.tombstonesDisabled = tombstonesDisabled;
         this.lightweightAntiEntropy = !convergeFaster;
     }
@@ -267,6 +277,7 @@
                         .register(LogicalTimestamp.class)
                         .register(WallClockTimestamp.class)
                         .register(AntiEntropyAdvertisement.class)
+                        .register(AntiEntropyResponse.class)
                         .register(UpdateEntry.class)
                         .register(MapValue.class)
                         .register(MapValue.Digest.class)
@@ -563,13 +574,17 @@
     }
 
     private void sendAdvertisementToPeer(NodeId peer) {
-        clusterCommunicator.unicast(createAdvertisement(),
+        AntiEntropyAdvertisement<K> ad = createAdvertisement();
+        clusterCommunicator.sendAndReceive(ad,
                 antiEntropyAdvertisementSubject,
                 serializer::encode,
+                serializer::decode,
                 peer)
                 .whenComplete((result, error) -> {
                     if (error != null) {
                         log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
+                    } else if (result == AntiEntropyResponse.PROCESSED) {
+                        antiEntropyTimes.put(peer, ad.creationTime());
                     }
                 });
     }
@@ -579,9 +594,9 @@
                 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
     }
 
-    private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
+    private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
         if (destroyed || underHighLoad()) {
-            return;
+            return AntiEntropyResponse.IGNORED;
         }
         try {
             if (log.isTraceEnabled()) {
@@ -600,7 +615,9 @@
             }
         } catch (Exception e) {
             log.warn("Error handling anti-entropy advertisement", e);
+            return AntiEntropyResponse.FAILED;
         }
+        return AntiEntropyResponse.PROCESSED;
     }
 
     /**
@@ -634,13 +651,37 @@
         return externalEvents;
     }
 
+    private void purgeTombstones() {
+        /*
+         * In order to mitigate the resource exhausation that can ensue due to an ever-growing set
+         * of tombstones we employ the following heuristic to purge old tombstones periodically.
+         * First, we keep track of the time (local system time) when we were able to have a successful
+         * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
+         * as the time before which all tombstones are considered safe to purge.
+         */
+        if (tombstonesDisabled || antiEntropyTimes.size() != clusterService.getNodes().size() - 1) {
+            return;
+        }
+        long currentSafeTombstonePurgeTime = antiEntropyTimes.values().stream().reduce(Math::min).orElse(0L);
+        if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
+            return;
+        }
+        List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
+                                          .stream()
+                                          .filter(e -> e.getValue().isTombstone())
+                                          .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
+                                          .collect(Collectors.toList());
+        previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
+        tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
+    }
+
     private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
         if (destroyed) {
             return;
         }
         updates.forEach(update -> {
             final K key = update.key();
-            final MapValue<V> value = update.value();
+            final MapValue<V> value = update.value() == null ? null : update.value().copy();
             if (value == null || value.isTombstone()) {
                 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
                 if (previousValue != null && previousValue.isAlive()) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MapValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MapValue.java
index 8e13d03..457caeb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MapValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MapValue.java
@@ -30,6 +30,7 @@
 public class MapValue<V> implements Comparable<MapValue<V>> {
     private final Timestamp timestamp;
     private final V value;
+    private long creationTime;
 
     /**
      * Creates a tombstone value with the specified timestamp.
@@ -39,12 +40,35 @@
      * @param <U> value type
      */
     public static <U> MapValue<U> tombstone(Timestamp timestamp) {
-        return new MapValue<>(null, timestamp);
+        return new MapValue<>(null, timestamp, System.currentTimeMillis());
     }
 
     public MapValue(V value, Timestamp timestamp) {
+        this(value, timestamp, System.currentTimeMillis());
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param value value
+     * @param timestamp value timestamp.
+     * @param creationTime the system time (on local instance) of construction
+     */
+    public MapValue(V value, Timestamp timestamp, long creationTime) {
         this.value = value;
         this.timestamp = checkNotNull(timestamp, "Timestamp cannot be null");
+        this.creationTime = creationTime;
+    }
+
+    /**
+     * Creates a copy of MapValue.
+     * <p>
+     * The copy will have an updated creation time corresponding to when the copy was constructed.
+     *
+     * @return MapValue copy
+     */
+    public MapValue<V> copy() {
+        return new MapValue<>(this.value, this.timestamp, System.currentTimeMillis());
     }
 
     public boolean isTombstone() {
@@ -63,6 +87,10 @@
         return value;
     }
 
+    public long creationTime() {
+        return creationTime;
+    }
+
     @Override
     public int compareTo(MapValue<V> o) {
         return this.timestamp.compareTo(o.timestamp);