Work toward cleaning up the Topology Manager and the Topology Publisher
ONOS-1890
Moved some of the methods in class TopologyManager to TopologyPublisher,
because naturally they belong to the latter.
This is also needed for the log-based mechanism.
In the process, removed the event writing to the datastore, until
it becomes clear when, where and what to write.
Change-Id: Ic1fe1db533aec66a91bf643b0989119f33c3d37e
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
index 1280cf4..daa334b 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
@@ -22,17 +22,12 @@
import net.onrc.onos.core.datagrid.IDatagridService;
import net.onrc.onos.core.datagrid.IEventChannel;
import net.onrc.onos.core.datagrid.IEventChannelListener;
-import net.onrc.onos.core.datastore.topology.KVDevice;
-import net.onrc.onos.core.datastore.topology.KVLink;
-import net.onrc.onos.core.datastore.topology.KVPort;
-import net.onrc.onos.core.datastore.topology.KVSwitch;
import net.onrc.onos.core.metrics.OnosMetrics;
import net.onrc.onos.core.metrics.OnosMetrics.MetricsComponent;
import net.onrc.onos.core.metrics.OnosMetrics.MetricsFeature;
import net.onrc.onos.core.registry.IControllerRegistryService;
import net.onrc.onos.core.util.Dpid;
import net.onrc.onos.core.util.EventEntry;
-import net.onrc.onos.core.util.PortNumber;
import net.onrc.onos.core.util.SwitchPort;
import net.onrc.onos.core.util.serializers.KryoFactory;
@@ -58,7 +53,7 @@
* TODO TBD: This class may delay the requested change to handle event
* re-ordering. e.g.) Link Add came in, but Switch was not there.
*/
-public class TopologyManager implements TopologyDiscoveryInterface {
+public class TopologyManager {
private static final Logger log = LoggerFactory
.getLogger(TopologyManager.class);
@@ -67,9 +62,7 @@
public static final String EVENT_CHANNEL_NAME = "onos.topology";
private EventHandler eventHandler = new EventHandler();
- private TopologyDatastore datastore;
private final TopologyImpl topology = new TopologyImpl();
- private final IControllerRegistryService registryService;
private Kryo kryo = KryoFactory.newKryoObject();
private TopologyEventPreprocessor eventPreprocessor;
private CopyOnWriteArrayList<ITopologyListener> topologyListeners =
@@ -104,44 +97,6 @@
"ListenerEventRate");
//
- // Local state for keeping track of locally discovered events so we can
- // cleanup properly when a Switch or Port is removed.
- //
- // We keep all Port, (incoming) Link and Host events per Switch DPID:
- // - If a switch goes down, we remove all corresponding Port, Link and
- // Host events.
- // - If a port on a switch goes down, we remove all corresponding Link
- // and Host events discovered by this instance.
- //
- // How to handle side-effect of remote events.
- // - Remote Port Down event -> Link Down
- // Not handled. (XXX Shouldn't it be removed from discovered.. Map)
- // - Remote Host Added -> lose ownership of Host)
- // Not handled. (XXX Shouldn't it be removed from discovered.. Map)
- //
- // XXX Domain knowledge based invariant maintenance should be moved to
- // driver module, since the invariant may be different on optical, etc.
- //
- // What happens on leadership change?
- // - Probably should: remove from discovered.. Maps, but not send DELETE
- // events
- // XXX Switch/Port can be rediscovered by new leader, but Link, Host?
- // - Current: There is no way to recognize leadership change?
- // ZookeeperRegistry.requestControl(long, ControlChangeCallback)
- // is the only way to register listener, and it allows only one
- // listener, which is already used by Controller class.
- //
- // FIXME Replace with concurrent variant.
- // #removeSwitchDiscoveryEvent(SwitchEvent) runs in different thread.
- //
- private Map<Dpid, Map<ByteBuffer, PortEvent>> discoveredAddedPortEvents =
- new HashMap<>();
- private Map<Dpid, Map<ByteBuffer, LinkEvent>> discoveredAddedLinkEvents =
- new HashMap<>();
- private Map<Dpid, Map<ByteBuffer, HostEvent>> discoveredAddedHostEvents =
- new HashMap<>();
-
- //
// Local state for keeping the last ADD Mastership Event entries.
// TODO: In the future, we might have to keep this state somewhere else.
//
@@ -173,8 +128,6 @@
* @param registryService the Registry Service to use.
*/
public TopologyManager(IControllerRegistryService registryService) {
- datastore = new TopologyDatastore();
- this.registryService = registryService;
this.eventPreprocessor =
new TopologyEventPreprocessor(registryService);
}
@@ -201,12 +154,7 @@
*/
private void startup() {
//
- // TODO: Read all state from the database:
- //
- // Collection<EventEntry<TopologyEvent>> collection =
- // readWholeTopologyFromDB();
- //
- // For now, as a shortcut we read it from the datagrid
+ // Read all topology state
//
Collection<TopologyEvent> allTopologyEvents =
eventChannel.getAllEntries();
@@ -611,359 +559,6 @@
apiRemovedHostEvents.clear();
}
- /**
- * Mastership updated event.
- *
- * @param mastershipEvent the mastership event.
- */
- @Override
- public void putSwitchMastershipEvent(MastershipEvent mastershipEvent) {
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(mastershipEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
- }
-
- /**
- * Mastership removed event.
- *
- * @param mastershipEvent the mastership event.
- */
- @Override
- public void removeSwitchMastershipEvent(MastershipEvent mastershipEvent) {
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(mastershipEvent,
- registryService.getOnosInstanceId());
- eventChannel.removeEntry(topologyEvent.getID());
- }
-
- /**
- * Switch discovered event.
- *
- * @param switchEvent the switch event.
- * @param portEvents the corresponding port events for the switch.
- */
- @Override
- public void putSwitchDiscoveryEvent(SwitchEvent switchEvent,
- Collection<PortEvent> portEvents) {
- if (datastore.addSwitch(switchEvent, portEvents)) {
- log.debug("Sending add switch: {}", switchEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(switchEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
- // Send out notification for each port
- for (PortEvent portEvent : portEvents) {
- log.debug("Sending add port: {}", portEvent);
- topologyEvent =
- new TopologyEvent(portEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
- }
-
- //
- // Keep track of the added ports
- //
- // Get the old Port Events
- Map<ByteBuffer, PortEvent> oldPortEvents =
- discoveredAddedPortEvents.get(switchEvent.getDpid());
- if (oldPortEvents == null) {
- oldPortEvents = new HashMap<>();
- }
-
- // Store the new Port Events in the local cache
- Map<ByteBuffer, PortEvent> newPortEvents = new HashMap<>();
- for (PortEvent portEvent : portEvents) {
- ByteBuffer id = portEvent.getIDasByteBuffer();
- newPortEvents.put(id, portEvent);
- }
- discoveredAddedPortEvents.put(switchEvent.getDpid(),
- newPortEvents);
-
- //
- // Extract the removed ports
- //
- List<PortEvent> removedPortEvents = new LinkedList<>();
- for (Map.Entry<ByteBuffer, PortEvent> entry : oldPortEvents.entrySet()) {
- ByteBuffer key = entry.getKey();
- PortEvent portEvent = entry.getValue();
- if (!newPortEvents.containsKey(key)) {
- removedPortEvents.add(portEvent);
- }
- }
-
- // Cleanup old removed ports
- for (PortEvent portEvent : removedPortEvents) {
- removePortDiscoveryEvent(portEvent);
- }
- }
- }
-
- /**
- * {@inheritDoc}
- * <p/>
- * Called by {@link TopologyPublisher.SwitchCleanup} thread.
- */
- @Override
- public void removeSwitchDiscoveryEvent(SwitchEvent switchEvent) {
- TopologyEvent topologyEvent;
-
- // Get the old Port Events
- Map<ByteBuffer, PortEvent> oldPortEvents =
- discoveredAddedPortEvents.get(switchEvent.getDpid());
- if (oldPortEvents == null) {
- oldPortEvents = new HashMap<>();
- }
-
- if (datastore.deactivateSwitch(switchEvent, oldPortEvents.values())) {
- log.debug("Sending remove switch: {}", switchEvent);
- // Send out notification
- topologyEvent =
- new TopologyEvent(switchEvent,
- registryService.getOnosInstanceId());
- eventChannel.removeEntry(topologyEvent.getID());
-
- //
- // Send out notification for each port.
- //
- // NOTE: We don't use removePortDiscoveryEvent() for the cleanup,
- // because it will attempt to remove the port from the database,
- // and the deactiveSwitch() call above already removed all ports.
- //
- for (PortEvent portEvent : oldPortEvents.values()) {
- log.debug("Sending remove port:", portEvent);
- topologyEvent =
- new TopologyEvent(portEvent,
- registryService.getOnosInstanceId());
- eventChannel.removeEntry(topologyEvent.getID());
- }
- discoveredAddedPortEvents.remove(switchEvent.getDpid());
-
- // Cleanup for each link
- Map<ByteBuffer, LinkEvent> oldLinkEvents =
- discoveredAddedLinkEvents.get(switchEvent.getDpid());
- if (oldLinkEvents != null) {
- for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
- removeLinkDiscoveryEvent(linkEvent);
- }
- discoveredAddedLinkEvents.remove(switchEvent.getDpid());
- }
-
- // Cleanup for each host
- Map<ByteBuffer, HostEvent> oldHostEvents =
- discoveredAddedHostEvents.get(switchEvent.getDpid());
- if (oldHostEvents != null) {
- for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
- removeHostDiscoveryEvent(hostEvent);
- }
- discoveredAddedHostEvents.remove(switchEvent.getDpid());
- }
- }
- }
-
- /**
- * Port discovered event.
- *
- * @param portEvent the port event.
- */
- @Override
- public void putPortDiscoveryEvent(PortEvent portEvent) {
- if (datastore.addPort(portEvent)) {
- log.debug("Sending add port: {}", portEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(portEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
- // Store the new Port Event in the local cache
- Map<ByteBuffer, PortEvent> oldPortEvents =
- discoveredAddedPortEvents.get(portEvent.getDpid());
- if (oldPortEvents == null) {
- oldPortEvents = new HashMap<>();
- discoveredAddedPortEvents.put(portEvent.getDpid(),
- oldPortEvents);
- }
- ByteBuffer id = portEvent.getIDasByteBuffer();
- oldPortEvents.put(id, portEvent);
- }
- }
-
- /**
- * Port removed event.
- *
- * @param portEvent the port event.
- */
- @Override
- public void removePortDiscoveryEvent(PortEvent portEvent) {
- if (datastore.deactivatePort(portEvent)) {
- log.debug("Sending remove port: {}", portEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(portEvent,
- registryService.getOnosInstanceId());
- eventChannel.removeEntry(topologyEvent.getID());
-
- // Cleanup the Port Event from the local cache
- Map<ByteBuffer, PortEvent> oldPortEvents =
- discoveredAddedPortEvents.get(portEvent.getDpid());
- if (oldPortEvents != null) {
- ByteBuffer id = portEvent.getIDasByteBuffer();
- oldPortEvents.remove(id);
- }
-
- // Cleanup for the incoming link
- Map<ByteBuffer, LinkEvent> oldLinkEvents =
- discoveredAddedLinkEvents.get(portEvent.getDpid());
- if (oldLinkEvents != null) {
- for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
- if (linkEvent.getDst().equals(portEvent.getSwitchPort())) {
- removeLinkDiscoveryEvent(linkEvent);
- // XXX If we change our model to allow multiple Link on
- // a Port, this loop must be fixed to allow continuing.
- break;
- }
- }
- }
-
- // Cleanup for the connected hosts
- // TODO: The implementation below is probably wrong
- List<HostEvent> removedHostEvents = new LinkedList<>();
- Map<ByteBuffer, HostEvent> oldHostEvents =
- discoveredAddedHostEvents.get(portEvent.getDpid());
- if (oldHostEvents != null) {
- for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
- for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
- if (swp.equals(portEvent.getSwitchPort())) {
- removedHostEvents.add(hostEvent);
- }
- }
- }
- for (HostEvent hostEvent : removedHostEvents) {
- removeHostDiscoveryEvent(hostEvent);
- }
- }
- }
- }
-
- /**
- * Link discovered event.
- *
- * @param linkEvent the link event.
- */
- @Override
- public void putLinkDiscoveryEvent(LinkEvent linkEvent) {
- if (datastore.addLink(linkEvent)) {
- log.debug("Sending add link: {}", linkEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(linkEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
- // Store the new Link Event in the local cache
- Map<ByteBuffer, LinkEvent> oldLinkEvents =
- discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
- if (oldLinkEvents == null) {
- oldLinkEvents = new HashMap<>();
- discoveredAddedLinkEvents.put(linkEvent.getDst().getDpid(),
- oldLinkEvents);
- }
- ByteBuffer id = linkEvent.getIDasByteBuffer();
- oldLinkEvents.put(id, linkEvent);
- }
- }
-
- /**
- * Link removed event.
- *
- * @param linkEvent the link event.
- */
- @Override
- public void removeLinkDiscoveryEvent(LinkEvent linkEvent) {
- if (datastore.removeLink(linkEvent)) {
- log.debug("Sending remove link: {}", linkEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(linkEvent,
- registryService.getOnosInstanceId());
- eventChannel.removeEntry(topologyEvent.getID());
-
- // Cleanup the Link Event from the local cache
- Map<ByteBuffer, LinkEvent> oldLinkEvents =
- discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
- if (oldLinkEvents != null) {
- ByteBuffer id = linkEvent.getIDasByteBuffer();
- oldLinkEvents.remove(id);
- }
- }
- }
-
- /**
- * Host discovered event.
- *
- * @param hostEvent the host event.
- */
- @Override
- public void putHostDiscoveryEvent(HostEvent hostEvent) {
- if (datastore.addHost(hostEvent)) {
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(hostEvent,
- registryService.getOnosInstanceId());
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
- log.debug("Put the host info into the cache of the topology. mac {}",
- hostEvent.getMac());
-
- // Store the new Host Event in the local cache
- // TODO: The implementation below is probably wrong
- for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
- Map<ByteBuffer, HostEvent> oldHostEvents =
- discoveredAddedHostEvents.get(swp.getDpid());
- if (oldHostEvents == null) {
- oldHostEvents = new HashMap<>();
- discoveredAddedHostEvents.put(swp.getDpid(),
- oldHostEvents);
- }
- ByteBuffer id = hostEvent.getIDasByteBuffer();
- oldHostEvents.put(id, hostEvent);
- }
- }
- }
-
- /**
- * Host removed event.
- *
- * @param hostEvent the host event.
- */
- @Override
- public void removeHostDiscoveryEvent(HostEvent hostEvent) {
- if (datastore.removeHost(hostEvent)) {
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(hostEvent,
- registryService.getOnosInstanceId());
- eventChannel.removeEntry(topologyEvent.getID());
- log.debug("Remove the host info into the cache of the topology. mac {}",
- hostEvent.getMac());
-
- // Cleanup the Host Event from the local cache
- // TODO: The implementation below is probably wrong
- ByteBuffer id = hostEvent.getIDasByteBuffer();
- for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
- Map<ByteBuffer, HostEvent> oldHostEvents =
- discoveredAddedHostEvents.get(swp.getDpid());
- if (oldHostEvents != null) {
- oldHostEvents.remove(id);
- }
- }
- }
- }
-
//
// Methods to update topology replica
//
@@ -1390,96 +985,4 @@
topology.removeHost(mac);
apiRemovedHostEvents.add(hostInTopo);
}
-
- /**
- * Read the whole topology from the database.
- *
- * @return a list of EventEntry-encapsulated Topology Events for
- * the whole topology.
- */
- private List<EventEntry<TopologyEvent>> readWholeTopologyFromDB() {
- List<EventEntry<TopologyEvent>> events =
- new LinkedList<EventEntry<TopologyEvent>>();
-
- // XXX May need to clear whole topology first, depending on
- // how we initially subscribe to replication events
-
- // Add all active switches
- for (KVSwitch sw : KVSwitch.getAllSwitches()) {
- if (sw.getStatus() != KVSwitch.STATUS.ACTIVE) {
- continue;
- }
-
- //
- // TODO: Using the local ONOS Instance ID below is incorrect.
- // Currently, this code is not used, and it might go away in the
- // future.
- //
- SwitchEvent switchEvent = new SwitchEvent(new Dpid(sw.getDpid()));
- TopologyEvent topologyEvent =
- new TopologyEvent(switchEvent,
- registryService.getOnosInstanceId());
- EventEntry<TopologyEvent> eventEntry =
- new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
- topologyEvent);
- events.add(eventEntry);
- }
-
- // Add all active ports
- for (KVPort p : KVPort.getAllPorts()) {
- if (p.getStatus() != KVPort.STATUS.ACTIVE) {
- continue;
- }
-
- //
- // TODO: Using the local ONOS Instance ID below is incorrect.
- // Currently, this code is not used, and it might go away in the
- // future.
- //
- PortEvent portEvent =
- new PortEvent(new Dpid(p.getDpid()),
- new PortNumber(p.getNumber().shortValue()));
- TopologyEvent topologyEvent =
- new TopologyEvent(portEvent,
- registryService.getOnosInstanceId());
- EventEntry<TopologyEvent> eventEntry =
- new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
- topologyEvent);
- events.add(eventEntry);
- }
-
- for (KVDevice d : KVDevice.getAllDevices()) {
- //
- // TODO: Using the local ONOS Instance ID below is incorrect.
- // Currently, this code is not used, and it might go away in the
- // future.
- //
- HostEvent devEvent = new HostEvent(MACAddress.valueOf(d.getMac()));
- for (byte[] portId : d.getAllPortIds()) {
- devEvent.addAttachmentPoint(
- new SwitchPort(KVPort.getDpidFromKey(portId),
- KVPort.getNumberFromKey(portId)));
- }
- }
-
- for (KVLink l : KVLink.getAllLinks()) {
- //
- // TODO: Using the local ONOS Instance ID below is incorrect.
- // Currently, this code is not used, and it might go away in the
- // future.
- //
- LinkEvent linkEvent = new LinkEvent(
- new SwitchPort(l.getSrc().dpid, l.getSrc().number),
- new SwitchPort(l.getDst().dpid, l.getDst().number));
- TopologyEvent topologyEvent =
- new TopologyEvent(linkEvent,
- registryService.getOnosInstanceId());
- EventEntry<TopologyEvent> eventEntry =
- new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
- topologyEvent);
- events.add(eventEntry);
- }
-
- return events;
- }
}