Implemented notifications for mastership changes
diff --git a/mastership-test.sh b/mastership-test.sh
new file mode 100755
index 0000000..06189df
--- /dev/null
+++ b/mastership-test.sh
@@ -0,0 +1 @@
+java -Dlogback.configurationFile=logback.xml -cp target/floodlight-only.jar:lib/*:lib/titan/* net.floodlightcontroller.mastership.MastershipManager $1
diff --git a/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java b/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
index b3ddda5..83800c3 100644
--- a/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
+++ b/src/main/java/net/floodlightcontroller/mastership/IMastershipService.java
@@ -11,7 +11,7 @@
}
// Acquire mastership for a switch.
- public void acquireMastership(long dpid, MastershipCallback cb);
+ public void acquireMastership(long dpid, MastershipCallback cb) throws Exception;
// Release mastership for a switch
public void releaseMastership(long dpid);
diff --git a/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java b/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
index 6c3c178..46b533e 100644
--- a/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
+++ b/src/main/java/net/floodlightcontroller/mastership/MastershipManager.java
@@ -13,6 +13,7 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,14 +24,12 @@
import com.netflix.curator.framework.api.CuratorWatcher;
import com.netflix.curator.framework.recipes.leader.LeaderLatch;
import com.netflix.curator.framework.recipes.leader.Participant;
-import com.netflix.curator.framework.state.ConnectionState;
-import com.netflix.curator.framework.state.ConnectionStateListener;
import com.netflix.curator.retry.ExponentialBackoffRetry;
-public class MastershipManager implements IFloodlightModule, IMastershipService,
- ConnectionStateListener {
+public class MastershipManager implements IFloodlightModule, IMastershipService {
+
protected static Logger log = LoggerFactory.getLogger(MastershipManager.class);
- protected String mastershipId;
+ protected String mastershipId = null;
//TODO read this from configuration
protected String connectionString = "localhost:2181";
@@ -42,12 +41,9 @@
protected Map<String, LeaderLatch> switchLatches;
protected Map<String, MastershipCallback> switchCallbacks;
- //protected final String singleLatchPath = switchLatchesPath + "/00:00:00:00:00:00:00:01";
- //protected LeaderLatch singleLatch;
-
-
protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
private String dpid;
+ private boolean isLeader = false;
private String latchPath;
public ParamaterizedCuratorWatcher(String dpid, String latchPath){
@@ -61,37 +57,63 @@
LeaderLatch latch = switchLatches.get(dpid);
- Participant leader = latch.getLeader();
+ if (event.getState() == KeeperState.Disconnected){
+ if (isLeader) {
+ log.debug("Disconnected while leader - lost leadership for {}", dpid);
+
+ isLeader = false;
+ switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), false);
+ }
+ return;
+ }
- log.debug("Leader for {} changed. Now {}", dpid, leader.getId());
-
- if (leader.getId().equals(mastershipId)){
- switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), true);
+ try {
+ Participant leader = latch.getLeader();
+
+ if (leader.getId().equals(mastershipId) && !isLeader){
+ log.debug("Became leader for {}", dpid);
+
+ isLeader = true;
+ switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), true);
+ }
+ else if (!leader.getId().equals(mastershipId) && isLeader){
+ log.debug("Lost leadership for {}", dpid);
+
+ isLeader = false;
+ switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), false);
+ }
+ } catch (Exception e){
+ if (isLeader){
+ log.debug("Exception checking leadership status. Assume leadship lost for {}",
+ dpid);
+
+ isLeader = false;
+ switchCallbacks.get(dpid).changeCallback(HexString.toLong(dpid), false);
+ }
}
client.getChildren().usingWatcher(this).inBackground().forPath(latchPath);
//client.getChildren().usingWatcher(this).forPath(latchPath);
}
- };
+ }
@Override
- public void acquireMastership(long dpid, MastershipCallback cb) {
- //TODO check if there's already a latch in the list
- //TODO throw exception if unique ID not set
- //TODO use unique ID
- if (cb == null) {
- //TODO throw exception?
+ public void acquireMastership(long dpid, MastershipCallback cb) throws Exception {
+
+ if (mastershipId == null){
+ throw new RuntimeException("Must set mastershipId before calling aquireMastership");
}
String dpidStr = HexString.toHexString(dpid);
String latchPath = switchLatchesPath + "/" + dpidStr;
+ if (switchLatches.get(dpidStr) != null){
+ throw new RuntimeException("Leader election for switch " + dpidStr +
+ "is already running");
+ }
+
LeaderLatch latch = new LeaderLatch(client, latchPath, mastershipId);
switchLatches.put(dpidStr, latch);
- //if (switchCallbacks.put(dpidStr, cb) != cb){
- //NOTE instance equality intended
- // log.debug("Throwing out old callback for switch {}: {}", dpidStr, cb);
- //}
switchCallbacks.put(dpidStr, cb);
try {
@@ -101,17 +123,19 @@
.inBackground().forPath(latchPath);
latch.start();
} catch (Exception e) {
- log.debug("WATCHER ERROROROROR");
- e.printStackTrace();
+ log.warn("Error starting leader latch: {}", e.getMessage());
+ throw e;
}
}
@Override
public void releaseMastership(long dpid) {
- // TODO Auto-generated method stub
- LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+ String dpidStr = HexString.toHexString(dpid);
+
+ LeaderLatch latch = switchLatches.get(dpidStr);
if (latch == null) {
+ log.debug("Trying to release mastership for switch we are not contesting");
return;
}
@@ -120,6 +144,9 @@
} catch (IOException e) {
}
+
+ switchLatches.remove(dpidStr);
+ switchCallbacks.remove(dpidStr);
}
@Override
@@ -131,7 +158,6 @@
return false;
}
- //return latch.hasLeadership();
try {
return latch.getLeader().getId().equals(mastershipId);
} catch (Exception e) {
@@ -142,39 +168,14 @@
@Override
public void setMastershipId(String id) {
- // TODO Synchronisation?
mastershipId = id;
}
@Override
public String getMastershipId() {
- // TODO Synchronisation?
return mastershipId;
}
- /*
- * ConnectionStateListener
- */
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState state) {
- // TODO Auto-generated method stub
- log.debug("Connection state change: {}", state);
-
- switch (state){
- case SUSPENDED:
- case LOST:
- //If we're suspended or lost, we can no longer assume mastership
- for (Map.Entry<String, MastershipCallback> entry : switchCallbacks.entrySet()){
- entry.getValue().changeCallback(HexString.toLong(entry.getKey()), false);
- }
- break;
- default:
- //CONNECTED or RECONNECTED
- //LeaderLatches should automatically try and re-gain the leadership
- break;
- }
- }
/*
* IFloodlightModule
@@ -201,10 +202,9 @@
return null;
}
- //be ready to serve mastership requests by startup?
@Override
public void init (FloodlightModuleContext context) throws FloodlightModuleException {
- //TODO
+
try {
String localHostname = java.net.InetAddress.getLocalHost().getHostName();
mastershipId = localHostname;
@@ -229,35 +229,36 @@
@Override
public void startUp (FloodlightModuleContext context) {
- //TODO
- return;
+ // Nothing to be done on startup
}
public static void main(String args[]){
FloodlightModuleContext fmc = new FloodlightModuleContext();
MastershipManager mm = new MastershipManager();
- if (args.length < 1){
- log.error("Need to supply ID");
- System.exit(1);
+ String id = null;
+ if (args.length > 0){
+ id = args[0];
+ log.info("Using unique id: {}", id);
}
- String id = args[0];
try {
mm.init(fmc);
mm.startUp(fmc);
- mm.setMastershipId(id);
-
+ if (id != null){
+ mm.setMastershipId(id);
+ }
+
mm.acquireMastership(1L,
new MastershipCallback(){
@Override
public void changeCallback(long dpid, boolean isMaster) {
if (isMaster){
- log.debug("I'm master for {}", HexString.toHexString(dpid));
+ log.debug("Callback for becoming master for {}", HexString.toHexString(dpid));
}
else {
- log.debug("NOT master for {}", HexString.toHexString(dpid));
+ log.debug("Callback for losing mastership for {}", HexString.toHexString(dpid));
}
}
});
@@ -268,10 +269,10 @@
}
} catch (FloodlightModuleException e) {
- // TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (Exception e) {
e.printStackTrace();
}
diff --git a/src/test/java/net/floodlightcontroller/mastership/MastershipManagerTest.java b/src/test/java/net/floodlightcontroller/mastership/MastershipManagerTest.java
deleted file mode 100644
index f29f6e7..0000000
--- a/src/test/java/net/floodlightcontroller/mastership/MastershipManagerTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package net.floodlightcontroller.mastership;
-
-import net.floodlightcontroller.core.module.FloodlightModuleContext;
-import net.floodlightcontroller.mastership.IMastershipService.MastershipCallback;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MastershipManagerTest {
- protected static Logger log = LoggerFactory.getLogger(MastershipManagerTest.class);
- private MastershipManager mm;
-
- @Before
- public void setUp() throws Exception{
- //MockFloodlightProvider fp = new MockFloodlightProvider();
- FloodlightModuleContext fmc = new FloodlightModuleContext();
-
- mm = new MastershipManager();
-
- mm.init(fmc);
- mm.startUp(fmc);
-
- }
-
- @Test
- public void testAcquireMastership(){
- MastershipCallback cb = new MastershipCallback(){
- @Override
- public void changeCallback(long dpid, boolean isMaster) {
- log.info("Callback called!");
- }
- };
-
- long dpid = 1L;
-
- mm.acquireMastership(dpid, cb);
-
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- log.debug("Is master? {}", mm.amMaster(dpid));
- }
-}