Work toward ONOS-1451: Separate Event Key space per instance

Final step in the implementation:
Implement the Topology Event filtering and processing logic
inside class TopologyEventPreprocessor:
  - The topology events from each ONOS instance are kept separately
    within class OnosInstanceLastAddEvents
  - We keep only the last copy of the ADD events (not removed yet
    by REMOVE events).
  - If ADD MastershipEvent is received, all previously
    stored ADD Topology events for the corresponding Switch DPID are
    applied to the Topology
  - If there were previously reordered events, we attempt to
    add them again along with the other events.
  - Before adding the events to the Topology, the events are
    aggregated (squashed), and reordered in their natural order to
    build a Topology. E.g., the ADD events order is:
    MastershipEvent, SwitchEvent, PortEvent, LinkEvent, HostEvent.
    The REMOVE events are in the reverse order.
    Also, the REMOVE events are processed before the ADD events.
  - If an event cannot be added to the topology, it is added
    back to the collection of reordered events.

Also, replaced some of the Collection<> with List<> in some of the
event-related API to indicate that there is a semantic ordering
of the events.

NOTE: Some of the internals of TopologyEventPreprocessor should be optimized
for speed. E.g., we should create an additional lookup map:
    Dpid -> Collection<TopologyEvent>
and then method getPostponedEvents() doesn't need to traverse all
topologyEvents entries.

Change-Id: Ic0d9ebf242f015adb60817921b239bb65dd560e2
diff --git a/src/main/java/net/onrc/onos/core/topology/HostEvent.java b/src/main/java/net/onrc/onos/core/topology/HostEvent.java
index 1c64939..33f8db8 100644
--- a/src/main/java/net/onrc/onos/core/topology/HostEvent.java
+++ b/src/main/java/net/onrc/onos/core/topology/HostEvent.java
@@ -9,6 +9,7 @@
 
 import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.core.topology.web.serializers.HostEventSerializer;
+import net.onrc.onos.core.util.Dpid;
 import net.onrc.onos.core.util.SwitchPort;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -108,6 +109,19 @@
         this.lastSeenTime = lastSeenTime;
     }
 
+    /**
+     * Gets the event origin DPID.
+     *
+     * @return the event origin DPID.
+     */
+    public Dpid getOriginDpid() {
+        // TODO: Eventually, we should return a collection of Dpid values
+        for (SwitchPort sp : attachmentPoints) {
+            return sp.getDpid();
+        }
+        return null;
+    }
+
     @Override
     public String toString() {
         return "[HostEvent " + mac + " attachmentPoints:" + attachmentPoints + "]";
diff --git a/src/main/java/net/onrc/onos/core/topology/LinkEvent.java b/src/main/java/net/onrc/onos/core/topology/LinkEvent.java
index f7ae6d3..807dcaf 100644
--- a/src/main/java/net/onrc/onos/core/topology/LinkEvent.java
+++ b/src/main/java/net/onrc/onos/core/topology/LinkEvent.java
@@ -122,6 +122,15 @@
         this.capacity = capacity;
     }
 
+    /**
+     * Gets the event origin DPID.
+     *
+     * @return the event origin DPID.
+     */
+    public Dpid getOriginDpid() {
+        return this.id.getDst().getDpid();
+    }
+
     @Override
     public String toString() {
         return "[LinkEvent " + getSrc() + "->" + getDst() + "]";
diff --git a/src/main/java/net/onrc/onos/core/topology/MastershipEvent.java b/src/main/java/net/onrc/onos/core/topology/MastershipEvent.java
index 8106a39..589174d 100644
--- a/src/main/java/net/onrc/onos/core/topology/MastershipEvent.java
+++ b/src/main/java/net/onrc/onos/core/topology/MastershipEvent.java
@@ -89,6 +89,15 @@
         return role;
     }
 
+    /**
+     * Gets the event origin DPID.
+     *
+     * @return the event origin DPID.
+     */
+    public Dpid getOriginDpid() {
+        return this.dpid;
+    }
+
     @Override
     public String toString() {
         return "[MastershipEvent " + getDpid() + "@" + getOnosInstanceId() +
diff --git a/src/main/java/net/onrc/onos/core/topology/PortEvent.java b/src/main/java/net/onrc/onos/core/topology/PortEvent.java
index 0fa5a7e..f23f907 100644
--- a/src/main/java/net/onrc/onos/core/topology/PortEvent.java
+++ b/src/main/java/net/onrc/onos/core/topology/PortEvent.java
@@ -95,6 +95,15 @@
         return id.getPortNumber();
     }
 
+    /**
+     * Gets the event origin DPID.
+     *
+     * @return the event origin DPID.
+     */
+    public Dpid getOriginDpid() {
+        return this.id.getDpid();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/src/main/java/net/onrc/onos/core/topology/SwitchEvent.java b/src/main/java/net/onrc/onos/core/topology/SwitchEvent.java
index f724fae..a3b6a93 100644
--- a/src/main/java/net/onrc/onos/core/topology/SwitchEvent.java
+++ b/src/main/java/net/onrc/onos/core/topology/SwitchEvent.java
@@ -56,6 +56,15 @@
         return dpid;
     }
 
+    /**
+     * Gets the event origin DPID.
+     *
+     * @return the event origin DPID.
+     */
+    public Dpid getOriginDpid() {
+        return this.dpid;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyEvent.java b/src/main/java/net/onrc/onos/core/topology/TopologyEvent.java
index 4ea9e31..23471ff 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyEvent.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyEvent.java
@@ -4,6 +4,7 @@
 import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 
+import net.onrc.onos.core.util.Dpid;
 import net.onrc.onos.core.util.OnosInstanceId;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -169,6 +170,40 @@
     }
 
     /**
+     * Gets the event origin DPID.
+     *
+     * @return the event origin DPID.
+     */
+    public Dpid getOriginDpid() {
+        if (mastershipEvent != null) {
+            return mastershipEvent.getOriginDpid();
+        }
+        if (switchEvent != null) {
+            return switchEvent.getOriginDpid();
+        }
+        if (portEvent != null) {
+            return portEvent.getOriginDpid();
+        }
+        if (linkEvent != null) {
+            return linkEvent.getOriginDpid();
+        }
+        if (hostEvent != null) {
+            return hostEvent.getOriginDpid();
+        }
+        return null;
+    }
+
+    /**
+     * Tests whether the event origin DPID equals the specified DPID.
+     *
+     * @param dpid the DPID to compare against.
+     * @return true if the event origin Dpid equals the specified DPID.
+     */
+    public boolean equalsOriginDpid(Dpid dpid) {
+        return dpid.equals(getOriginDpid());
+    }
+
+    /**
      * Checks if all events contained are equal.
      *
      * @param obj TopologyEvent to compare against
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyEventFilter.java b/src/main/java/net/onrc/onos/core/topology/TopologyEventFilter.java
deleted file mode 100644
index 65eed32..0000000
--- a/src/main/java/net/onrc/onos/core/topology/TopologyEventFilter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package net.onrc.onos.core.topology;
-
-import java.util.Collection;
-
-import net.onrc.onos.core.util.EventEntry;
-
-/**
- * Stateful filter for filtering Topology events.
- * <p/>
- * NOTE: The filter itself keeps internal state about filtered events.
- * As part of the filtering logic, a previously suppressed event might
- * be genenerated (released) later because of some other event.
- */
-public class TopologyEventFilter {
-    /**
-     * Filter a collection of events.
-     *
-     * @param events the events to filter.
-     * @return a collection of filtered events.
-     */
-    Collection<EventEntry<TopologyEvent>> filterEvents(
-                Collection<EventEntry<TopologyEvent>> events) {
-
-        // TODO: Not implemented yet
-        return events;
-    }
-}
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyEventPreprocessor.java b/src/main/java/net/onrc/onos/core/topology/TopologyEventPreprocessor.java
new file mode 100644
index 0000000..69d9b07
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyEventPreprocessor.java
@@ -0,0 +1,438 @@
+package net.onrc.onos.core.topology;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import net.floodlightcontroller.core.IFloodlightProviderService.Role;
+import net.onrc.onos.core.registry.IControllerRegistryService;
+import net.onrc.onos.core.registry.RegistryException;
+import net.onrc.onos.core.util.Dpid;
+import net.onrc.onos.core.util.EventEntry;
+import net.onrc.onos.core.util.OnosInstanceId;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Topology Event pre-processor. It is used by the Topology Manager for
+ * pre-processing Topology events before applying them to the Topology.
+ * <p/>
+ * The pre-processor itself keeps internal state about the most recent
+ * ADD events. It also might keep state about reordered events that cannot
+ * be applied.
+ * <p/>
+ * As part of the pre-processing logic, a previously suppressed event might
+ * be genenerated later because of some other event.
+ */
+public class TopologyEventPreprocessor {
+    private static final Logger log = LoggerFactory
+        .getLogger(TopologyEventPreprocessor.class);
+    private final IControllerRegistryService registryService;
+
+    //
+    // Reordered ADD events that need to be reapplied
+    //
+    // TODO: For now, this field is accessed by the TopologyManager as well
+    // This should be refactored, and change them to private.
+    //
+    Map<ByteBuffer, TopologyEvent> reorderedEvents = new HashMap<>();
+
+    //
+    // Topology ADD event state per ONOS instance
+    //
+    private Map<OnosInstanceId, OnosInstanceLastAddEvents> instanceState =
+        new HashMap<>();
+
+    //
+    // Switch mastership state (updated by the topology events)
+    //
+    Map<Dpid, OnosInstanceId> switchMastership = new HashMap<>();
+
+    /**
+     * Constructor for a given Registry Service.
+     *
+     * @param registryService the Registry Service to use.
+     */
+    TopologyEventPreprocessor(IControllerRegistryService registryService) {
+        this.registryService = registryService;
+    }
+
+    /**
+     * Class to store the last ADD Topology Events per ONOS Instance.
+     */
+    private final class OnosInstanceLastAddEvents {
+        private final OnosInstanceId onosInstanceId;
+
+        // The last ADD events received from this ONOS instance
+        Map<ByteBuffer, TopologyEvent> topologyEvents = new HashMap<>();
+
+        /**
+         * Constructor for a given ONOS Instance ID.
+         *
+         * @param onosInstanceId the ONOS Instance ID.
+         */
+        OnosInstanceLastAddEvents(OnosInstanceId onosInstanceId) {
+            this.onosInstanceId = checkNotNull(onosInstanceId);
+        }
+
+        /**
+         * Processes an event originated by this ONOS instance.
+         *
+         * @param event the event to process.
+         * @return true if the event should be applied to the final Topology
+         * as well, otherwise false.
+         */
+        boolean processEvent(EventEntry<TopologyEvent> event) {
+            TopologyEvent topologyEvent = event.eventData();
+            ByteBuffer id = topologyEvent.getIDasByteBuffer();
+            OnosInstanceId masterId = null;
+
+            // Get the Master of the Origin DPID
+            Dpid dpid = topologyEvent.getOriginDpid();
+            if (dpid != null) {
+                masterId = switchMastership.get(dpid);
+            }
+
+            //
+            // Apply the event based on its type
+            //
+            switch (event.eventType()) {
+            case ENTRY_ADD:
+                topologyEvents.put(id, topologyEvent);
+                reorderedEvents.remove(id);
+                // Allow the ADD only if the event was originated by the Master
+                return onosInstanceId.equals(masterId);
+
+            case ENTRY_REMOVE:
+                reorderedEvents.remove(id);
+                // Don't allow the REMOVE event if there was no ADD before
+                if (topologyEvents.remove(id) == null) {
+                    return false;
+                }
+                //
+                // Allow the REMOVE if the event was originated by the Master,
+                // or there is no Master at all.
+                //
+                if (masterId == null) {
+                    return true;
+                }
+                return onosInstanceId.equals(masterId);
+
+            default:
+                log.error("Unknown topology event {}", event.eventType());
+            }
+
+            return false;
+        }
+
+        /**
+         * Gets the postponed events for a given DPID.
+         * Those are the events that couldn't be applied earlier to the
+         * Topology, because the ONOS Instance originating the events
+         * was not the Master for the Switch.
+         *
+         * @param dpid the DPID to use.
+         * @return a list of postponed events for the given DPID.
+         */
+        List<EventEntry<TopologyEvent>> getPostponedEvents(Dpid dpid) {
+            List<EventEntry<TopologyEvent>> result = new LinkedList<>();
+
+            //
+            // Search all events, and keep only those that match the DPID
+            //
+            // TODO: This could be slow, and the code should be optimized
+            // for speed. The processing complexity is O(N*N) where N is
+            // the number of Switches: for each Switch Mastership we call
+            // getPostponedEvents(), and then for each call we
+            // search all previously added events.
+            // The code can be optimized by adding additional lookup map:
+            //  Dpid -> List<TopologyEvent>
+            //
+            for (TopologyEvent te : topologyEvents.values()) {
+                if (dpid.equals(te.getOriginDpid())) {
+                    result.add(new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD, te));
+                }
+            }
+
+            return result;
+        }
+    }
+
+    /**
+     * Extracts previously reordered events that should be applied again
+     * to the Topology.
+     *
+     * @return a list of previously reordered events.
+     */
+    List<EventEntry<TopologyEvent>> extractReorderedEvents() {
+        List<EventEntry<TopologyEvent>> result = new LinkedList<>();
+
+        //
+        // Search all previously reordered events, and extract only if
+        // the originator is the Master.
+        //
+        List<TopologyEvent> leftoverEvents = new LinkedList<>();
+        for (TopologyEvent te : reorderedEvents.values()) {
+            Dpid dpid = te.getOriginDpid();
+            OnosInstanceId masterId = null;
+            if (dpid != null) {
+                masterId = switchMastership.get(dpid);
+            }
+            if (te.getOnosInstanceId().equals(masterId)) {
+                result.add(new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD, te));
+            } else {
+                leftoverEvents.add(te);
+            }
+        }
+
+        //
+        // Add back the leftover events
+        //
+        reorderedEvents.clear();
+        for (TopologyEvent te : leftoverEvents) {
+            reorderedEvents.put(te.getIDasByteBuffer(), te);
+        }
+
+        return result;
+    }
+
+    /**
+     * Pre-processes a list of events.
+     *
+     * @param events the events to pre-process.
+     * @return a list of pre-processed events.
+     */
+    List<EventEntry<TopologyEvent>> processEvents(
+                List<EventEntry<TopologyEvent>> events) {
+        List<EventEntry<TopologyEvent>> result = new LinkedList<>();
+
+        //
+        // Process the events
+        //
+        for (EventEntry<TopologyEvent> event : events) {
+            List<EventEntry<TopologyEvent>> postponedEvents = null;
+
+            TopologyEvent topologyEvent = event.eventData();
+            OnosInstanceId onosInstanceId = topologyEvent.getOnosInstanceId();
+
+            log.debug("Topology event {}: {}", event.eventType(),
+                      topologyEvent);
+
+            // Get the ONOS instance state
+            OnosInstanceLastAddEvents instance =
+                instanceState.get(onosInstanceId);
+            if (instance == null) {
+                instance = new OnosInstanceLastAddEvents(onosInstanceId);
+                instanceState.put(onosInstanceId, instance);
+            }
+
+            //
+            // Update the Switch Mastership state:
+            //  - If ADD a MASTER and the Mastership is confirmed by the
+            //    Registry Service, then add to the Mastership map and fetch
+            //    the postponed events from the originating ONOS Instance.
+            //  - Otherwise, remove from the Mastership map, but only if it is
+            //    the current MASTER.
+            //
+            MastershipEvent mastershipEvent =
+                topologyEvent.getMastershipEvent();
+            if (mastershipEvent != null) {
+                Dpid dpid = mastershipEvent.getDpid();
+                boolean newMaster = false;
+
+                if ((event.eventType() == EventEntry.Type.ENTRY_ADD) &&
+                    (mastershipEvent.getRole() == Role.MASTER)) {
+                    //
+                    // Check with the Registry Service as well
+                    //
+                    try {
+                        String rc =
+                            registryService.getControllerForSwitch(dpid.value());
+                        if ((rc != null) &&
+                            onosInstanceId.equals(new OnosInstanceId(rc))) {
+                            newMaster = true;
+                        }
+                    } catch (RegistryException e) {
+                        log.error("Caught RegistryException while pre-processing Mastership Event", e);
+                    }
+                }
+
+                if (newMaster) {
+                    // Add to the map
+                    switchMastership.put(dpid, onosInstanceId);
+                    postponedEvents = instance.getPostponedEvents(dpid);
+                } else {
+                    // Eventually remove from the map
+                    OnosInstanceId oldId = switchMastership.get(dpid);
+                    if (onosInstanceId.equals(oldId)) {
+                        switchMastership.remove(dpid);
+                    }
+                }
+            }
+
+            //
+            // Process the event and eventually store it in the
+            // per-Instance state.
+            //
+            if (instance.processEvent(event)) {
+                result.add(event);
+            }
+
+            // Add the postponed events (if any)
+            if (postponedEvents != null) {
+                result.addAll(postponedEvents);
+            }
+        }
+
+        // Extract and add the previously reordered events
+        result.addAll(extractReorderedEvents());
+
+        return reorderEventsForTopology(result);
+    }
+
+    /**
+     * Classifies and reorders a list of events, and suppresses matching
+     * events.
+     * <p/>
+     * The result events can be applied to the Topology in the following
+     * order: REMOVE events followed by ADD events. The ADD events are in the
+     * natural order to build a Topology: MastershipEvent, SwitchEvent,
+     * PortEvent, LinkEvent, HostEvent. The REMOVE events are in the reverse
+     * order.
+     *
+     * @param events the events to classify and reorder.
+     * @return the classified and reordered events.
+     */
+    private List<EventEntry<TopologyEvent>> reorderEventsForTopology(
+                List<EventEntry<TopologyEvent>> events) {
+        // Local state for computing the final set of events
+        Map<ByteBuffer, EventEntry<TopologyEvent>> addedMastershipEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> removedMastershipEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> addedSwitchEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> removedSwitchEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> addedPortEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> removedPortEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> addedLinkEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> removedLinkEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> addedHostEvents =
+            new HashMap<>();
+        Map<ByteBuffer, EventEntry<TopologyEvent>> removedHostEvents =
+            new HashMap<>();
+
+        //
+        // Classify and suppress matching events
+        //
+        // NOTE: We intentionally use the event payload as the key ID
+        // (i.e., we exclude the ONOS Instance ID from the key),
+        // so we can suppress transient events across multiple ONOS instances.
+        //
+        for (EventEntry<TopologyEvent> event : events) {
+            TopologyEvent topologyEvent = event.eventData();
+
+            // Get the event itself
+            MastershipEvent mastershipEvent =
+                topologyEvent.getMastershipEvent();
+            SwitchEvent switchEvent = topologyEvent.getSwitchEvent();
+            PortEvent portEvent = topologyEvent.getPortEvent();
+            LinkEvent linkEvent = topologyEvent.getLinkEvent();
+            HostEvent hostEvent = topologyEvent.getHostEvent();
+
+            //
+            // Extract the events
+            //
+            switch (event.eventType()) {
+            case ENTRY_ADD:
+                if (mastershipEvent != null) {
+                    ByteBuffer id = mastershipEvent.getIDasByteBuffer();
+                    addedMastershipEvents.put(id, event);
+                    removedMastershipEvents.remove(id);
+                }
+                if (switchEvent != null) {
+                    ByteBuffer id = switchEvent.getIDasByteBuffer();
+                    addedSwitchEvents.put(id, event);
+                    removedSwitchEvents.remove(id);
+                }
+                if (portEvent != null) {
+                    ByteBuffer id = portEvent.getIDasByteBuffer();
+                    addedPortEvents.put(id, event);
+                    removedPortEvents.remove(id);
+                }
+                if (linkEvent != null) {
+                    ByteBuffer id = linkEvent.getIDasByteBuffer();
+                    addedLinkEvents.put(id, event);
+                    removedLinkEvents.remove(id);
+                }
+                if (hostEvent != null) {
+                    ByteBuffer id = hostEvent.getIDasByteBuffer();
+                    addedHostEvents.put(id, event);
+                    removedHostEvents.remove(id);
+                }
+                break;
+            case ENTRY_REMOVE:
+                if (mastershipEvent != null) {
+                    ByteBuffer id = mastershipEvent.getIDasByteBuffer();
+                    addedMastershipEvents.remove(id);
+                    removedMastershipEvents.put(id, event);
+                }
+                if (switchEvent != null) {
+                    ByteBuffer id = switchEvent.getIDasByteBuffer();
+                    addedSwitchEvents.remove(id);
+                    removedSwitchEvents.put(id, event);
+                }
+                if (portEvent != null) {
+                    ByteBuffer id = portEvent.getIDasByteBuffer();
+                    addedPortEvents.remove(id);
+                    removedPortEvents.put(id, event);
+                }
+                if (linkEvent != null) {
+                    ByteBuffer id = linkEvent.getIDasByteBuffer();
+                    addedLinkEvents.remove(id);
+                    removedLinkEvents.put(id, event);
+                }
+                if (hostEvent != null) {
+                    ByteBuffer id = hostEvent.getIDasByteBuffer();
+                    addedHostEvents.remove(id);
+                    removedHostEvents.put(id, event);
+                }
+                break;
+            default:
+                log.error("Unknown topology event {}", event.eventType());
+            }
+        }
+
+        //
+        // Prepare the result by adding the events in the appropriate order:
+        //  - First REMOVE, then ADD
+        //  - The REMOVE order is: Host, Link, Port, Switch, Mastership
+        //  - The ADD order is the reverse: Mastership, Switch, Port, Link,
+        //    Host
+        //
+        List<EventEntry<TopologyEvent>> result = new LinkedList<>();
+        result.addAll(removedHostEvents.values());
+        result.addAll(removedLinkEvents.values());
+        result.addAll(removedPortEvents.values());
+        result.addAll(removedSwitchEvents.values());
+        result.addAll(removedMastershipEvents.values());
+        //
+        result.addAll(addedMastershipEvents.values());
+        result.addAll(addedSwitchEvents.values());
+        result.addAll(addedPortEvents.values());
+        result.addAll(addedLinkEvents.values());
+        result.addAll(addedHostEvents.values());
+
+        return result;
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
index 05abc46..28c23da 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
@@ -72,7 +72,7 @@
     private final IControllerRegistryService registryService;
     private CopyOnWriteArrayList<ITopologyListener> topologyListeners;
     private Kryo kryo = KryoFactory.newKryoObject();
-    private final TopologyEventFilter eventFilter = new TopologyEventFilter();
+    private TopologyEventPreprocessor eventPreprocessor;
 
     //
     // Metrics
@@ -101,17 +101,6 @@
                                 "ListenerEventRate");
 
     //
-    // Local state for keeping track of reordered events.
-    // NOTE: Switch Events are not affected by the event reordering.
-    //
-    private Map<ByteBuffer, PortEvent> reorderedAddedPortEvents =
-            new HashMap<ByteBuffer, PortEvent>();
-    private Map<ByteBuffer, LinkEvent> reorderedAddedLinkEvents =
-            new HashMap<ByteBuffer, LinkEvent>();
-    private Map<ByteBuffer, HostEvent> reorderedAddedHostEvents =
-            new HashMap<ByteBuffer, HostEvent>();
-
-    //
     // Local state for keeping track of locally discovered events so we can
     // cleanup properly when a Switch or Port is removed.
     //
@@ -178,6 +167,8 @@
         datastore = new TopologyDatastore();
         this.registryService = registryService;
         this.topologyListeners = topologyListeners;
+        this.eventPreprocessor =
+            new TopologyEventPreprocessor(registryService);
     }
 
     /**
@@ -211,16 +202,16 @@
             //
             Collection<TopologyEvent> allTopologyEvents =
                     eventChannel.getAllEntries();
-            Collection<EventEntry<TopologyEvent>> collection =
-                    new LinkedList<EventEntry<TopologyEvent>>();
+            List<EventEntry<TopologyEvent>> events =
+                new LinkedList<EventEntry<TopologyEvent>>();
 
             for (TopologyEvent topologyEvent : allTopologyEvents) {
                 EventEntry<TopologyEvent> eventEntry =
                         new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
                                 topologyEvent);
-                collection.add(eventEntry);
+                events.add(eventEntry);
             }
-            processEvents(collection);
+            processEvents(events);
         }
 
         /**
@@ -228,8 +219,8 @@
          */
         @Override
         public void run() {
-            Collection<EventEntry<TopologyEvent>> collection =
-                    new LinkedList<EventEntry<TopologyEvent>>();
+            List<EventEntry<TopologyEvent>> events =
+                new LinkedList<EventEntry<TopologyEvent>>();
 
             this.setName("TopologyManager.EventHandler " + this.getId());
             startup();
@@ -241,11 +232,11 @@
                 try {
                     EventEntry<TopologyEvent> eventEntry =
                         topologyEvents.take();
-                    collection.add(eventEntry);
-                    topologyEvents.drainTo(collection);
+                    events.add(eventEntry);
+                    topologyEvents.drainTo(events);
 
-                    processEvents(collection);
-                    collection.clear();
+                    processEvents(events);
+                    events.clear();
                 } catch (Exception exception) {
                     log.debug("Exception processing Topology Events: ",
                               exception);
@@ -258,112 +249,11 @@
          *
          * @param events the events to process.
          */
-        private void processEvents(Collection<EventEntry<TopologyEvent>> events) {
-            // Local state for computing the final set of events
-            Map<ByteBuffer, MastershipEvent> addedMastershipEvents =
-                new HashMap<>();
-            Map<ByteBuffer, MastershipEvent> removedMastershipEvents =
-                new HashMap<>();
-            Map<ByteBuffer, SwitchEvent> addedSwitchEvents = new HashMap<>();
-            Map<ByteBuffer, SwitchEvent> removedSwitchEvents = new HashMap<>();
-            Map<ByteBuffer, PortEvent> addedPortEvents = new HashMap<>();
-            Map<ByteBuffer, PortEvent> removedPortEvents = new HashMap<>();
-            Map<ByteBuffer, LinkEvent> addedLinkEvents = new HashMap<>();
-            Map<ByteBuffer, LinkEvent> removedLinkEvents = new HashMap<>();
-            Map<ByteBuffer, HostEvent> addedHostEvents = new HashMap<>();
-            Map<ByteBuffer, HostEvent> removedHostEvents = new HashMap<>();
-
+        private void processEvents(List<EventEntry<TopologyEvent>> events) {
             //
-            // Filter the events
+            // Pre-process the events
             //
-            events = eventFilter.filterEvents(events);
-
-            //
-            // Classify and suppress matching events
-            //
-            for (EventEntry<TopologyEvent> event : events) {
-                TopologyEvent topologyEvent = event.eventData();
-                MastershipEvent mastershipEvent = topologyEvent.getMastershipEvent();
-                SwitchEvent switchEvent = topologyEvent.getSwitchEvent();
-                PortEvent portEvent = topologyEvent.getPortEvent();
-                LinkEvent linkEvent = topologyEvent.getLinkEvent();
-                HostEvent hostEvent = topologyEvent.getHostEvent();
-
-                //
-                // Extract the events
-                //
-                // FIXME Following event squashing logic based only on ID
-                //       potentially lose attribute change.
-                switch (event.eventType()) {
-                    case ENTRY_ADD:
-                        log.debug("Topology event ENTRY_ADD: {}", topologyEvent);
-                        if (mastershipEvent != null) {
-                            ByteBuffer id = mastershipEvent.getIDasByteBuffer();
-                            addedMastershipEvents.put(id, mastershipEvent);
-                            removedMastershipEvents.remove(id);
-                        }
-                        if (switchEvent != null) {
-                            ByteBuffer id = switchEvent.getIDasByteBuffer();
-                            addedSwitchEvents.put(id, switchEvent);
-                            removedSwitchEvents.remove(id);
-                            // Switch Events are not affected by event reordering
-                        }
-                        if (portEvent != null) {
-                            ByteBuffer id = portEvent.getIDasByteBuffer();
-                            addedPortEvents.put(id, portEvent);
-                            removedPortEvents.remove(id);
-                            reorderedAddedPortEvents.remove(id);
-                        }
-                        if (linkEvent != null) {
-                            ByteBuffer id = linkEvent.getIDasByteBuffer();
-                            addedLinkEvents.put(id, linkEvent);
-                            removedLinkEvents.remove(id);
-                            reorderedAddedLinkEvents.remove(id);
-                        }
-                        if (hostEvent != null) {
-                            ByteBuffer id = hostEvent.getIDasByteBuffer();
-                            addedHostEvents.put(id, hostEvent);
-                            removedHostEvents.remove(id);
-                            reorderedAddedHostEvents.remove(id);
-                        }
-                        break;
-                    case ENTRY_REMOVE:
-                        log.debug("Topology event ENTRY_REMOVE: {}", topologyEvent);
-                        if (mastershipEvent != null) {
-                            ByteBuffer id = mastershipEvent.getIDasByteBuffer();
-                            addedMastershipEvents.remove(id);
-                            removedMastershipEvents.put(id, mastershipEvent);
-                        }
-                        if (switchEvent != null) {
-                            ByteBuffer id = switchEvent.getIDasByteBuffer();
-                            addedSwitchEvents.remove(id);
-                            removedSwitchEvents.put(id, switchEvent);
-                            // Switch Events are not affected by event reordering
-                        }
-                        if (portEvent != null) {
-                            ByteBuffer id = portEvent.getIDasByteBuffer();
-                            addedPortEvents.remove(id);
-                            removedPortEvents.put(id, portEvent);
-                            reorderedAddedPortEvents.remove(id);
-                        }
-                        if (linkEvent != null) {
-                            ByteBuffer id = linkEvent.getIDasByteBuffer();
-                            addedLinkEvents.remove(id);
-                            removedLinkEvents.put(id, linkEvent);
-                            reorderedAddedLinkEvents.remove(id);
-                        }
-                        if (hostEvent != null) {
-                            ByteBuffer id = hostEvent.getIDasByteBuffer();
-                            addedHostEvents.remove(id);
-                            removedHostEvents.put(id, hostEvent);
-                            reorderedAddedHostEvents.remove(id);
-                        }
-                        break;
-                    default:
-                        log.error("Unknown topology event {}",
-                                  event.eventType());
-                }
-            }
+            events = eventPreprocessor.processEvents(events);
 
             //
             // Lock the topology while it is modified
@@ -371,58 +261,71 @@
             topology.acquireWriteLock();
 
             try {
+                // Apply the events
                 //
-                // Apply the classified events.
+                // NOTE: The events are suppose to be in the proper order
+                // to naturally build and update the topology.
                 //
-                // Apply the "add" events in the proper order:
-                //   mastership, switch, port, link, host
-                //
-                // TODO: Currently, the Mastership events are not used,
-                // so their processing ordering is not important (undefined).
-                //
-                for (MastershipEvent mastershipEvent :
-                         addedMastershipEvents.values()) {
-                    processAddedMastershipEvent(mastershipEvent);
-                }
-                for (SwitchEvent switchEvent : addedSwitchEvents.values()) {
-                    addSwitch(switchEvent);
-                }
-                for (PortEvent portEvent : addedPortEvents.values()) {
-                    addPort(portEvent);
-                }
-                for (LinkEvent linkEvent : addedLinkEvents.values()) {
-                    addLink(linkEvent);
-                }
-                for (HostEvent hostEvent : addedHostEvents.values()) {
-                    addHost(hostEvent);
-                }
-                //
-                // Apply the "remove" events in the reverse order:
-                //   host, link, port, switch, mastership
-                //
-                for (HostEvent hostEvent : removedHostEvents.values()) {
-                    removeHost(hostEvent);
-                }
-                for (LinkEvent linkEvent : removedLinkEvents.values()) {
-                    removeLink(linkEvent);
-                }
-                for (PortEvent portEvent : removedPortEvents.values()) {
-                    removePort(portEvent);
-                }
-                for (SwitchEvent switchEvent : removedSwitchEvents.values()) {
-                    removeSwitch(switchEvent);
-                }
-                for (MastershipEvent mastershipEvent :
-                         removedMastershipEvents.values()) {
-                    processRemovedMastershipEvent(mastershipEvent);
-                }
+                for (EventEntry<TopologyEvent> event : events) {
+                    TopologyEvent topologyEvent = event.eventData();
 
-                //
-                // Apply reordered events
-                //
-                applyReorderedEvents(!addedSwitchEvents.isEmpty(),
-                        !addedPortEvents.isEmpty());
+                    // Get the event itself
+                    MastershipEvent mastershipEvent =
+                        topologyEvent.getMastershipEvent();
+                    SwitchEvent switchEvent = topologyEvent.getSwitchEvent();
+                    PortEvent portEvent = topologyEvent.getPortEvent();
+                    LinkEvent linkEvent = topologyEvent.getLinkEvent();
+                    HostEvent hostEvent = topologyEvent.getHostEvent();
+                    boolean wasAdded = false;
 
+                    //
+                    // Extract the events
+                    //
+                    switch (event.eventType()) {
+                    case ENTRY_ADD:
+                        if (mastershipEvent != null) {
+                            wasAdded = addMastershipEvent(mastershipEvent);
+                        }
+                        if (switchEvent != null) {
+                            wasAdded = addSwitch(switchEvent);
+                        }
+                        if (portEvent != null) {
+                            wasAdded = addPort(portEvent);
+                        }
+                        if (linkEvent != null) {
+                            wasAdded = addLink(linkEvent);
+                        }
+                        if (hostEvent != null) {
+                            wasAdded = addHost(hostEvent);
+                        }
+                        // If the item wasn't added, probably it was reordered
+                        if (!wasAdded) {
+                            ByteBuffer id = topologyEvent.getIDasByteBuffer();
+                            eventPreprocessor.reorderedEvents.put(id, topologyEvent);
+                        }
+                        break;
+                    case ENTRY_REMOVE:
+                        if (mastershipEvent != null) {
+                            removeMastershipEvent(mastershipEvent);
+                        }
+                        if (switchEvent != null) {
+                            removeSwitch(switchEvent);
+                        }
+                        if (portEvent != null) {
+                            removePort(portEvent);
+                        }
+                        if (linkEvent != null) {
+                            removeLink(linkEvent);
+                        }
+                        if (hostEvent != null) {
+                            removeHost(hostEvent);
+                        }
+                        break;
+                    default:
+                        log.error("Unknown topology event {}",
+                                  event.eventType());
+                    }
+                }
             } finally {
                 //
                 // Topology modifications completed: Release the lock
@@ -589,56 +492,6 @@
     }
 
     /**
-     * Apply reordered events.
-     *
-     * @param hasAddedSwitchEvents true if there were Added Switch Events.
-     * @param hasAddedPortEvents   true if there were Added Port Events.
-     */
-    @GuardedBy("topology.writeLock")
-    private void applyReorderedEvents(boolean hasAddedSwitchEvents,
-                                      boolean hasAddedPortEvents) {
-        if (!(hasAddedSwitchEvents || hasAddedPortEvents)) {
-            return;        // Nothing to do
-        }
-
-        //
-        // Try to apply the reordered events.
-        //
-        // NOTE: For simplicity we try to apply all events of a particular
-        // type if any "parent" type event was processed:
-        //  - Apply reordered Port Events if Switches were added
-        //  - Apply reordered Link and Host Events if Switches or Ports
-        //    were added
-        //
-
-        //
-        // Apply reordered Port Events if Switches were added
-        //
-        if (hasAddedSwitchEvents) {
-            Map<ByteBuffer, PortEvent> portEvents = reorderedAddedPortEvents;
-            reorderedAddedPortEvents = new HashMap<>();
-            for (PortEvent portEvent : portEvents.values()) {
-                addPort(portEvent);
-            }
-        }
-        //
-        // Apply reordered Link and Host Events if Switches or Ports
-        // were added.
-        //
-        Map<ByteBuffer, LinkEvent> linkEvents = reorderedAddedLinkEvents;
-        reorderedAddedLinkEvents = new HashMap<>();
-        for (LinkEvent linkEvent : linkEvents.values()) {
-            addLink(linkEvent);
-        }
-        //
-        Map<ByteBuffer, HostEvent> hostEvents = reorderedAddedHostEvents;
-        reorderedAddedHostEvents = new HashMap<>();
-        for (HostEvent hostEvent : hostEvents.values()) {
-            addHost(hostEvent);
-        }
-    }
-
-    /**
      * Mastership updated event.
      *
      * @param mastershipEvent the mastership event.
@@ -974,28 +827,26 @@
     //
 
     /**
-     * Processes added Switch Mastership event.
+     * Adds Switch Mastership event.
      *
      * @param mastershipEvent the MastershipEvent to process.
+     * @return true if the item was successfully added, otherwise false.
      */
     @GuardedBy("topology.writeLock")
-    private void processAddedMastershipEvent(MastershipEvent mastershipEvent) {
-        log.debug("Processing added Mastership event {}",
-                  mastershipEvent);
-        // TODO: Not implemented/used yet.
+    private boolean addMastershipEvent(MastershipEvent mastershipEvent) {
+        log.debug("Added Mastership event {}", mastershipEvent);
         apiAddedMastershipEvents.add(mastershipEvent);
+        return true;
     }
 
     /**
-     * Processes removed Switch Mastership event.
+     * Removes Switch Mastership event.
      *
      * @param mastershipEvent the MastershipEvent to process.
      */
     @GuardedBy("topology.writeLock")
-    private void processRemovedMastershipEvent(MastershipEvent mastershipEvent) {
-        log.debug("Processing removed Mastership event {}",
-                  mastershipEvent);
-        // TODO: Not implemented/used yet.
+    private void removeMastershipEvent(MastershipEvent mastershipEvent) {
+        log.debug("Removed Mastership event {}", mastershipEvent);
         apiRemovedMastershipEvents.add(mastershipEvent);
     }
 
@@ -1003,9 +854,10 @@
      * Adds a switch to the topology replica.
      *
      * @param switchEvent the SwitchEvent with the switch to add.
+     * @return true if the item was successfully added, otherwise false.
      */
     @GuardedBy("topology.writeLock")
-    private void addSwitch(SwitchEvent switchEvent) {
+    private boolean addSwitch(SwitchEvent switchEvent) {
         if (log.isDebugEnabled()) {
             SwitchEvent sw = topology.getSwitchEvent(switchEvent.getDpid());
             if (sw != null) {
@@ -1016,12 +868,14 @@
         }
         topology.putSwitch(switchEvent.freeze());
         apiAddedSwitchEvents.add(switchEvent);
+        return true;
     }
 
     /**
      * Removes a switch from the topology replica.
      * <p/>
-     * It will call {@link #removePort(PortEvent)} for each ports on this switch.
+     * It will call {@link #removePort(PortEvent)} for each ports on this
+     * switch.
      *
      * @param switchEvent the SwitchEvent with the switch to remove.
      */
@@ -1058,16 +912,15 @@
      * Adds a port to the topology replica.
      *
      * @param portEvent the PortEvent with the port to add.
+     * @return true if the item was successfully added, otherwise false.
      */
     @GuardedBy("topology.writeLock")
-    private void addPort(PortEvent portEvent) {
+    private boolean addPort(PortEvent portEvent) {
         Switch sw = topology.getSwitch(portEvent.getDpid());
         if (sw == null) {
+            // Reordered event
             log.debug("{} reordered because switch is null", portEvent);
-            // Reordered event: delay the event in local cache
-            ByteBuffer id = portEvent.getIDasByteBuffer();
-            reorderedAddedPortEvents.put(id, portEvent);
-            return;
+            return false;
         }
 
         if (log.isDebugEnabled()) {
@@ -1080,6 +933,7 @@
         }
         topology.putPort(portEvent.freeze());
         apiAddedPortEvents.add(portEvent);
+        return true;
     }
 
     /**
@@ -1160,32 +1014,34 @@
      * It will remove attachment points from each hosts using the same ports.
      *
      * @param linkEvent the LinkEvent with the link to add.
+     * @return true if the item was successfully added, otherwise false.
      */
     @GuardedBy("topology.writeLock")
-    private void addLink(LinkEvent linkEvent) {
+    private boolean addLink(LinkEvent linkEvent) {
         PortEvent srcPort = topology.getPortEvent(linkEvent.getSrc());
         PortEvent dstPort = topology.getPortEvent(linkEvent.getDst());
         if ((srcPort == null) || (dstPort == null)) {
+            // Reordered event
             log.debug("{} reordered because {} port is null", linkEvent,
                     (srcPort == null) ? "src" : "dst");
-
-            // XXX domain knowledge: port must be present before link.
-            // Reordered event: delay the event in local cache
-            ByteBuffer id = linkEvent.getIDasByteBuffer();
-            reorderedAddedLinkEvents.put(id, linkEvent);
-            return;
+            return false;
         }
 
-        // XXX domain knowledge: Sanity check: Port cannot have both Link and Host
-        // FIXME potentially local replica may not be up-to-date yet due to HZ delay.
-        //        may need to manage local truth and use them instead.
+        //
+        // XXX domain knowledge: Sanity check: Port cannot have both Link and
+        // Host.
+        //
+        // FIXME: Potentially local replica may not be up-to-date yet due to
+        //        Hazelcast delay.
+        // FIXME: May need to manage local truth and use them instead.
+        //
         if (topology.getLinkEvent(linkEvent.getLinkTuple()) == null) {
             // Only check for existing Host when adding new Link.
-
             // Remove all Hosts attached to the ports on both ends
 
-            Set<HostEvent> hostsToUpdate = new TreeSet<>(new Comparator<HostEvent>() {
-                // comparison only using ID(=MAC)
+            Set<HostEvent> hostsToUpdate =
+                new TreeSet<>(new Comparator<HostEvent>() {
+                // Comparison only using ID(=MAC)
                 @Override
                 public int compare(HostEvent o1, HostEvent o2) {
                     return Long.compare(o1.getMac().toLong(), o2.getMac().toLong());
@@ -1206,9 +1062,9 @@
                     hostsToUpdate.add(hostEvent);
                 }
             }
-            // remove attachment point from them.
+            // Remove attachment point from them
             for (HostEvent hostEvent : hostsToUpdate) {
-                // remove port from attachment point and update
+                // Remove port from attachment point and update
                 HostEvent newHostEvent = new HostEvent(hostEvent);
                 newHostEvent.removeAttachmentPoint(srcPort.getSwitchPort());
                 newHostEvent.removeAttachmentPoint(dstPort.getSwitchPort());
@@ -1235,6 +1091,7 @@
         }
         topology.putLink(linkEvent.freeze());
         apiAddedLinkEvents.add(linkEvent);
+        return true;
     }
 
     /**
@@ -1290,12 +1147,14 @@
      * <p/>
      * TODO: Host-related work is incomplete.
      * TODO: Eventually, we might need to consider reordering
-     * or {@link #addLink(LinkEvent)} and {@link #addHost(HostEvent)} events on the same port.
+     * or {@link #addLink(LinkEvent)} and {@link #addHost(HostEvent)} events
+     * on the same port.
      *
      * @param hostEvent the HostEvent with the host to add.
+     * @return true if the item was successfully added, otherwise false.
      */
     @GuardedBy("topology.writeLock")
-    private void addHost(HostEvent hostEvent) {
+    private boolean addHost(HostEvent hostEvent) {
 
         // TODO Decide how to handle update scenario.
         // If the new HostEvent has less attachment point compared to
@@ -1316,10 +1175,8 @@
             Port port = topology.getPort(swp.getDpid(), swp.getPortNumber());
             if (port == null) {
                 log.debug("{} reordered because port {} was not there", hostEvent, swp);
-                // Reordered event: delay the event in local cache
-                ByteBuffer id = hostEvent.getIDasByteBuffer();
-                reorderedAddedHostEvents.put(id, hostEvent);
-                return; // should not continue if re-applying later
+                // Reordered event
+                return false; // should not continue if re-applying later
             }
             // Attached Ports must not have Link
             if (port.getOutgoingLink() != null ||
@@ -1345,7 +1202,7 @@
                         + "original: {}, modified: {}", hostEvent, modifiedHostEvent);
                 // TODO Should we call #removeHost to trigger remove event?
                 //      only if this call is update.
-                return;
+                return false;
             }
 
             if (log.isDebugEnabled()) {
@@ -1358,7 +1215,9 @@
             }
             topology.putHost(modifiedHostEvent.freeze());
             apiAddedHostEvents.add(modifiedHostEvent);
+            return true;
         }
+        return false;
     }
 
     /**
@@ -1386,11 +1245,11 @@
     /**
      * Read the whole topology from the database.
      *
-     * @return a collection of EventEntry-encapsulated Topology Events for
+     * @return a list of EventEntry-encapsulated Topology Events for
      * the whole topology.
      */
-    private Collection<EventEntry<TopologyEvent>> readWholeTopologyFromDB() {
-        Collection<EventEntry<TopologyEvent>> collection =
+    private List<EventEntry<TopologyEvent>> readWholeTopologyFromDB() {
+        List<EventEntry<TopologyEvent>> events =
                 new LinkedList<EventEntry<TopologyEvent>>();
 
         // XXX May need to clear whole topology first, depending on
@@ -1414,7 +1273,7 @@
             EventEntry<TopologyEvent> eventEntry =
                     new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
                             topologyEvent);
-            collection.add(eventEntry);
+            events.add(eventEntry);
         }
 
         // Add all active ports
@@ -1437,7 +1296,7 @@
             EventEntry<TopologyEvent> eventEntry =
                     new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
                             topologyEvent);
-            collection.add(eventEntry);
+            events.add(eventEntry);
         }
 
         for (KVDevice d : KVDevice.getAllDevices()) {
@@ -1469,9 +1328,9 @@
             EventEntry<TopologyEvent> eventEntry =
                     new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
                             topologyEvent);
-            collection.add(eventEntry);
+            events.add(eventEntry);
         }
 
-        return collection;
+        return events;
     }
 }