Work toward ONOS-1451: Separate Event Key space per instance
Final step in the implementation:
Implement the Topology Event filtering and processing logic
inside class TopologyEventPreprocessor:
- The topology events from each ONOS instance are kept separately
within class OnosInstanceLastAddEvents
- We keep only the last copy of the ADD events (not removed yet
by REMOVE events).
- If ADD MastershipEvent is received, all previously
stored ADD Topology events for the corresponding Switch DPID are
applied to the Topology
- If there were previously reordered events, we attempt to
add them again along with the other events.
- Before adding the events to the Topology, the events are
aggregated (squashed), and reordered in their natural order to
build a Topology. E.g., the ADD events order is:
MastershipEvent, SwitchEvent, PortEvent, LinkEvent, HostEvent.
The REMOVE events are in the reverse order.
Also, the REMOVE events are processed before the ADD events.
- If an event cannot be added to the topology, it is added
back to the collection of reordered events.
Also, replaced some of the Collection<> with List<> in some of the
event-related API to indicate that there is a semantic ordering
of the events.
NOTE: Some of the internals of TopologyEventPreprocessor should be optimized
for speed. E.g., we should create an additional lookup map:
Dpid -> Collection<TopologyEvent>
and then method getPostponedEvents() doesn't need to traverse all
topologyEvents entries.
Change-Id: Ic0d9ebf242f015adb60817921b239bb65dd560e2
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
index 05abc46..28c23da 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
@@ -72,7 +72,7 @@
private final IControllerRegistryService registryService;
private CopyOnWriteArrayList<ITopologyListener> topologyListeners;
private Kryo kryo = KryoFactory.newKryoObject();
- private final TopologyEventFilter eventFilter = new TopologyEventFilter();
+ private TopologyEventPreprocessor eventPreprocessor;
//
// Metrics
@@ -101,17 +101,6 @@
"ListenerEventRate");
//
- // Local state for keeping track of reordered events.
- // NOTE: Switch Events are not affected by the event reordering.
- //
- private Map<ByteBuffer, PortEvent> reorderedAddedPortEvents =
- new HashMap<ByteBuffer, PortEvent>();
- private Map<ByteBuffer, LinkEvent> reorderedAddedLinkEvents =
- new HashMap<ByteBuffer, LinkEvent>();
- private Map<ByteBuffer, HostEvent> reorderedAddedHostEvents =
- new HashMap<ByteBuffer, HostEvent>();
-
- //
// Local state for keeping track of locally discovered events so we can
// cleanup properly when a Switch or Port is removed.
//
@@ -178,6 +167,8 @@
datastore = new TopologyDatastore();
this.registryService = registryService;
this.topologyListeners = topologyListeners;
+ this.eventPreprocessor =
+ new TopologyEventPreprocessor(registryService);
}
/**
@@ -211,16 +202,16 @@
//
Collection<TopologyEvent> allTopologyEvents =
eventChannel.getAllEntries();
- Collection<EventEntry<TopologyEvent>> collection =
- new LinkedList<EventEntry<TopologyEvent>>();
+ List<EventEntry<TopologyEvent>> events =
+ new LinkedList<EventEntry<TopologyEvent>>();
for (TopologyEvent topologyEvent : allTopologyEvents) {
EventEntry<TopologyEvent> eventEntry =
new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
topologyEvent);
- collection.add(eventEntry);
+ events.add(eventEntry);
}
- processEvents(collection);
+ processEvents(events);
}
/**
@@ -228,8 +219,8 @@
*/
@Override
public void run() {
- Collection<EventEntry<TopologyEvent>> collection =
- new LinkedList<EventEntry<TopologyEvent>>();
+ List<EventEntry<TopologyEvent>> events =
+ new LinkedList<EventEntry<TopologyEvent>>();
this.setName("TopologyManager.EventHandler " + this.getId());
startup();
@@ -241,11 +232,11 @@
try {
EventEntry<TopologyEvent> eventEntry =
topologyEvents.take();
- collection.add(eventEntry);
- topologyEvents.drainTo(collection);
+ events.add(eventEntry);
+ topologyEvents.drainTo(events);
- processEvents(collection);
- collection.clear();
+ processEvents(events);
+ events.clear();
} catch (Exception exception) {
log.debug("Exception processing Topology Events: ",
exception);
@@ -258,112 +249,11 @@
*
* @param events the events to process.
*/
- private void processEvents(Collection<EventEntry<TopologyEvent>> events) {
- // Local state for computing the final set of events
- Map<ByteBuffer, MastershipEvent> addedMastershipEvents =
- new HashMap<>();
- Map<ByteBuffer, MastershipEvent> removedMastershipEvents =
- new HashMap<>();
- Map<ByteBuffer, SwitchEvent> addedSwitchEvents = new HashMap<>();
- Map<ByteBuffer, SwitchEvent> removedSwitchEvents = new HashMap<>();
- Map<ByteBuffer, PortEvent> addedPortEvents = new HashMap<>();
- Map<ByteBuffer, PortEvent> removedPortEvents = new HashMap<>();
- Map<ByteBuffer, LinkEvent> addedLinkEvents = new HashMap<>();
- Map<ByteBuffer, LinkEvent> removedLinkEvents = new HashMap<>();
- Map<ByteBuffer, HostEvent> addedHostEvents = new HashMap<>();
- Map<ByteBuffer, HostEvent> removedHostEvents = new HashMap<>();
-
+ private void processEvents(List<EventEntry<TopologyEvent>> events) {
//
- // Filter the events
+ // Pre-process the events
//
- events = eventFilter.filterEvents(events);
-
- //
- // Classify and suppress matching events
- //
- for (EventEntry<TopologyEvent> event : events) {
- TopologyEvent topologyEvent = event.eventData();
- MastershipEvent mastershipEvent = topologyEvent.getMastershipEvent();
- SwitchEvent switchEvent = topologyEvent.getSwitchEvent();
- PortEvent portEvent = topologyEvent.getPortEvent();
- LinkEvent linkEvent = topologyEvent.getLinkEvent();
- HostEvent hostEvent = topologyEvent.getHostEvent();
-
- //
- // Extract the events
- //
- // FIXME Following event squashing logic based only on ID
- // potentially lose attribute change.
- switch (event.eventType()) {
- case ENTRY_ADD:
- log.debug("Topology event ENTRY_ADD: {}", topologyEvent);
- if (mastershipEvent != null) {
- ByteBuffer id = mastershipEvent.getIDasByteBuffer();
- addedMastershipEvents.put(id, mastershipEvent);
- removedMastershipEvents.remove(id);
- }
- if (switchEvent != null) {
- ByteBuffer id = switchEvent.getIDasByteBuffer();
- addedSwitchEvents.put(id, switchEvent);
- removedSwitchEvents.remove(id);
- // Switch Events are not affected by event reordering
- }
- if (portEvent != null) {
- ByteBuffer id = portEvent.getIDasByteBuffer();
- addedPortEvents.put(id, portEvent);
- removedPortEvents.remove(id);
- reorderedAddedPortEvents.remove(id);
- }
- if (linkEvent != null) {
- ByteBuffer id = linkEvent.getIDasByteBuffer();
- addedLinkEvents.put(id, linkEvent);
- removedLinkEvents.remove(id);
- reorderedAddedLinkEvents.remove(id);
- }
- if (hostEvent != null) {
- ByteBuffer id = hostEvent.getIDasByteBuffer();
- addedHostEvents.put(id, hostEvent);
- removedHostEvents.remove(id);
- reorderedAddedHostEvents.remove(id);
- }
- break;
- case ENTRY_REMOVE:
- log.debug("Topology event ENTRY_REMOVE: {}", topologyEvent);
- if (mastershipEvent != null) {
- ByteBuffer id = mastershipEvent.getIDasByteBuffer();
- addedMastershipEvents.remove(id);
- removedMastershipEvents.put(id, mastershipEvent);
- }
- if (switchEvent != null) {
- ByteBuffer id = switchEvent.getIDasByteBuffer();
- addedSwitchEvents.remove(id);
- removedSwitchEvents.put(id, switchEvent);
- // Switch Events are not affected by event reordering
- }
- if (portEvent != null) {
- ByteBuffer id = portEvent.getIDasByteBuffer();
- addedPortEvents.remove(id);
- removedPortEvents.put(id, portEvent);
- reorderedAddedPortEvents.remove(id);
- }
- if (linkEvent != null) {
- ByteBuffer id = linkEvent.getIDasByteBuffer();
- addedLinkEvents.remove(id);
- removedLinkEvents.put(id, linkEvent);
- reorderedAddedLinkEvents.remove(id);
- }
- if (hostEvent != null) {
- ByteBuffer id = hostEvent.getIDasByteBuffer();
- addedHostEvents.remove(id);
- removedHostEvents.put(id, hostEvent);
- reorderedAddedHostEvents.remove(id);
- }
- break;
- default:
- log.error("Unknown topology event {}",
- event.eventType());
- }
- }
+ events = eventPreprocessor.processEvents(events);
//
// Lock the topology while it is modified
@@ -371,58 +261,71 @@
topology.acquireWriteLock();
try {
+ // Apply the events
//
- // Apply the classified events.
+ // NOTE: The events are suppose to be in the proper order
+ // to naturally build and update the topology.
//
- // Apply the "add" events in the proper order:
- // mastership, switch, port, link, host
- //
- // TODO: Currently, the Mastership events are not used,
- // so their processing ordering is not important (undefined).
- //
- for (MastershipEvent mastershipEvent :
- addedMastershipEvents.values()) {
- processAddedMastershipEvent(mastershipEvent);
- }
- for (SwitchEvent switchEvent : addedSwitchEvents.values()) {
- addSwitch(switchEvent);
- }
- for (PortEvent portEvent : addedPortEvents.values()) {
- addPort(portEvent);
- }
- for (LinkEvent linkEvent : addedLinkEvents.values()) {
- addLink(linkEvent);
- }
- for (HostEvent hostEvent : addedHostEvents.values()) {
- addHost(hostEvent);
- }
- //
- // Apply the "remove" events in the reverse order:
- // host, link, port, switch, mastership
- //
- for (HostEvent hostEvent : removedHostEvents.values()) {
- removeHost(hostEvent);
- }
- for (LinkEvent linkEvent : removedLinkEvents.values()) {
- removeLink(linkEvent);
- }
- for (PortEvent portEvent : removedPortEvents.values()) {
- removePort(portEvent);
- }
- for (SwitchEvent switchEvent : removedSwitchEvents.values()) {
- removeSwitch(switchEvent);
- }
- for (MastershipEvent mastershipEvent :
- removedMastershipEvents.values()) {
- processRemovedMastershipEvent(mastershipEvent);
- }
+ for (EventEntry<TopologyEvent> event : events) {
+ TopologyEvent topologyEvent = event.eventData();
- //
- // Apply reordered events
- //
- applyReorderedEvents(!addedSwitchEvents.isEmpty(),
- !addedPortEvents.isEmpty());
+ // Get the event itself
+ MastershipEvent mastershipEvent =
+ topologyEvent.getMastershipEvent();
+ SwitchEvent switchEvent = topologyEvent.getSwitchEvent();
+ PortEvent portEvent = topologyEvent.getPortEvent();
+ LinkEvent linkEvent = topologyEvent.getLinkEvent();
+ HostEvent hostEvent = topologyEvent.getHostEvent();
+ boolean wasAdded = false;
+ //
+ // Extract the events
+ //
+ switch (event.eventType()) {
+ case ENTRY_ADD:
+ if (mastershipEvent != null) {
+ wasAdded = addMastershipEvent(mastershipEvent);
+ }
+ if (switchEvent != null) {
+ wasAdded = addSwitch(switchEvent);
+ }
+ if (portEvent != null) {
+ wasAdded = addPort(portEvent);
+ }
+ if (linkEvent != null) {
+ wasAdded = addLink(linkEvent);
+ }
+ if (hostEvent != null) {
+ wasAdded = addHost(hostEvent);
+ }
+ // If the item wasn't added, probably it was reordered
+ if (!wasAdded) {
+ ByteBuffer id = topologyEvent.getIDasByteBuffer();
+ eventPreprocessor.reorderedEvents.put(id, topologyEvent);
+ }
+ break;
+ case ENTRY_REMOVE:
+ if (mastershipEvent != null) {
+ removeMastershipEvent(mastershipEvent);
+ }
+ if (switchEvent != null) {
+ removeSwitch(switchEvent);
+ }
+ if (portEvent != null) {
+ removePort(portEvent);
+ }
+ if (linkEvent != null) {
+ removeLink(linkEvent);
+ }
+ if (hostEvent != null) {
+ removeHost(hostEvent);
+ }
+ break;
+ default:
+ log.error("Unknown topology event {}",
+ event.eventType());
+ }
+ }
} finally {
//
// Topology modifications completed: Release the lock
@@ -589,56 +492,6 @@
}
/**
- * Apply reordered events.
- *
- * @param hasAddedSwitchEvents true if there were Added Switch Events.
- * @param hasAddedPortEvents true if there were Added Port Events.
- */
- @GuardedBy("topology.writeLock")
- private void applyReorderedEvents(boolean hasAddedSwitchEvents,
- boolean hasAddedPortEvents) {
- if (!(hasAddedSwitchEvents || hasAddedPortEvents)) {
- return; // Nothing to do
- }
-
- //
- // Try to apply the reordered events.
- //
- // NOTE: For simplicity we try to apply all events of a particular
- // type if any "parent" type event was processed:
- // - Apply reordered Port Events if Switches were added
- // - Apply reordered Link and Host Events if Switches or Ports
- // were added
- //
-
- //
- // Apply reordered Port Events if Switches were added
- //
- if (hasAddedSwitchEvents) {
- Map<ByteBuffer, PortEvent> portEvents = reorderedAddedPortEvents;
- reorderedAddedPortEvents = new HashMap<>();
- for (PortEvent portEvent : portEvents.values()) {
- addPort(portEvent);
- }
- }
- //
- // Apply reordered Link and Host Events if Switches or Ports
- // were added.
- //
- Map<ByteBuffer, LinkEvent> linkEvents = reorderedAddedLinkEvents;
- reorderedAddedLinkEvents = new HashMap<>();
- for (LinkEvent linkEvent : linkEvents.values()) {
- addLink(linkEvent);
- }
- //
- Map<ByteBuffer, HostEvent> hostEvents = reorderedAddedHostEvents;
- reorderedAddedHostEvents = new HashMap<>();
- for (HostEvent hostEvent : hostEvents.values()) {
- addHost(hostEvent);
- }
- }
-
- /**
* Mastership updated event.
*
* @param mastershipEvent the mastership event.
@@ -974,28 +827,26 @@
//
/**
- * Processes added Switch Mastership event.
+ * Adds Switch Mastership event.
*
* @param mastershipEvent the MastershipEvent to process.
+ * @return true if the item was successfully added, otherwise false.
*/
@GuardedBy("topology.writeLock")
- private void processAddedMastershipEvent(MastershipEvent mastershipEvent) {
- log.debug("Processing added Mastership event {}",
- mastershipEvent);
- // TODO: Not implemented/used yet.
+ private boolean addMastershipEvent(MastershipEvent mastershipEvent) {
+ log.debug("Added Mastership event {}", mastershipEvent);
apiAddedMastershipEvents.add(mastershipEvent);
+ return true;
}
/**
- * Processes removed Switch Mastership event.
+ * Removes Switch Mastership event.
*
* @param mastershipEvent the MastershipEvent to process.
*/
@GuardedBy("topology.writeLock")
- private void processRemovedMastershipEvent(MastershipEvent mastershipEvent) {
- log.debug("Processing removed Mastership event {}",
- mastershipEvent);
- // TODO: Not implemented/used yet.
+ private void removeMastershipEvent(MastershipEvent mastershipEvent) {
+ log.debug("Removed Mastership event {}", mastershipEvent);
apiRemovedMastershipEvents.add(mastershipEvent);
}
@@ -1003,9 +854,10 @@
* Adds a switch to the topology replica.
*
* @param switchEvent the SwitchEvent with the switch to add.
+ * @return true if the item was successfully added, otherwise false.
*/
@GuardedBy("topology.writeLock")
- private void addSwitch(SwitchEvent switchEvent) {
+ private boolean addSwitch(SwitchEvent switchEvent) {
if (log.isDebugEnabled()) {
SwitchEvent sw = topology.getSwitchEvent(switchEvent.getDpid());
if (sw != null) {
@@ -1016,12 +868,14 @@
}
topology.putSwitch(switchEvent.freeze());
apiAddedSwitchEvents.add(switchEvent);
+ return true;
}
/**
* Removes a switch from the topology replica.
* <p/>
- * It will call {@link #removePort(PortEvent)} for each ports on this switch.
+ * It will call {@link #removePort(PortEvent)} for each ports on this
+ * switch.
*
* @param switchEvent the SwitchEvent with the switch to remove.
*/
@@ -1058,16 +912,15 @@
* Adds a port to the topology replica.
*
* @param portEvent the PortEvent with the port to add.
+ * @return true if the item was successfully added, otherwise false.
*/
@GuardedBy("topology.writeLock")
- private void addPort(PortEvent portEvent) {
+ private boolean addPort(PortEvent portEvent) {
Switch sw = topology.getSwitch(portEvent.getDpid());
if (sw == null) {
+ // Reordered event
log.debug("{} reordered because switch is null", portEvent);
- // Reordered event: delay the event in local cache
- ByteBuffer id = portEvent.getIDasByteBuffer();
- reorderedAddedPortEvents.put(id, portEvent);
- return;
+ return false;
}
if (log.isDebugEnabled()) {
@@ -1080,6 +933,7 @@
}
topology.putPort(portEvent.freeze());
apiAddedPortEvents.add(portEvent);
+ return true;
}
/**
@@ -1160,32 +1014,34 @@
* It will remove attachment points from each hosts using the same ports.
*
* @param linkEvent the LinkEvent with the link to add.
+ * @return true if the item was successfully added, otherwise false.
*/
@GuardedBy("topology.writeLock")
- private void addLink(LinkEvent linkEvent) {
+ private boolean addLink(LinkEvent linkEvent) {
PortEvent srcPort = topology.getPortEvent(linkEvent.getSrc());
PortEvent dstPort = topology.getPortEvent(linkEvent.getDst());
if ((srcPort == null) || (dstPort == null)) {
+ // Reordered event
log.debug("{} reordered because {} port is null", linkEvent,
(srcPort == null) ? "src" : "dst");
-
- // XXX domain knowledge: port must be present before link.
- // Reordered event: delay the event in local cache
- ByteBuffer id = linkEvent.getIDasByteBuffer();
- reorderedAddedLinkEvents.put(id, linkEvent);
- return;
+ return false;
}
- // XXX domain knowledge: Sanity check: Port cannot have both Link and Host
- // FIXME potentially local replica may not be up-to-date yet due to HZ delay.
- // may need to manage local truth and use them instead.
+ //
+ // XXX domain knowledge: Sanity check: Port cannot have both Link and
+ // Host.
+ //
+ // FIXME: Potentially local replica may not be up-to-date yet due to
+ // Hazelcast delay.
+ // FIXME: May need to manage local truth and use them instead.
+ //
if (topology.getLinkEvent(linkEvent.getLinkTuple()) == null) {
// Only check for existing Host when adding new Link.
-
// Remove all Hosts attached to the ports on both ends
- Set<HostEvent> hostsToUpdate = new TreeSet<>(new Comparator<HostEvent>() {
- // comparison only using ID(=MAC)
+ Set<HostEvent> hostsToUpdate =
+ new TreeSet<>(new Comparator<HostEvent>() {
+ // Comparison only using ID(=MAC)
@Override
public int compare(HostEvent o1, HostEvent o2) {
return Long.compare(o1.getMac().toLong(), o2.getMac().toLong());
@@ -1206,9 +1062,9 @@
hostsToUpdate.add(hostEvent);
}
}
- // remove attachment point from them.
+ // Remove attachment point from them
for (HostEvent hostEvent : hostsToUpdate) {
- // remove port from attachment point and update
+ // Remove port from attachment point and update
HostEvent newHostEvent = new HostEvent(hostEvent);
newHostEvent.removeAttachmentPoint(srcPort.getSwitchPort());
newHostEvent.removeAttachmentPoint(dstPort.getSwitchPort());
@@ -1235,6 +1091,7 @@
}
topology.putLink(linkEvent.freeze());
apiAddedLinkEvents.add(linkEvent);
+ return true;
}
/**
@@ -1290,12 +1147,14 @@
* <p/>
* TODO: Host-related work is incomplete.
* TODO: Eventually, we might need to consider reordering
- * or {@link #addLink(LinkEvent)} and {@link #addHost(HostEvent)} events on the same port.
+ * or {@link #addLink(LinkEvent)} and {@link #addHost(HostEvent)} events
+ * on the same port.
*
* @param hostEvent the HostEvent with the host to add.
+ * @return true if the item was successfully added, otherwise false.
*/
@GuardedBy("topology.writeLock")
- private void addHost(HostEvent hostEvent) {
+ private boolean addHost(HostEvent hostEvent) {
// TODO Decide how to handle update scenario.
// If the new HostEvent has less attachment point compared to
@@ -1316,10 +1175,8 @@
Port port = topology.getPort(swp.getDpid(), swp.getPortNumber());
if (port == null) {
log.debug("{} reordered because port {} was not there", hostEvent, swp);
- // Reordered event: delay the event in local cache
- ByteBuffer id = hostEvent.getIDasByteBuffer();
- reorderedAddedHostEvents.put(id, hostEvent);
- return; // should not continue if re-applying later
+ // Reordered event
+ return false; // should not continue if re-applying later
}
// Attached Ports must not have Link
if (port.getOutgoingLink() != null ||
@@ -1345,7 +1202,7 @@
+ "original: {}, modified: {}", hostEvent, modifiedHostEvent);
// TODO Should we call #removeHost to trigger remove event?
// only if this call is update.
- return;
+ return false;
}
if (log.isDebugEnabled()) {
@@ -1358,7 +1215,9 @@
}
topology.putHost(modifiedHostEvent.freeze());
apiAddedHostEvents.add(modifiedHostEvent);
+ return true;
}
+ return false;
}
/**
@@ -1386,11 +1245,11 @@
/**
* Read the whole topology from the database.
*
- * @return a collection of EventEntry-encapsulated Topology Events for
+ * @return a list of EventEntry-encapsulated Topology Events for
* the whole topology.
*/
- private Collection<EventEntry<TopologyEvent>> readWholeTopologyFromDB() {
- Collection<EventEntry<TopologyEvent>> collection =
+ private List<EventEntry<TopologyEvent>> readWholeTopologyFromDB() {
+ List<EventEntry<TopologyEvent>> events =
new LinkedList<EventEntry<TopologyEvent>>();
// XXX May need to clear whole topology first, depending on
@@ -1414,7 +1273,7 @@
EventEntry<TopologyEvent> eventEntry =
new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
topologyEvent);
- collection.add(eventEntry);
+ events.add(eventEntry);
}
// Add all active ports
@@ -1437,7 +1296,7 @@
EventEntry<TopologyEvent> eventEntry =
new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
topologyEvent);
- collection.add(eventEntry);
+ events.add(eventEntry);
}
for (KVDevice d : KVDevice.getAllDevices()) {
@@ -1469,9 +1328,9 @@
EventEntry<TopologyEvent> eventEntry =
new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
topologyEvent);
- collection.add(eventEntry);
+ events.add(eventEntry);
}
- return collection;
+ return events;
}
}