fix for reactive forwarding failing in a
distributed setting.

Change-Id: I992d62bbbd3d873bc8715419592951704903c49d

making the ECHostStore respect sequentiality of events.

Change-Id: I14fa65fc78742c3ea7d417cddefef9f171472246
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
similarity index 84%
rename from core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
rename to core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
index 2012457..836a3c2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
@@ -15,26 +15,8 @@
  */
 package org.onosproject.store.host.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.onosproject.net.DefaultAnnotations.merge;
-import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -60,22 +42,34 @@
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.LogicalClockService;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.onosproject.net.host.HostEvent.Type.*;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Manages the inventory of hosts using a {@code EventuallyConsistentMap}.
  */
 @Component(immediate = true)
 @Service
-public class ECHostStore
+public class DistributedHostStore
     extends AbstractStore<HostEvent, HostStoreDelegate>
     implements HostStore {
 
@@ -84,15 +78,13 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LogicalClockService clockService;
-
-    private EventuallyConsistentMap<HostId, DefaultHost> hosts;
+    private ConsistentMap<HostId, DefaultHost> host;
+    private Map<HostId, DefaultHost> hosts;
 
     private final ConcurrentHashMap<HostId, DefaultHost> prevHosts =
             new ConcurrentHashMap<>();
 
-    private EventuallyConsistentMapListener<HostId, DefaultHost> hostLocationTracker =
+    private MapEventListener<HostId, DefaultHost> hostLocationTracker =
             new HostLocationTracker();
 
     @Activate
@@ -100,21 +92,22 @@
         KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API);
 
-        hosts = storageService.<HostId, DefaultHost>eventuallyConsistentMapBuilder()
+        host = storageService.<HostId, DefaultHost>consistentMapBuilder()
                 .withName("onos-hosts")
-                .withSerializer(hostSerializer)
-                .withTimestampProvider((k, v) -> clockService.getTimestamp())
+                .withRelaxedReadConsistency()
+                .withSerializer(Serializer.using(hostSerializer.build()))
                 .build();
 
-        hosts.addListener(hostLocationTracker);
+        hosts = host.asJavaMap();
+
+        host.addListener(hostLocationTracker);
 
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
-        hosts.removeListener(hostLocationTracker);
-        hosts.destroy();
+        host.removeListener(hostLocationTracker);
         prevHosts.clear();
 
         log.info("Stopped");
@@ -249,11 +242,11 @@
         return collection.stream().filter(predicate).collect(Collectors.toSet());
     }
 
-    private class HostLocationTracker implements EventuallyConsistentMapListener<HostId, DefaultHost> {
+    private class HostLocationTracker implements MapEventListener<HostId, DefaultHost> {
         @Override
-        public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) {
-            DefaultHost host = checkNotNull(event.value());
-            if (event.type() == PUT) {
+        public void event(MapEvent<HostId, DefaultHost> event) {
+            DefaultHost host = checkNotNull(event.value().value());
+            if (event.type() == MapEvent.Type.INSERT) {
                 Host prevHost = prevHosts.put(host.id(), host);
                 if (prevHost == null) {
                     notifyDelegate(new HostEvent(HOST_ADDED, host));
@@ -262,7 +255,7 @@
                 } else if (!Objects.equals(prevHost, host)) {
                     notifyDelegate(new HostEvent(HOST_UPDATED, host, prevHost));
                 }
-            } else if (event.type() == REMOVE) {
+            } else if (event.type() == MapEvent.Type.REMOVE) {
                 if (prevHosts.remove(host.id()) != null) {
                     notifyDelegate(new HostEvent(HOST_REMOVED, host));
                 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java b/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
index 851185b..4d7e7f3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
@@ -113,7 +113,7 @@
 
     @Override
     public void forward(ConnectPoint outPort, Host subject, ByteBuffer packet) {
-        NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId());
+        /*NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId());
         if (nodeId.equals(localNodeId)) {
             if (delegate != null) {
                 delegate.emitResponse(outPort, packet);
@@ -122,7 +122,10 @@
             log.info("Forwarding ARP response from {} to {}", subject.id(), outPort);
             commService.unicast(new ArpResponseMessage(outPort, subject, packet.array()),
                                 ARP_RESPONSE_MESSAGE, serializer::encode, nodeId);
-        }
+        }*/
+        //FIXME: Code above may be unnecessary and therefore cluster messaging
+        // and pendingMessages could be pruned as well.
+        delegate.emitResponse(outPort, packet);
     }
 
     @Override