Finished implementation of edge port manager using topology event async notifications.
Change-Id: Ide0eb947ba6400dafe11dac73af1466aaf0ce451
diff --git a/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java b/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java
index fdb7988..8536185 100644
--- a/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java
+++ b/core/net/src/main/java/org/onosproject/net/edgeservice/impl/EdgeManager.java
@@ -54,6 +54,7 @@
import java.util.Optional;
import java.util.Set;
+import static org.onosproject.net.device.DeviceEvent.Type.*;
import static org.onosproject.net.edge.EdgePortEvent.Type.EDGE_PORT_ADDED;
import static org.onosproject.net.edge.EdgePortEvent.Type.EDGE_PORT_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
@@ -65,17 +66,17 @@
@Service
public class EdgeManager implements EdgePortService {
- private final ListenerRegistry<EdgePortEvent, EdgePortListener>
- listenerRegistry = new ListenerRegistry<>();
-
private final Logger log = getLogger(getClass());
private Topology topology;
- private final TopologyListener topologyListener = new InnerTopologyListener();
-
private final Map<DeviceId, Set<ConnectPoint>> connectionPoints = Maps.newConcurrentMap();
+ private final ListenerRegistry<EdgePortEvent, EdgePortListener>
+ listenerRegistry = new ListenerRegistry<>();
+
+ private final TopologyListener topologyListener = new InnerTopologyListener();
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
@@ -102,19 +103,20 @@
log.info("Stopped");
}
+ @Override
public boolean isEdgePoint(ConnectPoint point) {
return !topologyService.isInfrastructure(topologyService.currentTopology(), point);
}
+ @Override
public Iterable<ConnectPoint> getEdgePoints() {
- //TODO if this is called before any notifications need to populate structure
ImmutableSet.Builder<ConnectPoint> builder = ImmutableSet.builder();
connectionPoints.forEach((k, v) -> v.forEach(builder::add));
return builder.build();
}
+ @Override
public Iterable<ConnectPoint> getEdgePoints(DeviceId deviceId) {
- //TODO if this is called before any notifications need to populate structure
ImmutableSet.Builder<ConnectPoint> builder = ImmutableSet.builder();
Set<ConnectPoint> set = connectionPoints.get(deviceId);
if (set != null) {
@@ -123,6 +125,7 @@
return builder.build();
}
+ @Override
public void emitPacket(ByteBuffer data, Optional<TrafficTreatment> treatment) {
TrafficTreatment.Builder builder = treatment.isPresent() ?
DefaultTrafficTreatment.builder(treatment.get()) :
@@ -130,13 +133,13 @@
getEdgePoints().forEach(p -> packetService.emit(packet(builder, p, data)));
}
+ @Override
public void emitPacket(DeviceId deviceId, ByteBuffer data,
Optional<TrafficTreatment> treatment) {
TrafficTreatment.Builder builder = treatment.isPresent() ?
DefaultTrafficTreatment.builder(treatment.get()) :
DefaultTrafficTreatment.builder();
getEdgePoints(deviceId).forEach(p -> packetService.emit(packet(builder, p, data)));
-
}
private OutboundPacket packet(TrafficTreatment.Builder builder, ConnectPoint point, ByteBuffer data) {
@@ -144,15 +147,19 @@
return new DefaultOutboundPacket(point.deviceId(), builder.build(), data);
}
+ @Override
public void addListener(EdgePortListener listener) {
listenerRegistry.addListener(listener);
}
+ @Override
public void removeListener(EdgePortListener listener) {
listenerRegistry.removeListener(listener);
}
+ // Internal listener for topo events used to keep our edge-port cache
+ // up to date.
private class InnerTopologyListener implements TopologyListener {
@Override
public void event(TopologyEvent event) {
@@ -161,12 +168,9 @@
if (triggers != null) {
triggers.forEach(reason -> {
if (reason instanceof DeviceEvent) {
- //TODO spuriously catches events not handled in the handler method
processDeviceEvent((DeviceEvent) reason);
} else if (reason instanceof LinkEvent) {
processLinkEvent((LinkEvent) reason);
- } else {
- System.out.println(reason.toString());
}
});
} else {
@@ -175,12 +179,13 @@
}
}
-
+ // Initial loading of the edge port cache.
private void loadAllEdgePorts() {
deviceService.getDevices().forEach(d -> deviceService.getPorts(d.id())
.forEach(p -> addEdgePort(new ConnectPoint(d.id(), p.number()))));
}
+ // Processes a link event by adding or removing its end-points in our cache.
private void processLinkEvent(LinkEvent event) {
if (event.type() == LinkEvent.Type.LINK_ADDED) {
removeEdgePort(event.subject().src());
@@ -189,22 +194,37 @@
addEdgePort(event.subject().src());
addEdgePort(event.subject().dst());
}
-
}
+ // Processes a device event by adding or removing its end-points in our cache.
private void processDeviceEvent(DeviceEvent event) {
+ DeviceEvent.Type type = event.type();
+ DeviceId id = event.subject().id();
- if (event.type() == DeviceEvent.Type.PORT_ADDED) {
- addEdgePort(new ConnectPoint(event.subject().id(), event.port().number()));
- } else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
- removeEdgePort(new ConnectPoint(event.subject().id(), event.port().number()));
+ if (type == DEVICE_ADDED ||
+ type == DEVICE_AVAILABILITY_CHANGED && deviceService.isAvailable(id)) {
+ // When device is added or becomes available, add all its ports
+ deviceService.getPorts(event.subject().id())
+ .forEach(p -> addEdgePort(new ConnectPoint(id, p.number())));
+ } else if (type == DEVICE_REMOVED ||
+ type == DEVICE_AVAILABILITY_CHANGED && !deviceService.isAvailable(id)) {
+ // When device is removed or becomes unavailable, remove all its ports
+ deviceService.getPorts(event.subject().id())
+ .forEach(p -> removeEdgePort(new ConnectPoint(id, p.number())));
+ connectionPoints.remove(id);
+
+ } else if (type == DeviceEvent.Type.PORT_ADDED ||
+ type == PORT_UPDATED && event.port().isEnabled()) {
+ addEdgePort(new ConnectPoint(id, event.port().number()));
+ } else if (type == DeviceEvent.Type.PORT_REMOVED ||
+ type == PORT_UPDATED && !event.port().isEnabled()) {
+ removeEdgePort(new ConnectPoint(id, event.port().number()));
}
}
+ // Adds the specified connection point to the edge points if needed.
private void addEdgePort(ConnectPoint point) {
- //TODO case of link removed and one of the end ports removed in same topo cycle
- //TODO pt2. resulting behavior will be adding a non-existent edge to the set
- if (!topologyService.isInfrastructure(topology, point)) {
+ if (!topologyService.isInfrastructure(topology, point) && !point.port().isLogical()) {
Set<ConnectPoint> set = connectionPoints.get(point.deviceId());
if (set == null) {
set = Sets.newConcurrentHashSet();
@@ -214,13 +234,11 @@
eventDispatcher.post(new EdgePortEvent(EDGE_PORT_ADDED, point));
}
}
-
}
+ // Removes the specified connection point from the edge points.
private void removeEdgePort(ConnectPoint point) {
- //TODO need to check that points still exist IE when a link and port are removed
- //TODO pt2 and both events are captures in the same topo update
- if (!topologyService.isInfrastructure(topology, point)) {
+ if (!point.port().isLogical()) {
Set<ConnectPoint> set = connectionPoints.get(point.deviceId());
if (set == null) {
return;
@@ -232,6 +250,5 @@
connectionPoints.remove(point.deviceId());
}
}
-
}
}