Work toward ONOS-1451: Separate Event Key space per instance

Final step in the implementation:
Implement the Topology Event filtering and processing logic
inside class TopologyEventPreprocessor:
  - The topology events from each ONOS instance are kept separately
    within class OnosInstanceLastAddEvents
  - We keep only the last copy of the ADD events (not removed yet
    by REMOVE events).
  - If ADD MastershipEvent is received, all previously
    stored ADD Topology events for the corresponding Switch DPID are
    applied to the Topology
  - If there were previously reordered events, we attempt to
    add them again along with the other events.
  - Before adding the events to the Topology, the events are
    aggregated (squashed), and reordered in their natural order to
    build a Topology. E.g., the ADD events order is:
    MastershipEvent, SwitchEvent, PortEvent, LinkEvent, HostEvent.
    The REMOVE events are in the reverse order.
    Also, the REMOVE events are processed before the ADD events.
  - If an event cannot be added to the topology, it is added
    back to the collection of reordered events.

Also, replaced some of the Collection<> with List<> in some of the
event-related API to indicate that there is a semantic ordering
of the events.

NOTE: Some of the internals of TopologyEventPreprocessor should be optimized
for speed. E.g., we should create an additional lookup map:
    Dpid -> Collection<TopologyEvent>
and then method getPostponedEvents() doesn't need to traverse all
topologyEvents entries.

Change-Id: Ic0d9ebf242f015adb60817921b239bb65dd560e2
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyEventPreprocessor.java b/src/main/java/net/onrc/onos/core/topology/TopologyEventPreprocessor.java
new file mode 100644
index 0000000..69d9b07
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyEventPreprocessor.java
@@ -0,0 +1,438 @@
+package net.onrc.onos.core.topology;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import net.floodlightcontroller.core.IFloodlightProviderService.Role;
+import net.onrc.onos.core.registry.IControllerRegistryService;
+import net.onrc.onos.core.registry.RegistryException;
+import net.onrc.onos.core.util.Dpid;
+import net.onrc.onos.core.util.EventEntry;
+import net.onrc.onos.core.util.OnosInstanceId;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Topology Event pre-processor. It is used by the Topology Manager for
+ * pre-processing Topology events before applying them to the Topology.
+ * <p/>
+ * The pre-processor itself keeps internal state about the most recent
+ * ADD events. It also might keep state about reordered events that cannot
+ * be applied.
+ * <p/>
+ * As part of the pre-processing logic, a previously suppressed event might
+ * be genenerated later because of some other event.
+ */
+public class TopologyEventPreprocessor {
+    private static final Logger log = LoggerFactory
+        .getLogger(TopologyEventPreprocessor.class);
+    private final IControllerRegistryService registryService;
+
+    //
+    // Reordered ADD events that need to be reapplied
+    //
+    // TODO: For now, this field is accessed by the TopologyManager as well
+    // This should be refactored, and change them to private.
+    //
+    Map<ByteBuffer, TopologyEvent> reorderedEvents = new HashMap<>();
+
+    //
+    // Topology ADD event state per ONOS instance
+    //
+    private Map<OnosInstanceId, OnosInstanceLastAddEvents> instanceState =
+        new HashMap<>();
+
+    //
+    // Switch mastership state (updated by the topology events)
+    //
+    Map<Dpid, OnosInstanceId> switchMastership = new HashMap<>();
+
+    /**
+     * Constructor for a given Registry Service.
+     *
+     * @param registryService the Registry Service to use.
+     */
+    TopologyEventPreprocessor(IControllerRegistryService registryService) {
+        this.registryService = registryService;
+    }
+
+    /**
+     * Class to store the last ADD Topology Events per ONOS Instance.
+     */
+    private final class OnosInstanceLastAddEvents {
+        private final OnosInstanceId onosInstanceId;
+
+        // The last ADD events received from this ONOS instance
+        Map<ByteBuffer, TopologyEvent> topologyEvents = new HashMap<>();
+
+        /**
+         * Constructor for a given ONOS Instance ID.
+         *
+         * @param onosInstanceId the ONOS Instance ID.
+         */
+        OnosInstanceLastAddEvents(OnosInstanceId onosInstanceId) {
+            this.onosInstanceId = checkNotNull(onosInstanceId);
+        }
+
+        /**
+         * Processes an event originated by this ONOS instance.
+         *
+         * @param event the event to process.
+         * @return true if the event should be applied to the final Topology
+         * as well, otherwise false.
+         */
+        boolean processEvent(EventEntry<TopologyEvent> event) {
+            TopologyEvent topologyEvent = event.eventData();
+            ByteBuffer id = topologyEvent.getIDasByteBuffer();
+            OnosInstanceId masterId = null;
+
+            // Get the Master of the Origin DPID
+            Dpid dpid = topologyEvent.getOriginDpid();
+            if (dpid != null) {
+                masterId = switchMastership.get(dpid);
+            }
+
+            //
+            // Apply the event based on its type
+            //
+            switch (event.eventType()) {
+            case ENTRY_ADD:
+                topologyEvents.put(id, topologyEvent);
+                reorderedEvents.remove(id);
+                // Allow the ADD only if the event was originated by the Master
+                return onosInstanceId.equals(masterId);
+
+            case ENTRY_REMOVE:
+                reorderedEvents.remove(id);
+                // Don't allow the REMOVE event if there was no ADD before
+                if (topologyEvents.remove(id) == null) {
+                    return false;
+                }
+                //
+                // Allow the REMOVE if the event was originated by the Master,
+                // or there is no Master at all.
+                //
+                if (masterId == null) {
+                    return true;
+                }
+                return onosInstanceId.equals(masterId);
+
+            default:
+                log.error("Unknown topology event {}", event.eventType());
+            }
+
+            return false;
+        }
+
+        /**
+         * Gets the postponed events for a given DPID.
+         * Those are the events that couldn't be applied earlier to the
+         * Topology, because the ONOS Instance originating the events
+         * was not the Master for the Switch.
+         *
+         * @param dpid the DPID to use.
+         * @return a list of postponed events for the given DPID.
+         */
+        List<EventEntry<TopologyEvent>> getPostponedEvents(Dpid dpid) {
+            List<EventEntry<TopologyEvent>> result = new LinkedList<>();
+
+            //
+            // Search all events, and keep only those that match the DPID
+            //
+            // TODO: This could be slow, and the code should be optimized
+            // for speed. The processing complexity is O(N*N) where N is
+            // the number of Switches: for each Switch Mastership we call
+            // getPostponedEvents(), and then for each call we
+            // search all previously added events.
+            // The code can be optimized by adding additional lookup map:
+            //  Dpid -> List<TopologyEvent>
+            //
+            for (TopologyEvent te : topologyEvents.values()) {
+                if (dpid.equals(te.getOriginDpid())) {
+                    result.add(new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD, te));
+                }
+            }
+
+            return result;
+        }
+    }
+
+    /**
+     * Extracts previously reordered events that should be applied again
+     * to the Topology.
+     *
+     * @return a list of previously reordered events.
+     */
+    List<EventEntry<TopologyEvent>> extractReorderedEvents() {
+        List<EventEntry<TopologyEvent>> result = new LinkedList<>();
+
+        //
+        // Search all previously reordered events, and extract only if
+        // the originator is the Master.
+        //
+        List<TopologyEvent> leftoverEvents = new LinkedList<>();
+        for (TopologyEvent te : reorderedEvents.values()) {
+            Dpid dpid = te.getOriginDpid();
+            OnosInstanceId masterId = null;
+            if (dpid != null) {
+                masterId = switchMastership.get(dpid);
+            }
+            if (te.getOnosInstanceId().equals(masterId)) {
+                result.add(new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD, te));
+            } else {
+                leftoverEvents.add(te);
+            }
+        }
+
+        //
+        // Add back the leftover events
+        //
+        reorderedEvents.clear();
+        for (TopologyEvent te : leftoverEvents) {
+            reorderedEvents.put(te.getIDasByteBuffer(), te);
+        }
+
+        return result;
+    }
+
+    /**
+     * Pre-processes a list of events.
+     *
+     * @param events the events to pre-process.
+     * @return a list of pre-processed events.
+     */
+    List<EventEntry<TopologyEvent>> processEvents(
+                List<EventEntry<TopologyEvent>> events) {
+        List<EventEntry<TopologyEvent>> result = new LinkedList<>();
+
+        //
+        // Process the events
+        //
+        for (EventEntry<TopologyEvent> event : events) {
+            List<EventEntry<TopologyEvent>> postponedEvents = null;
+
+            TopologyEvent topologyEvent = event.eventData();
+            OnosInstanceId onosInstanceId = topologyEvent.getOnosInstanceId();
+
+            log.debug("Topology event {}: {}", event.eventType(),
+                      topologyEvent);
+
+            // Get the ONOS instance state
+            OnosInstanceLastAddEvents instance =
+                instanceState.get(onosInstanceId);
+            if (instance == null) {
+                instance = new OnosInstanceLastAddEvents(onosInstanceId);
+                instanceState.put(onosInstanceId, instance);
+            }
+
+            //
+            // Update the Switch Mastership state:
+            //  - If ADD a MASTER and the Mastership is confirmed by the
+            //    Registry Service, then add to the Mastership map and fetch
+            //    the postponed events from the originating ONOS Instance.
+            //  - Otherwise, remove from the Mastership map, but only if it is
+            //    the current MASTER.
+            //
+            MastershipEvent mastershipEvent =
+                topologyEvent.getMastershipEvent();
+            if (mastershipEvent != null) {
+                Dpid dpid = mastershipEvent.getDpid();
+                boolean newMaster = false;
+
+                if ((event.eventType() == EventEntry.Type.ENTRY_ADD) &&
+                    (mastershipEvent.getRole() == Role.MASTER)) {
+                    //
+                    // Check with the Registry Service as well
+                    //
+                    try {
+                        String rc =
+                            registryService.getControllerForSwitch(dpid.value());
+                        if ((rc != null) &&
+                            onosInstanceId.equals(new OnosInstanceId(rc))) {
+                            newMaster = true;
+                        }
+                    } catch (RegistryException e) {
+                        log.error("Caught RegistryException while pre-processing Mastership Event", e);
+                    }
+                }
+
+                if (newMaster) {
+                    // Add to the map
+                    switchMastership.put(dpid, onosInstanceId);
+                    postponedEvents = instance.getPostponedEvents(dpid);
+                } else {
+                    // Eventually remove from the map
+                    OnosInstanceId oldId = switchMastership.get(dpid);
+                    if (onosInstanceId.equals(oldId)) {
+                        switchMastership.remove(dpid);
+                    }
+                }
+            }
+
+            //
+            // Process the event and eventually store it in the
+            // per-Instance state.
+            //
+            if (instance.processEvent(event)) {
+                result.add(event);
+            }
+
+            // Add the postponed events (if any)
+            if (postponedEvents != null) {
+                result.addAll(postponedEvents);
+            }
+        }
+
+        // Extract and add the previously reordered events
+        result.addAll(extractReorderedEvents());
+
+        return reorderEventsForTopology(result);
+    }
+
+    /**
+     * Classifies and reorders a list of events, and suppresses matching
+     * events.
+     * <p/>
+     * The result events can be applied to the Topology in the following
+     * order: REMOVE events followed by ADD events. The ADD events are in the
+     * natural order to build a Topology: MastershipEvent, SwitchEvent,
+     * PortEvent, LinkEvent, HostEvent. The REMOVE events are in the reverse
+     * order.
+     *
+     * @param events the events to classify and reorder.
+     * @return the classified and reordered events.
+     */
+    private List<EventEntry<TopologyEvent>> reorderEventsForTopology(
+                List<EventEntry<TopologyEvent>> events) {
+        // Local state for computing the final set of events
+        Map<ByteBuffer, EventEntry<TopologyEvent>> addedMastershipEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> removedMastershipEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> addedSwitchEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> removedSwitchEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> addedPortEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> removedPortEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> addedLinkEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> removedLinkEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> addedHostEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> removedHostEvents =
+            new HashMap<>();
+
+        //
+        // Classify and suppress matching events
+        //
+        // NOTE: We intentionally use the event payload as the key ID
+        // (i.e., we exclude the ONOS Instance ID from the key),
+        // so we can suppress transient events across multiple ONOS instances.
+        //
+        for (EventEntry<TopologyEvent> event : events) {
+            TopologyEvent topologyEvent = event.eventData();
+
+            // Get the event itself
+            MastershipEvent mastershipEvent =
+                topologyEvent.getMastershipEvent();
+            SwitchEvent switchEvent = topologyEvent.getSwitchEvent();
+            PortEvent portEvent = topologyEvent.getPortEvent();
+            LinkEvent linkEvent = topologyEvent.getLinkEvent();
+            HostEvent hostEvent = topologyEvent.getHostEvent();
+
+            //
+            // Extract the events
+            //
+            switch (event.eventType()) {
+            case ENTRY_ADD:
+                if (mastershipEvent != null) {
+                    ByteBuffer id = mastershipEvent.getIDasByteBuffer();
+                    addedMastershipEvents.put(id, event);
+                    removedMastershipEvents.remove(id);
+                }
+                if (switchEvent != null) {
+                    ByteBuffer id = switchEvent.getIDasByteBuffer();
+                    addedSwitchEvents.put(id, event);
+                    removedSwitchEvents.remove(id);
+                }
+                if (portEvent != null) {
+                    ByteBuffer id = portEvent.getIDasByteBuffer();
+                    addedPortEvents.put(id, event);
+                    removedPortEvents.remove(id);
+                }
+                if (linkEvent != null) {
+                    ByteBuffer id = linkEvent.getIDasByteBuffer();
+                    addedLinkEvents.put(id, event);
+                    removedLinkEvents.remove(id);
+                }
+                if (hostEvent != null) {
+                    ByteBuffer id = hostEvent.getIDasByteBuffer();
+                    addedHostEvents.put(id, event);
+                    removedHostEvents.remove(id);
+                }
+                break;
+            case ENTRY_REMOVE:
+                if (mastershipEvent != null) {
+                    ByteBuffer id = mastershipEvent.getIDasByteBuffer();
+                    addedMastershipEvents.remove(id);
+                    removedMastershipEvents.put(id, event);
+                }
+                if (switchEvent != null) {
+                    ByteBuffer id = switchEvent.getIDasByteBuffer();
+                    addedSwitchEvents.remove(id);
+                    removedSwitchEvents.put(id, event);
+                }
+                if (portEvent != null) {
+                    ByteBuffer id = portEvent.getIDasByteBuffer();
+                    addedPortEvents.remove(id);
+                    removedPortEvents.put(id, event);
+                }
+                if (linkEvent != null) {
+                    ByteBuffer id = linkEvent.getIDasByteBuffer();
+                    addedLinkEvents.remove(id);
+                    removedLinkEvents.put(id, event);
+                }
+                if (hostEvent != null) {
+                    ByteBuffer id = hostEvent.getIDasByteBuffer();
+                    addedHostEvents.remove(id);
+                    removedHostEvents.put(id, event);
+                }
+                break;
+            default:
+                log.error("Unknown topology event {}", event.eventType());
+            }
+        }
+
+        //
+        // Prepare the result by adding the events in the appropriate order:
+        //  - First REMOVE, then ADD
+        //  - The REMOVE order is: Host, Link, Port, Switch, Mastership
+        //  - The ADD order is the reverse: Mastership, Switch, Port, Link,
+        //    Host
+        //
+        List<EventEntry<TopologyEvent>> result = new LinkedList<>();
+        result.addAll(removedHostEvents.values());
+        result.addAll(removedLinkEvents.values());
+        result.addAll(removedPortEvents.values());
+        result.addAll(removedSwitchEvents.values());
+        result.addAll(removedMastershipEvents.values());
+        //
+        result.addAll(addedMastershipEvents.values());
+        result.addAll(addedSwitchEvents.values());
+        result.addAll(addedPortEvents.values());
+        result.addAll(addedLinkEvents.values());
+        result.addAll(addedHostEvents.values());
+
+        return result;
+    }
+}