Cleaned up ZookeeperRegistry code
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
index 3120d20..e727238 100644
--- a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -62,97 +62,13 @@
protected PathChildrenCache switchCache;
protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
- //protected Map<String, LeaderLatch> switchLatches;
- //protected Map<String, ControlChangeCallback> switchCallbacks;
protected Map<String, PathChildrenCache> switchPathCaches;
//Zookeeper performance-related configuration
protected static final int sessionTimeout = 2000;
protected static final int connectionTimeout = 4000;
- /**
- * Watches for changes in switch leadership election. The Curator
- * LeaderLatch doesn't notify us when leadership changes so we set a watch
- * on the election znodes to get leadership change events. The process
- * method will be called whenever the switches children change in
- * Zookeeper. We then have to work out whether to send a control-changed
- * event to our clients and reset the watch.
- *
- * TODO I think it's possible to miss events that happen while we're
- * processing the watch and before we've set a new watch. Need to think
- * of a safer way to implement leader change notifications.
- *
- */
- /*
- protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
- private String dpid;
- private boolean isLeader = false;
- private String latchPath;
-
- public ParamaterizedCuratorWatcher(String dpid, String latchPath){
- this.dpid = dpid;
- this.latchPath = latchPath;
- }
-
- @Override
- public synchronized void process(WatchedEvent event) throws Exception {
- log.debug("Watch Event: {}", event);
-
- if (event.getState() == KeeperState.Disconnected){
- if (isLeader) {
- log.debug("Disconnected while leader - lost leadership for {}", dpid);
-
- isLeader = false;
- ControlChangeCallback cb = switchCallbacks.get(dpid);
- if (cb != null) {
- //Allow callback to be null if the requester doesn't want a callback
- cb.controlChanged(HexString.toLong(dpid), false);
- }
- }
- return;
- //TODO Watcher is never reset once we reconnect to Zookeeper
- }
-
- LeaderLatch latch = switchLatches.get(dpid);
- if (latch == null){
- log.debug("In watcher process, looks like control was released for {}",
- dpid);
- return;
- }
-
- try {
-
- Participant leader = latch.getLeader();
-
- if (leader.getId().equals(controllerId) && !isLeader){
- log.debug("Became leader for {}", dpid);
-
- isLeader = true;
- switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), true);
- }
- else if (!leader.getId().equals(controllerId) && isLeader){
- log.debug("Lost leadership for {}", dpid);
-
- isLeader = false;
- switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), false);
- }
- } catch (Exception e){
- if (isLeader){
- log.debug("Exception checking leadership status. Assume leadership lost for {}",
- dpid);
-
- isLeader = false;
- switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), false);
- }
- } finally {
- client.getChildren().usingWatcher(this).inBackground().forPath(latchPath);
- }
- //client.getChildren().usingWatcher(this).forPath(latchPath);
- }
- }
- */
-
protected class SwitchLeaderListener implements LeaderLatchListener{
String dpid;
LeaderLatch latch;
@@ -239,10 +155,7 @@
String dpidStr = HexString.toHexString(dpid);
String latchPath = switchLatchesPath + "/" + dpidStr;
- //if (switchLatches.get(dpidStr) != null){
if (switches.get(dpidStr) != null){
- //throw new RuntimeException("Leader election for switch " + dpidStr +
- // "is already running");
log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
return;
}
@@ -250,10 +163,7 @@
LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
latch.addListener(new SwitchLeaderListener(dpidStr, latch));
- //switchLatches.put(dpidStr, latch);
- //switchCallbacks.put(dpidStr, cb);
-
- //Don't put the entry in the map if we can't start the latch
+
SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb);
SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
@@ -265,12 +175,13 @@
}
//Now that we know we were able to add our latch to the collection,
- //we can start the leader election in Zookeeper
+ //we can start the leader election in Zookeeper. However I don't know
+ //how to handle if the start fails - the latch is already in our
+ //switches list.
+ //TODO seems like there's a Curator bug when latch.start is called when
+ //there's no Zookeeper connection which causes two znodes to be put in
+ //Zookeeper at the latch path when we reconnect to Zookeeper.
try {
- //client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
- //client.getChildren().usingWatcher(
- // new ParamaterizedCuratorWatcher(dpidStr, latchPath))
- // .inBackground().forPath(latchPath);
latch.start();
} catch (Exception e) {
log.warn("Error starting leader latch: {}", e.getMessage());
@@ -292,8 +203,6 @@
return;
}
- //LeaderLatch latch = switches.get(dpidStr).getLatch();
- //LeaderLatch latch = switches.remove(dpidStr).getLatch();
LeaderLatch latch = swData.getLatch();
try {
@@ -301,16 +210,11 @@
} catch (IOException e) {
//I think it's OK not to do anything here. Either the node got
//deleted correctly, or the connection went down and the node got deleted.
- } finally {
- //switches.remove(dpidStr);
- //switchLatches.remove(dpidStr);
- //switchCallbacks.remove(dpidStr);
}
}
@Override
public boolean hasControl(long dpid) {
- //LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
String dpidStr = HexString.toHexString(dpid);
SwitchLeadershipData swData = switches.get(dpidStr);
@@ -321,14 +225,6 @@
}
return swData.getLatch().hasLeadership();
-
- //try {
- //return latch.getLeader().getId().equals(controllerId);
-
- //} catch (Exception e) {
- //TODO swallow exception?
- //return false;
- //}
}
@Override
@@ -384,7 +280,7 @@
// TODO work out synchronization
String dpidStr = HexString.toHexString(dpid);
- //LeaderLatch latch = switchLatches.get(strDpid);
+
LeaderLatch latch = switches.get(dpidStr).getLatch();
if (latch == null){
@@ -487,9 +383,6 @@
restApi = context.getServiceImpl(IRestApiService.class);
switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
-
- //switchLatches = new HashMap<String, LeaderLatch>();
- //switchCallbacks = new HashMap<String, ControlChangeCallback>();
switchPathCaches = new HashMap<String, PathChildrenCache>();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
@@ -497,7 +390,6 @@
sessionTimeout, connectionTimeout, retryPolicy);
client.start();
-
client = client.usingNamespace(namespace);