ONOS-1871: Send Topology MastershipEvent info to Topology listeners.
The MastershipEvent info is needed by applications such as the GUI
(via the Websocket client).
NOTE: Previously, the Mastership Events were sent only when there
was any change, but no Mastership Events were sent when a listener
has just subscribed.
The solution is to add a mechanism for creating and sending Topology
Snapshot event to new listeners, and that event also includes
the Mastership Events.
The modifications are:
* Renamed and updated the ITopologyService API for adding/removing
Topology listeners:
OLD: registerTopologyListener() and deregisterTopologyListener()
NEW: addListener() and removeListener()
Also, addListener() has a second argument:
"boolean startFromSnapshot"
If that argument is true, and if the topology is not empty, the first
(expected) event to that listener should be a snapshot of the current
topology.
* Added TopologyEvents() constructor for ADDED events only. Such event
can be used to represent a snapshot of the topology.
* Added new APIs to TopologyInternal:
getAllSwitchEvents(), getAllPortEvents(), get AllLinkEvents(),
getAllHostEvents()
Those APIs are needed for creating a snapshot of the topology.
* Added a mechanism for creating empty (NO-OP) TopologyEvent instance,
and use that mechanism to "wake-up" the EventHandler processing thread
when it needs to send Topology Snapshot to a new listener.
This solution is (kind-of) a hack.
Change-Id: Ie1eb52242f58682aac61f54af29c3b5d291ac0bd
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
index 1c0f531..57e8b6e 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
@@ -74,6 +74,8 @@
private TopologyEventPreprocessor eventPreprocessor;
private CopyOnWriteArrayList<ITopologyListener> topologyListeners =
new CopyOnWriteArrayList<>();
+ private CopyOnWriteArrayList<ITopologyListener> newTopologyListeners =
+ new CopyOnWriteArrayList<>();
//
// Metrics
@@ -257,6 +259,11 @@
*/
private void processEvents(List<EventEntry<TopologyEvent>> events) {
//
+ // Process pending (new) listeners
+ //
+ processPendingListeners();
+
+ //
// Pre-process the events
//
events = eventPreprocessor.processEvents(events);
@@ -273,6 +280,11 @@
// to naturally build and update the topology.
//
for (EventEntry<TopologyEvent> event : events) {
+ // Ignore NO-OP events
+ if (event.isNoop()) {
+ continue;
+ }
+
TopologyEvent topologyEvent = event.eventData();
// Get the event itself
@@ -381,6 +393,22 @@
// NOTE: The ADD and UPDATE events are processed in same way
entryAdded(value);
}
+
+ /**
+ * Informs the event handler that a new listener has been added,
+ * and that listener expects the first event to be a snapshot of the
+ * current topology.
+ */
+ void listenerAdded() {
+ //
+ // Generate a NO-OP event so the Event Handler processing can be
+ // triggered to generate in-order a snapshot of the current
+ // topology.
+ // TODO: This is a hack.
+ //
+ EventEntry<TopologyEvent> eventEntry = EventEntry.makeNoop();
+ topologyEvents.add(eventEntry);
+ }
}
/**
@@ -397,22 +425,89 @@
}
/**
- * Registers a listener for topology events.
+ * Adds a listener for topology events.
*
- * @param listener the listener to register
+ * @param listener the listener to add.
+ * @param startFromSnapshot if true, and if the topology is not
+ * empty, the first event should be a snapshot of the current topology.
*/
- void registerTopologyListener(ITopologyListener listener) {
- topologyListeners.addIfAbsent(listener);
+ void addListener(ITopologyListener listener, boolean startFromSnapshot) {
+ if (startFromSnapshot) {
+ newTopologyListeners.addIfAbsent(listener);
+ eventHandler.listenerAdded();
+ } else {
+ topologyListeners.addIfAbsent(listener);
+ }
}
/**
- * Deregisters a listener for topology events. The listener will no longer
+ * Removes a listener for topology events. The listener will no longer
* receive topology events after this call.
*
- * @param listener the listener to deregister
+ * @param listener the listener to remove.
*/
- void deregisterTopologyListener(ITopologyListener listener) {
+ void removeListener(ITopologyListener listener) {
topologyListeners.remove(listener);
+ newTopologyListeners.remove(listener);
+ }
+
+ /**
+ * Processes pending (new) listeners.
+ * <p>
+ * During the processing, we dispatch Topology Snapshot Events to new
+ * listeners.
+ */
+ private void processPendingListeners() {
+ if (newTopologyListeners.isEmpty()) {
+ return;
+ }
+
+ //
+ // Create the Topology Snapshot Event
+ //
+ TopologyEvents events = null;
+ List<MastershipEvent> mastershipEvents =
+ new ArrayList<>(lastAddMastershipEvents.values());
+ List<SwitchEvent> switchEvents =
+ new ArrayList<>(topology.getAllSwitchEvents());
+ List<PortEvent> portEvents =
+ new ArrayList<>(topology.getAllPortEvents());
+ List<LinkEvent> linkEvents =
+ new ArrayList<>(topology.getAllLinkEvents());
+ List<HostEvent> hostEvents =
+ new ArrayList<>(topology.getAllHostEvents());
+ if (!(mastershipEvents.isEmpty() &&
+ switchEvents.isEmpty() &&
+ portEvents.isEmpty() &&
+ linkEvents.isEmpty() &&
+ hostEvents.isEmpty())) {
+ events = new TopologyEvents(mastershipEvents,
+ switchEvents,
+ portEvents,
+ linkEvents,
+ hostEvents);
+ }
+
+ //
+ // Dispatch Snapshot Event to each new listener, and keep track
+ // of each processed listener.
+ //
+ // NOTE: We deliver the event only if it is not empty.
+ // NOTE: We need to execute the loop so we can properly
+ // move the new listeners together with the older listeners.
+ //
+ List<ITopologyListener> processedListeners = new LinkedList<>();
+ for (ITopologyListener listener : newTopologyListeners) {
+ processedListeners.add(listener);
+ // Move the new listener together with the rest of the listeners
+ topologyListeners.addIfAbsent(listener);
+
+ // Dispatch the event
+ if (events != null) {
+ listener.topologyEvents(events);
+ }
+ }
+ newTopologyListeners.removeAll(processedListeners);
}
/**
@@ -484,20 +579,29 @@
this.lastEventTimestampEpochMs = System.currentTimeMillis();
//
+ // Allocate the events to deliver.
+ //
+ // TODO: We could avoid the extra list allocation and copy
+ // by using directly the original list. However, during
+ // the cleanup below, we should create new LinkedList objects
+ // instead of using clear()
+ //
+ TopologyEvents events = new TopologyEvents(
+ new ArrayList<>(apiAddedMastershipEvents),
+ new ArrayList<>(apiRemovedMastershipEvents),
+ new ArrayList<>(apiAddedSwitchEvents),
+ new ArrayList<>(apiRemovedSwitchEvents),
+ new ArrayList<>(apiAddedPortEvents),
+ new ArrayList<>(apiRemovedPortEvents),
+ new ArrayList<>(apiAddedLinkEvents),
+ new ArrayList<>(apiRemovedLinkEvents),
+ new ArrayList<>(apiAddedHostEvents),
+ new ArrayList<>(apiRemovedHostEvents));
+
+ //
// Deliver the events
//
for (ITopologyListener listener : this.topologyListeners) {
- TopologyEvents events =
- new TopologyEvents(kryo.copy(apiAddedMastershipEvents),
- kryo.copy(apiRemovedMastershipEvents),
- kryo.copy(apiAddedSwitchEvents),
- kryo.copy(apiRemovedSwitchEvents),
- kryo.copy(apiAddedPortEvents),
- kryo.copy(apiRemovedPortEvents),
- kryo.copy(apiAddedLinkEvents),
- kryo.copy(apiRemovedLinkEvents),
- kryo.copy(apiAddedHostEvents),
- kryo.copy(apiRemovedHostEvents));
listener.topologyEvents(events);
}