EdgeManager fixes
- Edge point update should be triggered based on TopologyEvent.
{Device, Link}Event can be triggered before TopologyEvent,
which will result in use of the outdated Topology to determine
if the port is an edge port leading to incorrect edge port set.
(ONOS-4896)
- Ports on Edge Link should not be considered part of infrastructure,
should be candidate for Edge point.
Change-Id: I7d69cc242ba7849996c1105ccd1956975db63480
diff --git a/core/common/src/main/java/org/onosproject/common/DefaultTopology.java b/core/common/src/main/java/org/onosproject/common/DefaultTopology.java
index a2bc4a5..8c4fbc8 100644
--- a/core/common/src/main/java/org/onosproject/common/DefaultTopology.java
+++ b/core/common/src/main/java/org/onosproject/common/DefaultTopology.java
@@ -37,6 +37,7 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.DisjointPath;
import org.onosproject.net.Link;
+import org.onosproject.net.Link.Type;
import org.onosproject.net.Path;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.net.topology.ClusterId;
@@ -442,14 +443,17 @@
riskProfile2.put(new TopologyEdge() {
Link cur = l;
+ @Override
public Link link() {
return cur;
}
+ @Override
public TopologyVertex src() {
return () -> src;
}
+ @Override
public TopologyVertex dst() {
return () -> dst;
}
@@ -568,6 +572,12 @@
private ImmutableSet<ConnectPoint> findInfrastructurePoints() {
ImmutableSet.Builder<ConnectPoint> builder = ImmutableSet.builder();
for (TopologyEdge edge : graph.getEdges()) {
+ if (edge.link().type() == Type.EDGE) {
+ // exclude EDGE link from infrastructure link
+ // - Device <-> Host
+ // - Device <-> remote domain Device
+ continue;
+ }
builder.add(edge.link().src());
builder.add(edge.link().dst());
}
diff --git a/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java b/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java
index d841c52..ed5fa2c 100644
--- a/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java
+++ b/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java
@@ -29,8 +29,8 @@
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.Link.Type;
import org.onosproject.net.device.DeviceEvent;
-import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.edge.EdgePortEvent;
import org.onosproject.net.edge.EdgePortListener;
@@ -38,12 +38,12 @@
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.link.LinkEvent;
-import org.onosproject.net.link.LinkListener;
-import org.onosproject.net.link.LinkService;
import org.onosproject.net.packet.DefaultOutboundPacket;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketService;
import org.onosproject.net.topology.Topology;
+import org.onosproject.net.topology.TopologyEvent;
+import org.onosproject.net.topology.TopologyListener;
import org.onosproject.net.topology.TopologyService;
import org.slf4j.Logger;
@@ -72,11 +72,12 @@
private Topology topology;
+ /**
+ * Set of edge ConnectPoints per Device.
+ */
private final Map<DeviceId, Set<ConnectPoint>> connectionPoints = Maps.newConcurrentMap();
- private final LinkListener linkListener = new InnerLinkListener();
-
- private final DeviceListener deviceListener = new InnerDeviceListener();
+ private final TopologyListener topologyListener = new InnerTopologyListener();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@@ -87,23 +88,18 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyService topologyService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LinkService linkService;
-
@Activate
public void activate() {
eventDispatcher.addSink(EdgePortEvent.class, listenerRegistry);
- deviceService.addListener(deviceListener);
- linkService.addListener(linkListener);
+ topologyService.addListener(topologyListener);
loadAllEdgePorts();
log.info("Started");
}
@Deactivate
public void deactivate() {
+ topologyService.removeListener(topologyListener);
eventDispatcher.removeSink(EdgePortEvent.class);
- deviceService.removeListener(deviceListener);
- linkService.removeListener(linkListener);
log.info("Stopped");
}
@@ -153,21 +149,18 @@
return new DefaultOutboundPacket(point.deviceId(), builder.build(), data);
}
- private class InnerLinkListener implements LinkListener {
+ private class InnerTopologyListener implements TopologyListener {
@Override
- public void event(LinkEvent event) {
- topology = topologyService.currentTopology();
- processLinkEvent(event);
- }
- }
-
- private class InnerDeviceListener implements DeviceListener {
-
- @Override
- public void event(DeviceEvent event) {
- topology = topologyService.currentTopology();
- processDeviceEvent(event);
+ public void event(TopologyEvent event) {
+ topology = event.subject();
+ event.reasons().forEach(reason -> {
+ if (reason instanceof DeviceEvent) {
+ processDeviceEvent((DeviceEvent) reason);
+ } else if (reason instanceof LinkEvent) {
+ processLinkEvent((LinkEvent) reason);
+ }
+ });
}
}
@@ -180,12 +173,21 @@
// Processes a link event by adding or removing its end-points in our cache.
private void processLinkEvent(LinkEvent event) {
- if (event.type() == LinkEvent.Type.LINK_ADDED) {
- removeEdgePort(event.subject().src());
- removeEdgePort(event.subject().dst());
- } else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
+ // negative Link event can result in increase of edge ports
+ boolean addEdgePort = event.type() == LinkEvent.Type.LINK_REMOVED;
+
+ // but if the Link is an Edge type,
+ // it will be the opposite
+ if (event.subject().type() == Type.EDGE) {
+ addEdgePort = !addEdgePort;
+ }
+
+ if (addEdgePort) {
addEdgePort(event.subject().src());
addEdgePort(event.subject().dst());
+ } else {
+ removeEdgePort(event.subject().src());
+ removeEdgePort(event.subject().dst());
}
}
@@ -195,6 +197,8 @@
DeviceEvent.Type type = event.type();
DeviceId id = event.subject().id();
+ // FIXME there's still chance that Topology and Device Service
+ // view is out-of-sync
if (type == DEVICE_ADDED ||
type == DEVICE_AVAILABILITY_CHANGED && deviceService.isAvailable(id)) {
// When device is added or becomes available, add all its ports
@@ -202,10 +206,11 @@
.forEach(p -> addEdgePort(new ConnectPoint(id, p.number())));
} else if (type == DEVICE_REMOVED ||
type == DEVICE_AVAILABILITY_CHANGED && !deviceService.isAvailable(id)) {
- // When device is removed or becomes unavailable, remove all its ports
- deviceService.getPorts(event.subject().id())
- .forEach(p -> removeEdgePort(new ConnectPoint(id, p.number())));
- connectionPoints.remove(id);
+ // When device is removed or becomes unavailable, remove all its ports.
+ // Note: cannot rely on Device subsystem, ports may be gone.
+ Optional.ofNullable(connectionPoints.remove(id))
+ .orElse(ImmutableSet.of())
+ .forEach(point -> post(new EdgePortEvent(EDGE_PORT_REMOVED, point)));
} else if (type == DeviceEvent.Type.PORT_ADDED ||
type == PORT_UPDATED && event.port().isEnabled()) {
@@ -216,14 +221,16 @@
}
}
+ private boolean isEdgePort(ConnectPoint point) {
+ return !topologyService.isInfrastructure(topology, point) &&
+ !point.port().isLogical();
+ }
+
// Adds the specified connection point to the edge points if needed.
private void addEdgePort(ConnectPoint point) {
- if (!topologyService.isInfrastructure(topology, point) && !point.port().isLogical()) {
- Set<ConnectPoint> set = connectionPoints.get(point.deviceId());
- if (set == null) {
- set = Sets.newConcurrentHashSet();
- connectionPoints.put(point.deviceId(), set);
- }
+ if (isEdgePort(point)) {
+ Set<ConnectPoint> set = connectionPoints.computeIfAbsent(point.deviceId(),
+ (k) -> Sets.newConcurrentHashSet());
if (set.add(point)) {
post(new EdgePortEvent(EDGE_PORT_ADDED, point));
}
@@ -232,7 +239,7 @@
// Removes the specified connection point from the edge points.
private void removeEdgePort(ConnectPoint point) {
- if (!point.port().isLogical()) {
+ if (isEdgePort(point)) {
Set<ConnectPoint> set = connectionPoints.get(point.deviceId());
if (set == null) {
return;
@@ -241,7 +248,13 @@
post(new EdgePortEvent(EDGE_PORT_REMOVED, point));
}
if (set.isEmpty()) {
- connectionPoints.remove(point.deviceId());
+ connectionPoints.computeIfPresent(point.deviceId(), (k, v) -> {
+ if (v.isEmpty()) {
+ return v;
+ } else {
+ return v;
+ }
+ });
}
}
}
diff --git a/core/net/src/test/java/org/onosproject/net/edgeservice/impl/EdgeManagerTest.java b/core/net/src/test/java/org/onosproject/net/edgeservice/impl/EdgeManagerTest.java
index 2f55de9..b01ec39 100644
--- a/core/net/src/test/java/org/onosproject/net/edgeservice/impl/EdgeManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/edgeservice/impl/EdgeManagerTest.java
@@ -15,13 +15,16 @@
*/
package org.onosproject.net.edgeservice.impl;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.common.event.impl.TestEventDispatcher;
+import org.onosproject.event.Event;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultPort;
import org.onosproject.net.Device;
@@ -36,11 +39,11 @@
import org.onosproject.net.edge.EdgePortListener;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.link.LinkEvent;
-import org.onosproject.net.link.LinkListener;
-import org.onosproject.net.link.LinkServiceAdapter;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketServiceAdapter;
import org.onosproject.net.topology.Topology;
+import org.onosproject.net.topology.TopologyEvent;
+import org.onosproject.net.topology.TopologyEvent.Type;
import org.onosproject.net.topology.TopologyListener;
import org.onosproject.net.topology.TopologyServiceAdapter;
@@ -52,6 +55,7 @@
import java.util.Optional;
import java.util.Set;
+import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.onosproject.net.NetTestTools.injectEventDispatcher;
import static org.onosproject.net.device.DeviceEvent.Type.*;
@@ -74,7 +78,6 @@
private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
private Set<OutboundPacket> packets = Sets.newConcurrentHashSet();
private final EdgePortListener testListener = new TestListener(events);
- private TestLinkManager testLinkManager;
private TestDeviceManager testDeviceManager;
private TestTopologyManager testTopologyManager;
@@ -87,8 +90,6 @@
testDeviceManager = new TestDeviceManager(devices);
mgr.deviceService = testDeviceManager;
mgr.packetService = new TestPacketManager();
- testLinkManager = new TestLinkManager();
- mgr.linkService = testLinkManager;
mgr.activate();
mgr.addListener(testListener);
@@ -126,7 +127,7 @@
ConnectPoint testPoint, referencePoint;
//Testing link removal
- testLinkManager.listener.event(new LinkEvent(LINK_REMOVED, NetTestTools.link("a", 1, "b", 2)));
+ postTopologyEvent(new LinkEvent(LINK_REMOVED, NetTestTools.link("a", 1, "b", 2)));
assertTrue("The list contained an unexpected number of events", events.size() == 2);
assertTrue("The first element is of the wrong type.",
@@ -147,12 +148,12 @@
testPoint.deviceId().equals(referencePoint.deviceId()));
//Rebroadcast event to ensure it results in no additional events
- testLinkManager.listener.event(new LinkEvent(LINK_REMOVED, NetTestTools.link("a", 1, "b", 2)));
+ postTopologyEvent(new LinkEvent(LINK_REMOVED, NetTestTools.link("a", 1, "b", 2)));
assertTrue("The list contained an unexpected number of events", events.size() == 2);
//Testing link adding when links to remove exist
events.clear();
- testLinkManager.listener.event(new LinkEvent(LINK_ADDED, NetTestTools.link("a", 1, "b", 2)));
+ postTopologyEvent(new LinkEvent(LINK_ADDED, NetTestTools.link("a", 1, "b", 2)));
assertTrue("The list contained an unexpected number of events", events.size() == 2);
assertTrue("The first element is of the wrong type.",
@@ -176,10 +177,11 @@
//Apparent duplicate of previous method tests removal when the elements have already been removed
events.clear();
- testLinkManager.listener.event(new LinkEvent(LINK_ADDED, NetTestTools.link("a", 1, "b", 2)));
+ postTopologyEvent(new LinkEvent(LINK_ADDED, NetTestTools.link("a", 1, "b", 2)));
assertTrue("The list should contain no events, the removed elements don't exist.", events.size() == 0);
}
+
@Test
public void testDeviceUpdates() {
//Setup
@@ -195,7 +197,7 @@
referenceDevice = NetTestTools.device("1");
event = new DeviceEvent(DEVICE_ADDED, referenceDevice,
new DefaultPort(referenceDevice, PortNumber.portNumber(1), true));
- testDeviceManager.listener.event(event);
+ postTopologyEvent(event);
//Check that ports were populated correctly
assertTrue("Unexpected number of new ports added",
@@ -216,7 +218,7 @@
events.clear();
//Repost the event to test repeated posts
- testDeviceManager.listener.event(event);
+ postTopologyEvent(event);
assertEquals("The redundant notification should not have created additional notifications.",
0, events.size());
//Calculate the size of the returned iterable of edge points.
@@ -231,21 +233,21 @@
events.clear();
event = (new DeviceEvent(DEVICE_REMOVED, referenceDevice,
new DefaultPort(referenceDevice, PortNumber.portNumber(1), true)));
- testDeviceManager.listener.event(event);
+ postTopologyEvent(event);
assertEquals("There should be five new events from removal of edge points",
totalPorts - numInfraPorts, events.size());
for (int index = 0; index < events.size(); index++) {
//Assert that the correct port numbers were removed in the correct order
- assertEquals("Port removed had unexpected port number.",
- events.get(index).subject().port(),
- (NetTestTools.connectPoint("a", index + numInfraPorts + 1).port()));
+ assertThat("Port removed had unexpected port number.",
+ events.get(index).subject().port().toLong(),
+ is(greaterThanOrEqualTo((long) numInfraPorts)));
//Assert that the events are of the correct type
assertEquals("Unexpected type of event", events.get(index).type(), EDGE_PORT_REMOVED);
}
events.clear();
//Rebroadcast event to check that it triggers no new behavior
- testDeviceManager.listener.event(event);
+ postTopologyEvent(event);
assertEquals("Rebroadcast of removal event should not produce additional events",
0, events.size());
@@ -256,14 +258,14 @@
devices.put(referenceDevice.id(), referenceDevice);
event = new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, referenceDevice);
- testDeviceManager.listener.event(event);
+ postTopologyEvent(event);
//An earlier setup set half of the reference device ports to infrastructure
assertEquals("An unexpected number of events were generated.", totalPorts - numInfraPorts, events.size());
for (int i = 0; i < 5; i++) {
assertEquals("The event was not of the right type", events.get(i).type(), EDGE_PORT_ADDED);
}
events.clear();
- testDeviceManager.listener.event(event);
+ postTopologyEvent(event);
assertEquals("No events should have been generated for a set of existing ports.", 0, events.size());
//Test removal when state changes when the device becomes unavailable
@@ -275,18 +277,20 @@
no events will be generated since no ports will be provided in getPorts() to EdgeManager.
*/
alwaysReturnPorts = true;
- testDeviceManager.listener.event(event);
+ postTopologyEvent(event);
alwaysReturnPorts = false;
assertEquals("An unexpected number of events were created.", totalPorts - numInfraPorts, events.size());
for (int i = 0; i < 5; i++) {
EdgePortEvent edgeEvent = events.get(i);
assertEquals("The event is of an unexpected type.",
EdgePortEvent.Type.EDGE_PORT_REMOVED, edgeEvent.type());
- assertEquals("The event pertains to an unexpected port", PortNumber.portNumber(i + numInfraPorts + 1),
- edgeEvent.subject().port());
+ assertThat("The event pertains to an unexpected port",
+ edgeEvent.subject().port().toLong(),
+ is(greaterThanOrEqualTo((long) numInfraPorts)));
}
}
+
@Test
public void testInternalCache() {
int numDevices = 10;
@@ -298,7 +302,7 @@
for (int i = 0; i < numDevices; i++) {
Device newDevice = NetTestTools.device(Integer.toString(i));
devices.put(newDevice.id(), newDevice);
- testDeviceManager.listener.event(new DeviceEvent(DEVICE_ADDED, newDevice));
+ postTopologyEvent(new DeviceEvent(DEVICE_ADDED, newDevice));
}
//Check all ports have correct designations
ConnectPoint testPoint;
@@ -342,7 +346,7 @@
}
for (int i = 0; i < numDevices; i++) {
referenceDevice = NetTestTools.device(Integer.toString(i));
- testDeviceManager.listener.event(new DeviceEvent(DEVICE_ADDED, referenceDevice,
+ postTopologyEvent(new DeviceEvent(DEVICE_ADDED, referenceDevice,
new DefaultPort(referenceDevice,
PortNumber.portNumber(1),
true)));
@@ -373,6 +377,27 @@
/**
+ * Creates TopologyEvent triggered by {@code event}.
+ *
+ * @param event reason of the TopologyEvent
+ * @return TopologyEvent
+ */
+ private TopologyEvent topologyEventOf(Event event) {
+ return new TopologyEvent(Type.TOPOLOGY_CHANGED, null, ImmutableList.of(event));
+ }
+
+
+ /**
+ * Post Event dispatched from TopologyManager.
+ *
+ * @param event Event
+ */
+ private void postTopologyEvent(Event event) {
+ testTopologyManager.listener.event(topologyEventOf(event));
+ }
+
+
+ /**
* @param numDevices the number of devices to populate.
* @param numInfraPorts the number of ports to be set as infrastructure on each device, numbered base 0, ports 0
* through numInfraPorts - 1
@@ -493,15 +518,6 @@
}
}
- private class TestLinkManager extends LinkServiceAdapter {
- private LinkListener listener;
-
- @Override
- public void addListener(LinkListener listener) {
- this.listener = listener;
- }
- }
-
private class TestListener implements EdgePortListener {
private List<EdgePortEvent> events;