Misc fixes/improvments to ECMapImpl. Most notably:
- Fixed logic in determining random peer to do AE
- Fixed for logic for when to do active sync if lightWeightAE is disabled
- Fixed tracking of ECMap activity

Change-Id: I35da91d6ef684e16630be7bd5e518c8400debe14
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 18cb3bc..b91df44 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -15,7 +15,9 @@
  */
 package org.onosproject.store.ecmap;
 
+import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -97,6 +99,8 @@
     private final ExecutorService communicationExecutor;
     private final Map<NodeId, EventAccumulator> senderPending;
 
+    private final String mapName;
+
     private volatile boolean destroyed = false;
     private static final String ERROR_DESTROYED = " map is already destroyed";
     private final String destroyedMessage;
@@ -157,6 +161,7 @@
                                 TimeUnit antiEntropyTimeUnit,
                                 boolean convergeFaster,
                                 boolean persistent) {
+        this.mapName = mapName;
         items = Maps.newConcurrentMap();
         senderPending = Maps.newConcurrentMap();
         destroyedMessage = mapName + ERROR_DESTROYED;
@@ -284,7 +289,7 @@
         return items.values()
                     .stream()
                     .filter(MapValue::isAlive)
-                    .anyMatch(v -> v.get().equals(value));
+                    .anyMatch(v -> value.equals(v.get()));
     }
 
     @Override
@@ -303,7 +308,7 @@
         checkNotNull(value, ERROR_NULL_VALUE);
 
         MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
-        if (updateInternal(key, newValue)) {
+        if (putInternal(key, newValue)) {
             notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
             notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value));
         }
@@ -313,9 +318,20 @@
     public V remove(K key) {
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
-        // TODO prevent calls here if value is important for timestamp
-        MapValue<V> tombstone = MapValue.tombstone(timestampProvider.apply(key, null));
-        MapValue<V> previousValue = removeInternal(key, Optional.empty(), tombstone);
+        return removeAndNotify(key, null);
+    }
+
+    @Override
+    public void remove(K key, V value) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        removeAndNotify(key, value);
+    }
+
+    private V removeAndNotify(K key, V value) {
+        MapValue<V> tombstone = MapValue.tombstone(timestampProvider.apply(key, value));
+        MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
         if (previousValue != null) {
             notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, previousValue.get()));
             if (previousValue.isAlive()) {
@@ -325,27 +341,13 @@
         return previousValue != null ? previousValue.get() : null;
     }
 
-    @Override
-    public void remove(K key, V value) {
-        checkState(!destroyed, destroyedMessage);
-        checkNotNull(key, ERROR_NULL_KEY);
-        checkNotNull(value, ERROR_NULL_VALUE);
-        MapValue<V> tombstone = MapValue.tombstone(timestampProvider.apply(key, value));
-        MapValue<V> previousValue = removeInternal(key, Optional.of(value), tombstone);
-        if (previousValue != null) {
-            notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, previousValue.get()));
-            if (previousValue.isAlive()) {
-                notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
-            }
-        }
-    }
-
     private MapValue<V> removeInternal(K key, Optional<V> value, MapValue<V> tombstone) {
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-
         checkState(tombstone.isTombstone());
+
+        counter.incrementCount();
         AtomicBoolean updated = new AtomicBoolean(false);
         AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
         items.compute(key, (k, existing) -> {
@@ -360,11 +362,19 @@
             if (updated.get()) {
                 previousValue.set(existing);
             }
-            return updated.get() ? tombstone : existing;
+            if (updated.get()) {
+                return tombstonesDisabled ? null : tombstone;
+            } else {
+                return existing;
+            }
         });
         if (updated.get()) {
             if (persistent) {
-                persistentStore.update(key, tombstone);
+                if (tombstonesDisabled) {
+                    persistentStore.remove(key);
+                } else {
+                    persistentStore.update(key, tombstone);
+                }
             }
         }
         return previousValue.get();
@@ -393,11 +403,7 @@
     @Override
     public Collection<V> values() {
         checkState(!destroyed, destroyedMessage);
-        return Maps.filterValues(items, MapValue::isAlive)
-                   .values()
-                   .stream()
-                   .map(MapValue::get)
-                   .collect(Collectors.toList());
+        return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
     }
 
     @Override
@@ -416,14 +422,16 @@
      * @param newValue proposed new value
      * @return true if update happened; false if map already contains a more recent value for the key
      */
-    private boolean updateInternal(K key, MapValue<V> newValue) {
+    private boolean putInternal(K key, MapValue<V> newValue) {
+        checkState(!destroyed, destroyedMessage);
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(newValue, ERROR_NULL_VALUE);
+        checkState(newValue.isAlive());
+        counter.incrementCount();
         AtomicBoolean updated = new AtomicBoolean(false);
         items.compute(key, (k, existing) -> {
             if (existing == null || newValue.isNewerThan(existing)) {
                 updated.set(true);
-                if (newValue.isTombstone()) {
-                    return tombstonesDisabled ? null : newValue;
-                }
                 return newValue;
             }
             return existing;
@@ -499,8 +507,8 @@
     private Optional<NodeId> pickRandomActivePeer() {
         List<NodeId> activePeers = clusterService.getNodes()
                 .stream()
-                .filter(node -> !localNodeId.equals(node))
-                 .map(ControllerNode::id)
+                .map(ControllerNode::id)
+                .filter(id -> !localNodeId.equals(id))
                 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
                 .collect(Collectors.toList());
         Collections.shuffle(activePeers);
@@ -519,9 +527,9 @@
                 });
     }
 
-
     private AntiEntropyAdvertisement<K> createAdvertisement() {
-        return new AntiEntropyAdvertisement<K>(localNodeId, Maps.transformValues(items, MapValue::digest));
+        return new AntiEntropyAdvertisement<K>(localNodeId,
+                ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
     }
 
     private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
@@ -529,13 +537,14 @@
             return;
         }
         try {
+            log.debug("Received anti-entropy advertisement from {} for {} with {} entries in it",
+                    mapName, ad.sender(), ad.digest().size());
             antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
 
             if (!lightweightAntiEntropy) {
-                Set<K> missingKeys = Sets.difference(items.keySet(), ad.digest().keySet());
-                // if remote ad has something unknown, actively sync
-                if (missingKeys.size() > 0) {
-                    // Send the advertisement back if this peer is out-of-sync
+                // if remote ad has any entries that the local copy is missing, actively sync
+                // TODO: Missing keys is not the way local copy can be behind.
+                if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
                     // TODO: Send ad for missing keys and for entries that are stale
                     sendAdvertisementToPeer(ad.sender());
                 }
@@ -561,7 +570,9 @@
                 // local value is more recent, push to sender
                 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
             }
-            if (remoteValueDigest != null && remoteValueDigest.isTombstone()) {
+            if (remoteValueDigest != null
+                    && remoteValueDigest.isNewerThan(localValue.digest())
+                    && remoteValueDigest.isTombstone()) {
                 MapValue<V> previousValue = removeInternal(key,
                                                            Optional.empty(),
                                                            MapValue.tombstone(remoteValueDigest.timestamp()));
@@ -582,10 +593,10 @@
             final MapValue<V> value = update.value();
             if (value.isTombstone()) {
                 MapValue<V> previousValue = removeInternal(key, Optional.empty(), value);
-                if (previousValue != null && previousValue.get() != null) {
+                if (previousValue != null && previousValue.isAlive()) {
                     notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
                 }
-            } else if (updateInternal(key, value)) {
+            } else if (putInternal(key, value)) {
                 notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value.get()));
             }
         });