Ensure tombsone purge logic works correctly after a cluster scale down

Change-Id: I94a4c234982a9e8f44af5078b3cbcee13e4b93cb
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 205a24b..f4bb0d0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -256,11 +256,13 @@
                                           serializer::encode,
                                           this.backgroundExecutor);
 
-        previousTombstonePurgeTime = 0;
-        this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
-                                                       initialDelaySec,
-                                                       antiEntropyPeriod,
-                                                       TimeUnit.SECONDS);
+        if (!tombstonesDisabled) {
+            previousTombstonePurgeTime = 0;
+            this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
+                                                           initialDelaySec,
+                                                           antiEntropyPeriod,
+                                                           TimeUnit.SECONDS);
+        }
 
         this.tombstonesDisabled = tombstonesDisabled;
         this.lightweightAntiEntropy = !convergeFaster;
@@ -659,10 +661,13 @@
          * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
          * as the time before which all tombstones are considered safe to purge.
          */
-        if (tombstonesDisabled || antiEntropyTimes.size() != clusterService.getNodes().size() - 1) {
-            return;
-        }
-        long currentSafeTombstonePurgeTime = antiEntropyTimes.values().stream().reduce(Math::min).orElse(0L);
+        long currentSafeTombstonePurgeTime =  clusterService.getNodes()
+                                                            .stream()
+                                                            .map(ControllerNode::id)
+                                                            .filter(id -> !id.equals(localNodeId))
+                                                            .map(id -> antiEntropyTimes.getOrDefault(id, 0L))
+                                                            .reduce(Math::min)
+                                                            .orElse(0L);
         if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
             return;
         }