Began implementation of per-switch leader election using Curator
diff --git a/build.xml b/build.xml
index 2bd1343..a0fd756 100644
--- a/build.xml
+++ b/build.xml
@@ -65,6 +65,9 @@
<include name="concurrentlinkedhashmap-lru-1.3.jar"/>
<include name="jython-2.5.2.jar"/>
<include name="libthrift-0.7.0.jar"/>
+ <include name="curator-client-1.3.1.jar"/>
+ <include name="curator-framework-1.3.1.jar"/>
+ <include name="curator-recipes-1.3.1.jar"/>
</patternset>
<patternset id="titanlib">
diff --git a/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java b/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
index 322210e..b3ddda5 100644
--- a/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
+++ b/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
@@ -4,8 +4,14 @@
public interface IMastershipService extends IFloodlightService {
+ // Callback for all mastership changes.
+ // Change callback is called when mastership is acquired or released
+ public interface MastershipCallback {
+ public void changeCallback(long dpid, boolean isMaster);
+ }
+
// Acquire mastership for a switch.
- public void acquireMastership(long dpid, boolean blockOk);
+ public void acquireMastership(long dpid, MastershipCallback cb);
// Release mastership for a switch
public void releaseMastership(long dpid);
diff --git a/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java b/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
index 2470b12..6c3c178 100644
--- a/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
+++ b/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
@@ -1,24 +1,185 @@
package net.floodlightcontroller.mastership;
+import java.io.IOException;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.mastership.IMastershipService;
-public class MastershipManager implements IFloodlightModule, IMastershipService {
+import org.apache.zookeeper.WatchedEvent;
+import org.openflow.util.HexString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.netflix.curator.RetryPolicy;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.framework.api.CuratorWatcher;
+import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+import com.netflix.curator.framework.recipes.leader.Participant;
+import com.netflix.curator.framework.state.ConnectionState;
+import com.netflix.curator.framework.state.ConnectionStateListener;
+import com.netflix.curator.retry.ExponentialBackoffRetry;
+
+public class MastershipManager implements IFloodlightModule, IMastershipService,
+ ConnectionStateListener {
protected static Logger log = LoggerFactory.getLogger(MastershipManager.class);
protected String mastershipId;
+ //TODO read this from configuration
+ protected String connectionString = "localhost:2181";
+ private final String namespace = "onos";
+ private final String switchLatchesPath = "/switchmastership";
+
+ protected CuratorFramework client;
+
+ protected Map<String, LeaderLatch> switchLatches;
+ protected Map<String, MastershipCallback> switchCallbacks;
+
+ //protected final String singleLatchPath = switchLatchesPath + "/00:00:00:00:00:00:00:01";
+ //protected LeaderLatch singleLatch;
+
+
+ protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
+ private String dpid;
+ private String latchPath;
+
+ public ParamaterizedCuratorWatcher(String dpid, String latchPath){
+ this.dpid = dpid;
+ this.latchPath = latchPath;
+ }
+
+ @Override
+ public void process(WatchedEvent event) throws Exception {
+ log.debug("Watch Event: {}", event);
+
+ LeaderLatch latch = switchLatches.get(dpid);
+
+ Participant leader = latch.getLeader();
+
+ log.debug("Leader for {} changed. Now {}", dpid, leader.getId());
+
+ if (leader.getId().equals(mastershipId)){
+ switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), true);
+ }
+
+ client.getChildren().usingWatcher(this).inBackground().forPath(latchPath);
+ //client.getChildren().usingWatcher(this).forPath(latchPath);
+ }
+ };
+
+ @Override
+ public void acquireMastership(long dpid, MastershipCallback cb) {
+ //TODO check if there's already a latch in the list
+ //TODO throw exception if unique ID not set
+ //TODO use unique ID
+ if (cb == null) {
+ //TODO throw exception?
+ }
+
+ String dpidStr = HexString.toHexString(dpid);
+ String latchPath = switchLatchesPath + "/" + dpidStr;
+
+ LeaderLatch latch = new LeaderLatch(client, latchPath, mastershipId);
+ switchLatches.put(dpidStr, latch);
+ //if (switchCallbacks.put(dpidStr, cb) != cb){
+ //NOTE instance equality intended
+ // log.debug("Throwing out old callback for switch {}: {}", dpidStr, cb);
+ //}
+ switchCallbacks.put(dpidStr, cb);
+
+ try {
+ //client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
+ client.getChildren().usingWatcher(
+ new ParamaterizedCuratorWatcher(dpidStr, latchPath))
+ .inBackground().forPath(latchPath);
+ latch.start();
+ } catch (Exception e) {
+ log.debug("WATCHER ERROROROROR");
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public void releaseMastership(long dpid) {
+ // TODO Auto-generated method stub
+ LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+ if (latch == null) {
+ return;
+ }
+
+ try {
+ latch.close();
+ } catch (IOException e) {
+
+ }
+ }
+
+ @Override
+ public boolean amMaster(long dpid) {
+ LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+
+ if (latch == null) {
+ log.warn("No leader latch for dpid {}", HexString.toHexString(dpid));
+ return false;
+ }
+
+ //return latch.hasLeadership();
+ try {
+ return latch.getLeader().getId().equals(mastershipId);
+ } catch (Exception e) {
+ //TODO swallow exception?
+ return false;
+ }
+ }
+
+ @Override
+ public void setMastershipId(String id) {
+ // TODO Synchronisation?
+ mastershipId = id;
+ }
+
+ @Override
+ public String getMastershipId() {
+ // TODO Synchronisation?
+ return mastershipId;
+ }
+
+ /*
+ * ConnectionStateListener
+ */
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState state) {
+ // TODO Auto-generated method stub
+ log.debug("Connection state change: {}", state);
+
+ switch (state){
+ case SUSPENDED:
+ case LOST:
+ //If we're suspended or lost, we can no longer assume mastership
+ for (Map.Entry<String, MastershipCallback> entry : switchCallbacks.entrySet()){
+ entry.getValue().changeCallback(HexString.toLong(entry.getKey()), false);
+ }
+ break;
+ default:
+ //CONNECTED or RECONNECTED
+ //LeaderLatches should automatically try and re-gain the leadership
+ break;
+ }
+ }
+
+ /*
+ * IFloodlightModule
+ */
+
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Collection<Class<? extends IFloodlightService>> l = new ArrayList<Class<? extends IFloodlightService>>();
@@ -40,9 +201,29 @@
return null;
}
+ //be ready to serve mastership requests by startup?
@Override
public void init (FloodlightModuleContext context) throws FloodlightModuleException {
//TODO
+ try {
+ String localHostname = java.net.InetAddress.getLocalHost().getHostName();
+ mastershipId = localHostname;
+ log.debug("Setting mastership id to {}", mastershipId);
+ } catch (UnknownHostException e) {
+ // TODO Handle this exception
+ e.printStackTrace();
+ }
+
+ switchLatches = new HashMap<String, LeaderLatch>();
+ switchCallbacks = new HashMap<String, MastershipCallback>();
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
+
+ client.start();
+
+ client = client.usingNamespace(namespace);
+
return;
}
@@ -51,34 +232,49 @@
//TODO
return;
}
-
- @Override
- public void acquireMastership(long dpid, boolean blockOk) {
- // TODO Auto-generated method stub
+
+ public static void main(String args[]){
+ FloodlightModuleContext fmc = new FloodlightModuleContext();
+ MastershipManager mm = new MastershipManager();
- }
-
- @Override
- public void releaseMastership(long dpid) {
- // TODO Auto-generated method stub
+ if (args.length < 1){
+ log.error("Need to supply ID");
+ System.exit(1);
+ }
+ String id = args[0];
- }
-
- @Override
- public boolean amMaster(long dpid) {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public void setMastershipId(String id) {
- // TODO Auto-generated method stub
+ try {
+ mm.init(fmc);
+ mm.startUp(fmc);
+
+ mm.setMastershipId(id);
+
+ mm.acquireMastership(1L,
+ new MastershipCallback(){
+ @Override
+ public void changeCallback(long dpid, boolean isMaster) {
+ if (isMaster){
+ log.debug("I'm master for {}", HexString.toHexString(dpid));
+ }
+ else {
+ log.debug("NOT master for {}", HexString.toHexString(dpid));
+ }
+ }
+ });
+
+ //"Server" loop
+ while (true) {
+ Thread.sleep(60000);
+ }
+
+ } catch (FloodlightModuleException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
- }
-
- @Override
- public String getMastershipId() {
- // TODO Auto-generated method stub
- return null;
+ log.debug("is master: {}", mm.amMaster(1L));
}
}
diff --git a/src/test/java/net/floodlightcontroller/mastership/MastershipManagerTest.java b/src/test/java/net/floodlightcontroller/mastership/MastershipManagerTest.java
new file mode 100644
index 0000000..f29f6e7
--- /dev/null
+++ b/src/test/java/net/floodlightcontroller/mastership/MastershipManagerTest.java
@@ -0,0 +1,49 @@
+package net.floodlightcontroller.mastership;
+
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.mastership.IMastershipService.MastershipCallback;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MastershipManagerTest {
+ protected static Logger log = LoggerFactory.getLogger(MastershipManagerTest.class);
+ private MastershipManager mm;
+
+ @Before
+ public void setUp() throws Exception{
+ //MockFloodlightProvider fp = new MockFloodlightProvider();
+ FloodlightModuleContext fmc = new FloodlightModuleContext();
+
+ mm = new MastershipManager();
+
+ mm.init(fmc);
+ mm.startUp(fmc);
+
+ }
+
+ @Test
+ public void testAcquireMastership(){
+ MastershipCallback cb = new MastershipCallback(){
+ @Override
+ public void changeCallback(long dpid, boolean isMaster) {
+ log.info("Callback called!");
+ }
+ };
+
+ long dpid = 1L;
+
+ mm.acquireMastership(dpid, cb);
+
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ log.debug("Is master? {}", mm.amMaster(dpid));
+ }
+}