Added cleanup thread to RCNetworkGraphPublisher
Change-Id: I5c663b146d68f8bb8634ecd4e74953026da79c13
diff --git a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/RCNetworkGraphPublisher.java b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/RCNetworkGraphPublisher.java
index 156ee1a..d2c10fc 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/RCNetworkGraphPublisher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/RCNetworkGraphPublisher.java
@@ -2,7 +2,9 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IOFSwitch;
@@ -10,17 +12,25 @@
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
+import net.floodlightcontroller.core.util.SingletonTask;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.ofcontroller.core.IOFSwitchPortListener;
import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryListener;
import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryService;
import net.onrc.onos.ofcontroller.networkgraph.INetworkGraphService;
import net.onrc.onos.ofcontroller.networkgraph.LinkEvent;
+import net.onrc.onos.ofcontroller.networkgraph.NetworkGraph;
import net.onrc.onos.ofcontroller.networkgraph.NetworkGraphDiscoveryInterface;
+import net.onrc.onos.ofcontroller.networkgraph.PortEvent;
+import net.onrc.onos.ofcontroller.networkgraph.Switch;
import net.onrc.onos.ofcontroller.networkgraph.SwitchEvent;
import net.onrc.onos.registry.controller.IControllerRegistryService;
+import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
+import net.onrc.onos.registry.controller.RegistryException;
import org.openflow.protocol.OFPhysicalPort;
+import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +56,103 @@
private IDatagridService datagridService;
private INetworkGraphService networkGraphService;
+ private NetworkGraph networkGraph;
private NetworkGraphDiscoveryInterface networkGraphDiscoveryInterface;
+
+ private static final String ENABLE_CLEANUP_PROPERTY = "EnableCleanup";
+ private boolean cleanupEnabled = true;
+ private static final int CLEANUP_TASK_INTERVAL = 60; // in seconds
+ private SingletonTask cleanupTask;
+ /**
+ * Cleanup and synch switch state from registry
+ */
+ private class SwitchCleanup implements ControlChangeCallback, Runnable {
+ @Override
+ public void run() {
+ String old = Thread.currentThread().getName();
+ Thread.currentThread().setName("SwitchCleanup@" + old);
+
+ try {
+ log.debug("Running cleanup thread");
+ switchCleanup();
+ }
+ catch (Exception e) {
+ log.error("Error in cleanup thread", e);
+ } finally {
+ cleanupTask.reschedule(CLEANUP_TASK_INTERVAL,
+ TimeUnit.SECONDS);
+ Thread.currentThread().setName(old);
+ }
+ }
+
+ private void switchCleanup() {
+ Iterable<Switch> switches = networkGraph.getSwitches();
+
+ log.debug("Checking for inactive switches");
+ // For each switch check if a controller exists in controller registry
+ for (Switch sw: switches) {
+ try {
+ String controller =
+ registryService.getControllerForSwitch(sw.getDpid());
+ if (controller == null) {
+ log.debug("Requesting control to set switch {} INACTIVE",
+ HexString.toHexString(sw.getDpid()));
+ registryService.requestControl(sw.getDpid(), this);
+ }
+ } catch (RegistryException e) {
+ log.error("Caught RegistryException in cleanup thread", e);
+ }
+ }
+ }
+
+ @Override
+ public void controlChanged(long dpid, boolean hasControl) {
+ if (hasControl) {
+ log.debug("Got control to set switch {} INACTIVE", HexString.toHexString(dpid));
+ /*
+ // Get the affected ports
+ List<Short> ports = swStore.getPorts(HexString.toHexString(dpid));
+ // Get the affected links
+ List<Link> links = linkStore.getLinks(HexString.toHexString(dpid));
+ // Get the affected reverse links
+ List<Link> reverseLinks = linkStore.getReverseLinks(HexString.toHexString(dpid));
+ links.addAll(reverseLinks);
+ */
+ SwitchEvent switchEvent = new SwitchEvent(dpid);
+ networkGraphDiscoveryInterface.removeSwitchEvent(switchEvent);
+ registryService.releaseControl(dpid);
+
+ // TODO publish UPDATE_SWITCH event here
+ //
+ // NOTE: Here we explicitly send
+ // notification to remove the
+ // switch, because it is inactive
+ //
+ /*
+ TopologyElement topologyElement =
+ new TopologyElement(dpid);
+ datagridService.notificationSendTopologyElementRemoved(topologyElement);
+
+ // Publish: remove the affected ports
+ for (Short port : ports) {
+ TopologyElement topologyElementPort =
+ new TopologyElement(dpid, port);
+ datagridService.notificationSendTopologyElementRemoved(topologyElementPort);
+ }
+ // Publish: remove the affected links
+ for (Link link : links) {
+ TopologyElement topologyElementLink =
+ new TopologyElement(link.getSrc(),
+ link.getSrcPort(),
+ link.getDst(),
+ link.getDstPort());
+ datagridService.notificationSendTopologyElementRemoved(topologyElementLink);
+ }
+ */
+ }
+ }
+ }
@Override
public void linkDiscoveryUpdate(LDUpdate update) {
@@ -107,6 +212,13 @@
}
SwitchEvent switchEvent = new SwitchEvent(sw.getId());
+
+ List<PortEvent> portEvents = new ArrayList<PortEvent>();
+ for (OFPhysicalPort port : sw.getPorts()) {
+ portEvents.add(new PortEvent(sw.getId(), (long)port.getPortNumber()));
+ }
+ switchEvent.setPorts(portEvents);
+
networkGraphDiscoveryInterface.putSwitchEvent(switchEvent);
/*
@@ -114,19 +226,21 @@
TopologyElement topologyElement =
new TopologyElement(sw.getId());
datagridService.notificationSendTopologyElementAdded(topologyElement);
-
+ */
+
// Publish: add the ports
// TODO: Add only ports that are UP?
for (OFPhysicalPort port : sw.getPorts()) {
- TopologyElement topologyElementPort =
- new TopologyElement(sw.getId(), port.getPortNumber());
- datagridService.notificationSendTopologyElementAdded(topologyElementPort);
+ //TopologyElement topologyElementPort =
+ //new TopologyElement(sw.getId(), port.getPortNumber());
+ //datagridService.notificationSendTopologyElementAdded(topologyElementPort);
// Allow links to be discovered on this port now that it's
// in the database
linkDiscovery.RemoveFromSuppressLLDPs(sw.getId(), port.getPortNumber());
}
+ /*
// Add all links that might be connected already
List<Link> links = linkStore.getLinks(HexString.toHexString(sw.getId()));
// Add all reverse links as well
@@ -147,8 +261,8 @@
@Override
public void removedSwitch(IOFSwitch sw) {
// TODO move to cleanup thread
- SwitchEvent switchEvent = new SwitchEvent(sw.getId());
- networkGraphDiscoveryInterface.removeSwitchEvent(switchEvent);
+ //SwitchEvent switchEvent = new SwitchEvent(sw.getId());
+ //networkGraphDiscoveryInterface.removeSwitchEvent(switchEvent);
}
@Override
@@ -184,6 +298,7 @@
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IFloodlightProviderService.class);
l.add(ILinkDiscoveryService.class);
+ l.add(IThreadPoolService.class);
l.add(IControllerRegistryService.class);
l.add(IDatagridService.class);
l.add(INetworkGraphService.class);
@@ -207,6 +322,26 @@
floodlightProvider.addOFSwitchListener(this);
linkDiscovery.addListener(this);
- networkGraphDiscoveryInterface = networkGraphService.getNetworkGraphDiscoveryInterface();
+ networkGraph = networkGraphService.getNetworkGraph();
+ networkGraphDiscoveryInterface =
+ networkGraphService.getNetworkGraphDiscoveryInterface();
+
+ // Run the cleanup thread
+ String enableCleanup =
+ context.getConfigParams(this).get(ENABLE_CLEANUP_PROPERTY);
+ if (enableCleanup != null && enableCleanup.toLowerCase().equals("false")) {
+ cleanupEnabled = false;
+ }
+
+ log.debug("Cleanup thread is {}enabled", (cleanupEnabled)? "" : "not ");
+
+ if (cleanupEnabled) {
+ IThreadPoolService threadPool =
+ context.getServiceImpl(IThreadPoolService.class);
+ cleanupTask = new SingletonTask(threadPool.getScheduledExecutor(),
+ new SwitchCleanup());
+ // Run the cleanup task immediately on startup
+ cleanupTask.reschedule(0, TimeUnit.SECONDS);
+ }
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraph.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraph.java
index 9040fe7..eff5952 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraph.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraph.java
@@ -12,13 +12,13 @@
*/
public interface NetworkGraph {
public Switch getSwitch(Long dpid);
- public Iterable<? extends Switch> getSwitches();
+ public Iterable<Switch> getSwitches();
// TODO Not sure about the use-case of this method? Remove if not used at the end.
- public Iterable<? extends Link> getLinks();
+ public Iterable<Link> getLinks();
// XXX next 2 method can be removed. getSwitch(dpid).getOutgoingLinks() is equivalent
- public Iterable<? extends Link> getOutgoingLinksFromSwitch(Long dpid); // Toshi: unnecessary
- public Iterable<? extends Link> getIncomingLinksFromSwitch(Long dpid); // Toshi: unnecessary
+ public Iterable<Link> getOutgoingLinksFromSwitch(Long dpid); // Toshi: unnecessary
+ public Iterable<Link> getIncomingLinksFromSwitch(Long dpid); // Toshi: unnecessary
public Iterable<Device> getDeviceByIp(InetAddress ipAddress);
public Device getDeviceByMac(MACAddress address);
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java
index 52a6ab7..a3069c3 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java
@@ -5,6 +5,7 @@
import net.onrc.onos.datastore.RCObject;
import net.onrc.onos.datastore.RCObject.WriteOp;
+import net.onrc.onos.datastore.topology.RCLink;
import net.onrc.onos.datastore.topology.RCSwitch;
import org.slf4j.Logger;
@@ -61,21 +62,17 @@
// groupOp.add(WriteOp.ForceCreate(rcPort));
// }
- boolean failed = RCObject.multiWrite( groupOp );
+ boolean failed = RCObject.multiWrite(groupOp);
- if ( failed ) {
+ if (failed) {
log.error("Adding Switch {} and its ports failed.", sw.getDpid());
- for ( WriteOp op : groupOp ) {
+ for (WriteOp op : groupOp) {
log.debug("Operation:{} for {} - Result:{}", op.getOp(), op.getObject(), op.getStatus() );
// If we changed the operation from ForceCreate to
// Conditional operation (Create/Update) then we should retry here.
}
}
- else {
- // Publish event to the in-memory cache
-// graph.addSwitch(sw);
- }
}
@@ -163,80 +160,81 @@
// }
}
- public void addLink(LinkEvent link) {
- log.debug("Adding link {}", link);
-// RCLink rcLink = new RCLink(link.getSourceSwitchDpid(), (long)link.getSourcePortNumber(),
-// link.getDestinationSwitchDpid(), (long)link.getDestinationPortNumber());
-//
-// RCPort rcSrcPort = new RCPort(link.getSourceSwitchDpid(), (long)link.getSourcePortNumber());
-// RCPort rcDstPort = new RCPort(link.getDestinationSwitchDpid(), (long)link.getDestinationPortNumber());
-//
-// for (int i = 0; i < NUM_RETRIES; i++) {
-// try {
-// rcSrcPort.read();
-// rcDstPort.read();
-// rcLink.create();
-// } catch (ObjectDoesntExistException e) {
-// // port doesn't exist
-// log.error("Add link failed {}", link, e);
-// return;
-// } catch (ObjectExistsException e) {
-// log.debug("Link already exists {}", link);
-// return;
-// }
-//
-// rcSrcPort.addLinkId(rcLink.getId());
-// rcDstPort.addLinkId(rcLink.getId());
-//
-// rcLink.setStatus(RCLink.STATUS.ACTIVE);
-//
-// try {
-// rcLink.update();
-// rcSrcPort.update();
-// rcDstPort.update();
-// break;
-// } catch (ObjectDoesntExistException | WrongVersionException e) {
-// log.debug(" ", e);
-// // retry
-// }
-// }
-//
-// // Publish event to in-memory cache
-// graph.putLink(link);
+ public void addLink(LinkEvent linkEvent) {
+ log.debug("Adding link {}", linkEvent);
+
+ RCLink rcLink = new RCLink(linkEvent.getSrc().getDpid(), linkEvent.getSrc().getNumber(),
+ linkEvent.getDst().getDpid(), linkEvent.getDst().getNumber());
+
+ //RCPort rcSrcPort = new RCPort(link.getSrc().getDpid(), link.getSrc().getNumber());
+ //RCPort rcDstPort = new RCPort(link.getDst().getDpid(), link.getDst().getNumber());
+
+ for (int i = 0; i < NUM_RETRIES; i++) {
+ try {
+ //rcSrcPort.read();
+ //rcDstPort.read();
+ rcLink.create();
+ //} catch (ObjectDoesntExistException e) {
+ // port doesn't exist
+ //log.error("Add link failed {}", link, e);
+ //return;
+ } catch (ObjectExistsException e) {
+ log.debug("Link already exists {}", linkEvent);
+ return;
+ }
+
+ //rcSrcPort.addLinkId(rcLink.getId());
+ //rcDstPort.addLinkId(rcLink.getId());
+
+ rcLink.setStatus(RCLink.STATUS.ACTIVE);
+
+ try {
+ rcLink.update();
+ //rcSrcPort.update();
+ //rcDstPort.update();
+ break;
+ } catch (ObjectDoesntExistException | WrongVersionException e) {
+ log.debug(" ", e);
+ // retry
+ }
+ }
+
+ // Publish event to in-memory cache
+ graph.putLink(linkEvent);
}
- public void removeLink(LinkEvent link) {
- log.debug("Removing link {}", link);
-// RCLink rcLink = new RCLink(link.getSourceSwitchDpid(), (long)link.getSourcePortNumber(),
-// link.getDestinationSwitchDpid(), (long)link.getDestinationPortNumber());
-//
-// RCPort rcSrcPort = new RCPort(link.getSourceSwitchDpid(), (long)link.getSourcePortNumber());
-// RCPort rcDstPort = new RCPort(link.getDestinationSwitchDpid(), (long)link.getDestinationPortNumber());
-//
-// for (int i = 0; i < NUM_RETRIES; i++) {
-// try {
-// rcSrcPort.read();
-// rcDstPort.read();
-// rcLink.read();
-// } catch (ObjectDoesntExistException e) {
-// log.error("Remove link failed {}", link, e);
-// return;
-// }
-//
-// rcSrcPort.removeLinkId(rcLink.getId());
-// rcDstPort.removeLinkId(rcLink.getId());
-//
-// try {
-// rcSrcPort.update();
-// rcDstPort.update();
-// rcLink.delete();
-// } catch (ObjectDoesntExistException e) {
-// log.error("Remove link failed {}", link, e);
-// return;
-// } catch (WrongVersionException e) {
-// // retry
-// }
-// }
+ public void removeLink(LinkEvent linkEvent) {
+ log.debug("Removing link {}", linkEvent);
+ RCLink rcLink = new RCLink(linkEvent.getSrc().getDpid(), linkEvent.getSrc().getNumber(),
+ linkEvent.getDst().getDpid(), linkEvent.getDst().getNumber());
+
+ //RCPort rcSrcPort = new RCPort(link.getSourceSwitchDpid(), (long)link.getSourcePortNumber());
+ //RCPort rcDstPort = new RCPort(link.getDestinationSwitchDpid(), (long)link.getDestinationPortNumber());
+
+ for (int i = 0; i < NUM_RETRIES; i++) {
+ try {
+ //rcSrcPort.read();
+ //rcDstPort.read();
+ rcLink.read();
+ } catch (ObjectDoesntExistException e) {
+ log.error("Remove link failed {}", linkEvent, e);
+ return;
+ }
+
+ //rcSrcPort.removeLinkId(rcLink.getId());
+ //rcDstPort.removeLinkId(rcLink.getId());
+
+ try {
+ //rcSrcPort.update();
+ //rcDstPort.update();
+ rcLink.delete();
+ } catch (ObjectDoesntExistException e) {
+ log.error("Remove link failed {}", linkEvent, e);
+ return;
+ } catch (WrongVersionException e) {
+ // retry
+ }
+ }
}
public void updateDevice(DeviceEvent device) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/SwitchImpl.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/SwitchImpl.java
index a8c824d..874bf37 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/SwitchImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/SwitchImpl.java
@@ -1,10 +1,12 @@
package net.onrc.onos.ofcontroller.networkgraph;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -71,8 +73,17 @@
@Override
public Collection<Device> getDevices() {
- // TODO Auto-generated method stub
- return null;
+ // TODO Should switch also store a list of attached devices to avoid
+ // calculating this every time?
+ List<Device> devices = new ArrayList<Device>();
+
+ for (Port port : ports.values()) {
+ for (Device device : port.getDevices()) {
+ devices.add(device);
+ }
+ }
+
+ return devices;
}
public void addPort(Port port) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/serializers/LinkSerializer.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/serializers/LinkSerializer.java
index 7708bf1..bb13c75 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/serializers/LinkSerializer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/serializers/LinkSerializer.java
@@ -27,7 +27,7 @@
link.getSourcePort().getNumber());
jsonGenerator.writeStringField("dst-switch",
HexString.toHexString(link.getDestinationSwitch().getDpid()));
- jsonGenerator.writeNumberField("src-port",
+ jsonGenerator.writeNumberField("dst-port",
link.getDestinationPort().getNumber());
jsonGenerator.writeEndObject();
}