Merge branch 'master' of github.com:OPENNETWORKINGLAB/ONOS
diff --git a/build.xml b/build.xml
index e8055e7..1c473ce 100644
--- a/build.xml
+++ b/build.xml
@@ -66,9 +66,9 @@
         <include name="concurrentlinkedhashmap-lru-1.3.jar"/>
         <include name="jython-2.5.2.jar"/>
         <include name="libthrift-0.7.0.jar"/>
-	<include name="curator-client-1.3.3.jar"/>
-	<include name="curator-framework-1.3.3.jar"/>
-	<include name="curator-recipes-1.3.3.jar"/>
+	<include name="curator-client-1.3.4-SNAPSHOT.jar"/>
+	<include name="curator-framework-1.3.4-SNAPSHOT.jar"/>
+	<include name="curator-recipes-1.3.4-SNAPSHOT.jar"/>
 	<include name="zookeeper-3.4.5.jar"/>
     </patternset>
 
@@ -127,6 +127,7 @@
            debug="true" 
            srcdir="${source}:${thrift.out.dir}"
            destdir="${build}">
+	  <compilerarg value="-Xlint:unchecked"/>
         </javac>
     </target>
 
diff --git a/src/main/java/net/floodlightcontroller/core/internal/SwitchStorageImpl.java b/src/main/java/net/floodlightcontroller/core/internal/SwitchStorageImpl.java
index 6be26cb..7e049b6 100644
--- a/src/main/java/net/floodlightcontroller/core/internal/SwitchStorageImpl.java
+++ b/src/main/java/net/floodlightcontroller/core/internal/SwitchStorageImpl.java
@@ -16,6 +16,7 @@
 import com.thinkaurelius.titan.core.TitanFactory;
 import com.thinkaurelius.titan.core.TitanGraph;
 import com.tinkerpop.blueprints.Direction;
+import com.tinkerpop.blueprints.TransactionalGraph;
 import com.tinkerpop.blueprints.TransactionalGraph.Conclusion;
 import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Vertex;
@@ -244,9 +245,6 @@
 		FramedGraph<TitanGraph> fg = new FramedGraph<TitanGraph>(graph);
 		Iterable<ISwitchObject> switches =  fg.getVertices("type","switch",ISwitchObject.class);
 
-		for (ISwitchObject sw: switches) {
-			log.debug("switch: {}", sw.getDPID());
-		}
 
 		return switches;
 	}
diff --git a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
index a604969..3f74ff1 100644
--- a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
+++ b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
@@ -7,6 +7,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -73,6 +74,8 @@
     public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0;	// infinite
     public static final short PRIORITY_DEFAULT = 100;
 
+    private static long nextFlowEntryId = 1;
+
     /** The logger. */
     private static Logger log = LoggerFactory.getLogger(FlowManager.class);
 
@@ -89,9 +92,13 @@
 
 		Map<Long, IOFSwitch> mySwitches = floodlightProvider.getSwitches();
 
-		// Fetch all Flow Entries
-		Iterable<IFlowEntry> flowEntries = conn.utils().getAllFlowEntries(conn);
-		for (IFlowEntry flowEntryObj : flowEntries) {
+		Map<Long, IFlowEntry> myFlowEntries = new TreeMap<Long, IFlowEntry>();
+
+		//
+		// Fetch all Flow Entries and select only my Flow Entries
+		//
+		Iterable<IFlowEntry> allFlowEntries = conn.utils().getAllFlowEntries(conn);
+		for (IFlowEntry flowEntryObj : allFlowEntries) {
 		    FlowEntryId flowEntryId =
 			new FlowEntryId(flowEntryObj.getFlowEntryId());
 		    String userState = flowEntryObj.getUserState();
@@ -112,6 +119,29 @@
 			log.debug("Flow Entry ignored: not my switch");
 			continue;
 		    }
+		    myFlowEntries.put(flowEntryId.value(), flowEntryObj);
+		}
+
+		//
+		// Process my Flow Entries
+		//
+		for (Map.Entry<Long, IFlowEntry> entry : myFlowEntries.entrySet()) {
+		    IFlowEntry flowEntryObj = entry.getValue();
+
+		    //
+		    // TODO: Eliminate the re-fetching of flowEntryId,
+		    // userState, switchState, and dpid from the flowEntryObj.
+		    //
+		    FlowEntryId flowEntryId =
+			new FlowEntryId(flowEntryObj.getFlowEntryId());
+		    Dpid dpid = new Dpid(flowEntryObj.getSwitchDpid());
+		    String userState = flowEntryObj.getUserState();
+		    String switchState = flowEntryObj.getSwitchState();
+		    IOFSwitch mySwitch = mySwitches.get(dpid.value());
+		    if (mySwitch == null) {
+			log.debug("Flow Entry ignored: not my switch");
+			continue;
+		    }
 
 		    //
 		    // Create the Open Flow Flow Modification Entry to push
@@ -316,13 +346,11 @@
 
 	//
 	// Assign the FlowEntry IDs
-	// TODO: This is an ugly hack!
-	// The Flow Entry IDs are set to 1000*FlowId + Index
+	// Right now every new flow entry gets a new flow entry ID
+	// TODO: This needs to be redesigned!
 	//
-	int i = 1;
 	for (FlowEntry flowEntry : flowPath.dataPath().flowEntries()) {
-	    long id = flowPath.flowId().value() * 1000 + i;
-	    ++i;
+	    long id = nextFlowEntryId++;
 	    flowEntry.setFlowEntryId(new FlowEntryId(id));
 	}
 
@@ -517,6 +545,47 @@
     }
 
     /**
+     * Clear the state for a previously added flow.
+     *
+     * @param flowId the Flow ID of the flow to clear.
+     * @return true on success, otherwise false.
+     */
+    @Override
+    public boolean clearFlow(FlowId flowId) {
+	IFlowPath flowObj = null;
+	try {
+	    if ((flowObj = conn.utils().searchFlowPath(conn, flowId))
+		!= null) {
+		log.debug("Clearing FlowPath with FlowId {}: found existing FlowPath",
+			  flowId.toString());
+	    } else {
+		log.debug("Clearing FlowPath with FlowId {}:  FlowPath not found",
+			  flowId.toString());
+	    }
+	} catch (Exception e) {
+	    // TODO: handle exceptions
+	    conn.endTx(Transaction.ROLLBACK);
+	    log.error(":clearFlow FlowId:{} failed", flowId.toString());
+	}
+	if (flowObj == null)
+	    return true;		// OK: No such flow
+
+	//
+	// Remove all Flow Entries
+	//
+	Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
+	for (IFlowEntry flowEntryObj : flowEntries) {
+	    flowObj.removeFlowEntry(flowEntryObj);
+	    conn.utils().removeFlowEntry(conn, flowEntryObj);
+	}
+	// Remove the Flow itself
+	conn.utils().removeFlowPath(conn, flowObj);
+	conn.endTx(Transaction.COMMIT);
+
+	return true;
+    }
+
+    /**
      * Get a previously added flow.
      *
      * @param flowId the Flow ID of the flow to get.
diff --git a/src/main/java/net/floodlightcontroller/flowcache/IFlowService.java b/src/main/java/net/floodlightcontroller/flowcache/IFlowService.java
index b159661..48477f1 100644
--- a/src/main/java/net/floodlightcontroller/flowcache/IFlowService.java
+++ b/src/main/java/net/floodlightcontroller/flowcache/IFlowService.java
@@ -33,6 +33,14 @@
     boolean deleteFlow(FlowId flowId);
 
     /**
+     * Clear the state for a previously added flow.
+     *
+     * @param flowId the Flow ID of the flow to clear.
+     * @return true on success, otherwise false.
+     */
+    boolean clearFlow(FlowId flowId);
+
+    /**
      * Get a previously added flow.
      *
      * @param flowId the Flow ID of the flow to get.
diff --git a/src/main/java/net/floodlightcontroller/flowcache/web/ClearFlowResource.java b/src/main/java/net/floodlightcontroller/flowcache/web/ClearFlowResource.java
new file mode 100644
index 0000000..8fff358
--- /dev/null
+++ b/src/main/java/net/floodlightcontroller/flowcache/web/ClearFlowResource.java
@@ -0,0 +1,37 @@
+package net.floodlightcontroller.flowcache.web;
+
+import net.floodlightcontroller.flowcache.IFlowService;
+import net.floodlightcontroller.util.FlowId;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+import org.restlet.resource.ServerResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClearFlowResource extends ServerResource {
+    protected static Logger log = LoggerFactory.getLogger(ClearFlowResource.class);
+
+    @Get("json")
+    public Boolean retrieve() {
+	Boolean result = false;
+
+        IFlowService flowService =
+                (IFlowService)getContext().getAttributes().
+                get(IFlowService.class.getCanonicalName());
+
+        if (flowService == null) {
+	    log.debug("ONOS Flow Service not found");
+            return result;
+	}
+
+	// Extract the arguments
+	String flowIdStr = (String) getRequestAttributes().get("flow-id");
+	FlowId flowId = new FlowId(flowIdStr);
+	log.debug("Clear Flow Id: " + flowIdStr);
+
+	// Process the request
+	result = flowService.clearFlow(flowId);
+	return result;
+    }
+}
diff --git a/src/main/java/net/floodlightcontroller/flowcache/web/FlowWebRoutable.java b/src/main/java/net/floodlightcontroller/flowcache/web/FlowWebRoutable.java
index cfd3505..a40a508 100644
--- a/src/main/java/net/floodlightcontroller/flowcache/web/FlowWebRoutable.java
+++ b/src/main/java/net/floodlightcontroller/flowcache/web/FlowWebRoutable.java
@@ -14,6 +14,7 @@
     public Restlet getRestlet(Context context) {
         Router router = new Router(context);
         router.attach("/add/json", AddFlowResource.class);
+        router.attach("/clear/{flow-id}/json", ClearFlowResource.class);
         router.attach("/delete/{flow-id}/json", DeleteFlowResource.class);
         router.attach("/get/{flow-id}/json", GetFlowByIdResource.class);
         router.attach("/getall-by-installer-id/{installer-id}/{src-dpid}/{src-port}/{dst-dpid}/{dst-port}/json", GetAllFlowsByInstallerIdResource.class);
diff --git a/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java b/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java
index 2a66527..a6931e6 100644
--- a/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java
+++ b/src/main/java/net/floodlightcontroller/onoslistener/OnosPublisher.java
@@ -3,34 +3,110 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
+import org.openflow.util.HexString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.INetMapStorage.DM_OPERATION;
+import net.floodlightcontroller.core.INetMapTopologyObjects.ISwitchObject;
+import net.floodlightcontroller.core.ISwitchStorage.SwitchState;
 import net.floodlightcontroller.core.IOFSwitch;
 import net.floodlightcontroller.core.IOFSwitchListener;
 import net.floodlightcontroller.core.ISwitchStorage;
 import net.floodlightcontroller.core.internal.SwitchStorageImpl;
+import net.floodlightcontroller.core.internal.TopoSwitchServiceImpl;
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.core.module.FloodlightModuleException;
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
+import net.floodlightcontroller.core.util.SingletonTask;
 import net.floodlightcontroller.devicemanager.IDevice;
 import net.floodlightcontroller.devicemanager.IDeviceListener;
 import net.floodlightcontroller.devicemanager.IDeviceService;
 import net.floodlightcontroller.devicemanager.IDeviceStorage;
 import net.floodlightcontroller.devicemanager.internal.DeviceStorageImpl;
 import net.floodlightcontroller.linkdiscovery.ILinkDiscoveryListener;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+import net.onrc.onos.registry.controller.IControllerRegistryService;
+import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
+import net.onrc.onos.registry.controller.RegistryException;
 
 public class OnosPublisher implements IDeviceListener, IOFSwitchListener,
 		ILinkDiscoveryListener, IFloodlightModule {
 	
 	protected IDeviceStorage devStore;
+	protected ISwitchStorage swStore;
 	protected static Logger log;
 	protected IDeviceService deviceService;
+	protected IControllerRegistryService registryService;
 	
 	protected static final String DBConfigFile = "dbconf";
+	protected IThreadPoolService threadPool;
+	
+	protected final int CLEANUP_TASK_INTERVAL = 999; // 999 ms
+	protected SingletonTask cleanupTask;
+	
+	/**
+     *  Cleanup and synch switch state from registry
+     */
+    protected class SwitchCleanup implements ControlChangeCallback, Runnable {
+        @Override
+        public void run() {
+            try {
+            	log.debug("Running cleanup thread");
+                switchCleanup();
+            }
+            catch (Exception e) {
+                log.error("Error in cleanup thread", e);
+            } finally {
+                    cleanupTask.reschedule(CLEANUP_TASK_INTERVAL,
+                                              TimeUnit.MILLISECONDS);
+            }
+        }
+
+		@Override
+		public void controlChanged(long dpid, boolean hasControl) {
+			// TODO Auto-generated method stub
+			
+			if (hasControl) {
+				log.debug("got control to set inactive sw {}", dpid);
+				swStore.update(HexString.toHexString(dpid),SwitchState.INACTIVE, DM_OPERATION.UPDATE);
+			    registryService.releaseControl(dpid);	
+			}						
+		}
+    }
+    
+
+    
+    protected void switchCleanup() {
+    	
+    	TopoSwitchServiceImpl impl = new TopoSwitchServiceImpl();
+    	Iterable<ISwitchObject> switches = impl.getActiveSwitches();
+    	// For each switch check if a controller exists in controller registry
+    	for (ISwitchObject sw: switches) {
+			log.debug("checking if switch is inactive: {}", sw.getDPID());
+			try {
+				long dpid = HexString.toLong(sw.getDPID());
+				String controller = registryService.getControllerForSwitch(dpid);
+				if (controller == null) {
+					log.debug("request Control to set inactive sw {}", dpid);
+					registryService.requestControl(dpid, new SwitchCleanup());
+				} else {
+					log.debug("sw {} is controlled by controller: {}",dpid,controller);
+				}
+			} catch (NumberFormatException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} catch (RegistryException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}			
+		}
+    }
 
 	@Override
 	public void linkDiscoveryUpdate(LDUpdate update) {
@@ -112,6 +188,7 @@
 	            new ArrayList<Class<? extends IFloodlightService>>();
 	        l.add(IFloodlightProviderService.class);
 	        l.add(IDeviceService.class);
+	        l.add(IThreadPoolService.class);
 	        return l;
 	}
 
@@ -124,10 +201,15 @@
 		
 		log = LoggerFactory.getLogger(OnosPublisher.class);
 		deviceService = context.getServiceImpl(IDeviceService.class);
+		threadPool = context.getServiceImpl(IThreadPoolService.class);
+		registryService = context.getServiceImpl(IControllerRegistryService.class);
 		
 		devStore = new DeviceStorageImpl();
 		devStore.init(conf);
 		
+		swStore = new SwitchStorageImpl();
+		swStore.init(conf);
+				
 		log.debug("Initializing OnosPublisher module with {}", conf);
 		
 	}
@@ -135,7 +217,11 @@
 	@Override
 	public void startUp(FloodlightModuleContext context) {
 		// TODO Auto-generated method stub
-		deviceService.addListener(this);		
+		ScheduledExecutorService ses = threadPool.getScheduledExecutor();
+		deviceService.addListener(this);
+	       // Setup the Cleanup task. 
+        cleanupTask = new SingletonTask(ses, new SwitchCleanup());
+        cleanupTask.reschedule(CLEANUP_TASK_INTERVAL, TimeUnit.MILLISECONDS);
 	}
 
 }
diff --git a/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java b/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java
index 7345084..2c220fd 100644
--- a/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java
@@ -91,7 +91,7 @@
 
 	@Override
 	public String getControllerForSwitch(long dpid) throws RegistryException {
-		return controllerId;
+		return (switchCallbacks.get(HexString.toHexString(dpid)) != null)? controllerId: null;
 	}
 
 	@Override
diff --git a/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
new file mode 100644
index 0000000..eb513fd
--- /dev/null
+++ b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
@@ -0,0 +1,25 @@
+package net.onrc.onos.registry.controller;
+
+import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
+
+import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+
+public class SwitchLeadershipData {
+	
+	private LeaderLatch latch;
+	private ControlChangeCallback cb;
+
+	public SwitchLeadershipData(LeaderLatch latch, ControlChangeCallback cb) {
+		this.latch = latch;
+		this.cb = cb;
+	}
+	
+	public LeaderLatch getLatch(){
+		return latch;
+	}
+	
+	public ControlChangeCallback getCallback(){
+		return cb;
+	}
+
+}
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index 025bbfe..e727238 100644
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -8,6 +8,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.core.module.FloodlightModuleException;
@@ -16,8 +17,6 @@
 import net.floodlightcontroller.restserver.IRestApiService;
 
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.openflow.util.HexString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,13 +25,14 @@
 import com.netflix.curator.RetryPolicy;
 import com.netflix.curator.framework.CuratorFramework;
 import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.framework.api.CuratorWatcher;
 import com.netflix.curator.framework.recipes.cache.ChildData;
 import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
 import com.netflix.curator.framework.recipes.cache.PathChildrenCache.StartMode;
 import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
 import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+import com.netflix.curator.framework.recipes.leader.LeaderLatchEvent;
+import com.netflix.curator.framework.recipes.leader.LeaderLatchListener;
 import com.netflix.curator.framework.recipes.leader.Participant;
 import com.netflix.curator.retry.ExponentialBackoffRetry;
 
@@ -61,92 +61,40 @@
 	protected PathChildrenCache controllerCache;
 	protected PathChildrenCache switchCache;
 
-	protected Map<String, LeaderLatch> switchLatches;
-	protected Map<String, ControlChangeCallback> switchCallbacks;
+	protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
 	protected Map<String, PathChildrenCache> switchPathCaches;
 	
 	//Zookeeper performance-related configuration
 	protected static final int sessionTimeout = 2000;
 	protected static final int connectionTimeout = 4000;
 	
-	/**
-	 * Watches for changes in switch leadership election. The Curator
-	 * LeaderLatch doesn't notify us when leadership changes so we set a watch
-	 * on the election znodes to get leadership change events. The process
-	 * method will be called whenever the switches children change in 
-	 * Zookeeper. We then have to work out whether to send a control-changed
-	 * event to our clients and reset the watch.
-	 * 
-	 * TODO I think it's possible to miss events that happen while we're 
-	 * processing the watch and before we've set a new watch. Need to think
-	 * of a safer way to implement leader change notifications.
-	 *
-	 */
-	protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
-		private String dpid;
-		private boolean isLeader = false;
-		private String latchPath;
+
+	protected class SwitchLeaderListener implements LeaderLatchListener{
+		String dpid;
+		LeaderLatch latch;
 		
-		public ParamaterizedCuratorWatcher(String dpid, String latchPath){
+		public SwitchLeaderListener(String dpid, LeaderLatch latch){
 			this.dpid = dpid;
-			this.latchPath = latchPath;
+			this.latch = latch;
 		}
 		
 		@Override
-		public synchronized void process(WatchedEvent event) throws Exception {
-			log.debug("Watch Event: {}", event);
-
+		public void leaderLatchEvent(CuratorFramework arg0,
+				LeaderLatchEvent arg1) {
+			log.debug("Leadership changed for {}, now {}",
+					dpid, latch.hasLeadership());
 			
-			if (event.getState() == KeeperState.Disconnected){
-				if (isLeader) {
-					log.debug("Disconnected while leader - lost leadership for {}", dpid);
-					
-					isLeader = false;
-					ControlChangeCallback cb = switchCallbacks.get(dpid);
-					if (cb != null) {
-						//Allow callback to be null if the requester doesn't want a callback
-						cb.controlChanged(HexString.toLong(dpid), false);
-					}
-				}
-				return;
-				//TODO Watcher is never reset once we reconnect to Zookeeper
+			//Check that the leadership request is still active - the client
+			//may have since released the request or even begun another request
+			//(this is why we use == to check the object instance is the same)
+			SwitchLeadershipData swData = switches.get(dpid);
+			if (swData != null && swData.getLatch() == latch){
+				swData.getCallback().controlChanged(
+						HexString.toLong(dpid), latch.hasLeadership());
 			}
-			
-			LeaderLatch latch = switchLatches.get(dpid);
-			if (latch == null){
-				log.debug("In watcher process, looks like control was released for {}",
-						dpid);
-				return;
+			else {
+				log.debug("Latch for {} has changed", dpid);
 			}
-			
-			try {
-				
-				Participant leader = latch.getLeader();
-
-				if (leader.getId().equals(controllerId) && !isLeader){
-					log.debug("Became leader for {}", dpid);
-					
-					isLeader = true;
-					switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), true);
-				}
-				else if (!leader.getId().equals(controllerId) && isLeader){
-					log.debug("Lost leadership for {}", dpid);
-					
-					isLeader = false;
-					switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), false);
-				}
-			} catch (Exception e){
-				if (isLeader){
-					log.debug("Exception checking leadership status. Assume leadership lost for {}",
-							dpid);
-					
-					isLeader = false;
-					switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), false);
-				}
-			} finally {
-				client.getChildren().usingWatcher(this).inBackground().forPath(latchPath);
-			}
-			//client.getChildren().usingWatcher(this).forPath(latchPath);
 		}
 	}
 	
@@ -207,22 +155,33 @@
 		String dpidStr = HexString.toHexString(dpid);
 		String latchPath = switchLatchesPath + "/" + dpidStr;
 		
-		if (switchLatches.get(dpidStr) != null){
-			//throw new RuntimeException("Leader election for switch " + dpidStr +
-			//		"is already running");
+		if (switches.get(dpidStr) != null){
 			log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
 			return;
 		}
 		
 		LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
-		switchLatches.put(dpidStr, latch);
-		switchCallbacks.put(dpidStr, cb);
+		latch.addListener(new SwitchLeaderListener(dpidStr, latch));
 		
+
+		SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb);
+		SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
+		
+		if (oldData != null){
+			//There was already data for that key in the map
+			//i.e. someone else got here first so we can't succeed
+			log.debug("Already requested control for {}", dpidStr);
+			throw new RegistryException("Already requested control for " + dpidStr);
+		}
+		
+		//Now that we know we were able to add our latch to the collection,
+		//we can start the leader election in Zookeeper. However I don't know
+		//how to handle if the start fails - the latch is already in our
+		//switches list.
+		//TODO seems like there's a Curator bug when latch.start is called when
+		//there's no Zookeeper connection which causes two znodes to be put in 
+		//Zookeeper at the latch path when we reconnect to Zookeeper.
 		try {
-			//client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
-			client.getChildren().usingWatcher(
-					new ParamaterizedCuratorWatcher(dpidStr, latchPath))
-					.inBackground().forPath(latchPath);
 			latch.start();
 		} catch (Exception e) {
 			log.warn("Error starting leader latch: {}", e.getMessage());
@@ -237,39 +196,35 @@
 		
 		String dpidStr = HexString.toHexString(dpid);
 		
-		LeaderLatch latch = switchLatches.get(dpidStr);
-		if (latch == null) {
+		SwitchLeadershipData swData = switches.remove(dpidStr);
+		
+		if (swData == null) {
 			log.debug("Trying to release control of a switch we are not contesting");
 			return;
 		}
+
+		LeaderLatch latch = swData.getLatch();
 		
 		try {
 			latch.close();
 		} catch (IOException e) {
 			//I think it's OK not to do anything here. Either the node got 
 			//deleted correctly, or the connection went down and the node got deleted.
-		} finally {
-			switchLatches.remove(dpidStr);
-			switchCallbacks.remove(dpidStr);
 		}
 	}
 
 	@Override
 	public boolean hasControl(long dpid) {
+		String dpidStr = HexString.toHexString(dpid);
 		
-		LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+		SwitchLeadershipData swData = switches.get(dpidStr);
 		
-		if (latch == null) {
-			log.warn("No leader latch for dpid {}", HexString.toHexString(dpid));
+		if (swData == null) {
+			log.warn("No leader latch for dpid {}", dpidStr);
 			return false;
 		}
 		
-		try {
-			return latch.getLeader().getId().equals(controllerId);
-		} catch (Exception e) {
-			//TODO swallow exception?
-			return false;
-		}
+		return swData.getLatch().hasLeadership();
 	}
 
 	@Override
@@ -322,12 +277,11 @@
 	
 	@Override
 	public String getControllerForSwitch(long dpid) throws RegistryException {
-		// TODO Work out how we should store this controller/switch data.
-		// The leader latch might be a index to the /controllers collections
-		// which holds more info on the controller (how to talk to it for example).
+		// TODO work out synchronization
 		
-		String strDpid = HexString.toHexString(dpid);
-		LeaderLatch latch = switchLatches.get(strDpid);
+		String dpidStr = HexString.toHexString(dpid);
+
+		LeaderLatch latch = switches.get(dpidStr).getLatch();
 		
 		if (latch == null){
 			log.warn("Tried to get controller for non-existent switch");
@@ -351,6 +305,9 @@
 	}
 	
 
+	//TODO what should happen when there's no ZK connection? Currently we just return
+	//the cache but this may lead to false impressions - i.e. we don't actually know
+	//what's in ZK so we shouldn't say we do
 	@Override
 	public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
 		Map<String, List<ControllerRegistryEntry>> data = 
@@ -409,6 +366,8 @@
 		return l;
 	}
 	
+	//TODO currently blocks startup when it can't get a Zookeeper connection.
+	//Do we support starting up with no Zookeeper connection?
 	@Override
 	public void init (FloodlightModuleContext context) throws FloodlightModuleException {
 		log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
@@ -423,8 +382,7 @@
 		
 		restApi = context.getServiceImpl(IRestApiService.class);
 
-		switchLatches = new HashMap<String, LeaderLatch>();
-		switchCallbacks = new HashMap<String, ControlChangeCallback>();
+		switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
 		switchPathCaches = new HashMap<String, PathChildrenCache>();
 		
 		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
@@ -432,7 +390,6 @@
 				sessionTimeout, connectionTimeout, retryPolicy);
 		
 		client.start();
-		
 		client = client.usingNamespace(namespace);
 
 		
diff --git a/src/main/resources/floodlightdefault.properties b/src/main/resources/floodlightdefault.properties
index 9ea7a92..498fce5 100644
--- a/src/main/resources/floodlightdefault.properties
+++ b/src/main/resources/floodlightdefault.properties
@@ -10,7 +10,7 @@
 net.floodlightcontroller.perfmon.PktInProcessingTime,\
 net.floodlightcontroller.ui.web.StaticWebRoutable,\
 net.floodlightcontroller.onoslistener.OnosPublisher, \
-net.onrc.onos.registry.controller.StandaloneRegistry
+net.onrc.onos.registry.controller.ZookeeperRegistry
 net.floodlightcontroller.restserver.RestApiServer.port = 8080
 net.floodlightcontroller.core.FloodlightProvider.openflowport = 6633
 net.floodlightcontroller.jython.JythonDebugInterface.port = 6655
diff --git a/testbed-ctrl-add-ext.sh b/testbed-ctrl-add-ext.sh
new file mode 100755
index 0000000..110eeff
--- /dev/null
+++ b/testbed-ctrl-add-ext.sh
@@ -0,0 +1,18 @@
+#! /bin/bash
+controller="10.128.4.12 10.128.4.13 10.128.4.14 10.128.4.15 10.128.4.16"
+switches=`ifconfig -a | grep "^s" |grep -v eth | awk '{print $1}'`
+
+function host2ip (){
+   ip=`grep $1 /etc/hosts |grep -v "ip6"|  awk '{print $1}'`
+   echo $ip
+}
+
+url=""
+for c in $controller; do
+  url="$url tcp:$c:6633"
+done
+echo $url
+for s in $switches; do
+    echo "set switch $s controller $url"
+    sudo ovs-vsctl set-controller $s $url
+done
diff --git a/testbed-ctrl-none.sh b/testbed-ctrl-none.sh
new file mode 100755
index 0000000..e09835b
--- /dev/null
+++ b/testbed-ctrl-none.sh
@@ -0,0 +1,18 @@
+#! /bin/bash
+controller=""
+switches=`ifconfig -a | grep "^s" |grep -v eth | awk '{print $1}'`
+
+function host2ip (){
+   ip=`grep $1 /etc/hosts |grep -v "ip6"|  awk '{print $1}'`
+   echo $ip
+}
+
+url=""
+for c in $controller; do
+  url="$url tcp:`host2ip $c`:6633"
+done
+echo $url
+for s in $switches; do
+    echo "set switch $s controller $url"
+    sudo ovs-vsctl set-controller $s $url
+done
diff --git a/web/clear_flow.py b/web/clear_flow.py
new file mode 100755
index 0000000..2646498
--- /dev/null
+++ b/web/clear_flow.py
@@ -0,0 +1,64 @@
+#! /usr/bin/env python
+# -*- Mode: python; py-indent-offset: 4; tab-width: 8; indent-tabs-mode: t; -*-
+
+import pprint
+import os
+import sys
+import subprocess
+import json
+import argparse
+import io
+import time
+
+from flask import Flask, json, Response, render_template, make_response, request
+
+#
+# TODO: remove this! We don't use JSON argument here!
+# curl http://127.0.0.1:8080/wm/flow/clear/{"value":"0xf"}/json'
+#
+
+## Global Var ##
+ControllerIP="127.0.0.1"
+ControllerPort=8080
+
+DEBUG=0
+pp = pprint.PrettyPrinter(indent=4)
+
+app = Flask(__name__)
+
+## Worker Functions ##
+def log_error(txt):
+  print '%s' % (txt)
+
+def debug(txt):
+  if DEBUG:
+    print '%s' % (txt)
+
+# @app.route("/wm/flow/clear/<flow-id>/json")
+def clear_flow_path(flow_id):
+  command = "curl -s \"http://%s:%s/wm/flow/clear/%s/json\"" % (ControllerIP, ControllerPort, flow_id)
+  debug("clear_flow_path %s" % command)
+  result = os.popen(command).read()
+  debug("result %s" % result)
+  # parsedResult = json.loads(result)
+  # debug("parsed %s" % parsedResult)
+
+if __name__ == "__main__":
+  usage_msg = "Clear flow state from the ONOS Network Map\n"
+  usage_msg = usage_msg + "Usage: %s <flow_id>\n" % (sys.argv[0])
+
+  # app.debug = False;
+
+  # Usage info
+  if len(sys.argv) > 1 and (sys.argv[1] == "-h" or sys.argv[1] == "--help"):
+    print(usage_msg)
+    exit(0)
+
+  # Check arguments
+  if len(sys.argv) < 2:
+    log_error(usage_msg)
+    exit(1)
+
+  # Do the work
+  flow_id_arg = int(sys.argv[1], 0)
+  clear_flow_path(flow_id_arg);
diff --git a/web/delete_flow.py b/web/delete_flow.py
index 412d02f..f8e037f 100755
--- a/web/delete_flow.py
+++ b/web/delete_flow.py
@@ -44,7 +44,8 @@
   # debug("parsed %s" % parsedResult)
 
 if __name__ == "__main__":
-  usage_msg = "Usage: %s <flow_id>" % (sys.argv[0])
+  usage_msg = "Delete flow state from the ONOS Network Map and the switches\n"
+  usage_msg = usage_msg + "Usage: %s <flow_id>\n" % (sys.argv[0])
 
   # app.debug = False;
 
diff --git a/web/shortest_path.py b/web/shortest_path.py
index 8a77d61..751489f 100755
--- a/web/shortest_path.py
+++ b/web/shortest_path.py
@@ -67,7 +67,7 @@
     inPort = f['inPort']['value'];
     outPort = f['outPort']['value'];
     dpid = f['dpid']['value']
-    print "FlowEntry: (%s, %s, %s)" % (inPort, dpid, outPort)
+    print "  FlowEntry: (%s, %s, %s)" % (inPort, dpid, outPort)
 
 
 if __name__ == "__main__":