ONOS-2322: Support for periodic purging of ECMap tombstones

Change-Id: I6fe5475a472c383c4a51bd61446fba8f1dba1d37
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 9bd9c50..205a24b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -102,6 +102,9 @@
     private final ExecutorService communicationExecutor;
     private final Map<NodeId, EventAccumulator> senderPending;
 
+    private long previousTombstonePurgeTime;
+    private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
+
     private final String mapName;
 
     private volatile boolean destroyed = false;
@@ -250,8 +253,15 @@
         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
                                           serializer::decode,
                                           this::handleAntiEntropyAdvertisement,
+                                          serializer::encode,
                                           this.backgroundExecutor);
 
+        previousTombstonePurgeTime = 0;
+        this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
+                                                       initialDelaySec,
+                                                       antiEntropyPeriod,
+                                                       TimeUnit.SECONDS);
+
         this.tombstonesDisabled = tombstonesDisabled;
         this.lightweightAntiEntropy = !convergeFaster;
     }
@@ -267,6 +277,7 @@
                         .register(LogicalTimestamp.class)
                         .register(WallClockTimestamp.class)
                         .register(AntiEntropyAdvertisement.class)
+                        .register(AntiEntropyResponse.class)
                         .register(UpdateEntry.class)
                         .register(MapValue.class)
                         .register(MapValue.Digest.class)
@@ -563,13 +574,17 @@
     }
 
     private void sendAdvertisementToPeer(NodeId peer) {
-        clusterCommunicator.unicast(createAdvertisement(),
+        AntiEntropyAdvertisement<K> ad = createAdvertisement();
+        clusterCommunicator.sendAndReceive(ad,
                 antiEntropyAdvertisementSubject,
                 serializer::encode,
+                serializer::decode,
                 peer)
                 .whenComplete((result, error) -> {
                     if (error != null) {
                         log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
+                    } else if (result == AntiEntropyResponse.PROCESSED) {
+                        antiEntropyTimes.put(peer, ad.creationTime());
                     }
                 });
     }
@@ -579,9 +594,9 @@
                 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
     }
 
-    private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
+    private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
         if (destroyed || underHighLoad()) {
-            return;
+            return AntiEntropyResponse.IGNORED;
         }
         try {
             if (log.isTraceEnabled()) {
@@ -600,7 +615,9 @@
             }
         } catch (Exception e) {
             log.warn("Error handling anti-entropy advertisement", e);
+            return AntiEntropyResponse.FAILED;
         }
+        return AntiEntropyResponse.PROCESSED;
     }
 
     /**
@@ -634,13 +651,37 @@
         return externalEvents;
     }
 
+    private void purgeTombstones() {
+        /*
+         * In order to mitigate the resource exhausation that can ensue due to an ever-growing set
+         * of tombstones we employ the following heuristic to purge old tombstones periodically.
+         * First, we keep track of the time (local system time) when we were able to have a successful
+         * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
+         * as the time before which all tombstones are considered safe to purge.
+         */
+        if (tombstonesDisabled || antiEntropyTimes.size() != clusterService.getNodes().size() - 1) {
+            return;
+        }
+        long currentSafeTombstonePurgeTime = antiEntropyTimes.values().stream().reduce(Math::min).orElse(0L);
+        if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
+            return;
+        }
+        List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
+                                          .stream()
+                                          .filter(e -> e.getValue().isTombstone())
+                                          .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
+                                          .collect(Collectors.toList());
+        previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
+        tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
+    }
+
     private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
         if (destroyed) {
             return;
         }
         updates.forEach(update -> {
             final K key = update.key();
-            final MapValue<V> value = update.value();
+            final MapValue<V> value = update.value() == null ? null : update.value().copy();
             if (value == null || value.isTombstone()) {
                 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
                 if (previousValue != null && previousValue.isAlive()) {