Re-architected ZookeeperRegistry to use LeaderLatch change notifications and to improve thread safety
diff --git a/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
new file mode 100644
index 0000000..eb513fd
--- /dev/null
+++ b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
@@ -0,0 +1,25 @@
+package net.onrc.onos.registry.controller;
+
+import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
+
+import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+
+public class SwitchLeadershipData {
+	
+	private LeaderLatch latch;
+	private ControlChangeCallback cb;
+
+	public SwitchLeadershipData(LeaderLatch latch, ControlChangeCallback cb) {
+		this.latch = latch;
+		this.cb = cb;
+	}
+	
+	public LeaderLatch getLatch(){
+		return latch;
+	}
+	
+	public ControlChangeCallback getCallback(){
+		return cb;
+	}
+
+}
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index c9ed160..3120d20 100644
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -8,6 +8,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.core.module.FloodlightModuleException;
@@ -16,8 +17,6 @@
 import net.floodlightcontroller.restserver.IRestApiService;
 
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.openflow.util.HexString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,7 +25,6 @@
 import com.netflix.curator.RetryPolicy;
 import com.netflix.curator.framework.CuratorFramework;
 import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.framework.api.CuratorWatcher;
 import com.netflix.curator.framework.recipes.cache.ChildData;
 import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
 import com.netflix.curator.framework.recipes.cache.PathChildrenCache.StartMode;
@@ -63,8 +61,9 @@
 	protected PathChildrenCache controllerCache;
 	protected PathChildrenCache switchCache;
 
-	protected Map<String, LeaderLatch> switchLatches;
-	protected Map<String, ControlChangeCallback> switchCallbacks;
+	protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
+	//protected Map<String, LeaderLatch> switchLatches;
+	//protected Map<String, ControlChangeCallback> switchCallbacks;
 	protected Map<String, PathChildrenCache> switchPathCaches;
 	
 	//Zookeeper performance-related configuration
@@ -84,6 +83,7 @@
 	 * of a safer way to implement leader change notifications.
 	 *
 	 */
+	/*
 	protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
 		private String dpid;
 		private boolean isLeader = false;
@@ -151,12 +151,13 @@
 			//client.getChildren().usingWatcher(this).forPath(latchPath);
 		}
 	}
+	*/
 	
-	protected class ParamaterizedLeaderListener implements LeaderLatchListener{
+	protected class SwitchLeaderListener implements LeaderLatchListener{
 		String dpid;
 		LeaderLatch latch;
 		
-		public ParamaterizedLeaderListener(String dpid, LeaderLatch latch){
+		public SwitchLeaderListener(String dpid, LeaderLatch latch){
 			this.dpid = dpid;
 			this.latch = latch;
 		}
@@ -164,9 +165,20 @@
 		@Override
 		public void leaderLatchEvent(CuratorFramework arg0,
 				LeaderLatchEvent arg1) {
-			log.debug("Got leader latch event for {} hasLeadership {}",
+			log.debug("Leadership changed for {}, now {}",
 					dpid, latch.hasLeadership());
 			
+			//Check that the leadership request is still active - the client
+			//may have since released the request or even begun another request
+			//(this is why we use == to check the object instance is the same)
+			SwitchLeadershipData swData = switches.get(dpid);
+			if (swData != null && swData.getLatch() == latch){
+				swData.getCallback().controlChanged(
+						HexString.toLong(dpid), latch.hasLeadership());
+			}
+			else {
+				log.debug("Latch for {} has changed", dpid);
+			}
 		}
 	}
 	
@@ -227,7 +239,8 @@
 		String dpidStr = HexString.toHexString(dpid);
 		String latchPath = switchLatchesPath + "/" + dpidStr;
 		
-		if (switchLatches.get(dpidStr) != null){
+		//if (switchLatches.get(dpidStr) != null){
+		if (switches.get(dpidStr) != null){
 			//throw new RuntimeException("Leader election for switch " + dpidStr +
 			//		"is already running");
 			log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
@@ -235,11 +248,24 @@
 		}
 		
 		LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
-		switchLatches.put(dpidStr, latch);
-		switchCallbacks.put(dpidStr, cb);
+		latch.addListener(new SwitchLeaderListener(dpidStr, latch));
 		
-		latch.addListener(new ParamaterizedLeaderListener(dpidStr, latch));
+		//switchLatches.put(dpidStr, latch);
+		//switchCallbacks.put(dpidStr, cb);
 		
+		//Don't put the entry in the map if we can't start the latch
+		SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb);
+		SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
+		
+		if (oldData != null){
+			//There was already data for that key in the map
+			//i.e. someone else got here first so we can't succeed
+			log.debug("Already requested control for {}", dpidStr);
+			throw new RegistryException("Already requested control for " + dpidStr);
+		}
+		
+		//Now that we know we were able to add our latch to the collection,
+		//we can start the leader election in Zookeeper
 		try {
 			//client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
 			//client.getChildren().usingWatcher(
@@ -259,11 +285,16 @@
 		
 		String dpidStr = HexString.toHexString(dpid);
 		
-		LeaderLatch latch = switchLatches.get(dpidStr);
-		if (latch == null) {
+		SwitchLeadershipData swData = switches.remove(dpidStr);
+		
+		if (swData == null) {
 			log.debug("Trying to release control of a switch we are not contesting");
 			return;
 		}
+
+		//LeaderLatch latch = switches.get(dpidStr).getLatch();
+		//LeaderLatch latch = switches.remove(dpidStr).getLatch();
+		LeaderLatch latch = swData.getLatch();
 		
 		try {
 			latch.close();
@@ -271,27 +302,33 @@
 			//I think it's OK not to do anything here. Either the node got 
 			//deleted correctly, or the connection went down and the node got deleted.
 		} finally {
-			switchLatches.remove(dpidStr);
-			switchCallbacks.remove(dpidStr);
+			//switches.remove(dpidStr);
+			//switchLatches.remove(dpidStr);
+			//switchCallbacks.remove(dpidStr);
 		}
 	}
 
 	@Override
 	public boolean hasControl(long dpid) {
+		//LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+		String dpidStr = HexString.toHexString(dpid);
 		
-		LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+		SwitchLeadershipData swData = switches.get(dpidStr);
 		
-		if (latch == null) {
-			log.warn("No leader latch for dpid {}", HexString.toHexString(dpid));
+		if (swData == null) {
+			log.warn("No leader latch for dpid {}", dpidStr);
 			return false;
 		}
 		
-		try {
-			return latch.getLeader().getId().equals(controllerId);
-		} catch (Exception e) {
+		return swData.getLatch().hasLeadership();
+		
+		//try {
+			//return latch.getLeader().getId().equals(controllerId);
+		
+		//} catch (Exception e) {
 			//TODO swallow exception?
-			return false;
-		}
+			//return false;
+		//}
 	}
 
 	@Override
@@ -344,12 +381,11 @@
 	
 	@Override
 	public String getControllerForSwitch(long dpid) throws RegistryException {
-		// TODO Work out how we should store this controller/switch data.
-		// The leader latch might be a index to the /controllers collections
-		// which holds more info on the controller (how to talk to it for example).
+		// TODO work out synchronization
 		
-		String strDpid = HexString.toHexString(dpid);
-		LeaderLatch latch = switchLatches.get(strDpid);
+		String dpidStr = HexString.toHexString(dpid);
+		//LeaderLatch latch = switchLatches.get(strDpid);
+		LeaderLatch latch = switches.get(dpidStr).getLatch();
 		
 		if (latch == null){
 			log.warn("Tried to get controller for non-existent switch");
@@ -373,6 +409,9 @@
 	}
 	
 
+	//TODO what should happen when there's no ZK connection? Currently we just return
+	//the cache but this may lead to false impressions - i.e. we don't actually know
+	//what's in ZK so we shouldn't say we do
 	@Override
 	public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
 		Map<String, List<ControllerRegistryEntry>> data = 
@@ -431,6 +470,8 @@
 		return l;
 	}
 	
+	//TODO currently blocks startup when it can't get a Zookeeper connection.
+	//Do we support starting up with no Zookeeper connection?
 	@Override
 	public void init (FloodlightModuleContext context) throws FloodlightModuleException {
 		log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
@@ -445,8 +486,10 @@
 		
 		restApi = context.getServiceImpl(IRestApiService.class);
 
-		switchLatches = new HashMap<String, LeaderLatch>();
-		switchCallbacks = new HashMap<String, ControlChangeCallback>();
+		switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
+		
+		//switchLatches = new HashMap<String, LeaderLatch>();
+		//switchCallbacks = new HashMap<String, ControlChangeCallback>();
 		switchPathCaches = new HashMap<String, PathChildrenCache>();
 		
 		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);