ONOS-2280: Fix NPE in hosts EC map

Change-Id: I4cb74d7c9526dc0e836e1e2790748324f60183f5
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 6d8daaa..c20016b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -47,6 +47,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.Timer;
@@ -312,7 +313,9 @@
     public V remove(K key) {
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
-        return removeInternal(key, Optional.empty());
+        MapValue<V> tombstone = new MapValue<>(null, timestampProvider.apply(key, null));
+        MapValue<V> previousValue = removeInternal(key, Optional.empty(), tombstone);
+        return previousValue != null ? previousValue.get() : null;
     }
 
     @Override
@@ -320,34 +323,39 @@
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        removeInternal(key, Optional.of(value));
+        MapValue<V> tombstone = new MapValue<>(null, timestampProvider.apply(key, null));
+        removeInternal(key, Optional.of(value), tombstone);
     }
 
-    private V removeInternal(K key, Optional<V> value) {
+    private MapValue<V> removeInternal(K key, Optional<V> value, MapValue<V> tombstone) {
         checkState(!destroyed, destroyedMessage);
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
 
-        MapValue<V> newValue = new MapValue<>(null, timestampProvider.apply(key, value.orElse(null)));
+        checkState(tombstone.isTombstone());
         AtomicBoolean updated = new AtomicBoolean(false);
-        AtomicReference<V> previousValue = new AtomicReference<>();
+        AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
         items.compute(key, (k, existing) -> {
-            if (existing != null && existing.isAlive()) {
-                updated.set(!value.isPresent() ||  value.get().equals(existing.get()));
-                previousValue.set(existing.get());
+            boolean valueMatches = true;
+            if (value.isPresent() && existing != null && existing.isAlive()) {
+                valueMatches = Objects.equals(value.get(), existing.get());
             }
-            updated.set(existing == null || newValue.isNewerThan(existing));
-            return updated.get() ? newValue : existing;
+            updated.set(valueMatches && (existing == null || tombstone.isNewerThan(existing)));
+            if (updated.get()) {
+                previousValue.set(existing);
+            }
+            return updated.get() ? tombstone : existing;
         });
         if (updated.get()) {
-            notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, previousValue.get()));
-            notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
-            if (persistent) {
-                persistentStore.update(key, newValue);
+            notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, null));
+            if (previousValue.get() != null && previousValue.get().isAlive()) {
+                notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get().get()));
             }
-            return previousValue.get();
+            if (persistent) {
+                persistentStore.update(key, tombstone);
+            }
         }
-        return null;
+        return previousValue.get();
     }
 
     @Override
@@ -540,12 +548,13 @@
             if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
                 // local value is more recent, push to sender
                 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
-            } else {
-                if (remoteValueDigest.isTombstone()
-                        && remoteValueDigest.timestamp().isNewerThan(localValue.timestamp())) {
-                    if (updateInternal(key, new MapValue<>(null, remoteValueDigest.timestamp()))) {
-                        externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, null));
-                    }
+            }
+            if (remoteValueDigest != null && remoteValueDigest.isTombstone()) {
+                MapValue<V> previousValue = removeInternal(key,
+                                                           Optional.empty(),
+                                                           new MapValue<>(null, remoteValueDigest.timestamp()));
+                if (previousValue != null && previousValue.isAlive()) {
+                    externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
                 }
             }
         });
@@ -559,10 +568,13 @@
         updates.forEach(update -> {
             final K key = update.key();
             final MapValue<V> value = update.value();
-
-            if (updateInternal(key, value)) {
-                final EventuallyConsistentMapEvent.Type type = value.isTombstone() ? REMOVE : PUT;
-                notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value.get()));
+            if (value.isTombstone()) {
+                MapValue<V> previousValue = removeInternal(key, Optional.empty(), value);
+                if (previousValue != null && previousValue.get() != null) {
+                    notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
+                }
+            } else if (updateInternal(key, value)) {
+                notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value.get()));
             }
         });
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
index 637444c..f4071ef 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
@@ -1,5 +1,6 @@
 package org.onosproject.store.host.impl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.net.DefaultAnnotations.merge;
 import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
 import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
@@ -11,7 +12,6 @@
 import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -50,13 +50,9 @@
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
-import static com.google.common.collect.Multimaps.newSetMultimap;
-import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
-import static com.google.common.collect.Sets.newConcurrentHashSet;
 
 /**
  * Manages the inventory of hosts using a {@code EventuallyConsistentMap}.
@@ -76,9 +72,10 @@
     protected LogicalClockService clockService;
 
     // Hosts tracked by their location
-    private final Multimap<ConnectPoint, Host> locations
-        = synchronizedSetMultimap(newSetMultimap(new ConcurrentHashMap<>(),
-                                                 () -> newConcurrentHashSet()));
+    private final SetMultimap<ConnectPoint, Host> locations =
+            Multimaps.synchronizedSetMultimap(
+                    HashMultimap.<ConnectPoint, Host>create());
+
     private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
             Multimaps.synchronizedSetMultimap(
                     HashMultimap.<ConnectPoint, PortAddresses>create());
@@ -252,7 +249,7 @@
 
         @Override
         public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) {
-            DefaultHost host = event.value();
+            DefaultHost host = checkNotNull(event.value());
             if (event.type() == PUT) {
                 locations.put(host.location(), host);
             } else if (event.type() == REMOVE) {
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 57943ad..9a65630 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -327,8 +327,6 @@
         listener.event(new EventuallyConsistentMapEvent<>(
                 EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
         listener.event(new EventuallyConsistentMapEvent<>(
-                EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
-        listener.event(new EventuallyConsistentMapEvent<>(
                 EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
         listener.event(new EventuallyConsistentMapEvent<>(
                 EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));