Began implementation of per-switch leader election using Curator
diff --git a/build.xml b/build.xml
index 2bd1343..a0fd756 100644
--- a/build.xml
+++ b/build.xml
@@ -65,6 +65,9 @@
         <include name="concurrentlinkedhashmap-lru-1.3.jar"/>
         <include name="jython-2.5.2.jar"/>
         <include name="libthrift-0.7.0.jar"/>
+	<include name="curator-client-1.3.1.jar"/>
+	<include name="curator-framework-1.3.1.jar"/>
+	<include name="curator-recipes-1.3.1.jar"/>
     </patternset>
 
     <patternset id="titanlib">
diff --git a/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java b/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
index 322210e..b3ddda5 100644
--- a/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
+++ b/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
@@ -4,8 +4,14 @@
 
 public interface IMastershipService extends IFloodlightService {
 	
+	// Callback for all mastership changes. 
+	// Change callback is called when mastership is acquired or released
+	public interface MastershipCallback {
+		public void changeCallback(long dpid, boolean isMaster);
+	}
+	
 	// Acquire mastership for a switch. 
-	public void acquireMastership(long dpid, boolean blockOk);
+	public void acquireMastership(long dpid, MastershipCallback cb);
 	
 	// Release mastership for a switch
 	public void releaseMastership(long dpid);
diff --git a/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java b/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
index 2470b12..6c3c178 100644
--- a/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
+++ b/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
@@ -1,24 +1,185 @@
 package net.floodlightcontroller.mastership;
 
+import java.io.IOException;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.core.module.FloodlightModuleException;
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.mastership.IMastershipService;
 
-public class MastershipManager implements IFloodlightModule, IMastershipService {
+import org.apache.zookeeper.WatchedEvent;
+import org.openflow.util.HexString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.netflix.curator.RetryPolicy;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.framework.api.CuratorWatcher;
+import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+import com.netflix.curator.framework.recipes.leader.Participant;
+import com.netflix.curator.framework.state.ConnectionState;
+import com.netflix.curator.framework.state.ConnectionStateListener;
+import com.netflix.curator.retry.ExponentialBackoffRetry;
+
+public class MastershipManager implements IFloodlightModule, IMastershipService,
+											ConnectionStateListener {
 	protected static Logger log = LoggerFactory.getLogger(MastershipManager.class);
 	protected String mastershipId;
 	
+	//TODO read this from configuration
+	protected String connectionString = "localhost:2181";
+	private final String namespace = "onos";
+	private final String switchLatchesPath = "/switchmastership";
+	
+	protected CuratorFramework client;
+
+	protected Map<String, LeaderLatch> switchLatches;
+	protected Map<String, MastershipCallback> switchCallbacks;
+	
+	//protected final String singleLatchPath = switchLatchesPath + "/00:00:00:00:00:00:00:01";
+	//protected LeaderLatch singleLatch;
+
+	
+	protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
+		private String dpid;
+		private String latchPath;
+		
+		public ParamaterizedCuratorWatcher(String dpid, String latchPath){
+			this.dpid = dpid;
+			this.latchPath = latchPath;
+		}
+		
+		@Override
+		public void process(WatchedEvent event) throws Exception {
+			log.debug("Watch Event: {}", event);
+
+			LeaderLatch latch = switchLatches.get(dpid);
+			
+			Participant leader = latch.getLeader();
+			
+			log.debug("Leader for {} changed. Now {}", dpid, leader.getId());
+			
+			if (leader.getId().equals(mastershipId)){
+				switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), true);
+			}
+			
+			client.getChildren().usingWatcher(this).inBackground().forPath(latchPath);
+			//client.getChildren().usingWatcher(this).forPath(latchPath);
+		}
+	};
+	
+	@Override
+	public void acquireMastership(long dpid, MastershipCallback cb) {
+		//TODO check if there's already a latch in the list
+		//TODO throw exception if unique ID not set
+		//TODO use unique ID
+		if (cb == null) {
+			//TODO throw exception?
+		}
+		
+		String dpidStr = HexString.toHexString(dpid);
+		String latchPath = switchLatchesPath + "/" + dpidStr;
+		
+		LeaderLatch latch = new LeaderLatch(client, latchPath, mastershipId);
+		switchLatches.put(dpidStr, latch);
+		//if (switchCallbacks.put(dpidStr, cb) != cb){
+			//NOTE instance equality intended
+		//	log.debug("Throwing out old callback for switch {}: {}", dpidStr, cb);
+		//}
+		switchCallbacks.put(dpidStr, cb);
+		
+		try {
+			//client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
+			client.getChildren().usingWatcher(
+					new ParamaterizedCuratorWatcher(dpidStr, latchPath))
+					.inBackground().forPath(latchPath);
+			latch.start();
+		} catch (Exception e) {
+			log.debug("WATCHER ERROROROROR");
+			e.printStackTrace();
+		}
+		
+	}
+
+	@Override
+	public void releaseMastership(long dpid) {
+		// TODO Auto-generated method stub
+		LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+		if (latch == null) {
+			return;
+		}
+		
+		try {
+			latch.close();
+		} catch (IOException e) {
+			
+		}
+	}
+
+	@Override
+	public boolean amMaster(long dpid) {
+		LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+		
+		if (latch == null) {
+			log.warn("No leader latch for dpid {}", HexString.toHexString(dpid));
+			return false;
+		}
+		
+		//return latch.hasLeadership();
+		try {
+			return latch.getLeader().getId().equals(mastershipId);
+		} catch (Exception e) {
+			//TODO swallow exception?
+			return false;
+		}
+	}
+
+	@Override
+	public void setMastershipId(String id) {
+		// TODO Synchronisation?
+		mastershipId = id;
+	}
+
+	@Override
+	public String getMastershipId() {
+		// TODO Synchronisation?
+		return mastershipId;
+	}
+	
+	/*
+	 * ConnectionStateListener
+	 */
+	
+	@Override
+	public void stateChanged(CuratorFramework client, ConnectionState state) {
+		// TODO Auto-generated method stub
+		log.debug("Connection state change: {}", state);
+		
+		switch (state){
+		case SUSPENDED:
+		case LOST:
+			//If we're suspended or lost, we can no longer assume mastership
+			for (Map.Entry<String, MastershipCallback> entry : switchCallbacks.entrySet()){
+				entry.getValue().changeCallback(HexString.toLong(entry.getKey()), false);
+			}
+			break;
+		default:
+			//CONNECTED or RECONNECTED
+			//LeaderLatches should automatically try and re-gain the leadership
+			break;
+		}
+	}
+	
+	/*
+	 * IFloodlightModule
+	 */
+	
 	@Override
 	public Collection<Class<? extends IFloodlightService>> getModuleServices() {
 		Collection<Class<? extends IFloodlightService>> l = new ArrayList<Class<? extends IFloodlightService>>();
@@ -40,9 +201,29 @@
 		return null;
 	}
 	
+	//be ready to serve mastership requests by startup?
 	@Override
 	public void init (FloodlightModuleContext context) throws FloodlightModuleException {
 		//TODO
+		try {
+			String localHostname = java.net.InetAddress.getLocalHost().getHostName();
+			mastershipId = localHostname;
+			log.debug("Setting mastership id to {}", mastershipId);
+		} catch (UnknownHostException e) {
+			// TODO Handle this exception
+			e.printStackTrace();
+		}
+
+		switchLatches = new HashMap<String, LeaderLatch>();
+		switchCallbacks = new HashMap<String, MastershipCallback>();
+		
+		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+		client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
+		
+		client.start();
+		
+		client = client.usingNamespace(namespace);
+		
 		return;
 	}
 	
@@ -51,34 +232,49 @@
 		//TODO
 		return;
 	}
-
-	@Override
-	public void acquireMastership(long dpid, boolean blockOk) {
-		// TODO Auto-generated method stub
+	
+	public static void main(String args[]){
+		FloodlightModuleContext fmc = new FloodlightModuleContext();
+		MastershipManager mm = new MastershipManager();
 		
-	}
-
-	@Override
-	public void releaseMastership(long dpid) {
-		// TODO Auto-generated method stub
+		if (args.length < 1){
+			log.error("Need to supply ID");
+			System.exit(1);
+		}
+		String id = args[0];
 		
-	}
-
-	@Override
-	public boolean amMaster(long dpid) {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public void setMastershipId(String id) {
-		// TODO Auto-generated method stub
+		try {
+			mm.init(fmc);
+			mm.startUp(fmc);
+			
+			mm.setMastershipId(id);
+			
+			mm.acquireMastership(1L, 
+				new MastershipCallback(){
+					@Override
+					public void changeCallback(long dpid, boolean isMaster) {
+						if (isMaster){
+							log.debug("I'm master for {}", HexString.toHexString(dpid));
+						}
+						else {
+							log.debug("NOT master for {}", HexString.toHexString(dpid));
+						}
+					}
+				});
+			
+			//"Server" loop
+			while (true) {
+				Thread.sleep(60000);
+			}
+			
+		} catch (FloodlightModuleException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
 		
-	}
-
-	@Override
-	public String getMastershipId() {
-		// TODO Auto-generated method stub
-		return null;
+		log.debug("is master: {}", mm.amMaster(1L));
 	}
 }
diff --git a/src/test/java/net/floodlightcontroller/mastership/MastershipManagerTest.java b/src/test/java/net/floodlightcontroller/mastership/MastershipManagerTest.java
new file mode 100644
index 0000000..f29f6e7
--- /dev/null
+++ b/src/test/java/net/floodlightcontroller/mastership/MastershipManagerTest.java
@@ -0,0 +1,49 @@
+package net.floodlightcontroller.mastership;
+
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.mastership.IMastershipService.MastershipCallback;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MastershipManagerTest {
+	protected static Logger log = LoggerFactory.getLogger(MastershipManagerTest.class);
+	private MastershipManager mm;
+	
+	@Before
+	public void setUp() throws Exception{
+		//MockFloodlightProvider fp = new MockFloodlightProvider();
+		FloodlightModuleContext fmc = new FloodlightModuleContext();
+		
+		mm = new MastershipManager();
+		
+		mm.init(fmc);
+		mm.startUp(fmc);
+		
+	}
+	
+	@Test
+	public void testAcquireMastership(){
+		MastershipCallback cb = new MastershipCallback(){
+			@Override
+			public void changeCallback(long dpid, boolean isMaster) {
+				log.info("Callback called!");
+			}
+		};
+		
+		long dpid = 1L;
+		
+		mm.acquireMastership(dpid, cb);
+		
+		try {
+			Thread.sleep(2000);
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+		log.debug("Is master? {}", mm.amMaster(dpid));
+	}
+}