Merge branch 'experimental' - Improved stability and thread safety of ZookeeperRegistry
diff --git a/build.xml b/build.xml
index e8055e7..1b36535 100644
--- a/build.xml
+++ b/build.xml
@@ -66,9 +66,9 @@
         <include name="concurrentlinkedhashmap-lru-1.3.jar"/>
         <include name="jython-2.5.2.jar"/>
         <include name="libthrift-0.7.0.jar"/>
-	<include name="curator-client-1.3.3.jar"/>
-	<include name="curator-framework-1.3.3.jar"/>
-	<include name="curator-recipes-1.3.3.jar"/>
+	<include name="curator-client-1.3.4-SNAPSHOT.jar"/>
+	<include name="curator-framework-1.3.4-SNAPSHOT.jar"/>
+	<include name="curator-recipes-1.3.4-SNAPSHOT.jar"/>
 	<include name="zookeeper-3.4.5.jar"/>
     </patternset>
 
diff --git a/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
new file mode 100644
index 0000000..eb513fd
--- /dev/null
+++ b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
@@ -0,0 +1,25 @@
+package net.onrc.onos.registry.controller;
+
+import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
+
+import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+
+public class SwitchLeadershipData {
+	
+	private LeaderLatch latch;
+	private ControlChangeCallback cb;
+
+	public SwitchLeadershipData(LeaderLatch latch, ControlChangeCallback cb) {
+		this.latch = latch;
+		this.cb = cb;
+	}
+	
+	public LeaderLatch getLatch(){
+		return latch;
+	}
+	
+	public ControlChangeCallback getCallback(){
+		return cb;
+	}
+
+}
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index 025bbfe..e727238 100644
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -8,6 +8,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.core.module.FloodlightModuleException;
@@ -16,8 +17,6 @@
 import net.floodlightcontroller.restserver.IRestApiService;
 
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.openflow.util.HexString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,13 +25,14 @@
 import com.netflix.curator.RetryPolicy;
 import com.netflix.curator.framework.CuratorFramework;
 import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.framework.api.CuratorWatcher;
 import com.netflix.curator.framework.recipes.cache.ChildData;
 import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
 import com.netflix.curator.framework.recipes.cache.PathChildrenCache.StartMode;
 import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
 import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+import com.netflix.curator.framework.recipes.leader.LeaderLatchEvent;
+import com.netflix.curator.framework.recipes.leader.LeaderLatchListener;
 import com.netflix.curator.framework.recipes.leader.Participant;
 import com.netflix.curator.retry.ExponentialBackoffRetry;
 
@@ -61,92 +61,40 @@
 	protected PathChildrenCache controllerCache;
 	protected PathChildrenCache switchCache;
 
-	protected Map<String, LeaderLatch> switchLatches;
-	protected Map<String, ControlChangeCallback> switchCallbacks;
+	protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
 	protected Map<String, PathChildrenCache> switchPathCaches;
 	
 	//Zookeeper performance-related configuration
 	protected static final int sessionTimeout = 2000;
 	protected static final int connectionTimeout = 4000;
 	
-	/**
-	 * Watches for changes in switch leadership election. The Curator
-	 * LeaderLatch doesn't notify us when leadership changes so we set a watch
-	 * on the election znodes to get leadership change events. The process
-	 * method will be called whenever the switches children change in 
-	 * Zookeeper. We then have to work out whether to send a control-changed
-	 * event to our clients and reset the watch.
-	 * 
-	 * TODO I think it's possible to miss events that happen while we're 
-	 * processing the watch and before we've set a new watch. Need to think
-	 * of a safer way to implement leader change notifications.
-	 *
-	 */
-	protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
-		private String dpid;
-		private boolean isLeader = false;
-		private String latchPath;
+
+	protected class SwitchLeaderListener implements LeaderLatchListener{
+		String dpid;
+		LeaderLatch latch;
 		
-		public ParamaterizedCuratorWatcher(String dpid, String latchPath){
+		public SwitchLeaderListener(String dpid, LeaderLatch latch){
 			this.dpid = dpid;
-			this.latchPath = latchPath;
+			this.latch = latch;
 		}
 		
 		@Override
-		public synchronized void process(WatchedEvent event) throws Exception {
-			log.debug("Watch Event: {}", event);
-
+		public void leaderLatchEvent(CuratorFramework arg0,
+				LeaderLatchEvent arg1) {
+			log.debug("Leadership changed for {}, now {}",
+					dpid, latch.hasLeadership());
 			
-			if (event.getState() == KeeperState.Disconnected){
-				if (isLeader) {
-					log.debug("Disconnected while leader - lost leadership for {}", dpid);
-					
-					isLeader = false;
-					ControlChangeCallback cb = switchCallbacks.get(dpid);
-					if (cb != null) {
-						//Allow callback to be null if the requester doesn't want a callback
-						cb.controlChanged(HexString.toLong(dpid), false);
-					}
-				}
-				return;
-				//TODO Watcher is never reset once we reconnect to Zookeeper
+			//Check that the leadership request is still active - the client
+			//may have since released the request or even begun another request
+			//(this is why we use == to check the object instance is the same)
+			SwitchLeadershipData swData = switches.get(dpid);
+			if (swData != null && swData.getLatch() == latch){
+				swData.getCallback().controlChanged(
+						HexString.toLong(dpid), latch.hasLeadership());
 			}
-			
-			LeaderLatch latch = switchLatches.get(dpid);
-			if (latch == null){
-				log.debug("In watcher process, looks like control was released for {}",
-						dpid);
-				return;
+			else {
+				log.debug("Latch for {} has changed", dpid);
 			}
-			
-			try {
-				
-				Participant leader = latch.getLeader();
-
-				if (leader.getId().equals(controllerId) && !isLeader){
-					log.debug("Became leader for {}", dpid);
-					
-					isLeader = true;
-					switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), true);
-				}
-				else if (!leader.getId().equals(controllerId) && isLeader){
-					log.debug("Lost leadership for {}", dpid);
-					
-					isLeader = false;
-					switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), false);
-				}
-			} catch (Exception e){
-				if (isLeader){
-					log.debug("Exception checking leadership status. Assume leadership lost for {}",
-							dpid);
-					
-					isLeader = false;
-					switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), false);
-				}
-			} finally {
-				client.getChildren().usingWatcher(this).inBackground().forPath(latchPath);
-			}
-			//client.getChildren().usingWatcher(this).forPath(latchPath);
 		}
 	}
 	
@@ -207,22 +155,33 @@
 		String dpidStr = HexString.toHexString(dpid);
 		String latchPath = switchLatchesPath + "/" + dpidStr;
 		
-		if (switchLatches.get(dpidStr) != null){
-			//throw new RuntimeException("Leader election for switch " + dpidStr +
-			//		"is already running");
+		if (switches.get(dpidStr) != null){
 			log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
 			return;
 		}
 		
 		LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
-		switchLatches.put(dpidStr, latch);
-		switchCallbacks.put(dpidStr, cb);
+		latch.addListener(new SwitchLeaderListener(dpidStr, latch));
 		
+
+		SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb);
+		SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
+		
+		if (oldData != null){
+			//There was already data for that key in the map
+			//i.e. someone else got here first so we can't succeed
+			log.debug("Already requested control for {}", dpidStr);
+			throw new RegistryException("Already requested control for " + dpidStr);
+		}
+		
+		//Now that we know we were able to add our latch to the collection,
+		//we can start the leader election in Zookeeper. However I don't know
+		//how to handle if the start fails - the latch is already in our
+		//switches list.
+		//TODO seems like there's a Curator bug when latch.start is called when
+		//there's no Zookeeper connection which causes two znodes to be put in 
+		//Zookeeper at the latch path when we reconnect to Zookeeper.
 		try {
-			//client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
-			client.getChildren().usingWatcher(
-					new ParamaterizedCuratorWatcher(dpidStr, latchPath))
-					.inBackground().forPath(latchPath);
 			latch.start();
 		} catch (Exception e) {
 			log.warn("Error starting leader latch: {}", e.getMessage());
@@ -237,39 +196,35 @@
 		
 		String dpidStr = HexString.toHexString(dpid);
 		
-		LeaderLatch latch = switchLatches.get(dpidStr);
-		if (latch == null) {
+		SwitchLeadershipData swData = switches.remove(dpidStr);
+		
+		if (swData == null) {
 			log.debug("Trying to release control of a switch we are not contesting");
 			return;
 		}
+
+		LeaderLatch latch = swData.getLatch();
 		
 		try {
 			latch.close();
 		} catch (IOException e) {
 			//I think it's OK not to do anything here. Either the node got 
 			//deleted correctly, or the connection went down and the node got deleted.
-		} finally {
-			switchLatches.remove(dpidStr);
-			switchCallbacks.remove(dpidStr);
 		}
 	}
 
 	@Override
 	public boolean hasControl(long dpid) {
+		String dpidStr = HexString.toHexString(dpid);
 		
-		LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+		SwitchLeadershipData swData = switches.get(dpidStr);
 		
-		if (latch == null) {
-			log.warn("No leader latch for dpid {}", HexString.toHexString(dpid));
+		if (swData == null) {
+			log.warn("No leader latch for dpid {}", dpidStr);
 			return false;
 		}
 		
-		try {
-			return latch.getLeader().getId().equals(controllerId);
-		} catch (Exception e) {
-			//TODO swallow exception?
-			return false;
-		}
+		return swData.getLatch().hasLeadership();
 	}
 
 	@Override
@@ -322,12 +277,11 @@
 	
 	@Override
 	public String getControllerForSwitch(long dpid) throws RegistryException {
-		// TODO Work out how we should store this controller/switch data.
-		// The leader latch might be a index to the /controllers collections
-		// which holds more info on the controller (how to talk to it for example).
+		// TODO work out synchronization
 		
-		String strDpid = HexString.toHexString(dpid);
-		LeaderLatch latch = switchLatches.get(strDpid);
+		String dpidStr = HexString.toHexString(dpid);
+
+		LeaderLatch latch = switches.get(dpidStr).getLatch();
 		
 		if (latch == null){
 			log.warn("Tried to get controller for non-existent switch");
@@ -351,6 +305,9 @@
 	}
 	
 
+	//TODO what should happen when there's no ZK connection? Currently we just return
+	//the cache but this may lead to false impressions - i.e. we don't actually know
+	//what's in ZK so we shouldn't say we do
 	@Override
 	public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
 		Map<String, List<ControllerRegistryEntry>> data = 
@@ -409,6 +366,8 @@
 		return l;
 	}
 	
+	//TODO currently blocks startup when it can't get a Zookeeper connection.
+	//Do we support starting up with no Zookeeper connection?
 	@Override
 	public void init (FloodlightModuleContext context) throws FloodlightModuleException {
 		log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
@@ -423,8 +382,7 @@
 		
 		restApi = context.getServiceImpl(IRestApiService.class);
 
-		switchLatches = new HashMap<String, LeaderLatch>();
-		switchCallbacks = new HashMap<String, ControlChangeCallback>();
+		switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
 		switchPathCaches = new HashMap<String, PathChildrenCache>();
 		
 		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
@@ -432,7 +390,6 @@
 				sessionTimeout, connectionTimeout, retryPolicy);
 		
 		client.start();
-		
 		client = client.usingNamespace(namespace);