Merging in the mastership implementation
diff --git a/build.xml b/build.xml
index 2bd1343..4aa99bf 100644
--- a/build.xml
+++ b/build.xml
@@ -41,6 +41,7 @@
     <property name="main-class" value="net.floodlightcontroller.core.Main"/>
     <property name="floodlight-jar" location="${target}/floodlight.jar"/>
     <property name="floodlight-test-jar" location="${target}/floodlight-test.jar"/>
+    <property name="floodlight-only-jar" location="${target}/floodlight-only.jar"/>
     <property name="thrift.dir" value="${basedir}/src/main/thrift"/>
     <property name="thrift.out.dir" value="lib/gen-java"/>
     <property name="thrift.package" value="net/floodlightcontroller/packetstreamer/thrift"/>
@@ -65,6 +66,9 @@
         <include name="concurrentlinkedhashmap-lru-1.3.jar"/>
         <include name="jython-2.5.2.jar"/>
         <include name="libthrift-0.7.0.jar"/>
+	<include name="curator-client-1.3.1.jar"/>
+	<include name="curator-framework-1.3.1.jar"/>
+	<include name="curator-recipes-1.3.1.jar"/>
     </patternset>
 
     <patternset id="titanlib">
@@ -224,6 +228,16 @@
     </target>
     <target name="coverage" depends="instrument,test,coverage-report"/>
 
+    <target name="jar" depends="compile">
+        <jar destfile="${floodlight-only-jar}">
+	    <fileset dir="${build}"/>
+	    <fileset dir="${resources}"/>
+	    <fileset dir="${python-src}">
+	        <include name="**/*.py"/>
+	    </fileset>
+        </jar>
+    </target>
+
     <target name="dist" depends="compile,compile-test">
         <jar destfile="${floodlight-jar}" filesetmanifest="mergewithoutmain">
             <manifest>
diff --git a/mastership-test.sh b/mastership-test.sh
new file mode 100755
index 0000000..06189df
--- /dev/null
+++ b/mastership-test.sh
@@ -0,0 +1 @@
+java -Dlogback.configurationFile=logback.xml -cp target/floodlight-only.jar:lib/*:lib/titan/* net.floodlightcontroller.mastership.MastershipManager $1
diff --git a/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java b/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
index 322210e..83800c3 100644
--- a/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
+++ b/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
@@ -4,8 +4,14 @@
 
 public interface IMastershipService extends IFloodlightService {
 	
+	// Callback for all mastership changes. 
+	// Change callback is called when mastership is acquired or released
+	public interface MastershipCallback {
+		public void changeCallback(long dpid, boolean isMaster);
+	}
+	
 	// Acquire mastership for a switch. 
-	public void acquireMastership(long dpid, boolean blockOk);
+	public void acquireMastership(long dpid, MastershipCallback cb) throws Exception;
 	
 	// Release mastership for a switch
 	public void releaseMastership(long dpid);
diff --git a/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java b/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
index 2470b12..46b533e 100644
--- a/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
+++ b/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
@@ -1,23 +1,185 @@
 package net.floodlightcontroller.mastership;
 
+import java.io.IOException;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.core.module.FloodlightModuleException;
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.mastership.IMastershipService;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.openflow.util.HexString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.curator.RetryPolicy;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.framework.api.CuratorWatcher;
+import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+import com.netflix.curator.framework.recipes.leader.Participant;
+import com.netflix.curator.retry.ExponentialBackoffRetry;
 
 public class MastershipManager implements IFloodlightModule, IMastershipService {
 
 	protected static Logger log = LoggerFactory.getLogger(MastershipManager.class);
-	protected String mastershipId;
+	protected String mastershipId = null;
+	
+	//TODO read this from configuration
+	protected String connectionString = "localhost:2181";
+	private final String namespace = "onos";
+	private final String switchLatchesPath = "/switchmastership";
+	
+	protected CuratorFramework client;
+
+	protected Map<String, LeaderLatch> switchLatches;
+	protected Map<String, MastershipCallback> switchCallbacks;
+	
+	protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
+		private String dpid;
+		private boolean isLeader = false;
+		private String latchPath;
+		
+		public ParamaterizedCuratorWatcher(String dpid, String latchPath){
+			this.dpid = dpid;
+			this.latchPath = latchPath;
+		}
+		
+		@Override
+		public void process(WatchedEvent event) throws Exception {
+			log.debug("Watch Event: {}", event);
+
+			LeaderLatch latch = switchLatches.get(dpid);
+			
+			if (event.getState() == KeeperState.Disconnected){
+				if (isLeader) {
+					log.debug("Disconnected while leader - lost leadership for {}", dpid);
+					
+					isLeader = false;
+					switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), false);
+				}
+				return;
+			}
+			
+			try {
+				Participant leader = latch.getLeader();
+
+				if (leader.getId().equals(mastershipId) && !isLeader){
+					log.debug("Became leader for {}", dpid);
+					
+					isLeader = true;
+					switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), true);
+				}
+				else if (!leader.getId().equals(mastershipId) && isLeader){
+					log.debug("Lost leadership for {}", dpid);
+					
+					isLeader = false;
+					switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), false);
+				}
+			} catch (Exception e){
+				if (isLeader){
+					log.debug("Exception checking leadership status. Assume leadship lost for {}",
+							dpid);
+					
+					isLeader = false;
+					switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), false);
+				}
+			}
+			
+			client.getChildren().usingWatcher(this).inBackground().forPath(latchPath);
+			//client.getChildren().usingWatcher(this).forPath(latchPath);
+		}
+	}
+	
+	@Override
+	public void acquireMastership(long dpid, MastershipCallback cb) throws Exception {
+		
+		if (mastershipId == null){
+			throw new RuntimeException("Must set mastershipId before calling aquireMastership");
+		}
+		
+		String dpidStr = HexString.toHexString(dpid);
+		String latchPath = switchLatchesPath + "/" + dpidStr;
+		
+		if (switchLatches.get(dpidStr) != null){
+			throw new RuntimeException("Leader election for switch " + dpidStr +
+					"is already running");
+		}
+		
+		LeaderLatch latch = new LeaderLatch(client, latchPath, mastershipId);
+		switchLatches.put(dpidStr, latch);
+		switchCallbacks.put(dpidStr, cb);
+		
+		try {
+			//client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
+			client.getChildren().usingWatcher(
+					new ParamaterizedCuratorWatcher(dpidStr, latchPath))
+					.inBackground().forPath(latchPath);
+			latch.start();
+		} catch (Exception e) {
+			log.warn("Error starting leader latch: {}", e.getMessage());
+			throw e;
+		}
+		
+	}
+
+	@Override
+	public void releaseMastership(long dpid) {
+		String dpidStr = HexString.toHexString(dpid);
+		
+		LeaderLatch latch = switchLatches.get(dpidStr);
+		if (latch == null) {
+			log.debug("Trying to release mastership for switch we are not contesting");
+			return;
+		}
+		
+		try {
+			latch.close();
+		} catch (IOException e) {
+			
+		}
+		
+		switchLatches.remove(dpidStr);
+		switchCallbacks.remove(dpidStr);
+	}
+
+	@Override
+	public boolean amMaster(long dpid) {
+		LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+		
+		if (latch == null) {
+			log.warn("No leader latch for dpid {}", HexString.toHexString(dpid));
+			return false;
+		}
+		
+		try {
+			return latch.getLeader().getId().equals(mastershipId);
+		} catch (Exception e) {
+			//TODO swallow exception?
+			return false;
+		}
+	}
+
+	@Override
+	public void setMastershipId(String id) {
+		mastershipId = id;
+	}
+
+	@Override
+	public String getMastershipId() {
+		return mastershipId;
+	}
+	
+	
+	/*
+	 * IFloodlightModule
+	 */
 	
 	@Override
 	public Collection<Class<? extends IFloodlightService>> getModuleServices() {
@@ -42,43 +204,78 @@
 	
 	@Override
 	public void init (FloodlightModuleContext context) throws FloodlightModuleException {
-		//TODO
+
+		try {
+			String localHostname = java.net.InetAddress.getLocalHost().getHostName();
+			mastershipId = localHostname;
+			log.debug("Setting mastership id to {}", mastershipId);
+		} catch (UnknownHostException e) {
+			// TODO Handle this exception
+			e.printStackTrace();
+		}
+
+		switchLatches = new HashMap<String, LeaderLatch>();
+		switchCallbacks = new HashMap<String, MastershipCallback>();
+		
+		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+		client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
+		
+		client.start();
+		
+		client = client.usingNamespace(namespace);
+		
 		return;
 	}
 	
 	@Override
 	public void startUp (FloodlightModuleContext context) {
-		//TODO
-		return;
+		// Nothing to be done on startup
 	}
-
-	@Override
-	public void acquireMastership(long dpid, boolean blockOk) {
-		// TODO Auto-generated method stub
+	
+	public static void main(String args[]){
+		FloodlightModuleContext fmc = new FloodlightModuleContext();
+		MastershipManager mm = new MastershipManager();
 		
-	}
-
-	@Override
-	public void releaseMastership(long dpid) {
-		// TODO Auto-generated method stub
+		String id = null;
+		if (args.length > 0){
+			id = args[0];
+			log.info("Using unique id: {}", id);
+		}
 		
-	}
-
-	@Override
-	public boolean amMaster(long dpid) {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public void setMastershipId(String id) {
-		// TODO Auto-generated method stub
+		try {
+			mm.init(fmc);
+			mm.startUp(fmc);
+			
+			if (id != null){
+				mm.setMastershipId(id);
+			}
+				
+			mm.acquireMastership(1L, 
+				new MastershipCallback(){
+					@Override
+					public void changeCallback(long dpid, boolean isMaster) {
+						if (isMaster){
+							log.debug("Callback for becoming master for {}", HexString.toHexString(dpid));
+						}
+						else {
+							log.debug("Callback for losing mastership for {}", HexString.toHexString(dpid));
+						}
+					}
+				});
+			
+			//"Server" loop
+			while (true) {
+				Thread.sleep(60000);
+			}
+			
+		} catch (FloodlightModuleException e) {
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
 		
-	}
-
-	@Override
-	public String getMastershipId() {
-		// TODO Auto-generated method stub
-		return null;
+		log.debug("is master: {}", mm.amMaster(1L));
 	}
 }