Work toward ONOS-1451: Separate Event Key space per instance
Misc changes and cleanup (Step 3):
* Enabled the usage of ONOS Instance ID inside class TopologyEvent
- Added the ONOS Instance ID as part of the TopologyEvent key
- Use the ONOS Instance ID inside method toString() and equals()
- Also, added new method TopologyEvent.getIDasByteBuffer()
* Added support for Mastership Events inside class TopologyEvents,
including the JSON output.
NOTE: For now the JSON output for the Topology object itself doesn't
contain the corresponding Mastership info. The need for
adding such info is TBD.
* Added new class TopologyEventFilter that will perform the filtering
if incoming events. For now the internal logic of this filter is empty.
* Reordered some of the MastershipEvent-related code right before the
SwitchEvent-related code so it is more consistent.
Change-Id: I940c4686b776f5136a10c25bc49278b69c548fc5
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
index 10fa6be..05abc46 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
@@ -72,6 +72,7 @@
private final IControllerRegistryService registryService;
private CopyOnWriteArrayList<ITopologyListener> topologyListeners;
private Kryo kryo = KryoFactory.newKryoObject();
+ private final TopologyEventFilter eventFilter = new TopologyEventFilter();
//
// Metrics
@@ -153,6 +154,10 @@
// - Queue of events, which will be dispatched to local listeners
// on next notification.
+ private List<MastershipEvent> apiAddedMastershipEvents =
+ new LinkedList<>();
+ private List<MastershipEvent> apiRemovedMastershipEvents =
+ new LinkedList<>();
private List<SwitchEvent> apiAddedSwitchEvents = new LinkedList<>();
private List<SwitchEvent> apiRemovedSwitchEvents = new LinkedList<>();
private List<PortEvent> apiAddedPortEvents = new LinkedList<>();
@@ -255,6 +260,10 @@
*/
private void processEvents(Collection<EventEntry<TopologyEvent>> events) {
// Local state for computing the final set of events
+ Map<ByteBuffer, MastershipEvent> addedMastershipEvents =
+ new HashMap<>();
+ Map<ByteBuffer, MastershipEvent> removedMastershipEvents =
+ new HashMap<>();
Map<ByteBuffer, SwitchEvent> addedSwitchEvents = new HashMap<>();
Map<ByteBuffer, SwitchEvent> removedSwitchEvents = new HashMap<>();
Map<ByteBuffer, PortEvent> addedPortEvents = new HashMap<>();
@@ -263,21 +272,22 @@
Map<ByteBuffer, LinkEvent> removedLinkEvents = new HashMap<>();
Map<ByteBuffer, HostEvent> addedHostEvents = new HashMap<>();
Map<ByteBuffer, HostEvent> removedHostEvents = new HashMap<>();
- Map<ByteBuffer, MastershipEvent> addedMastershipEvents =
- new HashMap<>();
- Map<ByteBuffer, MastershipEvent> removedMastershipEvents =
- new HashMap<>();
+
+ //
+ // Filter the events
+ //
+ events = eventFilter.filterEvents(events);
//
// Classify and suppress matching events
//
for (EventEntry<TopologyEvent> event : events) {
TopologyEvent topologyEvent = event.eventData();
+ MastershipEvent mastershipEvent = topologyEvent.getMastershipEvent();
SwitchEvent switchEvent = topologyEvent.getSwitchEvent();
PortEvent portEvent = topologyEvent.getPortEvent();
LinkEvent linkEvent = topologyEvent.getLinkEvent();
HostEvent hostEvent = topologyEvent.getHostEvent();
- MastershipEvent mastershipEvent = topologyEvent.getMastershipEvent();
//
// Extract the events
@@ -287,6 +297,11 @@
switch (event.eventType()) {
case ENTRY_ADD:
log.debug("Topology event ENTRY_ADD: {}", topologyEvent);
+ if (mastershipEvent != null) {
+ ByteBuffer id = mastershipEvent.getIDasByteBuffer();
+ addedMastershipEvents.put(id, mastershipEvent);
+ removedMastershipEvents.remove(id);
+ }
if (switchEvent != null) {
ByteBuffer id = switchEvent.getIDasByteBuffer();
addedSwitchEvents.put(id, switchEvent);
@@ -311,14 +326,14 @@
removedHostEvents.remove(id);
reorderedAddedHostEvents.remove(id);
}
- if (mastershipEvent != null) {
- ByteBuffer id = mastershipEvent.getIDasByteBuffer();
- addedMastershipEvents.put(id, mastershipEvent);
- removedMastershipEvents.remove(id);
- }
break;
case ENTRY_REMOVE:
log.debug("Topology event ENTRY_REMOVE: {}", topologyEvent);
+ if (mastershipEvent != null) {
+ ByteBuffer id = mastershipEvent.getIDasByteBuffer();
+ addedMastershipEvents.remove(id);
+ removedMastershipEvents.put(id, mastershipEvent);
+ }
if (switchEvent != null) {
ByteBuffer id = switchEvent.getIDasByteBuffer();
addedSwitchEvents.remove(id);
@@ -343,11 +358,6 @@
removedHostEvents.put(id, hostEvent);
reorderedAddedHostEvents.remove(id);
}
- if (mastershipEvent != null) {
- ByteBuffer id = mastershipEvent.getIDasByteBuffer();
- addedMastershipEvents.remove(id);
- removedMastershipEvents.put(id, mastershipEvent);
- }
break;
default:
log.error("Unknown topology event {}",
@@ -481,7 +491,9 @@
* Dispatch Topology Events to the listeners.
*/
private void dispatchTopologyEvents() {
- if (apiAddedSwitchEvents.isEmpty() &&
+ if (apiAddedMastershipEvents.isEmpty() &&
+ apiRemovedMastershipEvents.isEmpty() &&
+ apiAddedSwitchEvents.isEmpty() &&
apiRemovedSwitchEvents.isEmpty() &&
apiAddedPortEvents.isEmpty() &&
apiRemovedPortEvents.isEmpty() &&
@@ -497,6 +509,14 @@
// Debug statements
// TODO: Those statements should be removed in the future
//
+ for (MastershipEvent mastershipEvent : apiAddedMastershipEvents) {
+ log.debug("Dispatch Topology Event: ADDED {}",
+ mastershipEvent);
+ }
+ for (MastershipEvent mastershipEvent : apiRemovedMastershipEvents) {
+ log.debug("Dispatch Topology Event: REMOVED {}",
+ mastershipEvent);
+ }
for (SwitchEvent switchEvent : apiAddedSwitchEvents) {
log.debug("Dispatch Topology Event: ADDED {}", switchEvent);
}
@@ -527,6 +547,7 @@
// Update the metrics
//
long totalEvents =
+ apiAddedMastershipEvents.size() + apiRemovedMastershipEvents.size() +
apiAddedSwitchEvents.size() + apiRemovedSwitchEvents.size() +
apiAddedPortEvents.size() + apiRemovedPortEvents.size() +
apiAddedLinkEvents.size() + apiRemovedLinkEvents.size() +
@@ -539,7 +560,9 @@
//
for (ITopologyListener listener : this.topologyListeners) {
TopologyEvents events =
- new TopologyEvents(kryo.copy(apiAddedSwitchEvents),
+ new TopologyEvents(kryo.copy(apiAddedMastershipEvents),
+ kryo.copy(apiRemovedMastershipEvents),
+ kryo.copy(apiAddedSwitchEvents),
kryo.copy(apiRemovedSwitchEvents),
kryo.copy(apiAddedPortEvents),
kryo.copy(apiRemovedPortEvents),
@@ -553,6 +576,8 @@
//
// Cleanup
//
+ apiAddedMastershipEvents.clear();
+ apiRemovedMastershipEvents.clear();
apiAddedSwitchEvents.clear();
apiRemovedSwitchEvents.clear();
apiAddedPortEvents.clear();
@@ -614,6 +639,31 @@
}
/**
+ * Mastership updated event.
+ *
+ * @param mastershipEvent the mastership event.
+ */
+ @Override
+ public void putSwitchMastershipEvent(MastershipEvent mastershipEvent) {
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(mastershipEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+ }
+
+ /**
+ * Mastership removed event.
+ *
+ * @param mastershipEvent the mastership event.
+ */
+ @Override
+ public void removeSwitchMastershipEvent(MastershipEvent mastershipEvent) {
+ // Send out notification
+ eventChannel.removeEntry(mastershipEvent.getID());
+ }
+
+ /**
* Switch discovered event.
*
* @param switchEvent the switch event.
@@ -924,28 +974,29 @@
//
/**
- * Mastership updated event.
+ * Processes added Switch Mastership event.
*
- * @param mastershipEvent the mastership event.
+ * @param mastershipEvent the MastershipEvent to process.
*/
- @Override
- public void putSwitchMastershipEvent(MastershipEvent mastershipEvent) {
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(mastershipEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+ @GuardedBy("topology.writeLock")
+ private void processAddedMastershipEvent(MastershipEvent mastershipEvent) {
+ log.debug("Processing added Mastership event {}",
+ mastershipEvent);
+ // TODO: Not implemented/used yet.
+ apiAddedMastershipEvents.add(mastershipEvent);
}
/**
- * Mastership removed event.
+ * Processes removed Switch Mastership event.
*
- * @param mastershipEvent the mastership event.
+ * @param mastershipEvent the MastershipEvent to process.
*/
- @Override
- public void removeSwitchMastershipEvent(MastershipEvent mastershipEvent) {
- // Send out notification
- eventChannel.removeEntry(mastershipEvent.getID());
+ @GuardedBy("topology.writeLock")
+ private void processRemovedMastershipEvent(MastershipEvent mastershipEvent) {
+ log.debug("Processing removed Mastership event {}",
+ mastershipEvent);
+ // TODO: Not implemented/used yet.
+ apiRemovedMastershipEvents.add(mastershipEvent);
}
/**
@@ -1333,30 +1384,6 @@
}
/**
- * Processes added Switch Mastership event.
- *
- * @param mastershipEvent the MastershipEvent to process.
- */
- @GuardedBy("topology.writeLock")
- private void processAddedMastershipEvent(MastershipEvent mastershipEvent) {
- log.debug("Processing added Mastership event {}",
- mastershipEvent);
- // TODO: Not implemented/used yet.
- }
-
- /**
- * Processes removed Switch Mastership event.
- *
- * @param mastershipEvent the MastershipEvent to process.
- */
- @GuardedBy("topology.writeLock")
- private void processRemovedMastershipEvent(MastershipEvent mastershipEvent) {
- log.debug("Processing removed Mastership event {}",
- mastershipEvent);
- // TODO: Not implemented/used yet.
- }
-
- /**
* Read the whole topology from the database.
*
* @return a collection of EventEntry-encapsulated Topology Events for