Merging in the mastership implementation
diff --git a/build.xml b/build.xml
index 2bd1343..4aa99bf 100644
--- a/build.xml
+++ b/build.xml
@@ -41,6 +41,7 @@
<property name="main-class" value="net.floodlightcontroller.core.Main"/>
<property name="floodlight-jar" location="${target}/floodlight.jar"/>
<property name="floodlight-test-jar" location="${target}/floodlight-test.jar"/>
+ <property name="floodlight-only-jar" location="${target}/floodlight-only.jar"/>
<property name="thrift.dir" value="${basedir}/src/main/thrift"/>
<property name="thrift.out.dir" value="lib/gen-java"/>
<property name="thrift.package" value="net/floodlightcontroller/packetstreamer/thrift"/>
@@ -65,6 +66,9 @@
<include name="concurrentlinkedhashmap-lru-1.3.jar"/>
<include name="jython-2.5.2.jar"/>
<include name="libthrift-0.7.0.jar"/>
+ <include name="curator-client-1.3.1.jar"/>
+ <include name="curator-framework-1.3.1.jar"/>
+ <include name="curator-recipes-1.3.1.jar"/>
</patternset>
<patternset id="titanlib">
@@ -224,6 +228,16 @@
</target>
<target name="coverage" depends="instrument,test,coverage-report"/>
+ <target name="jar" depends="compile">
+ <jar destfile="${floodlight-only-jar}">
+ <fileset dir="${build}"/>
+ <fileset dir="${resources}"/>
+ <fileset dir="${python-src}">
+ <include name="**/*.py"/>
+ </fileset>
+ </jar>
+ </target>
+
<target name="dist" depends="compile,compile-test">
<jar destfile="${floodlight-jar}" filesetmanifest="mergewithoutmain">
<manifest>
diff --git a/mastership-test.sh b/mastership-test.sh
new file mode 100755
index 0000000..06189df
--- /dev/null
+++ b/mastership-test.sh
@@ -0,0 +1 @@
+java -Dlogback.configurationFile=logback.xml -cp target/floodlight-only.jar:lib/*:lib/titan/* net.floodlightcontroller.mastership.MastershipManager $1
diff --git a/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java b/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
index 322210e..83800c3 100644
--- a/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
+++ b/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
@@ -4,8 +4,14 @@
public interface IMastershipService extends IFloodlightService {
+ // Callback for all mastership changes.
+ // Change callback is called when mastership is acquired or released
+ public interface MastershipCallback {
+ public void changeCallback(long dpid, boolean isMaster);
+ }
+
// Acquire mastership for a switch.
- public void acquireMastership(long dpid, boolean blockOk);
+ public void acquireMastership(long dpid, MastershipCallback cb) throws Exception;
// Release mastership for a switch
public void releaseMastership(long dpid);
diff --git a/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java b/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
index 2470b12..46b533e 100644
--- a/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
+++ b/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
@@ -1,23 +1,185 @@
package net.floodlightcontroller.mastership;
+import java.io.IOException;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.mastership.IMastershipService;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.openflow.util.HexString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.curator.RetryPolicy;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.framework.api.CuratorWatcher;
+import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+import com.netflix.curator.framework.recipes.leader.Participant;
+import com.netflix.curator.retry.ExponentialBackoffRetry;
public class MastershipManager implements IFloodlightModule, IMastershipService {
protected static Logger log = LoggerFactory.getLogger(MastershipManager.class);
- protected String mastershipId;
+ protected String mastershipId = null;
+
+ //TODO read this from configuration
+ protected String connectionString = "localhost:2181";
+ private final String namespace = "onos";
+ private final String switchLatchesPath = "/switchmastership";
+
+ protected CuratorFramework client;
+
+ protected Map<String, LeaderLatch> switchLatches;
+ protected Map<String, MastershipCallback> switchCallbacks;
+
+ protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
+ private String dpid;
+ private boolean isLeader = false;
+ private String latchPath;
+
+ public ParamaterizedCuratorWatcher(String dpid, String latchPath){
+ this.dpid = dpid;
+ this.latchPath = latchPath;
+ }
+
+ @Override
+ public void process(WatchedEvent event) throws Exception {
+ log.debug("Watch Event: {}", event);
+
+ LeaderLatch latch = switchLatches.get(dpid);
+
+ if (event.getState() == KeeperState.Disconnected){
+ if (isLeader) {
+ log.debug("Disconnected while leader - lost leadership for {}", dpid);
+
+ isLeader = false;
+ switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), false);
+ }
+ return;
+ }
+
+ try {
+ Participant leader = latch.getLeader();
+
+ if (leader.getId().equals(mastershipId) && !isLeader){
+ log.debug("Became leader for {}", dpid);
+
+ isLeader = true;
+ switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), true);
+ }
+ else if (!leader.getId().equals(mastershipId) && isLeader){
+ log.debug("Lost leadership for {}", dpid);
+
+ isLeader = false;
+ switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), false);
+ }
+ } catch (Exception e){
+ if (isLeader){
+ log.debug("Exception checking leadership status. Assume leadship lost for {}",
+ dpid);
+
+ isLeader = false;
+ switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), false);
+ }
+ }
+
+ client.getChildren().usingWatcher(this).inBackground().forPath(latchPath);
+ //client.getChildren().usingWatcher(this).forPath(latchPath);
+ }
+ }
+
+ @Override
+ public void acquireMastership(long dpid, MastershipCallback cb) throws Exception {
+
+ if (mastershipId == null){
+ throw new RuntimeException("Must set mastershipId before calling aquireMastership");
+ }
+
+ String dpidStr = HexString.toHexString(dpid);
+ String latchPath = switchLatchesPath + "/" + dpidStr;
+
+ if (switchLatches.get(dpidStr) != null){
+ throw new RuntimeException("Leader election for switch " + dpidStr +
+ "is already running");
+ }
+
+ LeaderLatch latch = new LeaderLatch(client, latchPath, mastershipId);
+ switchLatches.put(dpidStr, latch);
+ switchCallbacks.put(dpidStr, cb);
+
+ try {
+ //client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
+ client.getChildren().usingWatcher(
+ new ParamaterizedCuratorWatcher(dpidStr, latchPath))
+ .inBackground().forPath(latchPath);
+ latch.start();
+ } catch (Exception e) {
+ log.warn("Error starting leader latch: {}", e.getMessage());
+ throw e;
+ }
+
+ }
+
+ @Override
+ public void releaseMastership(long dpid) {
+ String dpidStr = HexString.toHexString(dpid);
+
+ LeaderLatch latch = switchLatches.get(dpidStr);
+ if (latch == null) {
+ log.debug("Trying to release mastership for switch we are not contesting");
+ return;
+ }
+
+ try {
+ latch.close();
+ } catch (IOException e) {
+
+ }
+
+ switchLatches.remove(dpidStr);
+ switchCallbacks.remove(dpidStr);
+ }
+
+ @Override
+ public boolean amMaster(long dpid) {
+ LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+
+ if (latch == null) {
+ log.warn("No leader latch for dpid {}", HexString.toHexString(dpid));
+ return false;
+ }
+
+ try {
+ return latch.getLeader().getId().equals(mastershipId);
+ } catch (Exception e) {
+ //TODO swallow exception?
+ return false;
+ }
+ }
+
+ @Override
+ public void setMastershipId(String id) {
+ mastershipId = id;
+ }
+
+ @Override
+ public String getMastershipId() {
+ return mastershipId;
+ }
+
+
+ /*
+ * IFloodlightModule
+ */
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
@@ -42,43 +204,78 @@
@Override
public void init (FloodlightModuleContext context) throws FloodlightModuleException {
- //TODO
+
+ try {
+ String localHostname = java.net.InetAddress.getLocalHost().getHostName();
+ mastershipId = localHostname;
+ log.debug("Setting mastership id to {}", mastershipId);
+ } catch (UnknownHostException e) {
+ // TODO Handle this exception
+ e.printStackTrace();
+ }
+
+ switchLatches = new HashMap<String, LeaderLatch>();
+ switchCallbacks = new HashMap<String, MastershipCallback>();
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
+
+ client.start();
+
+ client = client.usingNamespace(namespace);
+
return;
}
@Override
public void startUp (FloodlightModuleContext context) {
- //TODO
- return;
+ // Nothing to be done on startup
}
-
- @Override
- public void acquireMastership(long dpid, boolean blockOk) {
- // TODO Auto-generated method stub
+
+ public static void main(String args[]){
+ FloodlightModuleContext fmc = new FloodlightModuleContext();
+ MastershipManager mm = new MastershipManager();
- }
-
- @Override
- public void releaseMastership(long dpid) {
- // TODO Auto-generated method stub
+ String id = null;
+ if (args.length > 0){
+ id = args[0];
+ log.info("Using unique id: {}", id);
+ }
- }
-
- @Override
- public boolean amMaster(long dpid) {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public void setMastershipId(String id) {
- // TODO Auto-generated method stub
+ try {
+ mm.init(fmc);
+ mm.startUp(fmc);
+
+ if (id != null){
+ mm.setMastershipId(id);
+ }
+
+ mm.acquireMastership(1L,
+ new MastershipCallback(){
+ @Override
+ public void changeCallback(long dpid, boolean isMaster) {
+ if (isMaster){
+ log.debug("Callback for becoming master for {}", HexString.toHexString(dpid));
+ }
+ else {
+ log.debug("Callback for losing mastership for {}", HexString.toHexString(dpid));
+ }
+ }
+ });
+
+ //"Server" loop
+ while (true) {
+ Thread.sleep(60000);
+ }
+
+ } catch (FloodlightModuleException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
- }
-
- @Override
- public String getMastershipId() {
- // TODO Auto-generated method stub
- return null;
+ log.debug("is master: {}", mm.amMaster(1L));
}
}