Work toward cleaning up the Topology Manager and the Topology Publisher
ONOS-1890
Moved some of the methods in class TopologyManager to TopologyPublisher,
because naturally they belong to the latter.
This is also needed for the log-based mechanism.
In the process, removed the event writing to the datastore, until
it becomes clear when, where and what to write.
Change-Id: Ic1fe1db533aec66a91bf643b0989119f33c3d37e
diff --git a/src/main/java/net/onrc/onos/core/topology/ITopologyService.java b/src/main/java/net/onrc/onos/core/topology/ITopologyService.java
index 7ab5c47..ecb1212 100644
--- a/src/main/java/net/onrc/onos/core/topology/ITopologyService.java
+++ b/src/main/java/net/onrc/onos/core/topology/ITopologyService.java
@@ -30,14 +30,4 @@
* @param listener the listener to remove.
*/
public void removeListener(ITopologyListener listener);
-
- /**
- * Allows a module to get a reference to the southbound interface to
- * the topology.
- * TODO Figure out how to hide the southbound interface from
- * applications/modules that shouldn't touch it
- *
- * @return the TopologyDiscoveryInterface object
- */
- public TopologyDiscoveryInterface getTopologyDiscoveryInterface();
}
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyDiscoveryInterface.java b/src/main/java/net/onrc/onos/core/topology/TopologyDiscoveryInterface.java
deleted file mode 100644
index c373bad..0000000
--- a/src/main/java/net/onrc/onos/core/topology/TopologyDiscoveryInterface.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package net.onrc.onos.core.topology;
-
-import java.util.Collection;
-
-/**
- * Interface used by the Topology Discovery module to write topology-related
- * events.
- */
-public interface TopologyDiscoveryInterface {
- /**
- * Switch Mastership updated event.
- *
- * @param mastershipEvent the mastership event.
- */
- public void putSwitchMastershipEvent(MastershipEvent mastershipEvent);
-
- /**
- * Switch Mastership removed event.
- *
- * @param mastershipEvent the mastership event.
- */
- public void removeSwitchMastershipEvent(MastershipEvent mastershipEvent);
-
- /**
- * Switch discovered event.
- *
- * @param switchEvent the switch event.
- * @param portEvents the corresponding port events for the switch.
- */
- public void putSwitchDiscoveryEvent(SwitchEvent switchEvent,
- Collection<PortEvent> portEvents);
-
- /**
- * Switch removed event.
- *
- * @param switchEvent the switch event.
- */
- public void removeSwitchDiscoveryEvent(SwitchEvent switchEvent);
-
- /**
- * Port discovered event.
- *
- * @param portEvent the port event.
- */
- public void putPortDiscoveryEvent(PortEvent portEvent);
-
- /**
- * Port removed event.
- *
- * @param portEvent the port event.
- */
- public void removePortDiscoveryEvent(PortEvent portEvent);
-
- /**
- * Link discovered event.
- *
- * @param linkEvent the link event.
- */
- public void putLinkDiscoveryEvent(LinkEvent linkEvent);
-
- /**
- * Link removed event.
- *
- * @param linkEvent the link event.
- */
- public void removeLinkDiscoveryEvent(LinkEvent linkEvent);
-
- /**
- * Host discovered event.
- *
- * @param hostEvent the host event.
- */
- public void putHostDiscoveryEvent(HostEvent hostEvent);
-
- /**
- * Host removed event.
- *
- * @param hostEvent the host event.
- */
- public void removeHostDiscoveryEvent(HostEvent hostEvent);
-}
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
index 1280cf4..daa334b 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
@@ -22,17 +22,12 @@
import net.onrc.onos.core.datagrid.IDatagridService;
import net.onrc.onos.core.datagrid.IEventChannel;
import net.onrc.onos.core.datagrid.IEventChannelListener;
-import net.onrc.onos.core.datastore.topology.KVDevice;
-import net.onrc.onos.core.datastore.topology.KVLink;
-import net.onrc.onos.core.datastore.topology.KVPort;
-import net.onrc.onos.core.datastore.topology.KVSwitch;
import net.onrc.onos.core.metrics.OnosMetrics;
import net.onrc.onos.core.metrics.OnosMetrics.MetricsComponent;
import net.onrc.onos.core.metrics.OnosMetrics.MetricsFeature;
import net.onrc.onos.core.registry.IControllerRegistryService;
import net.onrc.onos.core.util.Dpid;
import net.onrc.onos.core.util.EventEntry;
-import net.onrc.onos.core.util.PortNumber;
import net.onrc.onos.core.util.SwitchPort;
import net.onrc.onos.core.util.serializers.KryoFactory;
@@ -58,7 +53,7 @@
* TODO TBD: This class may delay the requested change to handle event
* re-ordering. e.g.) Link Add came in, but Switch was not there.
*/
-public class TopologyManager implements TopologyDiscoveryInterface {
+public class TopologyManager {
private static final Logger log = LoggerFactory
.getLogger(TopologyManager.class);
@@ -67,9 +62,7 @@
public static final String EVENT_CHANNEL_NAME = "onos.topology";
private EventHandler eventHandler = new EventHandler();
- private TopologyDatastore datastore;
private final TopologyImpl topology = new TopologyImpl();
- private final IControllerRegistryService registryService;
private Kryo kryo = KryoFactory.newKryoObject();
private TopologyEventPreprocessor eventPreprocessor;
private CopyOnWriteArrayList<ITopologyListener> topologyListeners =
@@ -104,44 +97,6 @@
"ListenerEventRate");
//
- // Local state for keeping track of locally discovered events so we can
- // cleanup properly when a Switch or Port is removed.
- //
- // We keep all Port, (incoming) Link and Host events per Switch DPID:
- // - If a switch goes down, we remove all corresponding Port, Link and
- // Host events.
- // - If a port on a switch goes down, we remove all corresponding Link
- // and Host events discovered by this instance.
- //
- // How to handle side-effect of remote events.
- // - Remote Port Down event -> Link Down
- // Not handled. (XXX Shouldn't it be removed from discovered.. Map)
- // - Remote Host Added -> lose ownership of Host)
- // Not handled. (XXX Shouldn't it be removed from discovered.. Map)
- //
- // XXX Domain knowledge based invariant maintenance should be moved to
- // driver module, since the invariant may be different on optical, etc.
- //
- // What happens on leadership change?
- // - Probably should: remove from discovered.. Maps, but not send DELETE
- // events
- // XXX Switch/Port can be rediscovered by new leader, but Link, Host?
- // - Current: There is no way to recognize leadership change?
- // ZookeeperRegistry.requestControl(long, ControlChangeCallback)
- // is the only way to register listener, and it allows only one
- // listener, which is already used by Controller class.
- //
- // FIXME Replace with concurrent variant.
- // #removeSwitchDiscoveryEvent(SwitchEvent) runs in different thread.
- //
- private Map<Dpid, Map<ByteBuffer, PortEvent>> discoveredAddedPortEvents =
- new HashMap<>();
- private Map<Dpid, Map<ByteBuffer, LinkEvent>> discoveredAddedLinkEvents =
- new HashMap<>();
- private Map<Dpid, Map<ByteBuffer, HostEvent>> discoveredAddedHostEvents =
- new HashMap<>();
-
- //
// Local state for keeping the last ADD Mastership Event entries.
// TODO: In the future, we might have to keep this state somewhere else.
//
@@ -173,8 +128,6 @@
* @param registryService the Registry Service to use.
*/
public TopologyManager(IControllerRegistryService registryService) {
- datastore = new TopologyDatastore();
- this.registryService = registryService;
this.eventPreprocessor =
new TopologyEventPreprocessor(registryService);
}
@@ -201,12 +154,7 @@
*/
private void startup() {
//
- // TODO: Read all state from the database:
- //
- // Collection<EventEntry<TopologyEvent>> collection =
- // readWholeTopologyFromDB();
- //
- // For now, as a shortcut we read it from the datagrid
+ // Read all topology state
//
Collection<TopologyEvent> allTopologyEvents =
eventChannel.getAllEntries();
@@ -611,359 +559,6 @@
apiRemovedHostEvents.clear();
}
- /**
- * Mastership updated event.
- *
- * @param mastershipEvent the mastership event.
- */
- @Override
- public void putSwitchMastershipEvent(MastershipEvent mastershipEvent) {
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(mastershipEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
- }
-
- /**
- * Mastership removed event.
- *
- * @param mastershipEvent the mastership event.
- */
- @Override
- public void removeSwitchMastershipEvent(MastershipEvent mastershipEvent) {
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(mastershipEvent,
- registryService.getOnosInstanceId());
- eventChannel.removeEntry(topologyEvent.getID());
- }
-
- /**
- * Switch discovered event.
- *
- * @param switchEvent the switch event.
- * @param portEvents the corresponding port events for the switch.
- */
- @Override
- public void putSwitchDiscoveryEvent(SwitchEvent switchEvent,
- Collection<PortEvent> portEvents) {
- if (datastore.addSwitch(switchEvent, portEvents)) {
- log.debug("Sending add switch: {}", switchEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(switchEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
- // Send out notification for each port
- for (PortEvent portEvent : portEvents) {
- log.debug("Sending add port: {}", portEvent);
- topologyEvent =
- new TopologyEvent(portEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
- }
-
- //
- // Keep track of the added ports
- //
- // Get the old Port Events
- Map<ByteBuffer, PortEvent> oldPortEvents =
- discoveredAddedPortEvents.get(switchEvent.getDpid());
- if (oldPortEvents == null) {
- oldPortEvents = new HashMap<>();
- }
-
- // Store the new Port Events in the local cache
- Map<ByteBuffer, PortEvent> newPortEvents = new HashMap<>();
- for (PortEvent portEvent : portEvents) {
- ByteBuffer id = portEvent.getIDasByteBuffer();
- newPortEvents.put(id, portEvent);
- }
- discoveredAddedPortEvents.put(switchEvent.getDpid(),
- newPortEvents);
-
- //
- // Extract the removed ports
- //
- List<PortEvent> removedPortEvents = new LinkedList<>();
- for (Map.Entry<ByteBuffer, PortEvent> entry : oldPortEvents.entrySet()) {
- ByteBuffer key = entry.getKey();
- PortEvent portEvent = entry.getValue();
- if (!newPortEvents.containsKey(key)) {
- removedPortEvents.add(portEvent);
- }
- }
-
- // Cleanup old removed ports
- for (PortEvent portEvent : removedPortEvents) {
- removePortDiscoveryEvent(portEvent);
- }
- }
- }
-
- /**
- * {@inheritDoc}
- * <p/>
- * Called by {@link TopologyPublisher.SwitchCleanup} thread.
- */
- @Override
- public void removeSwitchDiscoveryEvent(SwitchEvent switchEvent) {
- TopologyEvent topologyEvent;
-
- // Get the old Port Events
- Map<ByteBuffer, PortEvent> oldPortEvents =
- discoveredAddedPortEvents.get(switchEvent.getDpid());
- if (oldPortEvents == null) {
- oldPortEvents = new HashMap<>();
- }
-
- if (datastore.deactivateSwitch(switchEvent, oldPortEvents.values())) {
- log.debug("Sending remove switch: {}", switchEvent);
- // Send out notification
- topologyEvent =
- new TopologyEvent(switchEvent,
- registryService.getOnosInstanceId());
- eventChannel.removeEntry(topologyEvent.getID());
-
- //
- // Send out notification for each port.
- //
- // NOTE: We don't use removePortDiscoveryEvent() for the cleanup,
- // because it will attempt to remove the port from the database,
- // and the deactiveSwitch() call above already removed all ports.
- //
- for (PortEvent portEvent : oldPortEvents.values()) {
- log.debug("Sending remove port:", portEvent);
- topologyEvent =
- new TopologyEvent(portEvent,
- registryService.getOnosInstanceId());
- eventChannel.removeEntry(topologyEvent.getID());
- }
- discoveredAddedPortEvents.remove(switchEvent.getDpid());
-
- // Cleanup for each link
- Map<ByteBuffer, LinkEvent> oldLinkEvents =
- discoveredAddedLinkEvents.get(switchEvent.getDpid());
- if (oldLinkEvents != null) {
- for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
- removeLinkDiscoveryEvent(linkEvent);
- }
- discoveredAddedLinkEvents.remove(switchEvent.getDpid());
- }
-
- // Cleanup for each host
- Map<ByteBuffer, HostEvent> oldHostEvents =
- discoveredAddedHostEvents.get(switchEvent.getDpid());
- if (oldHostEvents != null) {
- for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
- removeHostDiscoveryEvent(hostEvent);
- }
- discoveredAddedHostEvents.remove(switchEvent.getDpid());
- }
- }
- }
-
- /**
- * Port discovered event.
- *
- * @param portEvent the port event.
- */
- @Override
- public void putPortDiscoveryEvent(PortEvent portEvent) {
- if (datastore.addPort(portEvent)) {
- log.debug("Sending add port: {}", portEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(portEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
- // Store the new Port Event in the local cache
- Map<ByteBuffer, PortEvent> oldPortEvents =
- discoveredAddedPortEvents.get(portEvent.getDpid());
- if (oldPortEvents == null) {
- oldPortEvents = new HashMap<>();
- discoveredAddedPortEvents.put(portEvent.getDpid(),
- oldPortEvents);
- }
- ByteBuffer id = portEvent.getIDasByteBuffer();
- oldPortEvents.put(id, portEvent);
- }
- }
-
- /**
- * Port removed event.
- *
- * @param portEvent the port event.
- */
- @Override
- public void removePortDiscoveryEvent(PortEvent portEvent) {
- if (datastore.deactivatePort(portEvent)) {
- log.debug("Sending remove port: {}", portEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(portEvent,
- registryService.getOnosInstanceId());
- eventChannel.removeEntry(topologyEvent.getID());
-
- // Cleanup the Port Event from the local cache
- Map<ByteBuffer, PortEvent> oldPortEvents =
- discoveredAddedPortEvents.get(portEvent.getDpid());
- if (oldPortEvents != null) {
- ByteBuffer id = portEvent.getIDasByteBuffer();
- oldPortEvents.remove(id);
- }
-
- // Cleanup for the incoming link
- Map<ByteBuffer, LinkEvent> oldLinkEvents =
- discoveredAddedLinkEvents.get(portEvent.getDpid());
- if (oldLinkEvents != null) {
- for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
- if (linkEvent.getDst().equals(portEvent.getSwitchPort())) {
- removeLinkDiscoveryEvent(linkEvent);
- // XXX If we change our model to allow multiple Link on
- // a Port, this loop must be fixed to allow continuing.
- break;
- }
- }
- }
-
- // Cleanup for the connected hosts
- // TODO: The implementation below is probably wrong
- List<HostEvent> removedHostEvents = new LinkedList<>();
- Map<ByteBuffer, HostEvent> oldHostEvents =
- discoveredAddedHostEvents.get(portEvent.getDpid());
- if (oldHostEvents != null) {
- for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
- for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
- if (swp.equals(portEvent.getSwitchPort())) {
- removedHostEvents.add(hostEvent);
- }
- }
- }
- for (HostEvent hostEvent : removedHostEvents) {
- removeHostDiscoveryEvent(hostEvent);
- }
- }
- }
- }
-
- /**
- * Link discovered event.
- *
- * @param linkEvent the link event.
- */
- @Override
- public void putLinkDiscoveryEvent(LinkEvent linkEvent) {
- if (datastore.addLink(linkEvent)) {
- log.debug("Sending add link: {}", linkEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(linkEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
- // Store the new Link Event in the local cache
- Map<ByteBuffer, LinkEvent> oldLinkEvents =
- discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
- if (oldLinkEvents == null) {
- oldLinkEvents = new HashMap<>();
- discoveredAddedLinkEvents.put(linkEvent.getDst().getDpid(),
- oldLinkEvents);
- }
- ByteBuffer id = linkEvent.getIDasByteBuffer();
- oldLinkEvents.put(id, linkEvent);
- }
- }
-
- /**
- * Link removed event.
- *
- * @param linkEvent the link event.
- */
- @Override
- public void removeLinkDiscoveryEvent(LinkEvent linkEvent) {
- if (datastore.removeLink(linkEvent)) {
- log.debug("Sending remove link: {}", linkEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(linkEvent,
- registryService.getOnosInstanceId());
- eventChannel.removeEntry(topologyEvent.getID());
-
- // Cleanup the Link Event from the local cache
- Map<ByteBuffer, LinkEvent> oldLinkEvents =
- discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
- if (oldLinkEvents != null) {
- ByteBuffer id = linkEvent.getIDasByteBuffer();
- oldLinkEvents.remove(id);
- }
- }
- }
-
- /**
- * Host discovered event.
- *
- * @param hostEvent the host event.
- */
- @Override
- public void putHostDiscoveryEvent(HostEvent hostEvent) {
- if (datastore.addHost(hostEvent)) {
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(hostEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
- log.debug("Put the host info into the cache of the topology. mac {}",
- hostEvent.getMac());
-
- // Store the new Host Event in the local cache
- // TODO: The implementation below is probably wrong
- for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
- Map<ByteBuffer, HostEvent> oldHostEvents =
- discoveredAddedHostEvents.get(swp.getDpid());
- if (oldHostEvents == null) {
- oldHostEvents = new HashMap<>();
- discoveredAddedHostEvents.put(swp.getDpid(),
- oldHostEvents);
- }
- ByteBuffer id = hostEvent.getIDasByteBuffer();
- oldHostEvents.put(id, hostEvent);
- }
- }
- }
-
- /**
- * Host removed event.
- *
- * @param hostEvent the host event.
- */
- @Override
- public void removeHostDiscoveryEvent(HostEvent hostEvent) {
- if (datastore.removeHost(hostEvent)) {
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(hostEvent,
- registryService.getOnosInstanceId());
- eventChannel.removeEntry(topologyEvent.getID());
- log.debug("Remove the host info into the cache of the topology. mac {}",
- hostEvent.getMac());
-
- // Cleanup the Host Event from the local cache
- // TODO: The implementation below is probably wrong
- ByteBuffer id = hostEvent.getIDasByteBuffer();
- for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
- Map<ByteBuffer, HostEvent> oldHostEvents =
- discoveredAddedHostEvents.get(swp.getDpid());
- if (oldHostEvents != null) {
- oldHostEvents.remove(id);
- }
- }
- }
- }
-
//
// Methods to update topology replica
//
@@ -1390,96 +985,4 @@
topology.removeHost(mac);
apiRemovedHostEvents.add(hostInTopo);
}
-
- /**
- * Read the whole topology from the database.
- *
- * @return a list of EventEntry-encapsulated Topology Events for
- * the whole topology.
- */
- private List<EventEntry<TopologyEvent>> readWholeTopologyFromDB() {
- List<EventEntry<TopologyEvent>> events =
- new LinkedList<EventEntry<TopologyEvent>>();
-
- // XXX May need to clear whole topology first, depending on
- // how we initially subscribe to replication events
-
- // Add all active switches
- for (KVSwitch sw : KVSwitch.getAllSwitches()) {
- if (sw.getStatus() != KVSwitch.STATUS.ACTIVE) {
- continue;
- }
-
- //
- // TODO: Using the local ONOS Instance ID below is incorrect.
- // Currently, this code is not used, and it might go away in the
- // future.
- //
- SwitchEvent switchEvent = new SwitchEvent(new Dpid(sw.getDpid()));
- TopologyEvent topologyEvent =
- new TopologyEvent(switchEvent,
- registryService.getOnosInstanceId());
- EventEntry<TopologyEvent> eventEntry =
- new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
- topologyEvent);
- events.add(eventEntry);
- }
-
- // Add all active ports
- for (KVPort p : KVPort.getAllPorts()) {
- if (p.getStatus() != KVPort.STATUS.ACTIVE) {
- continue;
- }
-
- //
- // TODO: Using the local ONOS Instance ID below is incorrect.
- // Currently, this code is not used, and it might go away in the
- // future.
- //
- PortEvent portEvent =
- new PortEvent(new Dpid(p.getDpid()),
- new PortNumber(p.getNumber().shortValue()));
- TopologyEvent topologyEvent =
- new TopologyEvent(portEvent,
- registryService.getOnosInstanceId());
- EventEntry<TopologyEvent> eventEntry =
- new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
- topologyEvent);
- events.add(eventEntry);
- }
-
- for (KVDevice d : KVDevice.getAllDevices()) {
- //
- // TODO: Using the local ONOS Instance ID below is incorrect.
- // Currently, this code is not used, and it might go away in the
- // future.
- //
- HostEvent devEvent = new HostEvent(MACAddress.valueOf(d.getMac()));
- for (byte[] portId : d.getAllPortIds()) {
- devEvent.addAttachmentPoint(
- new SwitchPort(KVPort.getDpidFromKey(portId),
- KVPort.getNumberFromKey(portId)));
- }
- }
-
- for (KVLink l : KVLink.getAllLinks()) {
- //
- // TODO: Using the local ONOS Instance ID below is incorrect.
- // Currently, this code is not used, and it might go away in the
- // future.
- //
- LinkEvent linkEvent = new LinkEvent(
- new SwitchPort(l.getSrc().dpid, l.getSrc().number),
- new SwitchPort(l.getDst().dpid, l.getDst().number));
- TopologyEvent topologyEvent =
- new TopologyEvent(linkEvent,
- registryService.getOnosInstanceId());
- EventEntry<TopologyEvent> eventEntry =
- new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
- topologyEvent);
- events.add(eventEntry);
- }
-
- return events;
- }
}
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyModule.java b/src/main/java/net/onrc/onos/core/topology/TopologyModule.java
index fee98b4..07ba2cf 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyModule.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyModule.java
@@ -72,11 +72,6 @@
}
@Override
- public TopologyDiscoveryInterface getTopologyDiscoveryInterface() {
- return topologyManager;
- }
-
- @Override
public void addListener(ITopologyListener listener,
boolean startFromSnapshot) {
topologyManager.addListener(listener, startFromSnapshot);
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java b/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
index b1ea740..56ca8fc 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
@@ -1,7 +1,10 @@
package net.onrc.onos.core.topology;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -17,6 +20,9 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.core.util.SingletonTask;
import net.floodlightcontroller.threadpool.IThreadPoolService;
+
+import net.onrc.onos.core.datagrid.IDatagridService;
+import net.onrc.onos.core.datagrid.IEventChannel;
import net.onrc.onos.core.hostmanager.Host;
import net.onrc.onos.core.hostmanager.IHostListener;
import net.onrc.onos.core.hostmanager.IHostService;
@@ -51,17 +57,58 @@
private ILinkDiscoveryService linkDiscovery;
private IControllerRegistryService registryService;
private ITopologyService topologyService;
+ private IDatagridService datagridService;
private IHostService hostService;
private Topology topology;
- private TopologyDiscoveryInterface topologyDiscoveryInterface;
private static final String ENABLE_CLEANUP_PROPERTY = "EnableCleanup";
private boolean cleanupEnabled = true;
private static final int CLEANUP_TASK_INTERVAL = 60; // in seconds
private SingletonTask cleanupTask;
+ private IEventChannel<byte[], TopologyEvent> eventChannel;
+
+ //
+ // Local state for keeping track of locally discovered events so we can
+ // cleanup properly when a Switch or Port is removed.
+ //
+ // We keep all Port, (incoming) Link and Host events per Switch DPID:
+ // - If a switch goes down, we remove all corresponding Port, Link and
+ // Host events.
+ // - If a port on a switch goes down, we remove all corresponding Link
+ // and Host events discovered by this instance.
+ //
+ // How to handle side-effect of remote events.
+ // - Remote Port Down event -> Link Down
+ // Not handled. (XXX Shouldn't it be removed from discovered.. Map)
+ // - Remote Host Added -> lose ownership of Host)
+ // Not handled. (XXX Shouldn't it be removed from discovered.. Map)
+ //
+ // XXX Domain knowledge based invariant maintenance should be moved to
+ // driver module, since the invariant may be different on optical, etc.
+ //
+ // What happens on leadership change?
+ // - Probably should: remove from discovered.. Maps, but not send DELETE
+ // events
+ // XXX Switch/Port can be rediscovered by new leader, but Link, Host?
+ // - Current: There is no way to recognize leadership change?
+ // ZookeeperRegistry.requestControl(long, ControlChangeCallback)
+ // is the only way to register listener, and it allows only one
+ // listener, which is already used by Controller class.
+ //
+ // FIXME Replace with concurrent variant.
+ // #removeSwitchDiscoveryEvent(SwitchEvent) runs in different thread.
+ //
+ private Map<Dpid, Map<ByteBuffer, PortEvent>> discoveredAddedPortEvents =
+ new HashMap<>();
+ private Map<Dpid, Map<ByteBuffer, LinkEvent>> discoveredAddedLinkEvents =
+ new HashMap<>();
+ private Map<Dpid, Map<ByteBuffer, HostEvent>> discoveredAddedHostEvents =
+ new HashMap<>();
+
+
/**
* Cleanup old switches from the topology. Old switches are those which have
* no controller in the registry.
@@ -132,8 +179,7 @@
HexString.toHexString(dpid));
SwitchEvent switchEvent = new SwitchEvent(new Dpid(dpid));
- topologyDiscoveryInterface.
- removeSwitchDiscoveryEvent(switchEvent);
+ removeSwitchDiscoveryEvent(switchEvent);
registryService.releaseControl(dpid);
}
}
@@ -163,7 +209,7 @@
link.getDst(), linkEvent);
return;
}
- topologyDiscoveryInterface.putLinkDiscoveryEvent(linkEvent);
+ putLinkDiscoveryEvent(linkEvent);
}
@Override
@@ -187,7 +233,7 @@
link.getDst(), linkEvent);
return;
}
- topologyDiscoveryInterface.removeLinkDiscoveryEvent(linkEvent);
+ removeLinkDiscoveryEvent(linkEvent);
}
/* *****************
@@ -242,7 +288,7 @@
portEvent.freeze();
portEvents.add(portEvent);
}
- topologyDiscoveryInterface.putSwitchDiscoveryEvent(switchEvent, portEvents);
+ putSwitchDiscoveryEvent(switchEvent, portEvents);
for (OFPortDesc port : sw.getPorts()) {
// Allow links to be discovered on this port now that it's
@@ -287,7 +333,7 @@
mastershipEvent.createStringAttribute(TopologyElement.TYPE,
TopologyElement.TYPE_ALL_LAYERS);
mastershipEvent.freeze();
- topologyDiscoveryInterface.removeSwitchMastershipEvent(mastershipEvent);
+ removeSwitchMastershipEvent(mastershipEvent);
}
@Override
@@ -330,7 +376,7 @@
portEvent.freeze();
if (registryService.hasControl(switchId)) {
- topologyDiscoveryInterface.putPortDiscoveryEvent(portEvent);
+ putPortDiscoveryEvent(portEvent);
linkDiscovery.enableDiscoveryOnPort(switchId,
port.getPortNo().getShortPortNumber());
} else {
@@ -354,7 +400,7 @@
portEvent.freeze();
if (registryService.hasControl(switchId)) {
- topologyDiscoveryInterface.removePortDiscoveryEvent(portEvent);
+ removePortDiscoveryEvent(portEvent);
} else {
log.debug("Not the master for switch {}. Suppressed port del event {}.",
dpid, portEvent);
@@ -390,6 +436,7 @@
l.add(ILinkDiscoveryService.class);
l.add(IThreadPoolService.class);
l.add(IControllerRegistryService.class);
+ l.add(IDatagridService.class);
l.add(ITopologyService.class);
l.add(IHostService.class);
return l;
@@ -401,6 +448,7 @@
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
linkDiscovery = context.getServiceImpl(ILinkDiscoveryService.class);
registryService = context.getServiceImpl(IControllerRegistryService.class);
+ datagridService = context.getServiceImpl(IDatagridService.class);
hostService = context.getServiceImpl(IHostService.class);
topologyService = context.getServiceImpl(ITopologyService.class);
@@ -412,9 +460,12 @@
linkDiscovery.addListener(this);
hostService.addHostListener(this);
+ eventChannel = datagridService.createChannel(
+ TopologyManager.EVENT_CHANNEL_NAME,
+ byte[].class,
+ TopologyEvent.class);
+
topology = topologyService.getTopology();
- topologyDiscoveryInterface =
- topologyService.getTopologyDiscoveryInterface();
// Run the cleanup thread
String enableCleanup =
@@ -449,7 +500,7 @@
// Does not use vlan info now.
event.freeze();
- topologyDiscoveryInterface.putHostDiscoveryEvent(event);
+ putHostDiscoveryEvent(event);
}
@Override
@@ -458,7 +509,7 @@
HostEvent event = new HostEvent(host.getMacAddress());
// XXX shouldn't we be setting attachment points?
event.freeze();
- topologyDiscoveryInterface.removeHostDiscoveryEvent(event);
+ removeHostDiscoveryEvent(event);
}
private void controllerRoleChanged(Dpid dpid, Role role) {
@@ -473,6 +524,330 @@
mastershipEvent.createStringAttribute(TopologyElement.TYPE,
TopologyElement.TYPE_ALL_LAYERS);
mastershipEvent.freeze();
- topologyDiscoveryInterface.putSwitchMastershipEvent(mastershipEvent);
+ putSwitchMastershipEvent(mastershipEvent);
+ }
+
+ /**
+ * Mastership updated event.
+ *
+ * @param mastershipEvent the mastership event.
+ */
+ private void putSwitchMastershipEvent(MastershipEvent mastershipEvent) {
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(mastershipEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+ }
+
+ /**
+ * Mastership removed event.
+ *
+ * @param mastershipEvent the mastership event.
+ */
+ private void removeSwitchMastershipEvent(MastershipEvent mastershipEvent) {
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(mastershipEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.removeEntry(topologyEvent.getID());
+ }
+
+ /**
+ * Switch discovered event.
+ *
+ * @param switchEvent the switch event.
+ * @param portEvents the corresponding port events for the switch.
+ */
+ private void putSwitchDiscoveryEvent(SwitchEvent switchEvent,
+ Collection<PortEvent> portEvents) {
+ log.debug("Sending add switch: {}", switchEvent);
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(switchEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+
+ // Send out notification for each port
+ for (PortEvent portEvent : portEvents) {
+ log.debug("Sending add port: {}", portEvent);
+ topologyEvent =
+ new TopologyEvent(portEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+ }
+
+ //
+ // Keep track of the added ports
+ //
+ // Get the old Port Events
+ Map<ByteBuffer, PortEvent> oldPortEvents =
+ discoveredAddedPortEvents.get(switchEvent.getDpid());
+ if (oldPortEvents == null) {
+ oldPortEvents = new HashMap<>();
+ }
+
+ // Store the new Port Events in the local cache
+ Map<ByteBuffer, PortEvent> newPortEvents = new HashMap<>();
+ for (PortEvent portEvent : portEvents) {
+ ByteBuffer id = portEvent.getIDasByteBuffer();
+ newPortEvents.put(id, portEvent);
+ }
+ discoveredAddedPortEvents.put(switchEvent.getDpid(), newPortEvents);
+
+ //
+ // Extract the removed ports
+ //
+ List<PortEvent> removedPortEvents = new LinkedList<>();
+ for (Map.Entry<ByteBuffer, PortEvent> entry : oldPortEvents.entrySet()) {
+ ByteBuffer key = entry.getKey();
+ PortEvent portEvent = entry.getValue();
+ if (!newPortEvents.containsKey(key)) {
+ removedPortEvents.add(portEvent);
+ }
+ }
+
+ // Cleanup old removed ports
+ for (PortEvent portEvent : removedPortEvents) {
+ removePortDiscoveryEvent(portEvent);
+ }
+ }
+
+ /**
+ * Switch removed event.
+ *
+ * @param switchEvent the switch event.
+ */
+ private void removeSwitchDiscoveryEvent(SwitchEvent switchEvent) {
+ TopologyEvent topologyEvent;
+
+ // Get the old Port Events
+ Map<ByteBuffer, PortEvent> oldPortEvents =
+ discoveredAddedPortEvents.get(switchEvent.getDpid());
+ if (oldPortEvents == null) {
+ oldPortEvents = new HashMap<>();
+ }
+
+ log.debug("Sending remove switch: {}", switchEvent);
+ // Send out notification
+ topologyEvent =
+ new TopologyEvent(switchEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.removeEntry(topologyEvent.getID());
+
+ //
+ // Send out notification for each port.
+ //
+ // NOTE: We don't use removePortDiscoveryEvent() for the cleanup,
+ // because it will attempt to remove the port from the database,
+ // and the deactiveSwitch() call above already removed all ports.
+ //
+ for (PortEvent portEvent : oldPortEvents.values()) {
+ log.debug("Sending remove port:", portEvent);
+ topologyEvent =
+ new TopologyEvent(portEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.removeEntry(topologyEvent.getID());
+ }
+ discoveredAddedPortEvents.remove(switchEvent.getDpid());
+
+ // Cleanup for each link
+ Map<ByteBuffer, LinkEvent> oldLinkEvents =
+ discoveredAddedLinkEvents.get(switchEvent.getDpid());
+ if (oldLinkEvents != null) {
+ for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
+ removeLinkDiscoveryEvent(linkEvent);
+ }
+ discoveredAddedLinkEvents.remove(switchEvent.getDpid());
+ }
+
+ // Cleanup for each host
+ Map<ByteBuffer, HostEvent> oldHostEvents =
+ discoveredAddedHostEvents.get(switchEvent.getDpid());
+ if (oldHostEvents != null) {
+ for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
+ removeHostDiscoveryEvent(hostEvent);
+ }
+ discoveredAddedHostEvents.remove(switchEvent.getDpid());
+ }
+ }
+
+ /**
+ * Port discovered event.
+ *
+ * @param portEvent the port event.
+ */
+ private void putPortDiscoveryEvent(PortEvent portEvent) {
+ log.debug("Sending add port: {}", portEvent);
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(portEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+
+ // Store the new Port Event in the local cache
+ Map<ByteBuffer, PortEvent> oldPortEvents =
+ discoveredAddedPortEvents.get(portEvent.getDpid());
+ if (oldPortEvents == null) {
+ oldPortEvents = new HashMap<>();
+ discoveredAddedPortEvents.put(portEvent.getDpid(), oldPortEvents);
+ }
+ ByteBuffer id = portEvent.getIDasByteBuffer();
+ oldPortEvents.put(id, portEvent);
+ }
+
+ /**
+ * Port removed event.
+ *
+ * @param portEvent the port event.
+ */
+ private void removePortDiscoveryEvent(PortEvent portEvent) {
+ log.debug("Sending remove port: {}", portEvent);
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(portEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.removeEntry(topologyEvent.getID());
+
+ // Cleanup the Port Event from the local cache
+ Map<ByteBuffer, PortEvent> oldPortEvents =
+ discoveredAddedPortEvents.get(portEvent.getDpid());
+ if (oldPortEvents != null) {
+ ByteBuffer id = portEvent.getIDasByteBuffer();
+ oldPortEvents.remove(id);
+ }
+
+ // Cleanup for the incoming link
+ Map<ByteBuffer, LinkEvent> oldLinkEvents =
+ discoveredAddedLinkEvents.get(portEvent.getDpid());
+ if (oldLinkEvents != null) {
+ for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
+ if (linkEvent.getDst().equals(portEvent.getSwitchPort())) {
+ removeLinkDiscoveryEvent(linkEvent);
+ // XXX If we change our model to allow multiple Link on
+ // a Port, this loop must be fixed to allow continuing.
+ break;
+ }
+ }
+ }
+
+ // Cleanup for the connected hosts
+ // TODO: The implementation below is probably wrong
+ List<HostEvent> removedHostEvents = new LinkedList<>();
+ Map<ByteBuffer, HostEvent> oldHostEvents =
+ discoveredAddedHostEvents.get(portEvent.getDpid());
+ if (oldHostEvents != null) {
+ for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
+ for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
+ if (swp.equals(portEvent.getSwitchPort())) {
+ removedHostEvents.add(hostEvent);
+ }
+ }
+ }
+ for (HostEvent hostEvent : removedHostEvents) {
+ removeHostDiscoveryEvent(hostEvent);
+ }
+ }
+ }
+
+ /**
+ * Link discovered event.
+ *
+ * @param linkEvent the link event.
+ */
+ private void putLinkDiscoveryEvent(LinkEvent linkEvent) {
+ log.debug("Sending add link: {}", linkEvent);
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(linkEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+
+ // Store the new Link Event in the local cache
+ Map<ByteBuffer, LinkEvent> oldLinkEvents =
+ discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
+ if (oldLinkEvents == null) {
+ oldLinkEvents = new HashMap<>();
+ discoveredAddedLinkEvents.put(linkEvent.getDst().getDpid(),
+ oldLinkEvents);
+ }
+ ByteBuffer id = linkEvent.getIDasByteBuffer();
+ oldLinkEvents.put(id, linkEvent);
+ }
+
+ /**
+ * Link removed event.
+ *
+ * @param linkEvent the link event.
+ */
+ private void removeLinkDiscoveryEvent(LinkEvent linkEvent) {
+ log.debug("Sending remove link: {}", linkEvent);
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(linkEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.removeEntry(topologyEvent.getID());
+
+ // Cleanup the Link Event from the local cache
+ Map<ByteBuffer, LinkEvent> oldLinkEvents =
+ discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
+ if (oldLinkEvents != null) {
+ ByteBuffer id = linkEvent.getIDasByteBuffer();
+ oldLinkEvents.remove(id);
+ }
+ }
+
+ /**
+ * Host discovered event.
+ *
+ * @param hostEvent the host event.
+ */
+ private void putHostDiscoveryEvent(HostEvent hostEvent) {
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(hostEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+ log.debug("Put the host info into the cache of the topology. mac {}",
+ hostEvent.getMac());
+
+ // Store the new Host Event in the local cache
+ // TODO: The implementation below is probably wrong
+ for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
+ Map<ByteBuffer, HostEvent> oldHostEvents =
+ discoveredAddedHostEvents.get(swp.getDpid());
+ if (oldHostEvents == null) {
+ oldHostEvents = new HashMap<>();
+ discoveredAddedHostEvents.put(swp.getDpid(), oldHostEvents);
+ }
+ ByteBuffer id = hostEvent.getIDasByteBuffer();
+ oldHostEvents.put(id, hostEvent);
+ }
+ }
+
+ /**
+ * Host removed event.
+ *
+ * @param hostEvent the host event.
+ */
+ private void removeHostDiscoveryEvent(HostEvent hostEvent) {
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(hostEvent,
+ registryService.getOnosInstanceId());
+ eventChannel.removeEntry(topologyEvent.getID());
+ log.debug("Remove the host info into the cache of the topology. mac {}",
+ hostEvent.getMac());
+
+ // Cleanup the Host Event from the local cache
+ // TODO: The implementation below is probably wrong
+ ByteBuffer id = hostEvent.getIDasByteBuffer();
+ for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
+ Map<ByteBuffer, HostEvent> oldHostEvents =
+ discoveredAddedHostEvents.get(swp.getDpid());
+ if (oldHostEvents != null) {
+ oldHostEvents.remove(id);
+ }
+ }
}
}
diff --git a/src/test/java/net/onrc/onos/core/topology/TopologyManagerTest.java b/src/test/java/net/onrc/onos/core/topology/TopologyManagerTest.java
index 2f8f377..f6e6c09 100644
--- a/src/test/java/net/onrc/onos/core/topology/TopologyManagerTest.java
+++ b/src/test/java/net/onrc/onos/core/topology/TopologyManagerTest.java
@@ -49,10 +49,10 @@
* These test cases only check the sanity of functions in the TopologyManager.
* Note that we do not test the eventHandler functions in the TopologyManager
* class.
- * DatagridService, DataStoreService, eventChannel, and
- * controllerRegistryService are mocked out.
+ * DatagridService, eventChannel, and controllerRegistryService are mocked out.
*/
public class TopologyManagerTest extends UnitTest {
+ private TopologyPublisher theTopologyPublisher;
private TopologyManager theTopologyManager;
private TopologyManager.EventHandler theEventHandler;
private TopologyListenerTest theTopologyListener =
@@ -60,7 +60,6 @@
private final String eventChannelName = "onos.topology";
private IEventChannel<byte[], TopologyEvent> eventChannel;
private IDatagridService datagridService;
- private TopologyDatastore dataStoreService;
private IControllerRegistryService registryService;
private Collection<TopologyEvent> allTopologyEvents;
private static final OnosInstanceId ONOS_INSTANCE_ID_1 =
@@ -94,7 +93,6 @@
public void setUp() throws Exception {
// Mock objects for testing
datagridService = createNiceMock(IDatagridService.class);
- dataStoreService = createNiceMock(TopologyDatastore.class);
registryService = createMock(IControllerRegistryService.class);
eventChannel = createNiceMock(IEventChannel.class);
@@ -111,40 +109,6 @@
eq(TopologyEvent.class)))
.andReturn(eventChannel).once();
- expect(dataStoreService.addSwitch(
- anyObject(SwitchEvent.class),
- anyObject(Collection.class)))
- .andReturn(true).anyTimes();
-
- expect(dataStoreService.deactivateSwitch(
- anyObject(SwitchEvent.class),
- anyObject(Collection.class)))
- .andReturn(true).anyTimes();
-
- expect(dataStoreService.addPort(
- anyObject(PortEvent.class)))
- .andReturn(true).anyTimes();
-
- expect(dataStoreService.deactivatePort(
- anyObject(PortEvent.class)))
- .andReturn(true).anyTimes();
-
- expect(dataStoreService.addHost(
- anyObject(HostEvent.class)))
- .andReturn(true).anyTimes();
-
- expect(dataStoreService.removeHost(
- anyObject(HostEvent.class)))
- .andReturn(true).anyTimes();
-
- expect(dataStoreService.addLink(
- anyObject(LinkEvent.class)))
- .andReturn(true).anyTimes();
-
- expect(dataStoreService.removeLink(
- anyObject(LinkEvent.class)))
- .andReturn(true).anyTimes();
-
// Setup the Registry Service
expect(registryService.getOnosInstanceId()).andReturn(ONOS_INSTANCE_ID_1).anyTimes();
expect(registryService.getControllerForSwitch(DPID_1.value()))
@@ -158,11 +122,25 @@
replay(datagridService);
replay(registryService);
- replay(dataStoreService);
// replay(eventChannel);
}
/**
+ * Setup the Topology Publisher.
+ */
+ private void setupTopologyPublisher() {
+ // Create a TopologyPublisher object for testing
+ theTopologyPublisher = new TopologyPublisher();
+
+ // Setup the registry service
+ TestUtils.setField(theTopologyPublisher, "registryService",
+ registryService);
+
+ // Setup the event channel
+ TestUtils.setField(theTopologyPublisher, "eventChannel", eventChannel);
+ }
+
+ /**
* Setup the Topology Manager.
*/
private void setupTopologyManager() {
@@ -173,9 +151,6 @@
TestUtils.setField(theTopologyManager, "eventHandler",
createNiceMock(TopologyManager.EventHandler.class));
theTopologyManager.startup(datagridService);
-
- // Replace the data store with a mocked object
- TestUtils.setField(theTopologyManager, "datastore", dataStoreService);
}
/**
@@ -191,9 +166,6 @@
TestUtils.setField(theTopologyManager, "eventHandler",
theEventHandler);
- // Replace the data store with a mocked object
- TestUtils.setField(theTopologyManager, "datastore", dataStoreService);
-
replay(eventChannel);
//
// NOTE: Uncomment-out the line below if the startup() method needs
@@ -222,15 +194,17 @@
expectLastCall().times(1, 1); // 1 event
replay(eventChannel);
- setupTopologyManager();
+ setupTopologyPublisher();
// Generate a new Switch Mastership event
Role role = Role.MASTER;
MastershipEvent mastershipEvent =
new MastershipEvent(DPID_1, ONOS_INSTANCE_ID_1, role);
- // Call the topologyManager function for adding the event
- theTopologyManager.putSwitchMastershipEvent(mastershipEvent);
+ // Call the TopologyPublisher function for adding the event
+ TestUtils.callMethod(theTopologyPublisher,
+ "putSwitchMastershipEvent",
+ MastershipEvent.class, mastershipEvent);
// Verify the function calls
verify(eventChannel);
@@ -246,15 +220,17 @@
expectLastCall().times(1, 1); // 1 event
replay(eventChannel);
- setupTopologyManager();
+ setupTopologyPublisher();
// Generate a new Switch Mastership Event
Role role = Role.MASTER;
MastershipEvent mastershipEvent =
new MastershipEvent(DPID_1, ONOS_INSTANCE_ID_1, role);
- // Call the topologyManager function for removing the event
- theTopologyManager.removeSwitchMastershipEvent(mastershipEvent);
+ // Call the TopologyPublisher function for removing the event
+ TestUtils.callMethod(theTopologyPublisher,
+ "removeSwitchMastershipEvent",
+ MastershipEvent.class, mastershipEvent);
// Verify the function calls
verify(eventChannel);
@@ -271,7 +247,7 @@
expectLastCall().times(3, 3); // (1 Switch + 1 Port), 1 Port
replay(eventChannel);
- setupTopologyManager();
+ setupTopologyPublisher();
// Mock Switch has one Port
PortNumber portNumber = PortNumber.uint32(1);
@@ -282,12 +258,18 @@
Collection<PortEvent> portEvents = new ArrayList<PortEvent>();
portEvents.add(new PortEvent(DPID_1, portNumber));
- // Call the topologyManager function for adding a Switch
- theTopologyManager.putSwitchDiscoveryEvent(switchEvent, portEvents);
+ // Call the TopologyPublisher function for adding a Switch
+ TestUtils.callMethod(theTopologyPublisher,
+ "putSwitchDiscoveryEvent",
+ new Class<?>[] {SwitchEvent.class,
+ Collection.class},
+ switchEvent, portEvents);
for (PortEvent portEvent : portEvents) {
- // Call the topologyManager function for adding a Port
- theTopologyManager.putPortDiscoveryEvent(portEvent);
+ // Call the TopologyPublisher function for adding a Port
+ TestUtils.callMethod(theTopologyPublisher,
+ "putPortDiscoveryEvent",
+ PortEvent.class, portEvent);
}
// Verify the function calls
@@ -305,7 +287,7 @@
expectLastCall().times(2, 2); // 1 Switch, 1 Port
replay(eventChannel);
- setupTopologyManager();
+ setupTopologyPublisher();
PortNumber portNumber = PortNumber.uint32(1);
@@ -313,14 +295,18 @@
Collection<PortEvent> portEvents = new ArrayList<PortEvent>();
portEvents.add(new PortEvent(DPID_1, portNumber));
- // Call the topologyManager function for removing a Port
+ // Call the TopologyPublisher function for removing a Port
for (PortEvent portEvent : portEvents) {
- theTopologyManager.removePortDiscoveryEvent(portEvent);
+ TestUtils.callMethod(theTopologyPublisher,
+ "removePortDiscoveryEvent",
+ PortEvent.class, portEvent);
}
- // Call the topologyManager function for removing a Switch
+ // Call the TopologyPublisher function for removing a Switch
SwitchEvent switchEvent = new SwitchEvent(DPID_1);
- theTopologyManager.removeSwitchDiscoveryEvent(switchEvent);
+ TestUtils.callMethod(theTopologyPublisher,
+ "removeSwitchDiscoveryEvent",
+ SwitchEvent.class, switchEvent);
// Verify the function calls
verify(eventChannel);
@@ -338,7 +324,7 @@
expectLastCall().times(5, 5); // (2 Switch + 2 Port + 1 Link)
replay(eventChannel);
- setupTopologyManager();
+ setupTopologyPublisher();
// Generate the Switch and Port Events
PortNumber portNumber1 = PortNumber.uint32(1);
@@ -346,8 +332,12 @@
Collection<PortEvent> portEvents1 = new ArrayList<PortEvent>();
portEvents1.add(new PortEvent(DPID_1, portNumber1));
- // Call the topologyManager function for adding a Switch
- theTopologyManager.putSwitchDiscoveryEvent(switchEvent1, portEvents1);
+ // Call the TopologyPublisher function for adding a Switch
+ TestUtils.callMethod(theTopologyPublisher,
+ "putSwitchDiscoveryEvent",
+ new Class<?>[] {SwitchEvent.class,
+ Collection.class},
+ switchEvent1, portEvents1);
// Generate the Switch and Port Events
PortNumber portNumber2 = PortNumber.uint32(2);
@@ -355,14 +345,20 @@
Collection<PortEvent> portEvents2 = new ArrayList<PortEvent>();
portEvents2.add(new PortEvent(DPID_2, portNumber2));
- // Call the topologyManager function for adding a Switch
- theTopologyManager.putSwitchDiscoveryEvent(switchEvent2, portEvents2);
+ // Call the TopologyPublisher function for adding a Switch
+ TestUtils.callMethod(theTopologyPublisher,
+ "putSwitchDiscoveryEvent",
+ new Class<?>[] {SwitchEvent.class,
+ Collection.class},
+ switchEvent2, portEvents2);
// Create the Link Event
LinkEvent linkEvent =
new LinkEvent(new SwitchPort(DPID_1, portNumber1),
new SwitchPort(DPID_2, portNumber2));
- theTopologyManager.putLinkDiscoveryEvent(linkEvent);
+ TestUtils.callMethod(theTopologyPublisher,
+ "putLinkDiscoveryEvent",
+ LinkEvent.class, linkEvent);
// Verify the function calls
verify(eventChannel);
@@ -378,7 +374,7 @@
expectLastCall().times(1, 1); // (1 Link)
replay(eventChannel);
- setupTopologyManager();
+ setupTopologyPublisher();
// Generate the Switch and Port Events
PortNumber portNumber1 = PortNumber.uint32(1);
@@ -386,8 +382,12 @@
Collection<PortEvent> portEvents1 = new ArrayList<PortEvent>();
portEvents1.add(new PortEvent(DPID_1, portNumber1));
- // Call the topologyManager function for adding a Switch
- theTopologyManager.putSwitchDiscoveryEvent(switchEvent1, portEvents1);
+ // Call the TopologyPublisher function for adding a Switch
+ TestUtils.callMethod(theTopologyPublisher,
+ "putSwitchDiscoveryEvent",
+ new Class<?>[] {SwitchEvent.class,
+ Collection.class},
+ switchEvent1, portEvents1);
// Generate the Switch and Port Events
PortNumber portNumber2 = PortNumber.uint32(2);
@@ -395,14 +395,20 @@
Collection<PortEvent> portEvents2 = new ArrayList<PortEvent>();
portEvents2.add(new PortEvent(DPID_2, portNumber2));
- // Call the topologyManager function for adding a Switch
- theTopologyManager.putSwitchDiscoveryEvent(switchEvent2, portEvents2);
+ // Call the TopologyPublisher function for adding a Switch
+ TestUtils.callMethod(theTopologyPublisher,
+ "putSwitchDiscoveryEvent",
+ new Class<?>[] {SwitchEvent.class,
+ Collection.class},
+ switchEvent2, portEvents2);
// Remove the Link
LinkEvent linkEventRemove =
new LinkEvent(new SwitchPort(DPID_1, portNumber1),
new SwitchPort(DPID_2, portNumber2));
- theTopologyManager.removeLinkDiscoveryEvent(linkEventRemove);
+ TestUtils.callMethod(theTopologyPublisher,
+ "removeLinkDiscoveryEvent",
+ LinkEvent.class, linkEventRemove);
// Verify the function calls
verify(eventChannel);
@@ -419,7 +425,7 @@
expectLastCall().times(1, 1); // 1 Host
replay(eventChannel);
- setupTopologyManager();
+ setupTopologyPublisher();
// Generate a new Host Event
PortNumber portNumber = PortNumber.uint32(1);
@@ -430,8 +436,10 @@
HostEvent hostEvent = new HostEvent(hostMac);
hostEvent.setAttachmentPoints(spLists);
- // Call the topologyManager function for adding a Host
- theTopologyManager.putHostDiscoveryEvent(hostEvent);
+ // Call the TopologyPublisher function for adding a Host
+ TestUtils.callMethod(theTopologyPublisher,
+ "putHostDiscoveryEvent",
+ HostEvent.class, hostEvent);
// Verify the function calls
verify(eventChannel);
@@ -447,7 +455,7 @@
expectLastCall().times(1, 1); // 1 Host
replay(eventChannel);
- setupTopologyManager();
+ setupTopologyPublisher();
// Generate a new Host Event
PortNumber portNumber = PortNumber.uint32(1);
@@ -458,8 +466,10 @@
HostEvent hostEvent = new HostEvent(hostMac);
hostEvent.setAttachmentPoints(spLists);
- // Call the topologyManager function for removing a Host
- theTopologyManager.removeHostDiscoveryEvent(hostEvent);
+ // Call the TopologyPublisher function for removing a Host
+ TestUtils.callMethod(theTopologyPublisher,
+ "removeHostDiscoveryEvent",
+ HostEvent.class, hostEvent);
// Verify the function calls
verify(eventChannel);