Changed the disable zookeeper mechanism. There is now a class (StandaloneRegistry) that implements IControllerRegistryService without needing zookeeper - this is intended for single-node setups e.g. during development
diff --git a/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
new file mode 100644
index 0000000..58b8bc7
--- /dev/null
+++ b/src/main/java/net/onrc/onos/registry/controller/ZookeeperRegistry.java
@@ -0,0 +1,487 @@
+package net.onrc.onos.registry.controller;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.core.module.FloodlightModuleException;
+import net.floodlightcontroller.core.module.IFloodlightModule;
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.floodlightcontroller.restserver.IRestApiService;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.openflow.util.HexString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.curator.RetryPolicy;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.framework.api.CuratorWatcher;
+import com.netflix.curator.framework.recipes.cache.ChildData;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
+import com.netflix.curator.framework.recipes.leader.LeaderLatch;
+import com.netflix.curator.framework.recipes.leader.Participant;
+import com.netflix.curator.retry.ExponentialBackoffRetry;
+
+public class ZookeeperRegistry implements IFloodlightModule, IControllerRegistryService {
+
+ protected static Logger log = LoggerFactory.getLogger(ZookeeperRegistry.class);
+ protected String controllerId = null;
+
+ protected IRestApiService restApi;
+
+ //TODO read this from configuration
+ protected String connectionString = "localhost:2181";
+
+
+ private final String namespace = "onos";
+ private final String switchLatchesPath = "/switches";
+ private final String controllerPath = "/controllers";
+
+ protected CuratorFramework client;
+
+ protected PathChildrenCache controllerCache;
+ protected PathChildrenCache switchCache;
+
+ protected Map<String, LeaderLatch> switchLatches;
+ protected Map<String, ControlChangeCallback> switchCallbacks;
+ protected Map<String, PathChildrenCache> switchPathCaches;
+
+ //protected boolean zookeeperEnabled = false;
+
+ protected class ParamaterizedCuratorWatcher implements CuratorWatcher {
+ private String dpid;
+ private boolean isLeader = false;
+ private String latchPath;
+
+ public ParamaterizedCuratorWatcher(String dpid, String latchPath){
+ this.dpid = dpid;
+ this.latchPath = latchPath;
+ }
+
+ @Override
+ public synchronized void process(WatchedEvent event) throws Exception {
+ log.debug("Watch Event: {}", event);
+
+ LeaderLatch latch = switchLatches.get(dpid);
+
+ if (event.getState() == KeeperState.Disconnected){
+ if (isLeader) {
+ log.debug("Disconnected while leader - lost leadership for {}", dpid);
+
+ isLeader = false;
+ ControlChangeCallback cb = switchCallbacks.get(dpid);
+ if (cb != null) {
+ //Allow callback to be null if the requester doesn't want a callback
+ cb.controlChanged(HexString.toLong(dpid), false);
+ }
+ }
+ return;
+ }
+
+ try {
+
+ Participant leader = latch.getLeader();
+
+ if (leader.getId().equals(controllerId) && !isLeader){
+ log.debug("Became leader for {}", dpid);
+
+ isLeader = true;
+ switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), true);
+ }
+ else if (!leader.getId().equals(controllerId) && isLeader){
+ log.debug("Lost leadership for {}", dpid);
+
+ isLeader = false;
+ switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), false);
+ }
+ } catch (Exception e){
+ if (isLeader){
+ log.debug("Exception checking leadership status. Assume leadship lost for {}",
+ dpid);
+
+ isLeader = false;
+ switchCallbacks.get(dpid).controlChanged(HexString.toLong(dpid), false);
+ }
+ }
+
+ client.getChildren().usingWatcher(this).inBackground().forPath(latchPath);
+ //client.getChildren().usingWatcher(this).forPath(latchPath);
+ }
+ }
+
+
+ /*
+ * Listens for changes to the switch znodes in Zookeeper. This maintains the second level of
+ * PathChildrenCaches that hold the controllers contending for each switch - there's one for
+ * each switch.
+ */
+ PathChildrenCacheListener switchPathCacheListener = new PathChildrenCacheListener() {
+ @Override
+ public void childEvent(CuratorFramework client,
+ PathChildrenCacheEvent event) throws Exception {
+ // TODO Auto-generated method stub
+ log.debug("Root switch path cache got {} event", event.getType());
+
+ String strSwitch = null;
+ if (event.getData() != null){
+ log.debug("Event path {}", event.getData().getPath());
+ String[] splitted = event.getData().getPath().split("/");
+ strSwitch = splitted[splitted.length - 1];
+ log.debug("Switch name is {}", strSwitch);
+ }
+
+ switch (event.getType()){
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ //Check we have a PathChildrenCache for this child, add one if not
+ if (switchPathCaches.get(strSwitch) == null){
+ PathChildrenCache pc = new PathChildrenCache(client,
+ event.getData().getPath(), true);
+ pc.start(StartMode.NORMAL);
+ switchPathCaches.put(strSwitch, pc);
+ }
+ break;
+ case CHILD_REMOVED:
+ //Remove our PathChildrenCache for this child
+ PathChildrenCache pc = switchPathCaches.remove(strSwitch);
+ pc.close();
+ break;
+ default:
+ //All other events are connection status events. We need to do anything
+ //as the path cache handles these on its own.
+ break;
+ }
+
+ }
+ };
+
+
+ @Override
+ public void requestControl(long dpid, ControlChangeCallback cb) throws RegistryException {
+ /*
+ if (!zookeeperEnabled) {
+ //If zookeeper connection is disabled all control requests succeed immediately
+ if (cb != null){
+ cb.controlChanged(dpid, true);
+ }
+ return;
+ }*/
+
+ if (controllerId == null){
+ throw new RuntimeException("Must register a controller before calling requestControl");
+ }
+
+ String dpidStr = HexString.toHexString(dpid);
+ String latchPath = switchLatchesPath + "/" + dpidStr;
+
+ if (switchLatches.get(dpidStr) != null){
+ throw new RuntimeException("Leader election for switch " + dpidStr +
+ "is already running");
+ }
+
+ LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
+ switchLatches.put(dpidStr, latch);
+ switchCallbacks.put(dpidStr, cb);
+
+ try {
+ //client.getChildren().usingWatcher(watcher).inBackground().forPath(singleLatchPath);
+ client.getChildren().usingWatcher(
+ new ParamaterizedCuratorWatcher(dpidStr, latchPath))
+ .inBackground().forPath(latchPath);
+ latch.start();
+ } catch (Exception e) {
+ log.warn("Error starting leader latch: {}", e.getMessage());
+ throw new RegistryException("Error starting leader latch for " + dpidStr, e);
+ }
+
+ }
+
+ @Override
+ public void releaseControl(long dpid) {
+ //if (!zookeeperEnabled) return;
+
+ String dpidStr = HexString.toHexString(dpid);
+
+ LeaderLatch latch = switchLatches.get(dpidStr);
+ if (latch == null) {
+ log.debug("Trying to release mastership for switch we are not contesting");
+ return;
+ }
+
+ try {
+ latch.close();
+ } catch (IOException e) {
+ //I think it's OK not to do anything here. Either the node got deleted correctly,
+ //or the connection went down and the node got deleted.
+ } finally {
+ switchLatches.remove(dpidStr);
+ switchCallbacks.remove(dpidStr);
+ }
+ }
+
+ @Override
+ public boolean hasControl(long dpid) {
+ //if (!zookeeperEnabled) return false;
+
+ LeaderLatch latch = switchLatches.get(HexString.toHexString(dpid));
+
+ if (latch == null) {
+ log.warn("No leader latch for dpid {}", HexString.toHexString(dpid));
+ return false;
+ }
+
+ try {
+ return latch.getLeader().getId().equals(controllerId);
+ } catch (Exception e) {
+ //TODO swallow exception?
+ return false;
+ }
+ }
+
+ @Override
+ public void setMastershipId(String id) {
+ //TODO remove this method if not needed
+ //controllerId = id;
+ }
+
+ @Override
+ public String getMastershipId() {
+ return controllerId;
+ }
+
+ @Override
+ public Collection<String> getAllControllers() throws RegistryException {
+ //if (!zookeeperEnabled) return null;
+
+ log.debug("Getting all controllers");
+
+ List<String> controllers = new ArrayList<String>();
+ for (ChildData data : controllerCache.getCurrentData()){
+
+ String d = null;
+ try {
+ d = new String(data.getData(), "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RegistryException("Error encoding string", e);
+ }
+
+ controllers.add(d);
+ }
+ return controllers;
+ }
+
+ @Override
+ public void registerController(String id) throws RegistryException {
+ //if (!zookeeperEnabled) return;
+
+ controllerId = id;
+
+ byte bytes[] = null;
+ try {
+ bytes = id.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e1) {
+ throw new RegistryException("Error encoding string", e1);
+ }
+
+ String path = controllerPath + "/" + id;
+
+ log.info("Registering controller with id {}", id);
+
+ //Create ephemeral node in controller registry
+ try {
+ client.create().withProtection().withMode(CreateMode.EPHEMERAL)
+ .forPath(path, bytes);
+ } catch (Exception e) {
+ throw new RegistryException("Error contacting the Zookeeper service", e);
+ }
+ }
+
+ @Override
+ public String getControllerForSwitch(long dpid) throws RegistryException {
+ //if (!zookeeperEnabled) return null;
+ // TODO Work out how we should store this controller/switch data.
+ // The leader latch might be a index to the /controllers collections
+ // which holds more info on the controller (how to talk to it for example).
+
+
+ String strDpid = HexString.toHexString(dpid);
+ LeaderLatch latch = switchLatches.get(strDpid);
+
+ if (latch == null){
+ log.warn("Tried to get controller for non-existent switch");
+ return null;
+ }
+
+ Participant leader = null;
+ try {
+ leader = latch.getLeader();
+ } catch (Exception e) {
+ throw new RegistryException("Error contacting the Zookeeper service", e);
+ }
+
+ return leader.getId();
+ }
+
+ @Override
+ public Collection<Long> getSwitchesControlledByController(String controllerId) {
+ //TODO remove this if not needed
+ throw new RuntimeException("Not yet implemented");
+ }
+
+
+ @Override
+ public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
+ Map<String, List<ControllerRegistryEntry>> data =
+ new HashMap<String, List<ControllerRegistryEntry>>();
+
+ for (Map.Entry<String, PathChildrenCache> entry : switchPathCaches.entrySet()){
+ List<ControllerRegistryEntry> contendingControllers =
+ new ArrayList<ControllerRegistryEntry>();
+
+ if (entry.getValue().getCurrentData().size() < 1){
+ log.info("Switch entry with no leader elections: {}", entry.getKey());
+ continue;
+ }
+
+ for (ChildData d : entry.getValue().getCurrentData()) {
+ /*
+ if (d.getPath().length() < 1){
+ log.info("Switch entry with no leader elections: {}", d.getPath());
+ continue;
+ }
+ */
+
+ String controllerId = null;
+ try {
+ controllerId = new String(d.getData(), "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ log.warn("Encoding exception: {}", e.getMessage());
+ }
+
+ String[] splitted = d.getPath().split("-");
+ int sequenceNumber = Integer.parseInt(splitted[splitted.length - 1]);
+
+ contendingControllers.add(new ControllerRegistryEntry(controllerId, sequenceNumber));
+ }
+
+ Collections.sort(contendingControllers);
+ data.put(entry.getKey(), contendingControllers);
+ }
+ return data;
+ }
+
+ /*
+ * IFloodlightModule
+ */
+
+ @Override
+ public Collection<Class<? extends IFloodlightService>> getModuleServices() {
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IControllerRegistryService.class);
+ return l;
+ }
+
+ @Override
+ public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
+ Map<Class<? extends IFloodlightService>, IFloodlightService> m =
+ new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
+ m.put(IControllerRegistryService.class, this);
+ return m;
+ }
+
+ @Override
+ public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IRestApiService.class);
+ return l;
+ }
+
+ @Override
+ public void init (FloodlightModuleContext context) throws FloodlightModuleException {
+
+ log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
+
+ restApi = context.getServiceImpl(IRestApiService.class);
+
+ //We have a config option that determines whether we try and connect to
+ //zookeeper or not. By default zookeeper connection is disabled. When we don't
+ //have a zookeeper connection we act as though we are the only server in the
+ //cluster, i.e. all control requests will succeed.
+ /*Map<String, String> configOptions = context.getConfigParams(this);
+ String enableZookeeper = configOptions.get("enableZookeeper");
+ if (enableZookeeper != null) {
+ log.info("Enabling Zookeeper connection");
+ zookeeperEnabled = true;
+ }
+ else {
+ log.info("Zookeeper connectivity is disabled - running in standalone mode");
+ return;
+ }*/
+
+ /*
+ try {
+ String localHostname = java.net.InetAddress.getLocalHost().getHostName();
+ controllerId = localHostname;
+ log.debug("Setting controller id to {}", controllerId);
+ } catch (UnknownHostException e) {
+ // TODO Handle this exception
+ e.printStackTrace();
+ }*/
+
+ switchLatches = new HashMap<String, LeaderLatch>();
+ switchCallbacks = new HashMap<String, ControlChangeCallback>();
+ switchPathCaches = new HashMap<String, PathChildrenCache>();
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
+
+ client.start();
+
+ client = client.usingNamespace(namespace);
+
+ //Put some data in for testing
+ /*
+ try {
+ registerController("zookeeperController");
+ requestControl(2L, null);
+ } catch (RegistryException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }*/
+
+ controllerCache = new PathChildrenCache(client, controllerPath, true);
+ switchCache = new PathChildrenCache(client, switchLatchesPath, true);
+ switchCache.getListenable().addListener(switchPathCacheListener);
+
+ try {
+ controllerCache.start(StartMode.BUILD_INITIAL_CACHE);
+
+ //Don't prime the cache, we want a notification for each child node in the path
+ switchCache.start(StartMode.NORMAL);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void startUp (FloodlightModuleContext context) {
+ restApi.addRestletRoutable(new RegistryWebRoutable());
+ }
+
+}