Work toward ONOS-1451: Separate Event Key space per instance
Final step in the implementation:
Implement the Topology Event filtering and processing logic
inside class TopologyEventPreprocessor:
- The topology events from each ONOS instance are kept separately
within class OnosInstanceLastAddEvents
- We keep only the last copy of the ADD events (not removed yet
by REMOVE events).
- If ADD MastershipEvent is received, all previously
stored ADD Topology events for the corresponding Switch DPID are
applied to the Topology
- If there were previously reordered events, we attempt to
add them again along with the other events.
- Before adding the events to the Topology, the events are
aggregated (squashed), and reordered in their natural order to
build a Topology. E.g., the ADD events order is:
MastershipEvent, SwitchEvent, PortEvent, LinkEvent, HostEvent.
The REMOVE events are in the reverse order.
Also, the REMOVE events are processed before the ADD events.
- If an event cannot be added to the topology, it is added
back to the collection of reordered events.
Also, replaced some of the Collection<> with List<> in some of the
event-related API to indicate that there is a semantic ordering
of the events.
NOTE: Some of the internals of TopologyEventPreprocessor should be optimized
for speed. E.g., we should create an additional lookup map:
Dpid -> Collection<TopologyEvent>
and then method getPostponedEvents() doesn't need to traverse all
topologyEvents entries.
Change-Id: Ic0d9ebf242f015adb60817921b239bb65dd560e2
diff --git a/src/main/java/net/onrc/onos/core/topology/HostEvent.java b/src/main/java/net/onrc/onos/core/topology/HostEvent.java
index 1c64939..33f8db8 100644
--- a/src/main/java/net/onrc/onos/core/topology/HostEvent.java
+++ b/src/main/java/net/onrc/onos/core/topology/HostEvent.java
@@ -9,6 +9,7 @@
import net.floodlightcontroller.util.MACAddress;
import net.onrc.onos.core.topology.web.serializers.HostEventSerializer;
+import net.onrc.onos.core.util.Dpid;
import net.onrc.onos.core.util.SwitchPort;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -108,6 +109,19 @@
this.lastSeenTime = lastSeenTime;
}
+ /**
+ * Gets the event origin DPID.
+ *
+ * @return the event origin DPID.
+ */
+ public Dpid getOriginDpid() {
+ // TODO: Eventually, we should return a collection of Dpid values
+ for (SwitchPort sp : attachmentPoints) {
+ return sp.getDpid();
+ }
+ return null;
+ }
+
@Override
public String toString() {
return "[HostEvent " + mac + " attachmentPoints:" + attachmentPoints + "]";
diff --git a/src/main/java/net/onrc/onos/core/topology/LinkEvent.java b/src/main/java/net/onrc/onos/core/topology/LinkEvent.java
index f7ae6d3..807dcaf 100644
--- a/src/main/java/net/onrc/onos/core/topology/LinkEvent.java
+++ b/src/main/java/net/onrc/onos/core/topology/LinkEvent.java
@@ -122,6 +122,15 @@
this.capacity = capacity;
}
+ /**
+ * Gets the event origin DPID.
+ *
+ * @return the event origin DPID.
+ */
+ public Dpid getOriginDpid() {
+ return this.id.getDst().getDpid();
+ }
+
@Override
public String toString() {
return "[LinkEvent " + getSrc() + "->" + getDst() + "]";
diff --git a/src/main/java/net/onrc/onos/core/topology/MastershipEvent.java b/src/main/java/net/onrc/onos/core/topology/MastershipEvent.java
index 8106a39..589174d 100644
--- a/src/main/java/net/onrc/onos/core/topology/MastershipEvent.java
+++ b/src/main/java/net/onrc/onos/core/topology/MastershipEvent.java
@@ -89,6 +89,15 @@
return role;
}
+ /**
+ * Gets the event origin DPID.
+ *
+ * @return the event origin DPID.
+ */
+ public Dpid getOriginDpid() {
+ return this.dpid;
+ }
+
@Override
public String toString() {
return "[MastershipEvent " + getDpid() + "@" + getOnosInstanceId() +
diff --git a/src/main/java/net/onrc/onos/core/topology/PortEvent.java b/src/main/java/net/onrc/onos/core/topology/PortEvent.java
index 0fa5a7e..f23f907 100644
--- a/src/main/java/net/onrc/onos/core/topology/PortEvent.java
+++ b/src/main/java/net/onrc/onos/core/topology/PortEvent.java
@@ -95,6 +95,15 @@
return id.getPortNumber();
}
+ /**
+ * Gets the event origin DPID.
+ *
+ * @return the event origin DPID.
+ */
+ public Dpid getOriginDpid() {
+ return this.id.getDpid();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/src/main/java/net/onrc/onos/core/topology/SwitchEvent.java b/src/main/java/net/onrc/onos/core/topology/SwitchEvent.java
index f724fae..a3b6a93 100644
--- a/src/main/java/net/onrc/onos/core/topology/SwitchEvent.java
+++ b/src/main/java/net/onrc/onos/core/topology/SwitchEvent.java
@@ -56,6 +56,15 @@
return dpid;
}
+ /**
+ * Gets the event origin DPID.
+ *
+ * @return the event origin DPID.
+ */
+ public Dpid getOriginDpid() {
+ return this.dpid;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyEvent.java b/src/main/java/net/onrc/onos/core/topology/TopologyEvent.java
index 4ea9e31..23471ff 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyEvent.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyEvent.java
@@ -4,6 +4,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Objects;
+import net.onrc.onos.core.util.Dpid;
import net.onrc.onos.core.util.OnosInstanceId;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -169,6 +170,40 @@
}
/**
+ * Gets the event origin DPID.
+ *
+ * @return the event origin DPID.
+ */
+ public Dpid getOriginDpid() {
+ if (mastershipEvent != null) {
+ return mastershipEvent.getOriginDpid();
+ }
+ if (switchEvent != null) {
+ return switchEvent.getOriginDpid();
+ }
+ if (portEvent != null) {
+ return portEvent.getOriginDpid();
+ }
+ if (linkEvent != null) {
+ return linkEvent.getOriginDpid();
+ }
+ if (hostEvent != null) {
+ return hostEvent.getOriginDpid();
+ }
+ return null;
+ }
+
+ /**
+ * Tests whether the event origin DPID equals the specified DPID.
+ *
+ * @param dpid the DPID to compare against.
+ * @return true if the event origin Dpid equals the specified DPID.
+ */
+ public boolean equalsOriginDpid(Dpid dpid) {
+ return dpid.equals(getOriginDpid());
+ }
+
+ /**
* Checks if all events contained are equal.
*
* @param obj TopologyEvent to compare against
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyEventFilter.java b/src/main/java/net/onrc/onos/core/topology/TopologyEventFilter.java
deleted file mode 100644
index 65eed32..0000000
--- a/src/main/java/net/onrc/onos/core/topology/TopologyEventFilter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package net.onrc.onos.core.topology;
-
-import java.util.Collection;
-
-import net.onrc.onos.core.util.EventEntry;
-
-/**
- * Stateful filter for filtering Topology events.
- * <p/>
- * NOTE: The filter itself keeps internal state about filtered events.
- * As part of the filtering logic, a previously suppressed event might
- * be genenerated (released) later because of some other event.
- */
-public class TopologyEventFilter {
- /**
- * Filter a collection of events.
- *
- * @param events the events to filter.
- * @return a collection of filtered events.
- */
- Collection<EventEntry<TopologyEvent>> filterEvents(
- Collection<EventEntry<TopologyEvent>> events) {
-
- // TODO: Not implemented yet
- return events;
- }
-}
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyEventPreprocessor.java b/src/main/java/net/onrc/onos/core/topology/TopologyEventPreprocessor.java
new file mode 100644
index 0000000..69d9b07
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyEventPreprocessor.java
@@ -0,0 +1,438 @@
+package net.onrc.onos.core.topology;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import net.floodlightcontroller.core.IFloodlightProviderService.Role;
+import net.onrc.onos.core.registry.IControllerRegistryService;
+import net.onrc.onos.core.registry.RegistryException;
+import net.onrc.onos.core.util.Dpid;
+import net.onrc.onos.core.util.EventEntry;
+import net.onrc.onos.core.util.OnosInstanceId;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Topology Event pre-processor. It is used by the Topology Manager for
+ * pre-processing Topology events before applying them to the Topology.
+ * <p/>
+ * The pre-processor itself keeps internal state about the most recent
+ * ADD events. It also might keep state about reordered events that cannot
+ * be applied.
+ * <p/>
+ * As part of the pre-processing logic, a previously suppressed event might
+ * be genenerated later because of some other event.
+ */
+public class TopologyEventPreprocessor {
+ private static final Logger log = LoggerFactory
+ .getLogger(TopologyEventPreprocessor.class);
+ private final IControllerRegistryService registryService;
+
+ //
+ // Reordered ADD events that need to be reapplied
+ //
+ // TODO: For now, this field is accessed by the TopologyManager as well
+ // This should be refactored, and change them to private.
+ //
+ Map<ByteBuffer, TopologyEvent> reorderedEvents = new HashMap<>();
+
+ //
+ // Topology ADD event state per ONOS instance
+ //
+ private Map<OnosInstanceId, OnosInstanceLastAddEvents> instanceState =
+ new HashMap<>();
+
+ //
+ // Switch mastership state (updated by the topology events)
+ //
+ Map<Dpid, OnosInstanceId> switchMastership = new HashMap<>();
+
+ /**
+ * Constructor for a given Registry Service.
+ *
+ * @param registryService the Registry Service to use.
+ */
+ TopologyEventPreprocessor(IControllerRegistryService registryService) {
+ this.registryService = registryService;
+ }
+
+ /**
+ * Class to store the last ADD Topology Events per ONOS Instance.
+ */
+ private final class OnosInstanceLastAddEvents {
+ private final OnosInstanceId onosInstanceId;
+
+ // The last ADD events received from this ONOS instance
+ Map<ByteBuffer, TopologyEvent> topologyEvents = new HashMap<>();
+
+ /**
+ * Constructor for a given ONOS Instance ID.
+ *
+ * @param onosInstanceId the ONOS Instance ID.
+ */
+ OnosInstanceLastAddEvents(OnosInstanceId onosInstanceId) {
+ this.onosInstanceId = checkNotNull(onosInstanceId);
+ }
+
+ /**
+ * Processes an event originated by this ONOS instance.
+ *
+ * @param event the event to process.
+ * @return true if the event should be applied to the final Topology
+ * as well, otherwise false.
+ */
+ boolean processEvent(EventEntry<TopologyEvent> event) {
+ TopologyEvent topologyEvent = event.eventData();
+ ByteBuffer id = topologyEvent.getIDasByteBuffer();
+ OnosInstanceId masterId = null;
+
+ // Get the Master of the Origin DPID
+ Dpid dpid = topologyEvent.getOriginDpid();
+ if (dpid != null) {
+ masterId = switchMastership.get(dpid);
+ }
+
+ //
+ // Apply the event based on its type
+ //
+ switch (event.eventType()) {
+ case ENTRY_ADD:
+ topologyEvents.put(id, topologyEvent);
+ reorderedEvents.remove(id);
+ // Allow the ADD only if the event was originated by the Master
+ return onosInstanceId.equals(masterId);
+
+ case ENTRY_REMOVE:
+ reorderedEvents.remove(id);
+ // Don't allow the REMOVE event if there was no ADD before
+ if (topologyEvents.remove(id) == null) {
+ return false;
+ }
+ //
+ // Allow the REMOVE if the event was originated by the Master,
+ // or there is no Master at all.
+ //
+ if (masterId == null) {
+ return true;
+ }
+ return onosInstanceId.equals(masterId);
+
+ default:
+ log.error("Unknown topology event {}", event.eventType());
+ }
+
+ return false;
+ }
+
+ /**
+ * Gets the postponed events for a given DPID.
+ * Those are the events that couldn't be applied earlier to the
+ * Topology, because the ONOS Instance originating the events
+ * was not the Master for the Switch.
+ *
+ * @param dpid the DPID to use.
+ * @return a list of postponed events for the given DPID.
+ */
+ List<EventEntry<TopologyEvent>> getPostponedEvents(Dpid dpid) {
+ List<EventEntry<TopologyEvent>> result = new LinkedList<>();
+
+ //
+ // Search all events, and keep only those that match the DPID
+ //
+ // TODO: This could be slow, and the code should be optimized
+ // for speed. The processing complexity is O(N*N) where N is
+ // the number of Switches: for each Switch Mastership we call
+ // getPostponedEvents(), and then for each call we
+ // search all previously added events.
+ // The code can be optimized by adding additional lookup map:
+ // Dpid -> List<TopologyEvent>
+ //
+ for (TopologyEvent te : topologyEvents.values()) {
+ if (dpid.equals(te.getOriginDpid())) {
+ result.add(new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD, te));
+ }
+ }
+
+ return result;
+ }
+ }
+
+ /**
+ * Extracts previously reordered events that should be applied again
+ * to the Topology.
+ *
+ * @return a list of previously reordered events.
+ */
+ List<EventEntry<TopologyEvent>> extractReorderedEvents() {
+ List<EventEntry<TopologyEvent>> result = new LinkedList<>();
+
+ //
+ // Search all previously reordered events, and extract only if
+ // the originator is the Master.
+ //
+ List<TopologyEvent> leftoverEvents = new LinkedList<>();
+ for (TopologyEvent te : reorderedEvents.values()) {
+ Dpid dpid = te.getOriginDpid();
+ OnosInstanceId masterId = null;
+ if (dpid != null) {
+ masterId = switchMastership.get(dpid);
+ }
+ if (te.getOnosInstanceId().equals(masterId)) {
+ result.add(new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD, te));
+ } else {
+ leftoverEvents.add(te);
+ }
+ }
+
+ //
+ // Add back the leftover events
+ //
+ reorderedEvents.clear();
+ for (TopologyEvent te : leftoverEvents) {
+ reorderedEvents.put(te.getIDasByteBuffer(), te);
+ }
+
+ return result;
+ }
+
+ /**
+ * Pre-processes a list of events.
+ *
+ * @param events the events to pre-process.
+ * @return a list of pre-processed events.
+ */
+ List<EventEntry<TopologyEvent>> processEvents(
+ List<EventEntry<TopologyEvent>> events) {
+ List<EventEntry<TopologyEvent>> result = new LinkedList<>();
+
+ //
+ // Process the events
+ //
+ for (EventEntry<TopologyEvent> event : events) {
+ List<EventEntry<TopologyEvent>> postponedEvents = null;
+
+ TopologyEvent topologyEvent = event.eventData();
+ OnosInstanceId onosInstanceId = topologyEvent.getOnosInstanceId();
+
+ log.debug("Topology event {}: {}", event.eventType(),
+ topologyEvent);
+
+ // Get the ONOS instance state
+ OnosInstanceLastAddEvents instance =
+ instanceState.get(onosInstanceId);
+ if (instance == null) {
+ instance = new OnosInstanceLastAddEvents(onosInstanceId);
+ instanceState.put(onosInstanceId, instance);
+ }
+
+ //
+ // Update the Switch Mastership state:
+ // - If ADD a MASTER and the Mastership is confirmed by the
+ // Registry Service, then add to the Mastership map and fetch
+ // the postponed events from the originating ONOS Instance.
+ // - Otherwise, remove from the Mastership map, but only if it is
+ // the current MASTER.
+ //
+ MastershipEvent mastershipEvent =
+ topologyEvent.getMastershipEvent();
+ if (mastershipEvent != null) {
+ Dpid dpid = mastershipEvent.getDpid();
+ boolean newMaster = false;
+
+ if ((event.eventType() == EventEntry.Type.ENTRY_ADD) &&
+ (mastershipEvent.getRole() == Role.MASTER)) {
+ //
+ // Check with the Registry Service as well
+ //
+ try {
+ String rc =
+ registryService.getControllerForSwitch(dpid.value());
+ if ((rc != null) &&
+ onosInstanceId.equals(new OnosInstanceId(rc))) {
+ newMaster = true;
+ }
+ } catch (RegistryException e) {
+ log.error("Caught RegistryException while pre-processing Mastership Event", e);
+ }
+ }
+
+ if (newMaster) {
+ // Add to the map
+ switchMastership.put(dpid, onosInstanceId);
+ postponedEvents = instance.getPostponedEvents(dpid);
+ } else {
+ // Eventually remove from the map
+ OnosInstanceId oldId = switchMastership.get(dpid);
+ if (onosInstanceId.equals(oldId)) {
+ switchMastership.remove(dpid);
+ }
+ }
+ }
+
+ //
+ // Process the event and eventually store it in the
+ // per-Instance state.
+ //
+ if (instance.processEvent(event)) {
+ result.add(event);
+ }
+
+ // Add the postponed events (if any)
+ if (postponedEvents != null) {
+ result.addAll(postponedEvents);
+ }
+ }
+
+ // Extract and add the previously reordered events
+ result.addAll(extractReorderedEvents());
+
+ return reorderEventsForTopology(result);
+ }
+
+ /**
+ * Classifies and reorders a list of events, and suppresses matching
+ * events.
+ * <p/>
+ * The result events can be applied to the Topology in the following
+ * order: REMOVE events followed by ADD events. The ADD events are in the
+ * natural order to build a Topology: MastershipEvent, SwitchEvent,
+ * PortEvent, LinkEvent, HostEvent. The REMOVE events are in the reverse
+ * order.
+ *
+ * @param events the events to classify and reorder.
+ * @return the classified and reordered events.
+ */
+ private List<EventEntry<TopologyEvent>> reorderEventsForTopology(
+ List<EventEntry<TopologyEvent>> events) {
+ // Local state for computing the final set of events
+ Map<ByteBuffer, EventEntry<TopologyEvent>> addedMastershipEvents =
+ new HashMap<>();
+ Map<ByteBuffer, EventEntry<TopologyEvent>> removedMastershipEvents =
+ new HashMap<>();
+ Map<ByteBuffer, EventEntry<TopologyEvent>> addedSwitchEvents =
+ new HashMap<>();
+ Map<ByteBuffer, EventEntry<TopologyEvent>> removedSwitchEvents =
+ new HashMap<>();
+ Map<ByteBuffer, EventEntry<TopologyEvent>> addedPortEvents =
+ new HashMap<>();
+ Map<ByteBuffer, EventEntry<TopologyEvent>> removedPortEvents =
+ new HashMap<>();
+ Map<ByteBuffer, EventEntry<TopologyEvent>> addedLinkEvents =
+ new HashMap<>();
+ Map<ByteBuffer, EventEntry<TopologyEvent>> removedLinkEvents =
+ new HashMap<>();
+ Map<ByteBuffer, EventEntry<TopologyEvent>> addedHostEvents =
+ new HashMap<>();
+ Map<ByteBuffer, EventEntry<TopologyEvent>> removedHostEvents =
+ new HashMap<>();
+
+ //
+ // Classify and suppress matching events
+ //
+ // NOTE: We intentionally use the event payload as the key ID
+ // (i.e., we exclude the ONOS Instance ID from the key),
+ // so we can suppress transient events across multiple ONOS instances.
+ //
+ for (EventEntry<TopologyEvent> event : events) {
+ TopologyEvent topologyEvent = event.eventData();
+
+ // Get the event itself
+ MastershipEvent mastershipEvent =
+ topologyEvent.getMastershipEvent();
+ SwitchEvent switchEvent = topologyEvent.getSwitchEvent();
+ PortEvent portEvent = topologyEvent.getPortEvent();
+ LinkEvent linkEvent = topologyEvent.getLinkEvent();
+ HostEvent hostEvent = topologyEvent.getHostEvent();
+
+ //
+ // Extract the events
+ //
+ switch (event.eventType()) {
+ case ENTRY_ADD:
+ if (mastershipEvent != null) {
+ ByteBuffer id = mastershipEvent.getIDasByteBuffer();
+ addedMastershipEvents.put(id, event);
+ removedMastershipEvents.remove(id);
+ }
+ if (switchEvent != null) {
+ ByteBuffer id = switchEvent.getIDasByteBuffer();
+ addedSwitchEvents.put(id, event);
+ removedSwitchEvents.remove(id);
+ }
+ if (portEvent != null) {
+ ByteBuffer id = portEvent.getIDasByteBuffer();
+ addedPortEvents.put(id, event);
+ removedPortEvents.remove(id);
+ }
+ if (linkEvent != null) {
+ ByteBuffer id = linkEvent.getIDasByteBuffer();
+ addedLinkEvents.put(id, event);
+ removedLinkEvents.remove(id);
+ }
+ if (hostEvent != null) {
+ ByteBuffer id = hostEvent.getIDasByteBuffer();
+ addedHostEvents.put(id, event);
+ removedHostEvents.remove(id);
+ }
+ break;
+ case ENTRY_REMOVE:
+ if (mastershipEvent != null) {
+ ByteBuffer id = mastershipEvent.getIDasByteBuffer();
+ addedMastershipEvents.remove(id);
+ removedMastershipEvents.put(id, event);
+ }
+ if (switchEvent != null) {
+ ByteBuffer id = switchEvent.getIDasByteBuffer();
+ addedSwitchEvents.remove(id);
+ removedSwitchEvents.put(id, event);
+ }
+ if (portEvent != null) {
+ ByteBuffer id = portEvent.getIDasByteBuffer();
+ addedPortEvents.remove(id);
+ removedPortEvents.put(id, event);
+ }
+ if (linkEvent != null) {
+ ByteBuffer id = linkEvent.getIDasByteBuffer();
+ addedLinkEvents.remove(id);
+ removedLinkEvents.put(id, event);
+ }
+ if (hostEvent != null) {
+ ByteBuffer id = hostEvent.getIDasByteBuffer();
+ addedHostEvents.remove(id);
+ removedHostEvents.put(id, event);
+ }
+ break;
+ default:
+ log.error("Unknown topology event {}", event.eventType());
+ }
+ }
+
+ //
+ // Prepare the result by adding the events in the appropriate order:
+ // - First REMOVE, then ADD
+ // - The REMOVE order is: Host, Link, Port, Switch, Mastership
+ // - The ADD order is the reverse: Mastership, Switch, Port, Link,
+ // Host
+ //
+ List<EventEntry<TopologyEvent>> result = new LinkedList<>();
+ result.addAll(removedHostEvents.values());
+ result.addAll(removedLinkEvents.values());
+ result.addAll(removedPortEvents.values());
+ result.addAll(removedSwitchEvents.values());
+ result.addAll(removedMastershipEvents.values());
+ //
+ result.addAll(addedMastershipEvents.values());
+ result.addAll(addedSwitchEvents.values());
+ result.addAll(addedPortEvents.values());
+ result.addAll(addedLinkEvents.values());
+ result.addAll(addedHostEvents.values());
+
+ return result;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
index 05abc46..28c23da 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
@@ -72,7 +72,7 @@
private final IControllerRegistryService registryService;
private CopyOnWriteArrayList<ITopologyListener> topologyListeners;
private Kryo kryo = KryoFactory.newKryoObject();
- private final TopologyEventFilter eventFilter = new TopologyEventFilter();
+ private TopologyEventPreprocessor eventPreprocessor;
//
// Metrics
@@ -101,17 +101,6 @@
"ListenerEventRate");
//
- // Local state for keeping track of reordered events.
- // NOTE: Switch Events are not affected by the event reordering.
- //
- private Map<ByteBuffer, PortEvent> reorderedAddedPortEvents =
- new HashMap<ByteBuffer, PortEvent>();
- private Map<ByteBuffer, LinkEvent> reorderedAddedLinkEvents =
- new HashMap<ByteBuffer, LinkEvent>();
- private Map<ByteBuffer, HostEvent> reorderedAddedHostEvents =
- new HashMap<ByteBuffer, HostEvent>();
-
- //
// Local state for keeping track of locally discovered events so we can
// cleanup properly when a Switch or Port is removed.
//
@@ -178,6 +167,8 @@
datastore = new TopologyDatastore();
this.registryService = registryService;
this.topologyListeners = topologyListeners;
+ this.eventPreprocessor =
+ new TopologyEventPreprocessor(registryService);
}
/**
@@ -211,16 +202,16 @@
//
Collection<TopologyEvent> allTopologyEvents =
eventChannel.getAllEntries();
- Collection<EventEntry<TopologyEvent>> collection =
- new LinkedList<EventEntry<TopologyEvent>>();
+ List<EventEntry<TopologyEvent>> events =
+ new LinkedList<EventEntry<TopologyEvent>>();
for (TopologyEvent topologyEvent : allTopologyEvents) {
EventEntry<TopologyEvent> eventEntry =
new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
topologyEvent);
- collection.add(eventEntry);
+ events.add(eventEntry);
}
- processEvents(collection);
+ processEvents(events);
}
/**
@@ -228,8 +219,8 @@
*/
@Override
public void run() {
- Collection<EventEntry<TopologyEvent>> collection =
- new LinkedList<EventEntry<TopologyEvent>>();
+ List<EventEntry<TopologyEvent>> events =
+ new LinkedList<EventEntry<TopologyEvent>>();
this.setName("TopologyManager.EventHandler " + this.getId());
startup();
@@ -241,11 +232,11 @@
try {
EventEntry<TopologyEvent> eventEntry =
topologyEvents.take();
- collection.add(eventEntry);
- topologyEvents.drainTo(collection);
+ events.add(eventEntry);
+ topologyEvents.drainTo(events);
- processEvents(collection);
- collection.clear();
+ processEvents(events);
+ events.clear();
} catch (Exception exception) {
log.debug("Exception processing Topology Events: ",
exception);
@@ -258,112 +249,11 @@
*
* @param events the events to process.
*/
- private void processEvents(Collection<EventEntry<TopologyEvent>> events) {
- // Local state for computing the final set of events
- Map<ByteBuffer, MastershipEvent> addedMastershipEvents =
- new HashMap<>();
- Map<ByteBuffer, MastershipEvent> removedMastershipEvents =
- new HashMap<>();
- Map<ByteBuffer, SwitchEvent> addedSwitchEvents = new HashMap<>();
- Map<ByteBuffer, SwitchEvent> removedSwitchEvents = new HashMap<>();
- Map<ByteBuffer, PortEvent> addedPortEvents = new HashMap<>();
- Map<ByteBuffer, PortEvent> removedPortEvents = new HashMap<>();
- Map<ByteBuffer, LinkEvent> addedLinkEvents = new HashMap<>();
- Map<ByteBuffer, LinkEvent> removedLinkEvents = new HashMap<>();
- Map<ByteBuffer, HostEvent> addedHostEvents = new HashMap<>();
- Map<ByteBuffer, HostEvent> removedHostEvents = new HashMap<>();
-
+ private void processEvents(List<EventEntry<TopologyEvent>> events) {
//
- // Filter the events
+ // Pre-process the events
//
- events = eventFilter.filterEvents(events);
-
- //
- // Classify and suppress matching events
- //
- for (EventEntry<TopologyEvent> event : events) {
- TopologyEvent topologyEvent = event.eventData();
- MastershipEvent mastershipEvent = topologyEvent.getMastershipEvent();
- SwitchEvent switchEvent = topologyEvent.getSwitchEvent();
- PortEvent portEvent = topologyEvent.getPortEvent();
- LinkEvent linkEvent = topologyEvent.getLinkEvent();
- HostEvent hostEvent = topologyEvent.getHostEvent();
-
- //
- // Extract the events
- //
- // FIXME Following event squashing logic based only on ID
- // potentially lose attribute change.
- switch (event.eventType()) {
- case ENTRY_ADD:
- log.debug("Topology event ENTRY_ADD: {}", topologyEvent);
- if (mastershipEvent != null) {
- ByteBuffer id = mastershipEvent.getIDasByteBuffer();
- addedMastershipEvents.put(id, mastershipEvent);
- removedMastershipEvents.remove(id);
- }
- if (switchEvent != null) {
- ByteBuffer id = switchEvent.getIDasByteBuffer();
- addedSwitchEvents.put(id, switchEvent);
- removedSwitchEvents.remove(id);
- // Switch Events are not affected by event reordering
- }
- if (portEvent != null) {
- ByteBuffer id = portEvent.getIDasByteBuffer();
- addedPortEvents.put(id, portEvent);
- removedPortEvents.remove(id);
- reorderedAddedPortEvents.remove(id);
- }
- if (linkEvent != null) {
- ByteBuffer id = linkEvent.getIDasByteBuffer();
- addedLinkEvents.put(id, linkEvent);
- removedLinkEvents.remove(id);
- reorderedAddedLinkEvents.remove(id);
- }
- if (hostEvent != null) {
- ByteBuffer id = hostEvent.getIDasByteBuffer();
- addedHostEvents.put(id, hostEvent);
- removedHostEvents.remove(id);
- reorderedAddedHostEvents.remove(id);
- }
- break;
- case ENTRY_REMOVE:
- log.debug("Topology event ENTRY_REMOVE: {}", topologyEvent);
- if (mastershipEvent != null) {
- ByteBuffer id = mastershipEvent.getIDasByteBuffer();
- addedMastershipEvents.remove(id);
- removedMastershipEvents.put(id, mastershipEvent);
- }
- if (switchEvent != null) {
- ByteBuffer id = switchEvent.getIDasByteBuffer();
- addedSwitchEvents.remove(id);
- removedSwitchEvents.put(id, switchEvent);
- // Switch Events are not affected by event reordering
- }
- if (portEvent != null) {
- ByteBuffer id = portEvent.getIDasByteBuffer();
- addedPortEvents.remove(id);
- removedPortEvents.put(id, portEvent);
- reorderedAddedPortEvents.remove(id);
- }
- if (linkEvent != null) {
- ByteBuffer id = linkEvent.getIDasByteBuffer();
- addedLinkEvents.remove(id);
- removedLinkEvents.put(id, linkEvent);
- reorderedAddedLinkEvents.remove(id);
- }
- if (hostEvent != null) {
- ByteBuffer id = hostEvent.getIDasByteBuffer();
- addedHostEvents.remove(id);
- removedHostEvents.put(id, hostEvent);
- reorderedAddedHostEvents.remove(id);
- }
- break;
- default:
- log.error("Unknown topology event {}",
- event.eventType());
- }
- }
+ events = eventPreprocessor.processEvents(events);
//
// Lock the topology while it is modified
@@ -371,58 +261,71 @@
topology.acquireWriteLock();
try {
+ // Apply the events
//
- // Apply the classified events.
+ // NOTE: The events are suppose to be in the proper order
+ // to naturally build and update the topology.
//
- // Apply the "add" events in the proper order:
- // mastership, switch, port, link, host
- //
- // TODO: Currently, the Mastership events are not used,
- // so their processing ordering is not important (undefined).
- //
- for (MastershipEvent mastershipEvent :
- addedMastershipEvents.values()) {
- processAddedMastershipEvent(mastershipEvent);
- }
- for (SwitchEvent switchEvent : addedSwitchEvents.values()) {
- addSwitch(switchEvent);
- }
- for (PortEvent portEvent : addedPortEvents.values()) {
- addPort(portEvent);
- }
- for (LinkEvent linkEvent : addedLinkEvents.values()) {
- addLink(linkEvent);
- }
- for (HostEvent hostEvent : addedHostEvents.values()) {
- addHost(hostEvent);
- }
- //
- // Apply the "remove" events in the reverse order:
- // host, link, port, switch, mastership
- //
- for (HostEvent hostEvent : removedHostEvents.values()) {
- removeHost(hostEvent);
- }
- for (LinkEvent linkEvent : removedLinkEvents.values()) {
- removeLink(linkEvent);
- }
- for (PortEvent portEvent : removedPortEvents.values()) {
- removePort(portEvent);
- }
- for (SwitchEvent switchEvent : removedSwitchEvents.values()) {
- removeSwitch(switchEvent);
- }
- for (MastershipEvent mastershipEvent :
- removedMastershipEvents.values()) {
- processRemovedMastershipEvent(mastershipEvent);
- }
+ for (EventEntry<TopologyEvent> event : events) {
+ TopologyEvent topologyEvent = event.eventData();
- //
- // Apply reordered events
- //
- applyReorderedEvents(!addedSwitchEvents.isEmpty(),
- !addedPortEvents.isEmpty());
+ // Get the event itself
+ MastershipEvent mastershipEvent =
+ topologyEvent.getMastershipEvent();
+ SwitchEvent switchEvent = topologyEvent.getSwitchEvent();
+ PortEvent portEvent = topologyEvent.getPortEvent();
+ LinkEvent linkEvent = topologyEvent.getLinkEvent();
+ HostEvent hostEvent = topologyEvent.getHostEvent();
+ boolean wasAdded = false;
+ //
+ // Extract the events
+ //
+ switch (event.eventType()) {
+ case ENTRY_ADD:
+ if (mastershipEvent != null) {
+ wasAdded = addMastershipEvent(mastershipEvent);
+ }
+ if (switchEvent != null) {
+ wasAdded = addSwitch(switchEvent);
+ }
+ if (portEvent != null) {
+ wasAdded = addPort(portEvent);
+ }
+ if (linkEvent != null) {
+ wasAdded = addLink(linkEvent);
+ }
+ if (hostEvent != null) {
+ wasAdded = addHost(hostEvent);
+ }
+ // If the item wasn't added, probably it was reordered
+ if (!wasAdded) {
+ ByteBuffer id = topologyEvent.getIDasByteBuffer();
+ eventPreprocessor.reorderedEvents.put(id, topologyEvent);
+ }
+ break;
+ case ENTRY_REMOVE:
+ if (mastershipEvent != null) {
+ removeMastershipEvent(mastershipEvent);
+ }
+ if (switchEvent != null) {
+ removeSwitch(switchEvent);
+ }
+ if (portEvent != null) {
+ removePort(portEvent);
+ }
+ if (linkEvent != null) {
+ removeLink(linkEvent);
+ }
+ if (hostEvent != null) {
+ removeHost(hostEvent);
+ }
+ break;
+ default:
+ log.error("Unknown topology event {}",
+ event.eventType());
+ }
+ }
} finally {
//
// Topology modifications completed: Release the lock
@@ -589,56 +492,6 @@
}
/**
- * Apply reordered events.
- *
- * @param hasAddedSwitchEvents true if there were Added Switch Events.
- * @param hasAddedPortEvents true if there were Added Port Events.
- */
- @GuardedBy("topology.writeLock")
- private void applyReorderedEvents(boolean hasAddedSwitchEvents,
- boolean hasAddedPortEvents) {
- if (!(hasAddedSwitchEvents || hasAddedPortEvents)) {
- return; // Nothing to do
- }
-
- //
- // Try to apply the reordered events.
- //
- // NOTE: For simplicity we try to apply all events of a particular
- // type if any "parent" type event was processed:
- // - Apply reordered Port Events if Switches were added
- // - Apply reordered Link and Host Events if Switches or Ports
- // were added
- //
-
- //
- // Apply reordered Port Events if Switches were added
- //
- if (hasAddedSwitchEvents) {
- Map<ByteBuffer, PortEvent> portEvents = reorderedAddedPortEvents;
- reorderedAddedPortEvents = new HashMap<>();
- for (PortEvent portEvent : portEvents.values()) {
- addPort(portEvent);
- }
- }
- //
- // Apply reordered Link and Host Events if Switches or Ports
- // were added.
- //
- Map<ByteBuffer, LinkEvent> linkEvents = reorderedAddedLinkEvents;
- reorderedAddedLinkEvents = new HashMap<>();
- for (LinkEvent linkEvent : linkEvents.values()) {
- addLink(linkEvent);
- }
- //
- Map<ByteBuffer, HostEvent> hostEvents = reorderedAddedHostEvents;
- reorderedAddedHostEvents = new HashMap<>();
- for (HostEvent hostEvent : hostEvents.values()) {
- addHost(hostEvent);
- }
- }
-
- /**
* Mastership updated event.
*
* @param mastershipEvent the mastership event.
@@ -974,28 +827,26 @@
//
/**
- * Processes added Switch Mastership event.
+ * Adds Switch Mastership event.
*
* @param mastershipEvent the MastershipEvent to process.
+ * @return true if the item was successfully added, otherwise false.
*/
@GuardedBy("topology.writeLock")
- private void processAddedMastershipEvent(MastershipEvent mastershipEvent) {
- log.debug("Processing added Mastership event {}",
- mastershipEvent);
- // TODO: Not implemented/used yet.
+ private boolean addMastershipEvent(MastershipEvent mastershipEvent) {
+ log.debug("Added Mastership event {}", mastershipEvent);
apiAddedMastershipEvents.add(mastershipEvent);
+ return true;
}
/**
- * Processes removed Switch Mastership event.
+ * Removes Switch Mastership event.
*
* @param mastershipEvent the MastershipEvent to process.
*/
@GuardedBy("topology.writeLock")
- private void processRemovedMastershipEvent(MastershipEvent mastershipEvent) {
- log.debug("Processing removed Mastership event {}",
- mastershipEvent);
- // TODO: Not implemented/used yet.
+ private void removeMastershipEvent(MastershipEvent mastershipEvent) {
+ log.debug("Removed Mastership event {}", mastershipEvent);
apiRemovedMastershipEvents.add(mastershipEvent);
}
@@ -1003,9 +854,10 @@
* Adds a switch to the topology replica.
*
* @param switchEvent the SwitchEvent with the switch to add.
+ * @return true if the item was successfully added, otherwise false.
*/
@GuardedBy("topology.writeLock")
- private void addSwitch(SwitchEvent switchEvent) {
+ private boolean addSwitch(SwitchEvent switchEvent) {
if (log.isDebugEnabled()) {
SwitchEvent sw = topology.getSwitchEvent(switchEvent.getDpid());
if (sw != null) {
@@ -1016,12 +868,14 @@
}
topology.putSwitch(switchEvent.freeze());
apiAddedSwitchEvents.add(switchEvent);
+ return true;
}
/**
* Removes a switch from the topology replica.
* <p/>
- * It will call {@link #removePort(PortEvent)} for each ports on this switch.
+ * It will call {@link #removePort(PortEvent)} for each ports on this
+ * switch.
*
* @param switchEvent the SwitchEvent with the switch to remove.
*/
@@ -1058,16 +912,15 @@
* Adds a port to the topology replica.
*
* @param portEvent the PortEvent with the port to add.
+ * @return true if the item was successfully added, otherwise false.
*/
@GuardedBy("topology.writeLock")
- private void addPort(PortEvent portEvent) {
+ private boolean addPort(PortEvent portEvent) {
Switch sw = topology.getSwitch(portEvent.getDpid());
if (sw == null) {
+ // Reordered event
log.debug("{} reordered because switch is null", portEvent);
- // Reordered event: delay the event in local cache
- ByteBuffer id = portEvent.getIDasByteBuffer();
- reorderedAddedPortEvents.put(id, portEvent);
- return;
+ return false;
}
if (log.isDebugEnabled()) {
@@ -1080,6 +933,7 @@
}
topology.putPort(portEvent.freeze());
apiAddedPortEvents.add(portEvent);
+ return true;
}
/**
@@ -1160,32 +1014,34 @@
* It will remove attachment points from each hosts using the same ports.
*
* @param linkEvent the LinkEvent with the link to add.
+ * @return true if the item was successfully added, otherwise false.
*/
@GuardedBy("topology.writeLock")
- private void addLink(LinkEvent linkEvent) {
+ private boolean addLink(LinkEvent linkEvent) {
PortEvent srcPort = topology.getPortEvent(linkEvent.getSrc());
PortEvent dstPort = topology.getPortEvent(linkEvent.getDst());
if ((srcPort == null) || (dstPort == null)) {
+ // Reordered event
log.debug("{} reordered because {} port is null", linkEvent,
(srcPort == null) ? "src" : "dst");
-
- // XXX domain knowledge: port must be present before link.
- // Reordered event: delay the event in local cache
- ByteBuffer id = linkEvent.getIDasByteBuffer();
- reorderedAddedLinkEvents.put(id, linkEvent);
- return;
+ return false;
}
- // XXX domain knowledge: Sanity check: Port cannot have both Link and Host
- // FIXME potentially local replica may not be up-to-date yet due to HZ delay.
- // may need to manage local truth and use them instead.
+ //
+ // XXX domain knowledge: Sanity check: Port cannot have both Link and
+ // Host.
+ //
+ // FIXME: Potentially local replica may not be up-to-date yet due to
+ // Hazelcast delay.
+ // FIXME: May need to manage local truth and use them instead.
+ //
if (topology.getLinkEvent(linkEvent.getLinkTuple()) == null) {
// Only check for existing Host when adding new Link.
-
// Remove all Hosts attached to the ports on both ends
- Set<HostEvent> hostsToUpdate = new TreeSet<>(new Comparator<HostEvent>() {
- // comparison only using ID(=MAC)
+ Set<HostEvent> hostsToUpdate =
+ new TreeSet<>(new Comparator<HostEvent>() {
+ // Comparison only using ID(=MAC)
@Override
public int compare(HostEvent o1, HostEvent o2) {
return Long.compare(o1.getMac().toLong(), o2.getMac().toLong());
@@ -1206,9 +1062,9 @@
hostsToUpdate.add(hostEvent);
}
}
- // remove attachment point from them.
+ // Remove attachment point from them
for (HostEvent hostEvent : hostsToUpdate) {
- // remove port from attachment point and update
+ // Remove port from attachment point and update
HostEvent newHostEvent = new HostEvent(hostEvent);
newHostEvent.removeAttachmentPoint(srcPort.getSwitchPort());
newHostEvent.removeAttachmentPoint(dstPort.getSwitchPort());
@@ -1235,6 +1091,7 @@
}
topology.putLink(linkEvent.freeze());
apiAddedLinkEvents.add(linkEvent);
+ return true;
}
/**
@@ -1290,12 +1147,14 @@
* <p/>
* TODO: Host-related work is incomplete.
* TODO: Eventually, we might need to consider reordering
- * or {@link #addLink(LinkEvent)} and {@link #addHost(HostEvent)} events on the same port.
+ * or {@link #addLink(LinkEvent)} and {@link #addHost(HostEvent)} events
+ * on the same port.
*
* @param hostEvent the HostEvent with the host to add.
+ * @return true if the item was successfully added, otherwise false.
*/
@GuardedBy("topology.writeLock")
- private void addHost(HostEvent hostEvent) {
+ private boolean addHost(HostEvent hostEvent) {
// TODO Decide how to handle update scenario.
// If the new HostEvent has less attachment point compared to
@@ -1316,10 +1175,8 @@
Port port = topology.getPort(swp.getDpid(), swp.getPortNumber());
if (port == null) {
log.debug("{} reordered because port {} was not there", hostEvent, swp);
- // Reordered event: delay the event in local cache
- ByteBuffer id = hostEvent.getIDasByteBuffer();
- reorderedAddedHostEvents.put(id, hostEvent);
- return; // should not continue if re-applying later
+ // Reordered event
+ return false; // should not continue if re-applying later
}
// Attached Ports must not have Link
if (port.getOutgoingLink() != null ||
@@ -1345,7 +1202,7 @@
+ "original: {}, modified: {}", hostEvent, modifiedHostEvent);
// TODO Should we call #removeHost to trigger remove event?
// only if this call is update.
- return;
+ return false;
}
if (log.isDebugEnabled()) {
@@ -1358,7 +1215,9 @@
}
topology.putHost(modifiedHostEvent.freeze());
apiAddedHostEvents.add(modifiedHostEvent);
+ return true;
}
+ return false;
}
/**
@@ -1386,11 +1245,11 @@
/**
* Read the whole topology from the database.
*
- * @return a collection of EventEntry-encapsulated Topology Events for
+ * @return a list of EventEntry-encapsulated Topology Events for
* the whole topology.
*/
- private Collection<EventEntry<TopologyEvent>> readWholeTopologyFromDB() {
- Collection<EventEntry<TopologyEvent>> collection =
+ private List<EventEntry<TopologyEvent>> readWholeTopologyFromDB() {
+ List<EventEntry<TopologyEvent>> events =
new LinkedList<EventEntry<TopologyEvent>>();
// XXX May need to clear whole topology first, depending on
@@ -1414,7 +1273,7 @@
EventEntry<TopologyEvent> eventEntry =
new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
topologyEvent);
- collection.add(eventEntry);
+ events.add(eventEntry);
}
// Add all active ports
@@ -1437,7 +1296,7 @@
EventEntry<TopologyEvent> eventEntry =
new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
topologyEvent);
- collection.add(eventEntry);
+ events.add(eventEntry);
}
for (KVDevice d : KVDevice.getAllDevices()) {
@@ -1469,9 +1328,9 @@
EventEntry<TopologyEvent> eventEntry =
new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
topologyEvent);
- collection.add(eventEntry);
+ events.add(eventEntry);
}
- return collection;
+ return events;
}
}