fix for reactive forwarding failing in a
distributed setting.
Change-Id: I992d62bbbd3d873bc8715419592951704903c49d
making the ECHostStore respect sequentiality of events.
Change-Id: I14fa65fc78742c3ea7d417cddefef9f171472246
diff --git a/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java b/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java
index e992f7a..cd7335d 100644
--- a/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java
+++ b/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java
@@ -27,10 +27,10 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.event.Event;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.edge.EdgePortEvent;
import org.onosproject.net.edge.EdgePortListener;
@@ -38,17 +38,16 @@
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.link.LinkListener;
+import org.onosproject.net.link.LinkService;
import org.onosproject.net.packet.DefaultOutboundPacket;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketService;
import org.onosproject.net.topology.Topology;
-import org.onosproject.net.topology.TopologyEvent;
-import org.onosproject.net.topology.TopologyListener;
import org.onosproject.net.topology.TopologyService;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -73,7 +72,9 @@
private final Map<DeviceId, Set<ConnectPoint>> connectionPoints = Maps.newConcurrentMap();
- private final TopologyListener topologyListener = new InnerTopologyListener();
+ private final LinkListener linkListener = new InnerLinkListener();
+
+ private final DeviceListener deviceListener = new InnerDeviceListener();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@@ -84,17 +85,23 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyService topologyService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LinkService linkService;
+
@Activate
public void activate() {
eventDispatcher.addSink(EdgePortEvent.class, listenerRegistry);
- topologyService.addListener(topologyListener);
+ deviceService.addListener(deviceListener);
+ linkService.addListener(linkListener);
+ loadAllEdgePorts();
log.info("Started");
}
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(EdgePortEvent.class);
- topologyService.removeListener(topologyListener);
+ deviceService.removeListener(deviceListener);
+ linkService.removeListener(linkListener);
log.info("Stopped");
}
@@ -142,31 +149,27 @@
return new DefaultOutboundPacket(point.deviceId(), builder.build(), data);
}
- // Internal listener for topo events used to keep our edge-port cache
- // up to date.
- private class InnerTopologyListener implements TopologyListener {
+ private class InnerLinkListener implements LinkListener {
+
@Override
- public void event(TopologyEvent event) {
- topology = event.subject();
- List<Event> triggers = event.reasons();
- if (triggers != null) {
- triggers.forEach(reason -> {
- if (reason instanceof DeviceEvent) {
- processDeviceEvent((DeviceEvent) reason);
- } else if (reason instanceof LinkEvent) {
- processLinkEvent((LinkEvent) reason);
- }
- });
- } else {
- //FIXME special case of preexisting edgeport & no triggerless events could cause this to never hit and
- //never discover an edgeport that should have been discovered.
- loadAllEdgePorts();
- }
+ public void event(LinkEvent event) {
+ topology = topologyService.currentTopology();
+ processLinkEvent(event);
+ }
+ }
+
+ private class InnerDeviceListener implements DeviceListener {
+
+ @Override
+ public void event(DeviceEvent event) {
+ topology = topologyService.currentTopology();
+ processDeviceEvent(event);
}
}
// Initial loading of the edge port cache.
private void loadAllEdgePorts() {
+ topology = topologyService.currentTopology();
deviceService.getAvailableDevices().forEach(d -> deviceService.getPorts(d.id())
.forEach(p -> addEdgePort(new ConnectPoint(d.id(), p.number()))));
}
diff --git a/core/net/src/test/java/org/onosproject/net/edgeservice/impl/EdgeManagerTest.java b/core/net/src/test/java/org/onosproject/net/edgeservice/impl/EdgeManagerTest.java
index 319412f..70be5de 100644
--- a/core/net/src/test/java/org/onosproject/net/edgeservice/impl/EdgeManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/edgeservice/impl/EdgeManagerTest.java
@@ -22,7 +22,6 @@
import org.junit.Before;
import org.junit.Test;
import org.onosproject.common.event.impl.TestEventDispatcher;
-import org.onosproject.event.Event;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultPort;
import org.onosproject.net.Device;
@@ -31,15 +30,17 @@
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceServiceAdapter;
import org.onosproject.net.edge.EdgePortEvent;
import org.onosproject.net.edge.EdgePortListener;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.link.LinkListener;
+import org.onosproject.net.link.LinkServiceAdapter;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketServiceAdapter;
import org.onosproject.net.topology.Topology;
-import org.onosproject.net.topology.TopologyEvent;
import org.onosproject.net.topology.TopologyListener;
import org.onosproject.net.topology.TopologyServiceAdapter;
@@ -58,7 +59,6 @@
import static org.onosproject.net.edge.EdgePortEvent.Type.EDGE_PORT_REMOVED;
import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
-import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED;
/**
* Test of the edge port manager. Each device has ports '0' through 'numPorts - 1'
@@ -74,6 +74,8 @@
private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
private Set<OutboundPacket> packets = Sets.newConcurrentHashSet();
private final EdgePortListener testListener = new TestListener(events);
+ private TestLinkManager testLinkManager;
+ private TestDeviceManager testDeviceManager;
private TestTopologyManager testTopologyManager;
@Before
@@ -82,8 +84,11 @@
injectEventDispatcher(mgr, new TestEventDispatcher());
testTopologyManager = new TestTopologyManager(infrastructurePorts);
mgr.topologyService = testTopologyManager;
- mgr.deviceService = new TestDeviceManager(devices);
+ testDeviceManager = new TestDeviceManager(devices);
+ mgr.deviceService = testDeviceManager;
mgr.packetService = new TestPacketManager();
+ testLinkManager = new TestLinkManager();
+ mgr.linkService = testLinkManager;
mgr.activate();
mgr.addListener(testListener);
@@ -108,11 +113,11 @@
assertFalse("no ports expected", mgr.getEdgePoints().iterator().hasNext());
assertFalse("Expected isEdge to return false",
- mgr.isEdgePoint(NetTestTools.connectPoint(Integer.toString(1), 1)));
+ mgr.isEdgePoint(NetTestTools.connectPoint(Integer.toString(1), 1)));
removeInfraPort(NetTestTools.connectPoint(Integer.toString(1), 1));
assertTrue("Expected isEdge to return false",
- mgr.isEdgePoint(NetTestTools.connectPoint(Integer.toString(1), 1)));
+ mgr.isEdgePoint(NetTestTools.connectPoint(Integer.toString(1), 1)));
}
@Test
@@ -121,69 +126,57 @@
ConnectPoint testPoint, referencePoint;
//Testing link removal
- List<Event> eventsToAdd = Lists.newArrayList();
- eventsToAdd.add(new LinkEvent(LINK_REMOVED, NetTestTools.link("a", 1, "b", 2)));
- TopologyEvent event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
- testTopologyManager.listener.event(event);
+ testLinkManager.listener.event(new LinkEvent(LINK_REMOVED, NetTestTools.link("a", 1, "b", 2)));
assertTrue("The list contained an unexpected number of events", events.size() == 2);
assertTrue("The first element is of the wrong type.",
- events.get(0).type() == EDGE_PORT_ADDED);
- assertTrue("The second element is of the wrong type.",
- events.get(1).type() == EDGE_PORT_ADDED);
+ events.get(0).type() == EDGE_PORT_ADDED);
testPoint = events.get(0).subject();
referencePoint = NetTestTools.connectPoint("a", 1);
assertTrue("The port numbers of the first element are incorrect",
- testPoint.port().toLong() == referencePoint.port().toLong());
+ testPoint.port().toLong() == referencePoint.port().toLong());
assertTrue("The device id of the first element is incorrect.",
- testPoint.deviceId().equals(referencePoint.deviceId()));
+ testPoint.deviceId().equals(referencePoint.deviceId()));
testPoint = events.get(1).subject();
referencePoint = NetTestTools.connectPoint("b", 2);
assertTrue("The port numbers of the second element are incorrect",
- testPoint.port().toLong() == referencePoint.port().toLong());
+ testPoint.port().toLong() == referencePoint.port().toLong());
assertTrue("The device id of the second element is incorrect.",
- testPoint.deviceId().equals(referencePoint.deviceId()));
+ testPoint.deviceId().equals(referencePoint.deviceId()));
//Rebroadcast event to ensure it results in no additional events
- testTopologyManager.listener.event(event);
+ testLinkManager.listener.event(new LinkEvent(LINK_REMOVED, NetTestTools.link("a", 1, "b", 2)));
assertTrue("The list contained an unexpected number of events", events.size() == 2);
//Testing link adding when links to remove exist
- eventsToAdd.clear();
events.clear();
- eventsToAdd.add(new LinkEvent(LINK_ADDED, NetTestTools.link("a", 1, "b", 2)));
- event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
- testTopologyManager.listener.event(event);
+ testLinkManager.listener.event(new LinkEvent(LINK_ADDED, NetTestTools.link("a", 1, "b", 2)));
assertTrue("The list contained an unexpected number of events", events.size() == 2);
assertTrue("The first element is of the wrong type.",
- events.get(0).type() == EDGE_PORT_REMOVED);
+ events.get(0).type() == EDGE_PORT_REMOVED);
assertTrue("The second element is of the wrong type.",
- events.get(1).type() == EDGE_PORT_REMOVED);
+ events.get(1).type() == EDGE_PORT_REMOVED);
testPoint = events.get(0).subject();
referencePoint = NetTestTools.connectPoint("a", 1);
assertTrue("The port numbers of the first element are incorrect",
- testPoint.port().toLong() == referencePoint.port().toLong());
+ testPoint.port().toLong() == referencePoint.port().toLong());
assertTrue("The device id of the first element is incorrect.",
- testPoint.deviceId().equals(referencePoint.deviceId()));
+ testPoint.deviceId().equals(referencePoint.deviceId()));
testPoint = events.get(1).subject();
referencePoint = NetTestTools.connectPoint("b", 2);
assertTrue("The port numbers of the second element are incorrect",
- testPoint.port().toLong() == referencePoint.port().toLong());
+ testPoint.port().toLong() == referencePoint.port().toLong());
assertTrue("The device id of the second element is incorrect.",
- testPoint.deviceId().equals(referencePoint.deviceId()));
+ testPoint.deviceId().equals(referencePoint.deviceId()));
//Apparent duplicate of previous method tests removal when the elements have already been removed
- eventsToAdd.clear();
events.clear();
- eventsToAdd.add(new LinkEvent(LINK_ADDED, NetTestTools.link("a", 1, "b", 2)));
- event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
- testTopologyManager.listener.event(event);
-
+ testLinkManager.listener.event(new LinkEvent(LINK_ADDED, NetTestTools.link("a", 1, "b", 2)));
assertTrue("The list should contain no events, the removed elements don't exist.", events.size() == 0);
}
@@ -192,8 +185,7 @@
//Setup
Device referenceDevice;
- TopologyEvent event;
- List<Event> eventsToAdd = Lists.newArrayList();
+ DeviceEvent event;
int numDevices = 10;
int numInfraPorts = 5;
totalPorts = 10;
@@ -201,14 +193,13 @@
//Test response to device added events
referenceDevice = NetTestTools.device("1");
- eventsToAdd.add(new DeviceEvent(DEVICE_ADDED, referenceDevice,
- new DefaultPort(referenceDevice, PortNumber.portNumber(1), true)));
- event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
- testTopologyManager.listener.event(event);
+ event = new DeviceEvent(DEVICE_ADDED, referenceDevice,
+ new DefaultPort(referenceDevice, PortNumber.portNumber(1), true));
+ testDeviceManager.listener.event(event);
//Check that ports were populated correctly
assertTrue("Unexpected number of new ports added",
- mgr.deviceService.getPorts(NetTestTools.did("1")).size() == 10);
+ mgr.deviceService.getPorts(NetTestTools.did("1")).size() == 10);
//Check that of the ten ports the half that are infrastructure ports aren't added
assertEquals("Unexpected number of new edge ports added", (totalPorts - numInfraPorts), events.size());
@@ -219,15 +210,15 @@
//Names here are irrelevant, the first 5 ports are populated as infrastructure, 6-10 are edge
for (int index = 0; index < events.size(); index++) {
assertEquals("Port added had unexpected port number.",
- events.get(index).subject().port(),
- NetTestTools.connectPoint("a", index + numInfraPorts + 1).port());
+ events.get(index).subject().port(),
+ NetTestTools.connectPoint("a", index + numInfraPorts + 1).port());
}
events.clear();
//Repost the event to test repeated posts
- testTopologyManager.listener.event(event);
+ testDeviceManager.listener.event(event);
assertEquals("The redundant notification should not have created additional notifications.",
- 0, events.size());
+ 0, events.size());
//Calculate the size of the returned iterable of edge points.
Iterable<ConnectPoint> pts = mgr.getEdgePoints();
Iterator pointIterator = pts.iterator();
@@ -238,45 +229,41 @@
assertEquals("Unexpected number of edge points", totalPorts - numInfraPorts, count);
//Testing device removal
events.clear();
- eventsToAdd.clear();
- eventsToAdd.add(new DeviceEvent(DEVICE_REMOVED, referenceDevice,
- new DefaultPort(referenceDevice, PortNumber.portNumber(1), true)));
- event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
- testTopologyManager.listener.event(event);
+ event = (new DeviceEvent(DEVICE_REMOVED, referenceDevice,
+ new DefaultPort(referenceDevice, PortNumber.portNumber(1), true)));
+ testDeviceManager.listener.event(event);
assertEquals("There should be five new events from removal of edge points",
- totalPorts - numInfraPorts, events.size());
+ totalPorts - numInfraPorts, events.size());
for (int index = 0; index < events.size(); index++) {
//Assert that the correct port numbers were removed in the correct order
assertEquals("Port removed had unexpected port number.",
- events.get(index).subject().port(),
- (NetTestTools.connectPoint("a", index + numInfraPorts + 1).port()));
+ events.get(index).subject().port(),
+ (NetTestTools.connectPoint("a", index + numInfraPorts + 1).port()));
//Assert that the events are of the correct type
assertEquals("Unexpected type of event", events.get(index).type(), EDGE_PORT_REMOVED);
}
events.clear();
//Rebroadcast event to check that it triggers no new behavior
- testTopologyManager.listener.event(event);
+ testDeviceManager.listener.event(event);
assertEquals("Rebroadcast of removal event should not produce additional events",
- 0, events.size());
+ 0, events.size());
//Testing device status change, changed from unavailable to available
events.clear();
- eventsToAdd.clear();
//Make sure that the devicemanager shows the device as available.
addDevice(referenceDevice, "1", 5);
devices.put(referenceDevice.id(), referenceDevice);
- eventsToAdd.add(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, referenceDevice));
- event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
- testTopologyManager.listener.event(event);
+ event = new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, referenceDevice);
+ testDeviceManager.listener.event(event);
//An earlier setup set half of the reference device ports to infrastructure
assertEquals("An unexpected number of events were generated.", totalPorts - numInfraPorts, events.size());
for (int i = 0; i < 5; i++) {
assertEquals("The event was not of the right type", events.get(i).type(), EDGE_PORT_ADDED);
}
events.clear();
- testTopologyManager.listener.event(event);
+ testDeviceManager.listener.event(event);
assertEquals("No events should have been generated for a set of existing ports.", 0, events.size());
//Test removal when state changes when the device becomes unavailable
@@ -288,21 +275,20 @@
no events will be generated since no ports will be provided in getPorts() to EdgeManager.
*/
alwaysReturnPorts = true;
- testTopologyManager.listener.event(event);
+ testDeviceManager.listener.event(event);
alwaysReturnPorts = false;
assertEquals("An unexpected number of events were created.", totalPorts - numInfraPorts, events.size());
for (int i = 0; i < 5; i++) {
EdgePortEvent edgeEvent = events.get(i);
assertEquals("The event is of an unexpected type.",
- EdgePortEvent.Type.EDGE_PORT_REMOVED, edgeEvent.type());
+ EdgePortEvent.Type.EDGE_PORT_REMOVED, edgeEvent.type());
assertEquals("The event pertains to an unexpected port", PortNumber.portNumber(i + numInfraPorts + 1),
- edgeEvent.subject().port());
+ edgeEvent.subject().port());
}
}
@Test
public void testInternalCache() {
- List<Event> eventsToAdd = Lists.newArrayList();
int numDevices = 10;
//Number of infrastructure ports per device
int numPorts = 5;
@@ -312,11 +298,8 @@
for (int i = 0; i < numDevices; i++) {
Device newDevice = NetTestTools.device(Integer.toString(i));
devices.put(newDevice.id(), newDevice);
- eventsToAdd.add(new DeviceEvent(DEVICE_ADDED, newDevice));
+ testDeviceManager.listener.event(new DeviceEvent(DEVICE_ADDED, newDevice));
}
- TopologyEvent event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
- testTopologyManager.listener.event(event);
-
//Check all ports have correct designations
ConnectPoint testPoint;
for (int deviceNum = 0; deviceNum < numDevices; deviceNum++) {
@@ -334,7 +317,7 @@
count++;
}
assertEquals("There are an unexpeceted number of edge points returned.",
- (totalPorts - numPorts) * numDevices, count);
+ (totalPorts - numPorts) * numDevices, count);
for (int deviceNumber = 0; deviceNumber < numDevices; deviceNumber++) {
count = 0;
for (ConnectPoint ignored : mgr.getEdgePoints(NetTestTools.did("1"))) {
@@ -349,8 +332,7 @@
public void testEmit() {
byte[] arr = new byte[10];
Device referenceDevice;
- TopologyEvent event;
- List<Event> eventsToAdd = Lists.newArrayList();
+ DeviceEvent event;
int numDevices = 10;
int numInfraPorts = 5;
totalPorts = 10;
@@ -360,16 +342,16 @@
}
for (int i = 0; i < numDevices; i++) {
referenceDevice = NetTestTools.device(Integer.toString(i));
- eventsToAdd.add(new DeviceEvent(DEVICE_ADDED, referenceDevice,
- new DefaultPort(referenceDevice, PortNumber.portNumber(1), true)));
+ testDeviceManager.listener.event(new DeviceEvent(DEVICE_ADDED, referenceDevice,
+ new DefaultPort(referenceDevice,
+ PortNumber.portNumber(1),
+ true)));
}
- event = new TopologyEvent(TOPOLOGY_CHANGED, null, eventsToAdd);
- testTopologyManager.listener.event(event);
mgr.emitPacket(ByteBuffer.wrap(arr), Optional.<TrafficTreatment>empty());
assertEquals("There were an unexpected number of emitted packets",
- (totalPorts - numInfraPorts) * numDevices, packets.size());
+ (totalPorts - numInfraPorts) * numDevices, packets.size());
Iterator<OutboundPacket> packetIter = packets.iterator();
OutboundPacket packet;
while (packetIter.hasNext()) {
@@ -381,7 +363,7 @@
mgr.emitPacket(NetTestTools.did(Integer.toString(1)), ByteBuffer.wrap(arr), Optional.<TrafficTreatment>empty());
assertEquals("Unexpected number of outbound packets were emitted.",
- totalPorts - numInfraPorts, packets.size());
+ totalPorts - numInfraPorts, packets.size());
packetIter = packets.iterator();
while (packetIter.hasNext()) {
packet = packetIter.next();
@@ -455,6 +437,7 @@
}
private class TestDeviceManager extends DeviceServiceAdapter {
+ private DeviceListener listener;
private Map<DeviceId, Device> devices;
@@ -490,6 +473,17 @@
public Iterable<Device> getAvailableDevices() {
return devices.values();
}
+
+
+ @Override
+ public void addListener(DeviceListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void removeListener(DeviceListener listener) {
+ this.listener = null;
+ }
}
private class TestPacketManager extends PacketServiceAdapter {
@@ -499,6 +493,15 @@
}
}
+ private class TestLinkManager extends LinkServiceAdapter {
+ private LinkListener listener;
+
+ @Override
+ public void addListener(LinkListener listener) {
+ this.listener = listener;
+ }
+ }
+
private class TestListener implements EdgePortListener {
private List<EdgePortEvent> events;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
similarity index 84%
rename from core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
rename to core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
index 2012457..836a3c2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
@@ -15,26 +15,8 @@
*/
package org.onosproject.store.host.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.onosproject.net.DefaultAnnotations.merge;
-import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -60,22 +42,34 @@
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.LogicalClockService;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.onosproject.net.host.HostEvent.Type.*;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages the inventory of hosts using a {@code EventuallyConsistentMap}.
*/
@Component(immediate = true)
@Service
-public class ECHostStore
+public class DistributedHostStore
extends AbstractStore<HostEvent, HostStoreDelegate>
implements HostStore {
@@ -84,15 +78,13 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LogicalClockService clockService;
-
- private EventuallyConsistentMap<HostId, DefaultHost> hosts;
+ private ConsistentMap<HostId, DefaultHost> host;
+ private Map<HostId, DefaultHost> hosts;
private final ConcurrentHashMap<HostId, DefaultHost> prevHosts =
new ConcurrentHashMap<>();
- private EventuallyConsistentMapListener<HostId, DefaultHost> hostLocationTracker =
+ private MapEventListener<HostId, DefaultHost> hostLocationTracker =
new HostLocationTracker();
@Activate
@@ -100,21 +92,22 @@
KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API);
- hosts = storageService.<HostId, DefaultHost>eventuallyConsistentMapBuilder()
+ host = storageService.<HostId, DefaultHost>consistentMapBuilder()
.withName("onos-hosts")
- .withSerializer(hostSerializer)
- .withTimestampProvider((k, v) -> clockService.getTimestamp())
+ .withRelaxedReadConsistency()
+ .withSerializer(Serializer.using(hostSerializer.build()))
.build();
- hosts.addListener(hostLocationTracker);
+ hosts = host.asJavaMap();
+
+ host.addListener(hostLocationTracker);
log.info("Started");
}
@Deactivate
public void deactivate() {
- hosts.removeListener(hostLocationTracker);
- hosts.destroy();
+ host.removeListener(hostLocationTracker);
prevHosts.clear();
log.info("Stopped");
@@ -249,11 +242,11 @@
return collection.stream().filter(predicate).collect(Collectors.toSet());
}
- private class HostLocationTracker implements EventuallyConsistentMapListener<HostId, DefaultHost> {
+ private class HostLocationTracker implements MapEventListener<HostId, DefaultHost> {
@Override
- public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) {
- DefaultHost host = checkNotNull(event.value());
- if (event.type() == PUT) {
+ public void event(MapEvent<HostId, DefaultHost> event) {
+ DefaultHost host = checkNotNull(event.value().value());
+ if (event.type() == MapEvent.Type.INSERT) {
Host prevHost = prevHosts.put(host.id(), host);
if (prevHost == null) {
notifyDelegate(new HostEvent(HOST_ADDED, host));
@@ -262,7 +255,7 @@
} else if (!Objects.equals(prevHost, host)) {
notifyDelegate(new HostEvent(HOST_UPDATED, host, prevHost));
}
- } else if (event.type() == REMOVE) {
+ } else if (event.type() == MapEvent.Type.REMOVE) {
if (prevHosts.remove(host.id()) != null) {
notifyDelegate(new HostEvent(HOST_REMOVED, host));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java b/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
index 851185b..4d7e7f3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
@@ -113,7 +113,7 @@
@Override
public void forward(ConnectPoint outPort, Host subject, ByteBuffer packet) {
- NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId());
+ /*NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId());
if (nodeId.equals(localNodeId)) {
if (delegate != null) {
delegate.emitResponse(outPort, packet);
@@ -122,7 +122,10 @@
log.info("Forwarding ARP response from {} to {}", subject.id(), outPort);
commService.unicast(new ArpResponseMessage(outPort, subject, packet.array()),
ARP_RESPONSE_MESSAGE, serializer::encode, nodeId);
- }
+ }*/
+ //FIXME: Code above may be unnecessary and therefore cluster messaging
+ // and pendingMessages could be pruned as well.
+ delegate.emitResponse(outPort, packet);
}
@Override
diff --git a/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java
similarity index 83%
rename from core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java
rename to core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java
index a7077a8..0732126 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java
@@ -27,8 +27,6 @@
import org.onosproject.net.host.DefaultHostDescription;
import org.onosproject.net.host.HostDescription;
import org.onosproject.net.provider.ProviderId;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.TestStorageService;
import java.util.HashSet;
@@ -37,9 +35,9 @@
/**
* Tests for the ECHostStore.
*/
-public class ECHostStoreTest extends TestCase {
+public class DistributedHostStoreTest extends TestCase {
- private ECHostStore ecXHostStore;
+ private DistributedHostStore ecXHostStore;
private static final HostId HOSTID = HostId.hostId(MacAddress.valueOf("1a:1a:1a:1a:1a:1a"));
@@ -50,10 +48,9 @@
@Before
public void setUp() {
- ecXHostStore = new ECHostStore();
+ ecXHostStore = new DistributedHostStore();
ecXHostStore.storageService = new TestStorageService();
- ecXHostStore.clockService = new TestLogicalClockService();
ecXHostStore.activate();
}
@@ -83,13 +80,4 @@
assertTrue(host.ipAddresses().contains(IP2));
}
- /**
- * Mocks the LogicalClockService class.
- */
- class TestLogicalClockService implements LogicalClockService {
- @Override
- public Timestamp getTimestamp() {
- return null;
- }
- }
}
\ No newline at end of file
diff --git a/tools/test/topos/attmplsfast.py b/tools/test/topos/attmplsfast.py
index 7f90446..cc7479b 100644
--- a/tools/test/topos/attmplsfast.py
+++ b/tools/test/topos/attmplsfast.py
@@ -15,19 +15,23 @@
class AttMplsTopo( Topo ):
"Internet Topology Zoo Specimen."
- def build( self ):
+ def __init__( self ):
"Create a topology."
+ # Initialize Topology
+ Topo.__init__( self )
+
# add nodes, switches first...
+ NY54 = self.addSwitch( 's25' ) # 40.728270, -73.994483
CMBR = self.addSwitch( 's1' ) # 42.373730, -71.109734
- CHCG = self.addSwitch( 's2' ) # 41.877461, -87.642892
+ CHCG = self.addSwitch( 's2', protocols='OpenFlow13' ) # 41.877461, -87.642892
CLEV = self.addSwitch( 's3' ) # 41.498928, -81.695217
RLGH = self.addSwitch( 's4' ) # 35.780150, -78.644026
ATLN = self.addSwitch( 's5' ) # 33.749017, -84.394168
PHLA = self.addSwitch( 's6' ) # 39.952906, -75.172278
WASH = self.addSwitch( 's7' ) # 38.906696, -77.035509
NSVL = self.addSwitch( 's8' ) # 36.166410, -86.787305
- STLS = self.addSwitch( 's9' ) # 38.626418, -90.198143
+ STLS = self.addSwitch( 's9', protocols='OpenFlow13' ) # 38.626418, -90.198143
NWOR = self.addSwitch( 's10' ) # 29.951475, -90.078434
HSTN = self.addSwitch( 's11' ) # 29.763249, -95.368332
SNAN = self.addSwitch( 's12' ) # 29.424331, -98.491745
@@ -40,12 +44,12 @@
PTLD = self.addSwitch( 's19' ) # 45.523317, -122.677768
STTL = self.addSwitch( 's20' ) # 47.607326, -122.331786
SLKC = self.addSwitch( 's21' ) # 40.759577, -111.895079
- LA03 = self.addSwitch( 's22' ) # 34.056346, -118.235951
+ LA03 = self.addSwitch( 's22', protocols='OpenFlow13' ) # 34.056346, -118.235951
SNDG = self.addSwitch( 's23' ) # 32.714564, -117.153528
PHNX = self.addSwitch( 's24' ) # 33.448289, -112.076299
- NY54 = self.addSwitch( 's25' ) # 40.728270, -73.994483
# ... and now hosts
+ NY54_host = self.addHost( 'h25' )
CMBR_host = self.addHost( 'h1' )
CHCG_host = self.addHost( 'h2' )
CLEV_host = self.addHost( 'h3' )
@@ -70,7 +74,6 @@
LA03_host = self.addHost( 'h22' )
SNDG_host = self.addHost( 'h23' )
PHNX_host = self.addHost( 'h24' )
- NY54_host = self.addHost( 'h25' )
# add edges between switch and corresponding host
self.addLink( NY54 , NY54_host )