ONOS-1871: Send Topology MastershipEvent info to Topology listeners.

The MastershipEvent info is needed by applications such as the GUI
(via the Websocket client).

NOTE: Previously, the Mastership Events were sent only when there
was any change, but no Mastership Events were sent when a listener
has just subscribed.

The solution is to add a mechanism for creating and sending Topology
Snapshot event to new listeners, and that event also includes
the Mastership Events.

The modifications are:

 * Renamed and updated the ITopologyService API for adding/removing
   Topology listeners:
   OLD: registerTopologyListener() and deregisterTopologyListener()
   NEW: addListener() and removeListener()

   Also, addListener() has a second argument:
       "boolean startFromSnapshot"
   If that argument is true, and if the topology is not empty, the first
   (expected) event to that listener should be a snapshot of the current
   topology.

 * Added TopologyEvents() constructor for ADDED events only. Such event
   can be used to represent a snapshot of the topology.

 * Added new APIs to TopologyInternal:
   getAllSwitchEvents(), getAllPortEvents(), get AllLinkEvents(),
   getAllHostEvents()
   Those APIs are needed for creating a snapshot of the topology.

 * Added a mechanism for creating empty (NO-OP) TopologyEvent instance,
   and use that mechanism to "wake-up" the EventHandler processing thread
   when it needs to send Topology Snapshot to a new listener.
   This solution is (kind-of) a hack.

Change-Id: Ie1eb52242f58682aac61f54af29c3b5d291ac0bd
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
index 1c0f531..57e8b6e 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
@@ -74,6 +74,8 @@
     private TopologyEventPreprocessor eventPreprocessor;
     private CopyOnWriteArrayList<ITopologyListener> topologyListeners =
         new CopyOnWriteArrayList<>();
+    private CopyOnWriteArrayList<ITopologyListener> newTopologyListeners =
+        new CopyOnWriteArrayList<>();
 
     //
     // Metrics
@@ -257,6 +259,11 @@
          */
         private void processEvents(List<EventEntry<TopologyEvent>> events) {
             //
+            // Process pending (new) listeners
+            //
+            processPendingListeners();
+
+            //
             // Pre-process the events
             //
             events = eventPreprocessor.processEvents(events);
@@ -273,6 +280,11 @@
                 // to naturally build and update the topology.
                 //
                 for (EventEntry<TopologyEvent> event : events) {
+                    // Ignore NO-OP events
+                    if (event.isNoop()) {
+                        continue;
+                    }
+
                     TopologyEvent topologyEvent = event.eventData();
 
                     // Get the event itself
@@ -381,6 +393,22 @@
             // NOTE: The ADD and UPDATE events are processed in same way
             entryAdded(value);
         }
+
+        /**
+         * Informs the event handler that a new listener has been added,
+         * and that listener expects the first event to be a snapshot of the
+         * current topology.
+         */
+        void listenerAdded() {
+            //
+            // Generate a NO-OP event so the Event Handler processing can be
+            // triggered to generate in-order a snapshot of the current
+            // topology.
+            // TODO: This is a hack.
+            //
+            EventEntry<TopologyEvent> eventEntry = EventEntry.makeNoop();
+            topologyEvents.add(eventEntry);
+        }
     }
 
     /**
@@ -397,22 +425,89 @@
     }
 
     /**
-     * Registers a listener for topology events.
+     * Adds a listener for topology events.
      *
-     * @param listener the listener to register
+     * @param listener the listener to add.
+     * @param startFromSnapshot if true, and if the topology is not
+     * empty, the first event should be a snapshot of the current topology.
      */
-    void registerTopologyListener(ITopologyListener listener) {
-        topologyListeners.addIfAbsent(listener);
+    void addListener(ITopologyListener listener, boolean startFromSnapshot) {
+        if (startFromSnapshot) {
+            newTopologyListeners.addIfAbsent(listener);
+            eventHandler.listenerAdded();
+        } else {
+            topologyListeners.addIfAbsent(listener);
+        }
     }
 
     /**
-     * Deregisters a listener for topology events. The listener will no longer
+     * Removes a listener for topology events. The listener will no longer
      * receive topology events after this call.
      *
-     * @param listener the listener to deregister
+     * @param listener the listener to remove.
      */
-    void deregisterTopologyListener(ITopologyListener listener) {
+    void removeListener(ITopologyListener listener) {
         topologyListeners.remove(listener);
+        newTopologyListeners.remove(listener);
+    }
+
+    /**
+     * Processes pending (new) listeners.
+     * <p>
+     * During the processing, we dispatch Topology Snapshot Events to new
+     * listeners.
+     */
+    private void processPendingListeners() {
+        if (newTopologyListeners.isEmpty()) {
+            return;
+        }
+
+        //
+        // Create the Topology Snapshot Event
+        //
+        TopologyEvents events = null;
+        List<MastershipEvent> mastershipEvents =
+            new ArrayList<>(lastAddMastershipEvents.values());
+        List<SwitchEvent> switchEvents =
+            new ArrayList<>(topology.getAllSwitchEvents());
+        List<PortEvent> portEvents =
+            new ArrayList<>(topology.getAllPortEvents());
+        List<LinkEvent> linkEvents =
+            new ArrayList<>(topology.getAllLinkEvents());
+        List<HostEvent> hostEvents =
+            new ArrayList<>(topology.getAllHostEvents());
+        if (!(mastershipEvents.isEmpty() &&
+              switchEvents.isEmpty() &&
+              portEvents.isEmpty() &&
+              linkEvents.isEmpty() &&
+              hostEvents.isEmpty())) {
+            events = new TopologyEvents(mastershipEvents,
+                                        switchEvents,
+                                        portEvents,
+                                        linkEvents,
+                                        hostEvents);
+        }
+
+        //
+        // Dispatch Snapshot Event to each new listener, and keep track
+        // of each processed listener.
+        //
+        // NOTE: We deliver the event only if it is not empty.
+        // NOTE: We need to execute the loop so we can properly
+        // move the new listeners together with the older listeners.
+        //
+        List<ITopologyListener> processedListeners = new LinkedList<>();
+        for (ITopologyListener listener : newTopologyListeners) {
+            processedListeners.add(listener);
+            // Move the new listener together with the rest of the listeners
+            topologyListeners.addIfAbsent(listener);
+
+            // Dispatch the event
+            if (events != null) {
+                listener.topologyEvents(events);
+            }
+        }
+        newTopologyListeners.removeAll(processedListeners);
     }
 
     /**
@@ -484,20 +579,29 @@
         this.lastEventTimestampEpochMs = System.currentTimeMillis();
 
         //
+        // Allocate the events to deliver.
+        //
+        // TODO: We could avoid the extra list allocation and copy
+        // by using directly the original list. However, during
+        // the cleanup below, we should create new LinkedList objects
+        // instead of using clear()
+        //
+        TopologyEvents events = new TopologyEvents(
+                new ArrayList<>(apiAddedMastershipEvents),
+                new ArrayList<>(apiRemovedMastershipEvents),
+                new ArrayList<>(apiAddedSwitchEvents),
+                new ArrayList<>(apiRemovedSwitchEvents),
+                new ArrayList<>(apiAddedPortEvents),
+                new ArrayList<>(apiRemovedPortEvents),
+                new ArrayList<>(apiAddedLinkEvents),
+                new ArrayList<>(apiRemovedLinkEvents),
+                new ArrayList<>(apiAddedHostEvents),
+                new ArrayList<>(apiRemovedHostEvents));
+
+        //
         // Deliver the events
         //
         for (ITopologyListener listener : this.topologyListeners) {
-            TopologyEvents events =
-                new TopologyEvents(kryo.copy(apiAddedMastershipEvents),
-                                   kryo.copy(apiRemovedMastershipEvents),
-                                   kryo.copy(apiAddedSwitchEvents),
-                                   kryo.copy(apiRemovedSwitchEvents),
-                                   kryo.copy(apiAddedPortEvents),
-                                   kryo.copy(apiRemovedPortEvents),
-                                   kryo.copy(apiAddedLinkEvents),
-                                   kryo.copy(apiRemovedLinkEvents),
-                                   kryo.copy(apiAddedHostEvents),
-                                   kryo.copy(apiRemovedHostEvents));
             listener.topologyEvents(events);
         }