fix for reactive forwarding failing in a
distributed setting.

Change-Id: I992d62bbbd3d873bc8715419592951704903c49d

making the ECHostStore respect sequentiality of events.

Change-Id: I14fa65fc78742c3ea7d417cddefef9f171472246
diff --git a/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java b/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java
index e992f7a..cd7335d 100644
--- a/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java
+++ b/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java
@@ -27,10 +27,10 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.event.Event;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.edge.EdgePortEvent;
 import org.onosproject.net.edge.EdgePortListener;
@@ -38,17 +38,16 @@
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.link.LinkListener;
+import org.onosproject.net.link.LinkService;
 import org.onosproject.net.packet.DefaultOutboundPacket;
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketService;
 import org.onosproject.net.topology.Topology;
-import org.onosproject.net.topology.TopologyEvent;
-import org.onosproject.net.topology.TopologyListener;
 import org.onosproject.net.topology.TopologyService;
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -73,7 +72,9 @@
 
     private final Map<DeviceId, Set<ConnectPoint>> connectionPoints = Maps.newConcurrentMap();
 
-    private final TopologyListener topologyListener = new InnerTopologyListener();
+    private final LinkListener linkListener = new InnerLinkListener();
+
+    private final DeviceListener deviceListener = new InnerDeviceListener();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PacketService packetService;
@@ -84,17 +85,23 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected TopologyService topologyService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LinkService linkService;
+
     @Activate
     public void activate() {
         eventDispatcher.addSink(EdgePortEvent.class, listenerRegistry);
-        topologyService.addListener(topologyListener);
+        deviceService.addListener(deviceListener);
+        linkService.addListener(linkListener);
+        loadAllEdgePorts();
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         eventDispatcher.removeSink(EdgePortEvent.class);
-        topologyService.removeListener(topologyListener);
+        deviceService.removeListener(deviceListener);
+        linkService.removeListener(linkListener);
         log.info("Stopped");
     }
 
@@ -142,31 +149,27 @@
         return new DefaultOutboundPacket(point.deviceId(), builder.build(), data);
     }
 
-    // Internal listener for topo events used to keep our edge-port cache
-    // up to date.
-    private class InnerTopologyListener implements TopologyListener {
+    private class InnerLinkListener implements LinkListener {
+
         @Override
-        public void event(TopologyEvent event) {
-            topology = event.subject();
-            List<Event> triggers = event.reasons();
-            if (triggers != null) {
-                triggers.forEach(reason -> {
-                    if (reason instanceof DeviceEvent) {
-                        processDeviceEvent((DeviceEvent) reason);
-                    } else if (reason instanceof LinkEvent) {
-                        processLinkEvent((LinkEvent) reason);
-                    }
-                });
-            } else {
-                //FIXME special case of preexisting edgeport & no triggerless events could cause this to never hit and
-                //never discover an edgeport that should have been discovered.
-                loadAllEdgePorts();
-            }
+        public void event(LinkEvent event) {
+            topology = topologyService.currentTopology();
+            processLinkEvent(event);
+        }
+    }
+
+    private class InnerDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+            topology = topologyService.currentTopology();
+            processDeviceEvent(event);
         }
     }
 
     // Initial loading of the edge port cache.
     private void loadAllEdgePorts() {
+        topology = topologyService.currentTopology();
         deviceService.getAvailableDevices().forEach(d -> deviceService.getPorts(d.id())
                 .forEach(p -> addEdgePort(new ConnectPoint(d.id(), p.number()))));
     }
diff --git a/core/net/src/test/java/org/onosproject/net/edgeservice/impl/EdgeManagerTest.java b/core/net/src/test/java/org/onosproject/net/edgeservice/impl/EdgeManagerTest.java
index 319412f..70be5de 100644
--- a/core/net/src/test/java/org/onosproject/net/edgeservice/impl/EdgeManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/edgeservice/impl/EdgeManagerTest.java
@@ -22,7 +22,6 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.onosproject.common.event.impl.TestEventDispatcher;
-import org.onosproject.event.Event;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultPort;
 import org.onosproject.net.Device;
@@ -31,15 +30,17 @@
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceServiceAdapter;
 import org.onosproject.net.edge.EdgePortEvent;
 import org.onosproject.net.edge.EdgePortListener;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.link.LinkListener;
+import org.onosproject.net.link.LinkServiceAdapter;
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketServiceAdapter;
 import org.onosproject.net.topology.Topology;
-import org.onosproject.net.topology.TopologyEvent;
 import org.onosproject.net.topology.TopologyListener;
 import org.onosproject.net.topology.TopologyServiceAdapter;
 
@@ -58,7 +59,6 @@
 import static org.onosproject.net.edge.EdgePortEvent.Type.EDGE_PORT_REMOVED;
 import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
 import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
-import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED;
 
 /**
  * Test of the edge port manager. Each device has ports '0' through 'numPorts - 1'
@@ -74,6 +74,8 @@
     private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
     private Set<OutboundPacket> packets = Sets.newConcurrentHashSet();
     private final EdgePortListener testListener = new TestListener(events);
+    private TestLinkManager testLinkManager;
+    private TestDeviceManager testDeviceManager;
     private TestTopologyManager testTopologyManager;
 
     @Before
@@ -82,8 +84,11 @@
         injectEventDispatcher(mgr, new TestEventDispatcher());
         testTopologyManager = new TestTopologyManager(infrastructurePorts);
         mgr.topologyService = testTopologyManager;
-        mgr.deviceService = new TestDeviceManager(devices);
+        testDeviceManager = new TestDeviceManager(devices);
+        mgr.deviceService = testDeviceManager;
         mgr.packetService = new TestPacketManager();
+        testLinkManager = new TestLinkManager();
+        mgr.linkService = testLinkManager;
         mgr.activate();
         mgr.addListener(testListener);
 
@@ -108,11 +113,11 @@
         assertFalse("no ports expected", mgr.getEdgePoints().iterator().hasNext());
 
         assertFalse("Expected isEdge to return false",
-                mgr.isEdgePoint(NetTestTools.connectPoint(Integer.toString(1), 1)));
+                    mgr.isEdgePoint(NetTestTools.connectPoint(Integer.toString(1), 1)));
 
         removeInfraPort(NetTestTools.connectPoint(Integer.toString(1), 1));
         assertTrue("Expected isEdge to return false",
-                mgr.isEdgePoint(NetTestTools.connectPoint(Integer.toString(1), 1)));
+                   mgr.isEdgePoint(NetTestTools.connectPoint(Integer.toString(1), 1)));
     }
 
     @Test
@@ -121,69 +126,57 @@
         ConnectPoint testPoint, referencePoint;
 
         //Testing link removal
-        List<Event> eventsToAdd = Lists.newArrayList();
-        eventsToAdd.add(new LinkEvent(LINK_REMOVED, NetTestTools.link("a", 1, "b", 2)));
-        TopologyEvent event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
-        testTopologyManager.listener.event(event);
+        testLinkManager.listener.event(new LinkEvent(LINK_REMOVED, NetTestTools.link("a", 1, "b", 2)));
 
         assertTrue("The list contained an unexpected number of events", events.size() == 2);
         assertTrue("The first element is of the wrong type.",
-                events.get(0).type() == EDGE_PORT_ADDED);
-        assertTrue("The second element is of the wrong type.",
-                events.get(1).type() == EDGE_PORT_ADDED);
+                   events.get(0).type() == EDGE_PORT_ADDED);
 
         testPoint = events.get(0).subject();
         referencePoint = NetTestTools.connectPoint("a", 1);
         assertTrue("The port numbers of the first element are incorrect",
-                testPoint.port().toLong() == referencePoint.port().toLong());
+                   testPoint.port().toLong() == referencePoint.port().toLong());
         assertTrue("The device id of the first element is incorrect.",
-                testPoint.deviceId().equals(referencePoint.deviceId()));
+                   testPoint.deviceId().equals(referencePoint.deviceId()));
 
         testPoint = events.get(1).subject();
         referencePoint = NetTestTools.connectPoint("b", 2);
         assertTrue("The port numbers of the second element are incorrect",
-                testPoint.port().toLong() == referencePoint.port().toLong());
+                   testPoint.port().toLong() == referencePoint.port().toLong());
         assertTrue("The device id of the second element is incorrect.",
-                testPoint.deviceId().equals(referencePoint.deviceId()));
+                   testPoint.deviceId().equals(referencePoint.deviceId()));
 
         //Rebroadcast event to ensure it results in no additional events
-        testTopologyManager.listener.event(event);
+        testLinkManager.listener.event(new LinkEvent(LINK_REMOVED, NetTestTools.link("a", 1, "b", 2)));
         assertTrue("The list contained an unexpected number of events", events.size() == 2);
 
         //Testing link adding when links to remove exist
-        eventsToAdd.clear();
         events.clear();
-        eventsToAdd.add(new LinkEvent(LINK_ADDED, NetTestTools.link("a", 1, "b", 2)));
-        event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
-        testTopologyManager.listener.event(event);
+        testLinkManager.listener.event(new LinkEvent(LINK_ADDED, NetTestTools.link("a", 1, "b", 2)));
 
         assertTrue("The list contained an unexpected number of events", events.size() == 2);
         assertTrue("The first element is of the wrong type.",
-                events.get(0).type() == EDGE_PORT_REMOVED);
+                   events.get(0).type() == EDGE_PORT_REMOVED);
         assertTrue("The second element is of the wrong type.",
-                events.get(1).type() == EDGE_PORT_REMOVED);
+                   events.get(1).type() == EDGE_PORT_REMOVED);
 
         testPoint = events.get(0).subject();
         referencePoint = NetTestTools.connectPoint("a", 1);
         assertTrue("The port numbers of the first element are incorrect",
-                testPoint.port().toLong() == referencePoint.port().toLong());
+                   testPoint.port().toLong() == referencePoint.port().toLong());
         assertTrue("The device id of the first element is incorrect.",
-                testPoint.deviceId().equals(referencePoint.deviceId()));
+                   testPoint.deviceId().equals(referencePoint.deviceId()));
 
         testPoint = events.get(1).subject();
         referencePoint = NetTestTools.connectPoint("b", 2);
         assertTrue("The port numbers of the second element are incorrect",
-                testPoint.port().toLong() == referencePoint.port().toLong());
+                   testPoint.port().toLong() == referencePoint.port().toLong());
         assertTrue("The device id of the second element is incorrect.",
-                testPoint.deviceId().equals(referencePoint.deviceId()));
+                   testPoint.deviceId().equals(referencePoint.deviceId()));
 
         //Apparent duplicate of previous method tests removal when the elements have already been removed
-        eventsToAdd.clear();
         events.clear();
-        eventsToAdd.add(new LinkEvent(LINK_ADDED, NetTestTools.link("a", 1, "b", 2)));
-        event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
-        testTopologyManager.listener.event(event);
-
+        testLinkManager.listener.event(new LinkEvent(LINK_ADDED, NetTestTools.link("a", 1, "b", 2)));
         assertTrue("The list should contain no events, the removed elements don't exist.", events.size() == 0);
     }
 
@@ -192,8 +185,7 @@
         //Setup
 
         Device referenceDevice;
-        TopologyEvent event;
-        List<Event> eventsToAdd = Lists.newArrayList();
+        DeviceEvent event;
         int numDevices = 10;
         int numInfraPorts = 5;
         totalPorts = 10;
@@ -201,14 +193,13 @@
 
         //Test response to device added events
         referenceDevice = NetTestTools.device("1");
-        eventsToAdd.add(new DeviceEvent(DEVICE_ADDED, referenceDevice,
-                new DefaultPort(referenceDevice, PortNumber.portNumber(1), true)));
-        event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
-        testTopologyManager.listener.event(event);
+        event = new DeviceEvent(DEVICE_ADDED, referenceDevice,
+                                new DefaultPort(referenceDevice, PortNumber.portNumber(1), true));
+        testDeviceManager.listener.event(event);
 
         //Check that ports were populated correctly
         assertTrue("Unexpected number of new ports added",
-                mgr.deviceService.getPorts(NetTestTools.did("1")).size() == 10);
+                   mgr.deviceService.getPorts(NetTestTools.did("1")).size() == 10);
 
         //Check that of the ten ports the half that are infrastructure ports aren't added
         assertEquals("Unexpected number of new edge ports added", (totalPorts - numInfraPorts), events.size());
@@ -219,15 +210,15 @@
         //Names here are irrelevant, the first 5 ports are populated as infrastructure, 6-10 are edge
         for (int index = 0; index < events.size(); index++) {
             assertEquals("Port added had unexpected port number.",
-                    events.get(index).subject().port(),
-                    NetTestTools.connectPoint("a", index + numInfraPorts + 1).port());
+                         events.get(index).subject().port(),
+                         NetTestTools.connectPoint("a", index + numInfraPorts + 1).port());
         }
         events.clear();
 
         //Repost the event to test repeated posts
-        testTopologyManager.listener.event(event);
+        testDeviceManager.listener.event(event);
         assertEquals("The redundant notification should not have created additional notifications.",
-                0, events.size());
+                     0, events.size());
         //Calculate the size of the returned iterable of edge points.
         Iterable<ConnectPoint> pts = mgr.getEdgePoints();
         Iterator pointIterator = pts.iterator();
@@ -238,45 +229,41 @@
         assertEquals("Unexpected number of edge points", totalPorts - numInfraPorts, count);
         //Testing device removal
         events.clear();
-        eventsToAdd.clear();
-        eventsToAdd.add(new DeviceEvent(DEVICE_REMOVED, referenceDevice,
-                new DefaultPort(referenceDevice, PortNumber.portNumber(1), true)));
-        event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
-        testTopologyManager.listener.event(event);
+        event = (new DeviceEvent(DEVICE_REMOVED, referenceDevice,
+                                 new DefaultPort(referenceDevice, PortNumber.portNumber(1), true)));
+        testDeviceManager.listener.event(event);
 
         assertEquals("There should be five new events from removal of edge points",
-                totalPorts - numInfraPorts, events.size());
+                     totalPorts - numInfraPorts, events.size());
         for (int index = 0; index < events.size(); index++) {
             //Assert that the correct port numbers were removed in the correct order
             assertEquals("Port removed had unexpected port number.",
-                    events.get(index).subject().port(),
-                    (NetTestTools.connectPoint("a", index + numInfraPorts + 1).port()));
+                         events.get(index).subject().port(),
+                         (NetTestTools.connectPoint("a", index + numInfraPorts + 1).port()));
             //Assert that the events are of the correct type
             assertEquals("Unexpected type of event", events.get(index).type(), EDGE_PORT_REMOVED);
         }
         events.clear();
         //Rebroadcast event to check that it triggers no new behavior
-        testTopologyManager.listener.event(event);
+        testDeviceManager.listener.event(event);
         assertEquals("Rebroadcast of removal event should not produce additional events",
-                0, events.size());
+                     0, events.size());
 
         //Testing device status change, changed from unavailable to available
         events.clear();
-        eventsToAdd.clear();
         //Make sure that the devicemanager shows the device as available.
         addDevice(referenceDevice, "1", 5);
         devices.put(referenceDevice.id(), referenceDevice);
 
-        eventsToAdd.add(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, referenceDevice));
-        event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
-        testTopologyManager.listener.event(event);
+        event = new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, referenceDevice);
+        testDeviceManager.listener.event(event);
         //An earlier setup set half of the reference device ports to infrastructure
         assertEquals("An unexpected number of events were generated.", totalPorts - numInfraPorts, events.size());
         for (int i = 0; i < 5; i++) {
             assertEquals("The event was not of the right type", events.get(i).type(), EDGE_PORT_ADDED);
         }
         events.clear();
-        testTopologyManager.listener.event(event);
+        testDeviceManager.listener.event(event);
         assertEquals("No events should have been generated for a set of existing ports.", 0, events.size());
 
         //Test removal when state changes when the device becomes unavailable
@@ -288,21 +275,20 @@
         no events will be generated since no ports will be provided in getPorts() to EdgeManager.
         */
         alwaysReturnPorts = true;
-        testTopologyManager.listener.event(event);
+        testDeviceManager.listener.event(event);
         alwaysReturnPorts = false;
         assertEquals("An unexpected number of events were created.", totalPorts - numInfraPorts, events.size());
         for (int i = 0; i < 5; i++) {
             EdgePortEvent edgeEvent = events.get(i);
             assertEquals("The event is of an unexpected type.",
-                    EdgePortEvent.Type.EDGE_PORT_REMOVED, edgeEvent.type());
+                         EdgePortEvent.Type.EDGE_PORT_REMOVED, edgeEvent.type());
             assertEquals("The event pertains to an unexpected port", PortNumber.portNumber(i + numInfraPorts + 1),
-                    edgeEvent.subject().port());
+                         edgeEvent.subject().port());
         }
     }
 
     @Test
     public void testInternalCache() {
-        List<Event> eventsToAdd = Lists.newArrayList();
         int numDevices = 10;
         //Number of infrastructure ports per device
         int numPorts = 5;
@@ -312,11 +298,8 @@
         for (int i = 0; i < numDevices; i++) {
             Device newDevice = NetTestTools.device(Integer.toString(i));
             devices.put(newDevice.id(), newDevice);
-            eventsToAdd.add(new DeviceEvent(DEVICE_ADDED, newDevice));
+            testDeviceManager.listener.event(new DeviceEvent(DEVICE_ADDED, newDevice));
         }
-        TopologyEvent event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
-        testTopologyManager.listener.event(event);
-
         //Check all ports have correct designations
         ConnectPoint testPoint;
         for (int deviceNum = 0; deviceNum < numDevices; deviceNum++) {
@@ -334,7 +317,7 @@
             count++;
         }
         assertEquals("There are an unexpeceted number of edge points returned.",
-                (totalPorts - numPorts) * numDevices, count);
+                     (totalPorts - numPorts) * numDevices, count);
         for (int deviceNumber = 0; deviceNumber < numDevices; deviceNumber++) {
             count = 0;
             for (ConnectPoint ignored : mgr.getEdgePoints(NetTestTools.did("1"))) {
@@ -349,8 +332,7 @@
     public void testEmit() {
         byte[] arr = new byte[10];
         Device referenceDevice;
-        TopologyEvent event;
-        List<Event> eventsToAdd = Lists.newArrayList();
+        DeviceEvent event;
         int numDevices = 10;
         int numInfraPorts = 5;
         totalPorts = 10;
@@ -360,16 +342,16 @@
         }
         for (int i = 0; i < numDevices; i++) {
             referenceDevice = NetTestTools.device(Integer.toString(i));
-            eventsToAdd.add(new DeviceEvent(DEVICE_ADDED, referenceDevice,
-                    new DefaultPort(referenceDevice, PortNumber.portNumber(1), true)));
+            testDeviceManager.listener.event(new DeviceEvent(DEVICE_ADDED, referenceDevice,
+                                                             new DefaultPort(referenceDevice,
+                                                                             PortNumber.portNumber(1),
+                                                                             true)));
         }
-        event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
-        testTopologyManager.listener.event(event);
 
         mgr.emitPacket(ByteBuffer.wrap(arr), Optional.<TrafficTreatment>empty());
 
         assertEquals("There were an unexpected number of emitted packets",
-                (totalPorts - numInfraPorts) * numDevices, packets.size());
+                     (totalPorts - numInfraPorts) * numDevices, packets.size());
         Iterator<OutboundPacket> packetIter = packets.iterator();
         OutboundPacket packet;
         while (packetIter.hasNext()) {
@@ -381,7 +363,7 @@
         mgr.emitPacket(NetTestTools.did(Integer.toString(1)), ByteBuffer.wrap(arr), Optional.<TrafficTreatment>empty());
 
         assertEquals("Unexpected number of outbound packets were emitted.",
-                totalPorts - numInfraPorts, packets.size());
+                     totalPorts - numInfraPorts, packets.size());
         packetIter = packets.iterator();
         while (packetIter.hasNext()) {
             packet = packetIter.next();
@@ -455,6 +437,7 @@
     }
 
     private class TestDeviceManager extends DeviceServiceAdapter {
+        private DeviceListener listener;
 
         private Map<DeviceId, Device> devices;
 
@@ -490,6 +473,17 @@
         public Iterable<Device> getAvailableDevices() {
             return devices.values();
         }
+
+
+        @Override
+        public void addListener(DeviceListener listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void removeListener(DeviceListener listener) {
+            this.listener = null;
+        }
     }
 
     private class TestPacketManager extends PacketServiceAdapter {
@@ -499,6 +493,15 @@
         }
     }
 
+    private class TestLinkManager extends LinkServiceAdapter {
+        private LinkListener listener;
+
+        @Override
+        public void addListener(LinkListener listener) {
+            this.listener = listener;
+        }
+    }
+
     private class TestListener implements EdgePortListener {
         private List<EdgePortEvent> events;
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
similarity index 84%
rename from core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
rename to core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
index 2012457..836a3c2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
@@ -15,26 +15,8 @@
  */
 package org.onosproject.store.host.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.onosproject.net.DefaultAnnotations.merge;
-import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -60,22 +42,34 @@
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.LogicalClockService;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.onosproject.net.host.HostEvent.Type.*;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Manages the inventory of hosts using a {@code EventuallyConsistentMap}.
  */
 @Component(immediate = true)
 @Service
-public class ECHostStore
+public class DistributedHostStore
     extends AbstractStore<HostEvent, HostStoreDelegate>
     implements HostStore {
 
@@ -84,15 +78,13 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LogicalClockService clockService;
-
-    private EventuallyConsistentMap<HostId, DefaultHost> hosts;
+    private ConsistentMap<HostId, DefaultHost> host;
+    private Map<HostId, DefaultHost> hosts;
 
     private final ConcurrentHashMap<HostId, DefaultHost> prevHosts =
             new ConcurrentHashMap<>();
 
-    private EventuallyConsistentMapListener<HostId, DefaultHost> hostLocationTracker =
+    private MapEventListener<HostId, DefaultHost> hostLocationTracker =
             new HostLocationTracker();
 
     @Activate
@@ -100,21 +92,22 @@
         KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API);
 
-        hosts = storageService.<HostId, DefaultHost>eventuallyConsistentMapBuilder()
+        host = storageService.<HostId, DefaultHost>consistentMapBuilder()
                 .withName("onos-hosts")
-                .withSerializer(hostSerializer)
-                .withTimestampProvider((k, v) -> clockService.getTimestamp())
+                .withRelaxedReadConsistency()
+                .withSerializer(Serializer.using(hostSerializer.build()))
                 .build();
 
-        hosts.addListener(hostLocationTracker);
+        hosts = host.asJavaMap();
+
+        host.addListener(hostLocationTracker);
 
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
-        hosts.removeListener(hostLocationTracker);
-        hosts.destroy();
+        host.removeListener(hostLocationTracker);
         prevHosts.clear();
 
         log.info("Stopped");
@@ -249,11 +242,11 @@
         return collection.stream().filter(predicate).collect(Collectors.toSet());
     }
 
-    private class HostLocationTracker implements EventuallyConsistentMapListener<HostId, DefaultHost> {
+    private class HostLocationTracker implements MapEventListener<HostId, DefaultHost> {
         @Override
-        public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) {
-            DefaultHost host = checkNotNull(event.value());
-            if (event.type() == PUT) {
+        public void event(MapEvent<HostId, DefaultHost> event) {
+            DefaultHost host = checkNotNull(event.value().value());
+            if (event.type() == MapEvent.Type.INSERT) {
                 Host prevHost = prevHosts.put(host.id(), host);
                 if (prevHost == null) {
                     notifyDelegate(new HostEvent(HOST_ADDED, host));
@@ -262,7 +255,7 @@
                 } else if (!Objects.equals(prevHost, host)) {
                     notifyDelegate(new HostEvent(HOST_UPDATED, host, prevHost));
                 }
-            } else if (event.type() == REMOVE) {
+            } else if (event.type() == MapEvent.Type.REMOVE) {
                 if (prevHosts.remove(host.id()) != null) {
                     notifyDelegate(new HostEvent(HOST_REMOVED, host));
                 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java b/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
index 851185b..4d7e7f3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
@@ -113,7 +113,7 @@
 
     @Override
     public void forward(ConnectPoint outPort, Host subject, ByteBuffer packet) {
-        NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId());
+        /*NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId());
         if (nodeId.equals(localNodeId)) {
             if (delegate != null) {
                 delegate.emitResponse(outPort, packet);
@@ -122,7 +122,10 @@
             log.info("Forwarding ARP response from {} to {}", subject.id(), outPort);
             commService.unicast(new ArpResponseMessage(outPort, subject, packet.array()),
                                 ARP_RESPONSE_MESSAGE, serializer::encode, nodeId);
-        }
+        }*/
+        //FIXME: Code above may be unnecessary and therefore cluster messaging
+        // and pendingMessages could be pruned as well.
+        delegate.emitResponse(outPort, packet);
     }
 
     @Override
diff --git a/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java
similarity index 83%
rename from core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java
rename to core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java
index a7077a8..0732126 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java
@@ -27,8 +27,6 @@
 import org.onosproject.net.host.DefaultHostDescription;
 import org.onosproject.net.host.HostDescription;
 import org.onosproject.net.provider.ProviderId;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.service.LogicalClockService;
 import org.onosproject.store.service.TestStorageService;
 
 import java.util.HashSet;
@@ -37,9 +35,9 @@
 /**
  * Tests for the ECHostStore.
  */
-public class ECHostStoreTest extends TestCase {
+public class DistributedHostStoreTest extends TestCase {
 
-    private ECHostStore ecXHostStore;
+    private DistributedHostStore ecXHostStore;
 
     private static final HostId HOSTID = HostId.hostId(MacAddress.valueOf("1a:1a:1a:1a:1a:1a"));
 
@@ -50,10 +48,9 @@
 
     @Before
     public void setUp() {
-        ecXHostStore = new ECHostStore();
+        ecXHostStore = new DistributedHostStore();
 
         ecXHostStore.storageService = new TestStorageService();
-        ecXHostStore.clockService = new TestLogicalClockService();
         ecXHostStore.activate();
     }
 
@@ -83,13 +80,4 @@
         assertTrue(host.ipAddresses().contains(IP2));
     }
 
-    /**
-     * Mocks the LogicalClockService class.
-     */
-    class TestLogicalClockService implements LogicalClockService {
-        @Override
-        public Timestamp getTimestamp() {
-            return null;
-        }
-    }
 }
\ No newline at end of file
diff --git a/tools/test/topos/attmplsfast.py b/tools/test/topos/attmplsfast.py
index 7f90446..cc7479b 100644
--- a/tools/test/topos/attmplsfast.py
+++ b/tools/test/topos/attmplsfast.py
@@ -15,19 +15,23 @@
 class AttMplsTopo( Topo ):
     "Internet Topology Zoo Specimen."
 
-    def build( self ):
+    def __init__( self ):
         "Create a topology."
 
+        # Initialize Topology
+        Topo.__init__( self )
+
         # add nodes, switches first...
+        NY54 = self.addSwitch( 's25' ) # 40.728270, -73.994483
         CMBR = self.addSwitch( 's1' )  # 42.373730, -71.109734
-        CHCG = self.addSwitch( 's2' )  # 41.877461, -87.642892
+        CHCG = self.addSwitch( 's2', protocols='OpenFlow13' )  # 41.877461, -87.642892
         CLEV = self.addSwitch( 's3' )  # 41.498928, -81.695217
         RLGH = self.addSwitch( 's4' )  # 35.780150, -78.644026
         ATLN = self.addSwitch( 's5' )  # 33.749017, -84.394168
         PHLA = self.addSwitch( 's6' )  # 39.952906, -75.172278
         WASH = self.addSwitch( 's7' )  # 38.906696, -77.035509
         NSVL = self.addSwitch( 's8' )  # 36.166410, -86.787305
-        STLS = self.addSwitch( 's9' )  # 38.626418, -90.198143
+        STLS = self.addSwitch( 's9', protocols='OpenFlow13' )  # 38.626418, -90.198143
         NWOR = self.addSwitch( 's10' ) # 29.951475, -90.078434
         HSTN = self.addSwitch( 's11' ) # 29.763249, -95.368332
         SNAN = self.addSwitch( 's12' ) # 29.424331, -98.491745
@@ -40,12 +44,12 @@
         PTLD = self.addSwitch( 's19' ) # 45.523317, -122.677768
         STTL = self.addSwitch( 's20' ) # 47.607326, -122.331786
         SLKC = self.addSwitch( 's21' ) # 40.759577, -111.895079
-        LA03 = self.addSwitch( 's22' ) # 34.056346, -118.235951
+        LA03 = self.addSwitch( 's22', protocols='OpenFlow13' ) # 34.056346, -118.235951
         SNDG = self.addSwitch( 's23' ) # 32.714564, -117.153528
         PHNX = self.addSwitch( 's24' ) # 33.448289, -112.076299
-        NY54 = self.addSwitch( 's25' ) # 40.728270, -73.994483
 
         # ... and now hosts
+        NY54_host = self.addHost( 'h25' )
         CMBR_host = self.addHost( 'h1' )
         CHCG_host = self.addHost( 'h2' )
         CLEV_host = self.addHost( 'h3' )
@@ -70,7 +74,6 @@
         LA03_host = self.addHost( 'h22' )
         SNDG_host = self.addHost( 'h23' )
         PHNX_host = self.addHost( 'h24' )
-        NY54_host = self.addHost( 'h25' )
 
         # add edges between switch and corresponding host
         self.addLink( NY54 , NY54_host )