Re-architected ZookeeperRegistry to use LeaderLatch change notifications and to improve thread safety
diff --git a/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
new file mode 100644
index 0000000..eb513fd
--- /dev/null
+++ b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
@@ -0,0 +1,25 @@
+package net.onrc.onos.registry.controller;
+
+import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
+
+import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+
+public class SwitchLeadershipData {
+
+ private LeaderLatch latch;
+ private ControlChangeCallback cb;
+
+ public SwitchLeadershipData(LeaderLatch latch, ControlChangeCallback cb) {
+ this.latch = latch;
+ this.cb = cb;
+ }
+
+ public LeaderLatch getLatch(){
+ return latch;
+ }
+
+ public ControlChangeCallback getCallback(){
+ return cb;
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index c9ed160..3120d20 100644
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -8,6 +8,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
@@ -16,8 +17,6 @@
import net.floodlightcontroller.restserver.IRestApiService;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,7 +25,6 @@
import com.netflix.curator.RetryPolicy;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.framework.api.CuratorWatcher;
import com.netflix.curator.framework.recipes.cache.ChildData;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache.StartMode;
@@ -63,8 +61,9 @@
protected PathChildrenCache controllerCache;
protected PathChildrenCache switchCache;
- protected Map<String, LeaderLatch> switchLatches;
- protected Map<String, ControlChangeCallback> switchCallbacks;
+ protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
+ //protected Map<String, LeaderLatch> switchLatches;
+ //protected Map<String, ControlChangeCallback> switchCallbacks;
protected Map<String, PathChildrenCache> switchPathCaches;
//Zookeeper performance-related configuration
@@ -84,6 +83,7 @@
* of a safer way to implement leader change notifications.
*
*/
+ /*
protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
private String dpid;
private boolean isLeader = false;
@@ -151,12 +151,13 @@
//client.getChildren().usingWatcher(this).forPath(latchPath);
}
}
+ */
- protected class ParamaterizedLeaderListener implements LeaderLatchListener{
+ protected class SwitchLeaderListener implements LeaderLatchListener{
String dpid;
LeaderLatch latch;
- public ParamaterizedLeaderListener(String dpid, LeaderLatch latch){
+ public SwitchLeaderListener(String dpid, LeaderLatch latch){
this.dpid = dpid;
this.latch = latch;
}
@@ -164,9 +165,20 @@
@Override
public void leaderLatchEvent(CuratorFramework arg0,
LeaderLatchEvent arg1) {
- log.debug("Got leader latch event for {} hasLeadership {}",
+ log.debug("Leadership changed for {}, now {}",
dpid, latch.hasLeadership());
+ //Check that the leadership request is still active - the client
+ //may have since released the request or even begun another request
+ //(this is why we use == to check the object instance is the same)
+ SwitchLeadershipData swData = switches.get(dpid);
+ if (swData != null && swData.getLatch() == latch){
+ swData.getCallback().controlChanged(
+ HexString.toLong(dpid), latch.hasLeadership());
+ }
+ else {
+ log.debug("Latch for {} has changed", dpid);
+ }
}
}
@@ -227,7 +239,8 @@
String dpidStr = HexString.toHexString(dpid);
String latchPath = switchLatchesPath + "/" + dpidStr;
- if (switchLatches.get(dpidStr) != null){
+ //if (switchLatches.get(dpidStr) != null){
+ if (switches.get(dpidStr) != null){
//throw new RuntimeException("Leader election for switch " + dpidStr +
// "is already running");
log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
@@ -235,11 +248,24 @@
}
LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
- switchLatches.put(dpidStr, latch);
- switchCallbacks.put(dpidStr, cb);
+ latch.addListener(new SwitchLeaderListener(dpidStr, latch));
- latch.addListener(new ParamaterizedLeaderListener(dpidStr, latch));
+ //switchLatches.put(dpidStr, latch);
+ //switchCallbacks.put(dpidStr, cb);
+ //Don't put the entry in the map if we can't start the latch
+ SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb);
+ SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
+
+ if (oldData != null){
+ //There was already data for that key in the map
+ //i.e. someone else got here first so we can't succeed
+ log.debug("Already requested control for {}", dpidStr);
+ throw new RegistryException("Already requested control for " + dpidStr);
+ }
+
+ //Now that we know we were able to add our latch to the collection,
+ //we can start the leader election in Zookeeper
try {
//client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
//client.getChildren().usingWatcher(
@@ -259,11 +285,16 @@
String dpidStr = HexString.toHexString(dpid);
- LeaderLatch latch = switchLatches.get(dpidStr);
- if (latch == null) {
+ SwitchLeadershipData swData = switches.remove(dpidStr);
+
+ if (swData == null) {
log.debug("Trying to release control of a switch we are not contesting");
return;
}
+
+ //LeaderLatch latch = switches.get(dpidStr).getLatch();
+ //LeaderLatch latch = switches.remove(dpidStr).getLatch();
+ LeaderLatch latch = swData.getLatch();
try {
latch.close();
@@ -271,27 +302,33 @@
//I think it's OK not to do anything here. Either the node got
//deleted correctly, or the connection went down and the node got deleted.
} finally {
- switchLatches.remove(dpidStr);
- switchCallbacks.remove(dpidStr);
+ //switches.remove(dpidStr);
+ //switchLatches.remove(dpidStr);
+ //switchCallbacks.remove(dpidStr);
}
}
@Override
public boolean hasControl(long dpid) {
+ //LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+ String dpidStr = HexString.toHexString(dpid);
- LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+ SwitchLeadershipData swData = switches.get(dpidStr);
- if (latch == null) {
- log.warn("No leader latch for dpid {}", HexString.toHexString(dpid));
+ if (swData == null) {
+ log.warn("No leader latch for dpid {}", dpidStr);
return false;
}
- try {
- return latch.getLeader().getId().equals(controllerId);
- } catch (Exception e) {
+ return swData.getLatch().hasLeadership();
+
+ //try {
+ //return latch.getLeader().getId().equals(controllerId);
+
+ //} catch (Exception e) {
//TODO swallow exception?
- return false;
- }
+ //return false;
+ //}
}
@Override
@@ -344,12 +381,11 @@
@Override
public String getControllerForSwitch(long dpid) throws RegistryException {
- // TODO Work out how we should store this controller/switch data.
- // The leader latch might be a index to the /controllers collections
- // which holds more info on the controller (how to talk to it for example).
+ // TODO work out synchronization
- String strDpid = HexString.toHexString(dpid);
- LeaderLatch latch = switchLatches.get(strDpid);
+ String dpidStr = HexString.toHexString(dpid);
+ //LeaderLatch latch = switchLatches.get(strDpid);
+ LeaderLatch latch = switches.get(dpidStr).getLatch();
if (latch == null){
log.warn("Tried to get controller for non-existent switch");
@@ -373,6 +409,9 @@
}
+ //TODO what should happen when there's no ZK connection? Currently we just return
+ //the cache but this may lead to false impressions - i.e. we don't actually know
+ //what's in ZK so we shouldn't say we do
@Override
public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
Map<String, List<ControllerRegistryEntry>> data =
@@ -431,6 +470,8 @@
return l;
}
+ //TODO currently blocks startup when it can't get a Zookeeper connection.
+ //Do we support starting up with no Zookeeper connection?
@Override
public void init (FloodlightModuleContext context) throws FloodlightModuleException {
log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
@@ -445,8 +486,10 @@
restApi = context.getServiceImpl(IRestApiService.class);
- switchLatches = new HashMap<String, LeaderLatch>();
- switchCallbacks = new HashMap<String, ControlChangeCallback>();
+ switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
+
+ //switchLatches = new HashMap<String, LeaderLatch>();
+ //switchCallbacks = new HashMap<String, ControlChangeCallback>();
switchPathCaches = new HashMap<String, PathChildrenCache>();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);