Merge branch 'master' of github.com:OPENNETWORKINGLAB/ONOS
diff --git a/build.xml b/build.xml
index e8055e7..1c473ce 100644
--- a/build.xml
+++ b/build.xml
@@ -66,9 +66,9 @@
<include name="concurrentlinkedhashmap-lru-1.3.jar"/>
<include name="jython-2.5.2.jar"/>
<include name="libthrift-0.7.0.jar"/>
- <include name="curator-client-1.3.3.jar"/>
- <include name="curator-framework-1.3.3.jar"/>
- <include name="curator-recipes-1.3.3.jar"/>
+ <include name="curator-client-1.3.4-SNAPSHOT.jar"/>
+ <include name="curator-framework-1.3.4-SNAPSHOT.jar"/>
+ <include name="curator-recipes-1.3.4-SNAPSHOT.jar"/>
<include name="zookeeper-3.4.5.jar"/>
</patternset>
@@ -127,6 +127,7 @@
debug="true"
srcdir="${source}:${thrift.out.dir}"
destdir="${build}">
+ <compilerarg value="-Xlint:unchecked"/>
</javac>
</target>
diff --git a/src/main/java/net/floodlightcontroller/core/internal/SwitchStorageImpl.java b/src/main/java/net/floodlightcontroller/core/internal/SwitchStorageImpl.java
index 6be26cb..7e049b6 100644
--- a/src/main/java/net/floodlightcontroller/core/internal/SwitchStorageImpl.java
+++ b/src/main/java/net/floodlightcontroller/core/internal/SwitchStorageImpl.java
@@ -16,6 +16,7 @@
import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.TitanGraph;
import com.tinkerpop.blueprints.Direction;
+import com.tinkerpop.blueprints.TransactionalGraph;
import com.tinkerpop.blueprints.TransactionalGraph.Conclusion;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
@@ -244,9 +245,6 @@
FramedGraph<TitanGraph> fg = new FramedGraph<TitanGraph>(graph);
Iterable<ISwitchObject> switches = fg.getVertices("type","switch",ISwitchObject.class);
- for (ISwitchObject sw: switches) {
- log.debug("switch: {}", sw.getDPID());
- }
return switches;
}
diff --git a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
index a604969..3f74ff1 100644
--- a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
+++ b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
@@ -7,6 +7,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -73,6 +74,8 @@
public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
public static final short PRIORITY_DEFAULT = 100;
+ private static long nextFlowEntryId = 1;
+
/** The logger. */
private static Logger log = LoggerFactory.getLogger(FlowManager.class);
@@ -89,9 +92,13 @@
Map<Long, IOFSwitch> mySwitches = floodlightProvider.getSwitches();
- // Fetch all Flow Entries
- Iterable<IFlowEntry> flowEntries = conn.utils().getAllFlowEntries(conn);
- for (IFlowEntry flowEntryObj : flowEntries) {
+ Map<Long, IFlowEntry> myFlowEntries = new TreeMap<Long, IFlowEntry>();
+
+ //
+ // Fetch all Flow Entries and select only my Flow Entries
+ //
+ Iterable<IFlowEntry> allFlowEntries = conn.utils().getAllFlowEntries(conn);
+ for (IFlowEntry flowEntryObj : allFlowEntries) {
FlowEntryId flowEntryId =
new FlowEntryId(flowEntryObj.getFlowEntryId());
String userState = flowEntryObj.getUserState();
@@ -112,6 +119,29 @@
log.debug("Flow Entry ignored: not my switch");
continue;
}
+ myFlowEntries.put(flowEntryId.value(), flowEntryObj);
+ }
+
+ //
+ // Process my Flow Entries
+ //
+ for (Map.Entry<Long, IFlowEntry> entry : myFlowEntries.entrySet()) {
+ IFlowEntry flowEntryObj = entry.getValue();
+
+ //
+ // TODO: Eliminate the re-fetching of flowEntryId,
+ // userState, switchState, and dpid from the flowEntryObj.
+ //
+ FlowEntryId flowEntryId =
+ new FlowEntryId(flowEntryObj.getFlowEntryId());
+ Dpid dpid = new Dpid(flowEntryObj.getSwitchDpid());
+ String userState = flowEntryObj.getUserState();
+ String switchState = flowEntryObj.getSwitchState();
+ IOFSwitch mySwitch = mySwitches.get(dpid.value());
+ if (mySwitch == null) {
+ log.debug("Flow Entry ignored: not my switch");
+ continue;
+ }
//
// Create the Open Flow Flow Modification Entry to push
@@ -316,13 +346,11 @@
//
// Assign the FlowEntry IDs
- // TODO: This is an ugly hack!
- // The Flow Entry IDs are set to 1000*FlowId + Index
+ // Right now every new flow entry gets a new flow entry ID
+ // TODO: This needs to be redesigned!
//
- int i = 1;
for (FlowEntry flowEntry : flowPath.dataPath().flowEntries()) {
- long id = flowPath.flowId().value() * 1000 + i;
- ++i;
+ long id = nextFlowEntryId++;
flowEntry.setFlowEntryId(new FlowEntryId(id));
}
@@ -517,6 +545,47 @@
}
/**
+ * Clear the state for a previously added flow.
+ *
+ * @param flowId the Flow ID of the flow to clear.
+ * @return true on success, otherwise false.
+ */
+ @Override
+ public boolean clearFlow(FlowId flowId) {
+ IFlowPath flowObj = null;
+ try {
+ if ((flowObj = conn.utils().searchFlowPath(conn, flowId))
+ != null) {
+ log.debug("Clearing FlowPath with FlowId {}: found existing FlowPath",
+ flowId.toString());
+ } else {
+ log.debug("Clearing FlowPath with FlowId {}: FlowPath not found",
+ flowId.toString());
+ }
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ conn.endTx(Transaction.ROLLBACK);
+ log.error(":clearFlow FlowId:{} failed", flowId.toString());
+ }
+ if (flowObj == null)
+ return true; // OK: No such flow
+
+ //
+ // Remove all Flow Entries
+ //
+ Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
+ for (IFlowEntry flowEntryObj : flowEntries) {
+ flowObj.removeFlowEntry(flowEntryObj);
+ conn.utils().removeFlowEntry(conn, flowEntryObj);
+ }
+ // Remove the Flow itself
+ conn.utils().removeFlowPath(conn, flowObj);
+ conn.endTx(Transaction.COMMIT);
+
+ return true;
+ }
+
+ /**
* Get a previously added flow.
*
* @param flowId the Flow ID of the flow to get.
diff --git a/src/main/java/net/floodlightcontroller/flowcache/IFlowService.java b/src/main/java/net/floodlightcontroller/flowcache/IFlowService.java
index b159661..48477f1 100644
--- a/src/main/java/net/floodlightcontroller/flowcache/IFlowService.java
+++ b/src/main/java/net/floodlightcontroller/flowcache/IFlowService.java
@@ -33,6 +33,14 @@
boolean deleteFlow(FlowId flowId);
/**
+ * Clear the state for a previously added flow.
+ *
+ * @param flowId the Flow ID of the flow to clear.
+ * @return true on success, otherwise false.
+ */
+ boolean clearFlow(FlowId flowId);
+
+ /**
* Get a previously added flow.
*
* @param flowId the Flow ID of the flow to get.
diff --git a/src/main/java/net/floodlightcontroller/flowcache/web/ClearFlowResource.java b/src/main/java/net/floodlightcontroller/flowcache/web/ClearFlowResource.java
new file mode 100644
index 0000000..8fff358
--- /dev/null
+++ b/src/main/java/net/floodlightcontroller/flowcache/web/ClearFlowResource.java
@@ -0,0 +1,37 @@
+package net.floodlightcontroller.flowcache.web;
+
+import net.floodlightcontroller.flowcache.IFlowService;
+import net.floodlightcontroller.util.FlowId;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+import org.restlet.resource.ServerResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClearFlowResource extends ServerResource {
+ protected static Logger log = LoggerFactory.getLogger(ClearFlowResource.class);
+
+ @Get("json")
+ public Boolean retrieve() {
+ Boolean result = false;
+
+ IFlowService flowService =
+ (IFlowService)getContext().getAttributes().
+ get(IFlowService.class.getCanonicalName());
+
+ if (flowService == null) {
+ log.debug("ONOS Flow Service not found");
+ return result;
+ }
+
+ // Extract the arguments
+ String flowIdStr = (String) getRequestAttributes().get("flow-id");
+ FlowId flowId = new FlowId(flowIdStr);
+ log.debug("Clear Flow Id: " + flowIdStr);
+
+ // Process the request
+ result = flowService.clearFlow(flowId);
+ return result;
+ }
+}
diff --git a/src/main/java/net/floodlightcontroller/flowcache/web/FlowWebRoutable.java b/src/main/java/net/floodlightcontroller/flowcache/web/FlowWebRoutable.java
index cfd3505..a40a508 100644
--- a/src/main/java/net/floodlightcontroller/flowcache/web/FlowWebRoutable.java
+++ b/src/main/java/net/floodlightcontroller/flowcache/web/FlowWebRoutable.java
@@ -14,6 +14,7 @@
public Restlet getRestlet(Context context) {
Router router = new Router(context);
router.attach("/add/json", AddFlowResource.class);
+ router.attach("/clear/{flow-id}/json", ClearFlowResource.class);
router.attach("/delete/{flow-id}/json", DeleteFlowResource.class);
router.attach("/get/{flow-id}/json", GetFlowByIdResource.class);
router.attach("/getall-by-installer-id/{installer-id}/{src-dpid}/{src-port}/{dst-dpid}/{dst-port}/json", GetAllFlowsByInstallerIdResource.class);
diff --git a/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java b/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java
index 2a66527..a6931e6 100644
--- a/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java
+++ b/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java
@@ -3,34 +3,110 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.INetMapStorage.DM_OPERATION;
+import net.floodlightcontroller.core.INetMapTopologyObjects.ISwitchObject;
+import net.floodlightcontroller.core.ISwitchStorage.SwitchState;
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.IOFSwitchListener;
import net.floodlightcontroller.core.ISwitchStorage;
import net.floodlightcontroller.core.internal.SwitchStorageImpl;
+import net.floodlightcontroller.core.internal.TopoSwitchServiceImpl;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
+import net.floodlightcontroller.core.util.SingletonTask;
import net.floodlightcontroller.devicemanager.IDevice;
import net.floodlightcontroller.devicemanager.IDeviceListener;
import net.floodlightcontroller.devicemanager.IDeviceService;
import net.floodlightcontroller.devicemanager.IDeviceStorage;
import net.floodlightcontroller.devicemanager.internal.DeviceStorageImpl;
import net.floodlightcontroller.linkdiscovery.ILinkDiscoveryListener;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+import net.onrc.onos.registry.controller.IControllerRegistryService;
+import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
+import net.onrc.onos.registry.controller.RegistryException;
public class OnosPublisher implements IDeviceListener, IOFSwitchListener,
ILinkDiscoveryListener, IFloodlightModule {
protected IDeviceStorage devStore;
+ protected ISwitchStorage swStore;
protected static Logger log;
protected IDeviceService deviceService;
+ protected IControllerRegistryService registryService;
protected static final String DBConfigFile = "dbconf";
+ protected IThreadPoolService threadPool;
+
+ protected final int CLEANUP_TASK_INTERVAL = 999; // 999 ms
+ protected SingletonTask cleanupTask;
+
+ /**
+ * Cleanup and synch switch state from registry
+ */
+ protected class SwitchCleanup implements ControlChangeCallback, Runnable {
+ @Override
+ public void run() {
+ try {
+ log.debug("Running cleanup thread");
+ switchCleanup();
+ }
+ catch (Exception e) {
+ log.error("Error in cleanup thread", e);
+ } finally {
+ cleanupTask.reschedule(CLEANUP_TASK_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public void controlChanged(long dpid, boolean hasControl) {
+ // TODO Auto-generated method stub
+
+ if (hasControl) {
+ log.debug("got control to set inactive sw {}", dpid);
+ swStore.update(HexString.toHexString(dpid),SwitchState.INACTIVE, DM_OPERATION.UPDATE);
+ registryService.releaseControl(dpid);
+ }
+ }
+ }
+
+
+
+ protected void switchCleanup() {
+
+ TopoSwitchServiceImpl impl = new TopoSwitchServiceImpl();
+ Iterable<ISwitchObject> switches = impl.getActiveSwitches();
+ // For each switch check if a controller exists in controller registry
+ for (ISwitchObject sw: switches) {
+ log.debug("checking if switch is inactive: {}", sw.getDPID());
+ try {
+ long dpid = HexString.toLong(sw.getDPID());
+ String controller = registryService.getControllerForSwitch(dpid);
+ if (controller == null) {
+ log.debug("request Control to set inactive sw {}", dpid);
+ registryService.requestControl(dpid, new SwitchCleanup());
+ } else {
+ log.debug("sw {} is controlled by controller: {}",dpid,controller);
+ }
+ } catch (NumberFormatException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (RegistryException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
@Override
public void linkDiscoveryUpdate(LDUpdate update) {
@@ -112,6 +188,7 @@
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IFloodlightProviderService.class);
l.add(IDeviceService.class);
+ l.add(IThreadPoolService.class);
return l;
}
@@ -124,10 +201,15 @@
log = LoggerFactory.getLogger(OnosPublisher.class);
deviceService = context.getServiceImpl(IDeviceService.class);
+ threadPool = context.getServiceImpl(IThreadPoolService.class);
+ registryService = context.getServiceImpl(IControllerRegistryService.class);
devStore = new DeviceStorageImpl();
devStore.init(conf);
+ swStore = new SwitchStorageImpl();
+ swStore.init(conf);
+
log.debug("Initializing OnosPublisher module with {}", conf);
}
@@ -135,7 +217,11 @@
@Override
public void startUp(FloodlightModuleContext context) {
// TODO Auto-generated method stub
- deviceService.addListener(this);
+ ScheduledExecutorService ses = threadPool.getScheduledExecutor();
+ deviceService.addListener(this);
+ // Setup the Cleanup task.
+ cleanupTask = new SingletonTask(ses, new SwitchCleanup());
+ cleanupTask.reschedule(CLEANUP_TASK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
diff --git a/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java b/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java
index 7345084..2c220fd 100644
--- a/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java
@@ -91,7 +91,7 @@
@Override
public String getControllerForSwitch(long dpid) throws RegistryException {
- return controllerId;
+ return (switchCallbacks.get(HexString.toHexString(dpid)) != null)? controllerId: null;
}
@Override
diff --git a/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
new file mode 100644
index 0000000..eb513fd
--- /dev/null
+++ b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
@@ -0,0 +1,25 @@
+package net.onrc.onos.registry.controller;
+
+import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
+
+import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+
+public class SwitchLeadershipData {
+
+ private LeaderLatch latch;
+ private ControlChangeCallback cb;
+
+ public SwitchLeadershipData(LeaderLatch latch, ControlChangeCallback cb) {
+ this.latch = latch;
+ this.cb = cb;
+ }
+
+ public LeaderLatch getLatch(){
+ return latch;
+ }
+
+ public ControlChangeCallback getCallback(){
+ return cb;
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index 025bbfe..e727238 100644
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -8,6 +8,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
@@ -16,8 +17,6 @@
import net.floodlightcontroller.restserver.IRestApiService;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,13 +25,14 @@
import com.netflix.curator.RetryPolicy;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.framework.api.CuratorWatcher;
import com.netflix.curator.framework.recipes.cache.ChildData;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+import com.netflix.curator.framework.recipes.leader.LeaderLatchEvent;
+import com.netflix.curator.framework.recipes.leader.LeaderLatchListener;
import com.netflix.curator.framework.recipes.leader.Participant;
import com.netflix.curator.retry.ExponentialBackoffRetry;
@@ -61,92 +61,40 @@
protected PathChildrenCache controllerCache;
protected PathChildrenCache switchCache;
- protected Map<String, LeaderLatch> switchLatches;
- protected Map<String, ControlChangeCallback> switchCallbacks;
+ protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
protected Map<String, PathChildrenCache> switchPathCaches;
//Zookeeper performance-related configuration
protected static final int sessionTimeout = 2000;
protected static final int connectionTimeout = 4000;
- /**
- * Watches for changes in switch leadership election. The Curator
- * LeaderLatch doesn't notify us when leadership changes so we set a watch
- * on the election znodes to get leadership change events. The process
- * method will be called whenever the switches children change in
- * Zookeeper. We then have to work out whether to send a control-changed
- * event to our clients and reset the watch.
- *
- * TODO I think it's possible to miss events that happen while we're
- * processing the watch and before we've set a new watch. Need to think
- * of a safer way to implement leader change notifications.
- *
- */
- protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
- private String dpid;
- private boolean isLeader = false;
- private String latchPath;
+
+ protected class SwitchLeaderListener implements LeaderLatchListener{
+ String dpid;
+ LeaderLatch latch;
- public ParamaterizedCuratorWatcher(String dpid, String latchPath){
+ public SwitchLeaderListener(String dpid, LeaderLatch latch){
this.dpid = dpid;
- this.latchPath = latchPath;
+ this.latch = latch;
}
@Override
- public synchronized void process(WatchedEvent event) throws Exception {
- log.debug("Watch Event: {}", event);
-
+ public void leaderLatchEvent(CuratorFramework arg0,
+ LeaderLatchEvent arg1) {
+ log.debug("Leadership changed for {}, now {}",
+ dpid, latch.hasLeadership());
- if (event.getState() == KeeperState.Disconnected){
- if (isLeader) {
- log.debug("Disconnected while leader - lost leadership for {}", dpid);
-
- isLeader = false;
- ControlChangeCallback cb = switchCallbacks.get(dpid);
- if (cb != null) {
- //Allow callback to be null if the requester doesn't want a callback
- cb.controlChanged(HexString.toLong(dpid), false);
- }
- }
- return;
- //TODO Watcher is never reset once we reconnect to Zookeeper
+ //Check that the leadership request is still active - the client
+ //may have since released the request or even begun another request
+ //(this is why we use == to check the object instance is the same)
+ SwitchLeadershipData swData = switches.get(dpid);
+ if (swData != null && swData.getLatch() == latch){
+ swData.getCallback().controlChanged(
+ HexString.toLong(dpid), latch.hasLeadership());
}
-
- LeaderLatch latch = switchLatches.get(dpid);
- if (latch == null){
- log.debug("In watcher process, looks like control was released for {}",
- dpid);
- return;
+ else {
+ log.debug("Latch for {} has changed", dpid);
}
-
- try {
-
- Participant leader = latch.getLeader();
-
- if (leader.getId().equals(controllerId) && !isLeader){
- log.debug("Became leader for {}", dpid);
-
- isLeader = true;
- switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), true);
- }
- else if (!leader.getId().equals(controllerId) && isLeader){
- log.debug("Lost leadership for {}", dpid);
-
- isLeader = false;
- switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), false);
- }
- } catch (Exception e){
- if (isLeader){
- log.debug("Exception checking leadership status. Assume leadership lost for {}",
- dpid);
-
- isLeader = false;
- switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), false);
- }
- } finally {
- client.getChildren().usingWatcher(this).inBackground().forPath(latchPath);
- }
- //client.getChildren().usingWatcher(this).forPath(latchPath);
}
}
@@ -207,22 +155,33 @@
String dpidStr = HexString.toHexString(dpid);
String latchPath = switchLatchesPath + "/" + dpidStr;
- if (switchLatches.get(dpidStr) != null){
- //throw new RuntimeException("Leader election for switch " + dpidStr +
- // "is already running");
+ if (switches.get(dpidStr) != null){
log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
return;
}
LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
- switchLatches.put(dpidStr, latch);
- switchCallbacks.put(dpidStr, cb);
+ latch.addListener(new SwitchLeaderListener(dpidStr, latch));
+
+ SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb);
+ SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
+
+ if (oldData != null){
+ //There was already data for that key in the map
+ //i.e. someone else got here first so we can't succeed
+ log.debug("Already requested control for {}", dpidStr);
+ throw new RegistryException("Already requested control for " + dpidStr);
+ }
+
+ //Now that we know we were able to add our latch to the collection,
+ //we can start the leader election in Zookeeper. However I don't know
+ //how to handle if the start fails - the latch is already in our
+ //switches list.
+ //TODO seems like there's a Curator bug when latch.start is called when
+ //there's no Zookeeper connection which causes two znodes to be put in
+ //Zookeeper at the latch path when we reconnect to Zookeeper.
try {
- //client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
- client.getChildren().usingWatcher(
- new ParamaterizedCuratorWatcher(dpidStr, latchPath))
- .inBackground().forPath(latchPath);
latch.start();
} catch (Exception e) {
log.warn("Error starting leader latch: {}", e.getMessage());
@@ -237,39 +196,35 @@
String dpidStr = HexString.toHexString(dpid);
- LeaderLatch latch = switchLatches.get(dpidStr);
- if (latch == null) {
+ SwitchLeadershipData swData = switches.remove(dpidStr);
+
+ if (swData == null) {
log.debug("Trying to release control of a switch we are not contesting");
return;
}
+
+ LeaderLatch latch = swData.getLatch();
try {
latch.close();
} catch (IOException e) {
//I think it's OK not to do anything here. Either the node got
//deleted correctly, or the connection went down and the node got deleted.
- } finally {
- switchLatches.remove(dpidStr);
- switchCallbacks.remove(dpidStr);
}
}
@Override
public boolean hasControl(long dpid) {
+ String dpidStr = HexString.toHexString(dpid);
- LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+ SwitchLeadershipData swData = switches.get(dpidStr);
- if (latch == null) {
- log.warn("No leader latch for dpid {}", HexString.toHexString(dpid));
+ if (swData == null) {
+ log.warn("No leader latch for dpid {}", dpidStr);
return false;
}
- try {
- return latch.getLeader().getId().equals(controllerId);
- } catch (Exception e) {
- //TODO swallow exception?
- return false;
- }
+ return swData.getLatch().hasLeadership();
}
@Override
@@ -322,12 +277,11 @@
@Override
public String getControllerForSwitch(long dpid) throws RegistryException {
- // TODO Work out how we should store this controller/switch data.
- // The leader latch might be a index to the /controllers collections
- // which holds more info on the controller (how to talk to it for example).
+ // TODO work out synchronization
- String strDpid = HexString.toHexString(dpid);
- LeaderLatch latch = switchLatches.get(strDpid);
+ String dpidStr = HexString.toHexString(dpid);
+
+ LeaderLatch latch = switches.get(dpidStr).getLatch();
if (latch == null){
log.warn("Tried to get controller for non-existent switch");
@@ -351,6 +305,9 @@
}
+ //TODO what should happen when there's no ZK connection? Currently we just return
+ //the cache but this may lead to false impressions - i.e. we don't actually know
+ //what's in ZK so we shouldn't say we do
@Override
public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
Map<String, List<ControllerRegistryEntry>> data =
@@ -409,6 +366,8 @@
return l;
}
+ //TODO currently blocks startup when it can't get a Zookeeper connection.
+ //Do we support starting up with no Zookeeper connection?
@Override
public void init (FloodlightModuleContext context) throws FloodlightModuleException {
log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
@@ -423,8 +382,7 @@
restApi = context.getServiceImpl(IRestApiService.class);
- switchLatches = new HashMap<String, LeaderLatch>();
- switchCallbacks = new HashMap<String, ControlChangeCallback>();
+ switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
switchPathCaches = new HashMap<String, PathChildrenCache>();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
@@ -432,7 +390,6 @@
sessionTimeout, connectionTimeout, retryPolicy);
client.start();
-
client = client.usingNamespace(namespace);
diff --git a/src/main/resources/floodlightdefault.properties b/src/main/resources/floodlightdefault.properties
index 9ea7a92..498fce5 100644
--- a/src/main/resources/floodlightdefault.properties
+++ b/src/main/resources/floodlightdefault.properties
@@ -10,7 +10,7 @@
net.floodlightcontroller.perfmon.PktInProcessingTime,\
net.floodlightcontroller.ui.web.StaticWebRoutable,\
net.floodlightcontroller.onoslistener.OnosPublisher, \
-net.onrc.onos.registry.controller.StandaloneRegistry
+net.onrc.onos.registry.controller.ZookeeperRegistry
net.floodlightcontroller.restserver.RestApiServer.port = 8080
net.floodlightcontroller.core.FloodlightProvider.openflowport = 6633
net.floodlightcontroller.jython.JythonDebugInterface.port = 6655
diff --git a/testbed-ctrl-add-ext.sh b/testbed-ctrl-add-ext.sh
new file mode 100755
index 0000000..110eeff
--- /dev/null
+++ b/testbed-ctrl-add-ext.sh
@@ -0,0 +1,18 @@
+#! /bin/bash
+controller="10.128.4.12 10.128.4.13 10.128.4.14 10.128.4.15 10.128.4.16"
+switches=`ifconfig -a | grep "^s" |grep -v eth | awk '{print $1}'`
+
+function host2ip (){
+ ip=`grep $1 /etc/hosts |grep -v "ip6"| awk '{print $1}'`
+ echo $ip
+}
+
+url=""
+for c in $controller; do
+ url="$url tcp:$c:6633"
+done
+echo $url
+for s in $switches; do
+ echo "set switch $s controller $url"
+ sudo ovs-vsctl set-controller $s $url
+done
diff --git a/testbed-ctrl-none.sh b/testbed-ctrl-none.sh
new file mode 100755
index 0000000..e09835b
--- /dev/null
+++ b/testbed-ctrl-none.sh
@@ -0,0 +1,18 @@
+#! /bin/bash
+controller=""
+switches=`ifconfig -a | grep "^s" |grep -v eth | awk '{print $1}'`
+
+function host2ip (){
+ ip=`grep $1 /etc/hosts |grep -v "ip6"| awk '{print $1}'`
+ echo $ip
+}
+
+url=""
+for c in $controller; do
+ url="$url tcp:`host2ip $c`:6633"
+done
+echo $url
+for s in $switches; do
+ echo "set switch $s controller $url"
+ sudo ovs-vsctl set-controller $s $url
+done
diff --git a/web/clear_flow.py b/web/clear_flow.py
new file mode 100755
index 0000000..2646498
--- /dev/null
+++ b/web/clear_flow.py
@@ -0,0 +1,64 @@
+#! /usr/bin/env python
+# -*- Mode: python; py-indent-offset: 4; tab-width: 8; indent-tabs-mode: t; -*-
+
+import pprint
+import os
+import sys
+import subprocess
+import json
+import argparse
+import io
+import time
+
+from flask import Flask, json, Response, render_template, make_response, request
+
+#
+# TODO: remove this! We don't use JSON argument here!
+# curl http://127.0.0.1:8080/wm/flow/clear/{"value":"0xf"}/json'
+#
+
+## Global Var ##
+ControllerIP="127.0.0.1"
+ControllerPort=8080
+
+DEBUG=0
+pp = pprint.PrettyPrinter(indent=4)
+
+app = Flask(__name__)
+
+## Worker Functions ##
+def log_error(txt):
+ print '%s' % (txt)
+
+def debug(txt):
+ if DEBUG:
+ print '%s' % (txt)
+
+# @app.route("/wm/flow/clear/<flow-id>/json")
+def clear_flow_path(flow_id):
+ command = "curl -s \"http://%s:%s/wm/flow/clear/%s/json\"" % (ControllerIP, ControllerPort, flow_id)
+ debug("clear_flow_path %s" % command)
+ result = os.popen(command).read()
+ debug("result %s" % result)
+ # parsedResult = json.loads(result)
+ # debug("parsed %s" % parsedResult)
+
+if __name__ == "__main__":
+ usage_msg = "Clear flow state from the ONOS Network Map\n"
+ usage_msg = usage_msg + "Usage: %s <flow_id>\n" % (sys.argv[0])
+
+ # app.debug = False;
+
+ # Usage info
+ if len(sys.argv) > 1 and (sys.argv[1] == "-h" or sys.argv[1] == "--help"):
+ print(usage_msg)
+ exit(0)
+
+ # Check arguments
+ if len(sys.argv) < 2:
+ log_error(usage_msg)
+ exit(1)
+
+ # Do the work
+ flow_id_arg = int(sys.argv[1], 0)
+ clear_flow_path(flow_id_arg);
diff --git a/web/delete_flow.py b/web/delete_flow.py
index 412d02f..f8e037f 100755
--- a/web/delete_flow.py
+++ b/web/delete_flow.py
@@ -44,7 +44,8 @@
# debug("parsed %s" % parsedResult)
if __name__ == "__main__":
- usage_msg = "Usage: %s <flow_id>" % (sys.argv[0])
+ usage_msg = "Delete flow state from the ONOS Network Map and the switches\n"
+ usage_msg = usage_msg + "Usage: %s <flow_id>\n" % (sys.argv[0])
# app.debug = False;
diff --git a/web/shortest_path.py b/web/shortest_path.py
index 8a77d61..751489f 100755
--- a/web/shortest_path.py
+++ b/web/shortest_path.py
@@ -67,7 +67,7 @@
inPort = f['inPort']['value'];
outPort = f['outPort']['value'];
dpid = f['dpid']['value']
- print "FlowEntry: (%s, %s, %s)" % (inPort, dpid, outPort)
+ print " FlowEntry: (%s, %s, %s)" % (inPort, dpid, outPort)
if __name__ == "__main__":