Merge branch 'experimental' - Improved stability and thread safety of ZookeeperRegistry
diff --git a/build.xml b/build.xml
index e8055e7..1b36535 100644
--- a/build.xml
+++ b/build.xml
@@ -66,9 +66,9 @@
<include name="concurrentlinkedhashmap-lru-1.3.jar"/>
<include name="jython-2.5.2.jar"/>
<include name="libthrift-0.7.0.jar"/>
- <include name="curator-client-1.3.3.jar"/>
- <include name="curator-framework-1.3.3.jar"/>
- <include name="curator-recipes-1.3.3.jar"/>
+ <include name="curator-client-1.3.4-SNAPSHOT.jar"/>
+ <include name="curator-framework-1.3.4-SNAPSHOT.jar"/>
+ <include name="curator-recipes-1.3.4-SNAPSHOT.jar"/>
<include name="zookeeper-3.4.5.jar"/>
</patternset>
diff --git a/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
new file mode 100644
index 0000000..eb513fd
--- /dev/null
+++ b/src/main/java/net/onrc/onos/registry/controller/SwitchLeadershipData.java
@@ -0,0 +1,25 @@
+package net.onrc.onos.registry.controller;
+
+import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
+
+import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+
+public class SwitchLeadershipData {
+
+ private LeaderLatch latch;
+ private ControlChangeCallback cb;
+
+ public SwitchLeadershipData(LeaderLatch latch, ControlChangeCallback cb) {
+ this.latch = latch;
+ this.cb = cb;
+ }
+
+ public LeaderLatch getLatch(){
+ return latch;
+ }
+
+ public ControlChangeCallback getCallback(){
+ return cb;
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index 025bbfe..e727238 100644
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -8,6 +8,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
@@ -16,8 +17,6 @@
import net.floodlightcontroller.restserver.IRestApiService;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,13 +25,14 @@
import com.netflix.curator.RetryPolicy;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.framework.api.CuratorWatcher;
import com.netflix.curator.framework.recipes.cache.ChildData;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+import com.netflix.curator.framework.recipes.leader.LeaderLatchEvent;
+import com.netflix.curator.framework.recipes.leader.LeaderLatchListener;
import com.netflix.curator.framework.recipes.leader.Participant;
import com.netflix.curator.retry.ExponentialBackoffRetry;
@@ -61,92 +61,40 @@
protected PathChildrenCache controllerCache;
protected PathChildrenCache switchCache;
- protected Map<String, LeaderLatch> switchLatches;
- protected Map<String, ControlChangeCallback> switchCallbacks;
+ protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
protected Map<String, PathChildrenCache> switchPathCaches;
//Zookeeper performance-related configuration
protected static final int sessionTimeout = 2000;
protected static final int connectionTimeout = 4000;
- /**
- * Watches for changes in switch leadership election. The Curator
- * LeaderLatch doesn't notify us when leadership changes so we set a watch
- * on the election znodes to get leadership change events. The process
- * method will be called whenever the switches children change in
- * Zookeeper. We then have to work out whether to send a control-changed
- * event to our clients and reset the watch.
- *
- * TODO I think it's possible to miss events that happen while we're
- * processing the watch and before we've set a new watch. Need to think
- * of a safer way to implement leader change notifications.
- *
- */
- protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
- private String dpid;
- private boolean isLeader = false;
- private String latchPath;
+
+ protected class SwitchLeaderListener implements LeaderLatchListener{
+ String dpid;
+ LeaderLatch latch;
- public ParamaterizedCuratorWatcher(String dpid, String latchPath){
+ public SwitchLeaderListener(String dpid, LeaderLatch latch){
this.dpid = dpid;
- this.latchPath = latchPath;
+ this.latch = latch;
}
@Override
- public synchronized void process(WatchedEvent event) throws Exception {
- log.debug("Watch Event: {}", event);
-
+ public void leaderLatchEvent(CuratorFramework arg0,
+ LeaderLatchEvent arg1) {
+ log.debug("Leadership changed for {}, now {}",
+ dpid, latch.hasLeadership());
- if (event.getState() == KeeperState.Disconnected){
- if (isLeader) {
- log.debug("Disconnected while leader - lost leadership for {}", dpid);
-
- isLeader = false;
- ControlChangeCallback cb = switchCallbacks.get(dpid);
- if (cb != null) {
- //Allow callback to be null if the requester doesn't want a callback
- cb.controlChanged(HexString.toLong(dpid), false);
- }
- }
- return;
- //TODO Watcher is never reset once we reconnect to Zookeeper
+ //Check that the leadership request is still active - the client
+ //may have since released the request or even begun another request
+ //(this is why we use == to check the object instance is the same)
+ SwitchLeadershipData swData = switches.get(dpid);
+ if (swData != null && swData.getLatch() == latch){
+ swData.getCallback().controlChanged(
+ HexString.toLong(dpid), latch.hasLeadership());
}
-
- LeaderLatch latch = switchLatches.get(dpid);
- if (latch == null){
- log.debug("In watcher process, looks like control was released for {}",
- dpid);
- return;
+ else {
+ log.debug("Latch for {} has changed", dpid);
}
-
- try {
-
- Participant leader = latch.getLeader();
-
- if (leader.getId().equals(controllerId) && !isLeader){
- log.debug("Became leader for {}", dpid);
-
- isLeader = true;
- switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), true);
- }
- else if (!leader.getId().equals(controllerId) && isLeader){
- log.debug("Lost leadership for {}", dpid);
-
- isLeader = false;
- switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), false);
- }
- } catch (Exception e){
- if (isLeader){
- log.debug("Exception checking leadership status. Assume leadership lost for {}",
- dpid);
-
- isLeader = false;
- switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), false);
- }
- } finally {
- client.getChildren().usingWatcher(this).inBackground().forPath(latchPath);
- }
- //client.getChildren().usingWatcher(this).forPath(latchPath);
}
}
@@ -207,22 +155,33 @@
String dpidStr = HexString.toHexString(dpid);
String latchPath = switchLatchesPath + "/" + dpidStr;
- if (switchLatches.get(dpidStr) != null){
- //throw new RuntimeException("Leader election for switch " + dpidStr +
- // "is already running");
+ if (switches.get(dpidStr) != null){
log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
return;
}
LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
- switchLatches.put(dpidStr, latch);
- switchCallbacks.put(dpidStr, cb);
+ latch.addListener(new SwitchLeaderListener(dpidStr, latch));
+
+ SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb);
+ SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
+
+ if (oldData != null){
+ //There was already data for that key in the map
+ //i.e. someone else got here first so we can't succeed
+ log.debug("Already requested control for {}", dpidStr);
+ throw new RegistryException("Already requested control for " + dpidStr);
+ }
+
+ //Now that we know we were able to add our latch to the collection,
+ //we can start the leader election in Zookeeper. However I don't know
+ //how to handle if the start fails - the latch is already in our
+ //switches list.
+ //TODO seems like there's a Curator bug when latch.start is called when
+ //there's no Zookeeper connection which causes two znodes to be put in
+ //Zookeeper at the latch path when we reconnect to Zookeeper.
try {
- //client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
- client.getChildren().usingWatcher(
- new ParamaterizedCuratorWatcher(dpidStr, latchPath))
- .inBackground().forPath(latchPath);
latch.start();
} catch (Exception e) {
log.warn("Error starting leader latch: {}", e.getMessage());
@@ -237,39 +196,35 @@
String dpidStr = HexString.toHexString(dpid);
- LeaderLatch latch = switchLatches.get(dpidStr);
- if (latch == null) {
+ SwitchLeadershipData swData = switches.remove(dpidStr);
+
+ if (swData == null) {
log.debug("Trying to release control of a switch we are not contesting");
return;
}
+
+ LeaderLatch latch = swData.getLatch();
try {
latch.close();
} catch (IOException e) {
//I think it's OK not to do anything here. Either the node got
//deleted correctly, or the connection went down and the node got deleted.
- } finally {
- switchLatches.remove(dpidStr);
- switchCallbacks.remove(dpidStr);
}
}
@Override
public boolean hasControl(long dpid) {
+ String dpidStr = HexString.toHexString(dpid);
- LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+ SwitchLeadershipData swData = switches.get(dpidStr);
- if (latch == null) {
- log.warn("No leader latch for dpid {}", HexString.toHexString(dpid));
+ if (swData == null) {
+ log.warn("No leader latch for dpid {}", dpidStr);
return false;
}
- try {
- return latch.getLeader().getId().equals(controllerId);
- } catch (Exception e) {
- //TODO swallow exception?
- return false;
- }
+ return swData.getLatch().hasLeadership();
}
@Override
@@ -322,12 +277,11 @@
@Override
public String getControllerForSwitch(long dpid) throws RegistryException {
- // TODO Work out how we should store this controller/switch data.
- // The leader latch might be a index to the /controllers collections
- // which holds more info on the controller (how to talk to it for example).
+ // TODO work out synchronization
- String strDpid = HexString.toHexString(dpid);
- LeaderLatch latch = switchLatches.get(strDpid);
+ String dpidStr = HexString.toHexString(dpid);
+
+ LeaderLatch latch = switches.get(dpidStr).getLatch();
if (latch == null){
log.warn("Tried to get controller for non-existent switch");
@@ -351,6 +305,9 @@
}
+ //TODO what should happen when there's no ZK connection? Currently we just return
+ //the cache but this may lead to false impressions - i.e. we don't actually know
+ //what's in ZK so we shouldn't say we do
@Override
public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
Map<String, List<ControllerRegistryEntry>> data =
@@ -409,6 +366,8 @@
return l;
}
+ //TODO currently blocks startup when it can't get a Zookeeper connection.
+ //Do we support starting up with no Zookeeper connection?
@Override
public void init (FloodlightModuleContext context) throws FloodlightModuleException {
log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
@@ -423,8 +382,7 @@
restApi = context.getServiceImpl(IRestApiService.class);
- switchLatches = new HashMap<String, LeaderLatch>();
- switchCallbacks = new HashMap<String, ControlChangeCallback>();
+ switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
switchPathCaches = new HashMap<String, PathChildrenCache>();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
@@ -432,7 +390,6 @@
sessionTimeout, connectionTimeout, retryPolicy);
client.start();
-
client = client.usingNamespace(namespace);