Fix checkstyle whitespace issues - WHITESPACE ONLY
Change-Id: Ic205c1afd639c6008d61d9de95cb764eeb6238ca
diff --git a/src/main/java/net/onrc/onos/core/registry/ZookeeperRegistry.java b/src/main/java/net/onrc/onos/core/registry/ZookeeperRegistry.java
index aa3e98b..1897631 100755
--- a/src/main/java/net/onrc/onos/core/registry/ZookeeperRegistry.java
+++ b/src/main/java/net/onrc/onos/core/registry/ZookeeperRegistry.java
@@ -51,600 +51,603 @@
/**
* A registry service that uses Zookeeper. All data is stored in Zookeeper,
* so this can be used as a global registry in a multi-node ONOS cluster.
- * @author jono
*
+ * @author jono
*/
public class ZookeeperRegistry implements IFloodlightModule, IControllerRegistryService {
- protected final static Logger log = LoggerFactory.getLogger(ZookeeperRegistry.class);
- protected String controllerId = null;
-
- protected IRestApiService restApi;
-
- //This is the default, it's overwritten by the connectionString configuration parameter
- protected String connectionString = "localhost:2181";
-
- private final String namespace = "onos";
- private final String switchLatchesPath = "/switches";
- private final String CLUSTER_LEADER_PATH = "/cluster/leader";
+ protected final static Logger log = LoggerFactory.getLogger(ZookeeperRegistry.class);
+ protected String controllerId = null;
- private final String SERVICES_PATH = "/"; //i.e. the root of our namespace
- private final String CONTROLLER_SERVICE_NAME = "controllers";
-
- protected CuratorFramework client;
-
- protected PathChildrenCache switchCache;
+ protected IRestApiService restApi;
- protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
- protected Map<String, PathChildrenCache> switchPathCaches;
+ //This is the default, it's overwritten by the connectionString configuration parameter
+ protected String connectionString = "localhost:2181";
- protected LeaderLatch clusterLeaderLatch;
- protected ClusterLeaderListener clusterLeaderListener;
- private static final long CLUSTER_LEADER_ELECTION_RETRY_MS = 100;
+ private final String namespace = "onos";
+ private final String switchLatchesPath = "/switches";
+ private final String CLUSTER_LEADER_PATH = "/cluster/leader";
- private final String ID_COUNTER_PATH = "/flowidcounter";
- private final Long ID_BLOCK_SIZE = 0x100000000L;
- protected DistributedAtomicLong distributedIdCounter;
-
- //Zookeeper performance-related configuration
- protected static final int sessionTimeout = 5000;
- protected static final int connectionTimeout = 7000;
+ private final String SERVICES_PATH = "/"; //i.e. the root of our namespace
+ private final String CONTROLLER_SERVICE_NAME = "controllers";
- //
- // Unique ID generation state
- // TODO: The implementation must be updated to use the Zookeeper
- // instead of a ramdon generator.
- //
- private static Random randomGenerator = new Random();
- private static int nextUniqueIdPrefix = 0;
- private static int nextUniqueIdSuffix = 0;
+ protected CuratorFramework client;
- private final BlockingQueue<SwitchLeaderEvent> switchLeadershipEvents =
- new LinkedBlockingQueue<SwitchLeaderEvent>();
-
+ protected PathChildrenCache switchCache;
+
+ protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
+ protected Map<String, PathChildrenCache> switchPathCaches;
+
+ protected LeaderLatch clusterLeaderLatch;
+ protected ClusterLeaderListener clusterLeaderListener;
+ private static final long CLUSTER_LEADER_ELECTION_RETRY_MS = 100;
+
+ private final String ID_COUNTER_PATH = "/flowidcounter";
+ private final Long ID_BLOCK_SIZE = 0x100000000L;
+ protected DistributedAtomicLong distributedIdCounter;
+
+ //Zookeeper performance-related configuration
+ protected static final int sessionTimeout = 5000;
+ protected static final int connectionTimeout = 7000;
+
+ //
+ // Unique ID generation state
+ // TODO: The implementation must be updated to use the Zookeeper
+ // instead of a ramdon generator.
+ //
+ private static Random randomGenerator = new Random();
+ private static int nextUniqueIdPrefix = 0;
+ private static int nextUniqueIdSuffix = 0;
+
+ private final BlockingQueue<SwitchLeaderEvent> switchLeadershipEvents =
+ new LinkedBlockingQueue<SwitchLeaderEvent>();
+
private ExecutorService eventThreadExecutorService;
-
+
private static class SwitchLeaderEvent {
- public final long dpid;
- public final boolean isLeader;
-
- public SwitchLeaderEvent(long dpid, boolean isLeader) {
- this.dpid = dpid;
- this.isLeader = isLeader;
- }
+ public final long dpid;
+ public final boolean isLeader;
+
+ public SwitchLeaderEvent(long dpid, boolean isLeader) {
+ this.dpid = dpid;
+ this.isLeader = isLeader;
+ }
}
-
+
/*
* Dispatcher thread for leadership change events coming from Curator.
*/
private void dispatchEvents() {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- SwitchLeaderEvent event = switchLeadershipEvents.take();
- SwitchLeadershipData swData = switches.get(HexString.toHexString(event.dpid));
- if (swData == null) {
- log.debug("Leadership data {} not found", event.dpid);
- continue;
- }
-
- swData.getCallback().controlChanged(event.dpid, event.isLeader);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- } catch (Exception e) {
- log.error("Exception in registry event thread", e);
- }
- }
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ SwitchLeaderEvent event = switchLeadershipEvents.take();
+ SwitchLeadershipData swData = switches.get(HexString.toHexString(event.dpid));
+ if (swData == null) {
+ log.debug("Leadership data {} not found", event.dpid);
+ continue;
+ }
+
+ swData.getCallback().controlChanged(event.dpid, event.isLeader);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Exception e) {
+ log.error("Exception in registry event thread", e);
+ }
+ }
}
- protected class SwitchLeaderListener implements LeaderLatchListener{
- String dpid;
- LeaderLatch latch;
-
- public SwitchLeaderListener(String dpid, LeaderLatch latch){
- this.dpid = dpid;
- this.latch = latch;
- }
+ protected class SwitchLeaderListener implements LeaderLatchListener {
+ String dpid;
+ LeaderLatch latch;
- @Override
- public void isLeader() {
- log.debug("Became leader for {}", dpid);
-
- switchLeadershipEvents.offer(new SwitchLeaderEvent(HexString.toLong(dpid), true));
- }
+ public SwitchLeaderListener(String dpid, LeaderLatch latch) {
+ this.dpid = dpid;
+ this.latch = latch;
+ }
- @Override
- public void notLeader() {
- log.debug("Lost leadership for {}", dpid);
-
- switchLeadershipEvents.offer(new SwitchLeaderEvent(HexString.toLong(dpid), false));
- }
- }
-
- protected class SwitchPathCacheListener implements PathChildrenCacheListener {
- @Override
- public void childEvent(CuratorFramework client,
- PathChildrenCacheEvent event) throws Exception {
-
- String strSwitch = null;
- if (event.getData() != null){
- String[] splitted = event.getData().getPath().split("/");
- strSwitch = splitted[splitted.length - 1];
- }
-
- switch (event.getType()){
- case CHILD_ADDED:
- case CHILD_UPDATED:
- //Check we have a PathChildrenCache for this child, add one if not
- synchronized (switchPathCaches){
- if (switchPathCaches.get(strSwitch) == null){
- PathChildrenCache pc = new PathChildrenCache(client,
- event.getData().getPath(), true);
- pc.start(StartMode.NORMAL);
- switchPathCaches.put(strSwitch, pc);
- }
- }
- break;
- case CHILD_REMOVED:
- //Remove our PathChildrenCache for this child
- PathChildrenCache pc = null;
- synchronized(switchPathCaches){
- pc = switchPathCaches.remove(strSwitch);
- }
- if (pc != null){
- pc.close();
- }
- break;
- default:
- //All other switchLeadershipEvents are connection status switchLeadershipEvents. We don't need to
- //do anything as the path cache handles these on its own.
- break;
- }
-
- }
- };
+ @Override
+ public void isLeader() {
+ log.debug("Became leader for {}", dpid);
- protected class ClusterLeaderListener implements LeaderLatchListener {
- LeaderLatch latch;
+ switchLeadershipEvents.offer(new SwitchLeaderEvent(HexString.toLong(dpid), true));
+ }
- public ClusterLeaderListener(LeaderLatch latch) {
- this.latch = latch;
- }
+ @Override
+ public void notLeader() {
+ log.debug("Lost leadership for {}", dpid);
- //
- // NOTE: If we need to support callbacks when the
- // leadership changes, those should be called here.
- //
-
- @Override
- public void isLeader() {
- log.debug("Cluster leadership aquired");
- }
+ switchLeadershipEvents.offer(new SwitchLeaderEvent(HexString.toLong(dpid), false));
+ }
+ }
- @Override
- public void notLeader() {
- log.debug("Cluster leadership lost");
- }
- }
+ protected class SwitchPathCacheListener implements PathChildrenCacheListener {
+ @Override
+ public void childEvent(CuratorFramework client,
+ PathChildrenCacheEvent event) throws Exception {
- /**
- * Listens for changes to the switch znodes in Zookeeper. This maintains
- * the second level of PathChildrenCaches that hold the controllers
- * contending for each switch - there's one for each switch.
- */
- PathChildrenCacheListener switchPathCacheListener = new SwitchPathCacheListener();
- protected ServiceDiscovery<ControllerService> serviceDiscovery;
- protected ServiceCache<ControllerService> serviceCache;
-
-
- @Override
- public void requestControl(long dpid, ControlChangeCallback cb) throws RegistryException {
- log.info("Requesting control for {}", HexString.toHexString(dpid));
-
- if (controllerId == null){
- throw new RuntimeException("Must register a controller before calling requestControl");
- }
-
- String dpidStr = HexString.toHexString(dpid);
- String latchPath = switchLatchesPath + "/" + dpidStr;
-
- if (switches.get(dpidStr) != null){
- log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
- throw new RegistryException("Already contesting control for " + dpidStr);
- }
-
- LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
- SwitchLeaderListener listener = new SwitchLeaderListener(dpidStr, latch);
- latch.addListener(listener);
-
-
- SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb, listener);
- SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
-
- if (oldData != null){
- //There was already data for that key in the map
- //i.e. someone else got here first so we can't succeed
- log.debug("Already requested control for {}", dpidStr);
- throw new RegistryException("Already requested control for " + dpidStr);
- }
-
- //Now that we know we were able to add our latch to the collection,
- //we can start the leader election in Zookeeper. However I don't know
- //how to handle if the start fails - the latch is already in our
- //switches list.
- //TODO seems like there's a Curator bug when latch.start is called when
- //there's no Zookeeper connection which causes two znodes to be put in
- //Zookeeper at the latch path when we reconnect to Zookeeper.
- try {
- latch.start();
- } catch (Exception e) {
- log.warn("Error starting leader latch: {}", e.getMessage());
- throw new RegistryException("Error starting leader latch for " + dpidStr, e);
- }
-
- }
-
- @Override
- public void releaseControl(long dpid) {
- log.info("Releasing control for {}", HexString.toHexString(dpid));
-
- String dpidStr = HexString.toHexString(dpid);
-
- SwitchLeadershipData swData = switches.remove(dpidStr);
-
- if (swData == null) {
- log.debug("Trying to release control of a switch we are not contesting");
- return;
- }
-
- LeaderLatch latch = swData.getLatch();
-
- latch.removeListener(swData.getListener());
-
- try {
- latch.close();
- } catch (IOException e) {
- //I think it's OK not to do anything here. Either the node got
- //deleted correctly, or the connection went down and the node got deleted.
- log.debug("releaseControl: caught IOException {}", dpidStr);
- }
- }
-
- @Override
- public boolean hasControl(long dpid) {
- String dpidStr = HexString.toHexString(dpid);
-
- SwitchLeadershipData swData = switches.get(dpidStr);
-
- if (swData == null) {
- log.warn("No leader latch for dpid {}", dpidStr);
- return false;
- }
-
- return swData.getLatch().hasLeadership();
- }
-
- @Override
- public boolean isClusterLeader() {
- return clusterLeaderLatch.hasLeadership();
- }
-
- @Override
- public String getControllerId() {
- return controllerId;
- }
-
- @Override
- public Collection<String> getAllControllers() throws RegistryException {
- log.debug("Getting all controllers");
-
- List<String> controllers = new ArrayList<String>();
- for (ServiceInstance<ControllerService> instance : serviceCache.getInstances()){
- String id = instance.getPayload().getControllerId();
- if (!controllers.contains(id)){
- controllers.add(id);
- }
- }
-
- return controllers;
- }
-
- @Override
- public void registerController(String id) throws RegistryException {
- if (controllerId != null) {
- throw new RegistryException(
- "Controller already registered with id " + controllerId);
- }
-
- controllerId = id;
-
- try {
- ServiceInstance<ControllerService> thisInstance = ServiceInstance.<ControllerService>builder()
- .name(CONTROLLER_SERVICE_NAME)
- .payload(new ControllerService(controllerId))
- //.port((int)(65535 * Math.random())) // in a real application, you'd use a common port
- //.uriSpec(uriSpec)
- .build();
-
- serviceDiscovery.registerService(thisInstance);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
-
- @Override
- public String getControllerForSwitch(long dpid) throws RegistryException {
- String dpidStr = HexString.toHexString(dpid);
-
- PathChildrenCache switchCache = switchPathCaches.get(dpidStr);
-
- if (switchCache == null){
- log.warn("Tried to get controller for non-existent switch");
- return null;
- }
-
- try {
- //We've seen issues with these caches get stuck out of date, so we'll have to
- //force them to refresh before each read. This slows down the method as it
- //blocks on a Zookeeper query, however at the moment only the cleanup thread
- //uses this and that isn't particularly time-sensitive.
- switchCache.rebuild();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- List<ChildData> sortedData = new ArrayList<ChildData>(switchCache.getCurrentData());
-
- Collections.sort(
- sortedData,
- new Comparator<ChildData>(){
- private String getSequenceNumber(String path){
- return path.substring(path.lastIndexOf('-') + 1);
- }
- @Override
- public int compare(ChildData lhs, ChildData rhs) {
- return getSequenceNumber(lhs.getPath()).
- compareTo(getSequenceNumber(rhs.getPath()));
- }
- }
- );
-
- if (sortedData.size() == 0){
- return null;
- }
-
- return new String(sortedData.get(0).getData(), Charsets.UTF_8);
- }
-
- @Override
- public Collection<Long> getSwitchesControlledByController(String controllerId) {
- //TODO remove this if not needed
- throw new RuntimeException("Not yet implemented");
- }
-
-
- //TODO what should happen when there's no ZK connection? Currently we just return
- //the cache but this may lead to false impressions - i.e. we don't actually know
- //what's in ZK so we shouldn't say we do
- @Override
- public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
- Map<String, List<ControllerRegistryEntry>> data =
- new HashMap<String, List<ControllerRegistryEntry>>();
-
- for (Map.Entry<String, PathChildrenCache> entry : switchPathCaches.entrySet()){
- List<ControllerRegistryEntry> contendingControllers =
- new ArrayList<ControllerRegistryEntry>();
-
- if (entry.getValue().getCurrentData().size() < 1){
- //TODO prevent even having the PathChildrenCache in this case
- //log.info("Switch entry with no leader elections: {}", entry.getKey());
- continue;
- }
-
- for (ChildData d : entry.getValue().getCurrentData()) {
-
- String controllerId = new String(d.getData(), Charsets.UTF_8);
-
- String[] splitted = d.getPath().split("-");
- int sequenceNumber = Integer.parseInt(splitted[splitted.length - 1]);
-
- contendingControllers.add(new ControllerRegistryEntry(controllerId, sequenceNumber));
- }
-
- Collections.sort(contendingControllers);
- data.put(entry.getKey(), contendingControllers);
- }
- return data;
- }
-
- public IdBlock allocateUniqueIdBlock(long range) {
- try {
- AtomicValue<Long> result = null;
- do {
- result = distributedIdCounter.add(range);
- } while (result == null || !result.succeeded());
-
- return new IdBlock(result.preValue(), result.postValue() - 1, range);
- } catch (Exception e) {
- log.error("Error allocating ID block");
+ String strSwitch = null;
+ if (event.getData() != null) {
+ String[] splitted = event.getData().getPath().split("/");
+ strSwitch = splitted[splitted.length - 1];
}
+
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ //Check we have a PathChildrenCache for this child, add one if not
+ synchronized (switchPathCaches) {
+ if (switchPathCaches.get(strSwitch) == null) {
+ PathChildrenCache pc = new PathChildrenCache(client,
+ event.getData().getPath(), true);
+ pc.start(StartMode.NORMAL);
+ switchPathCaches.put(strSwitch, pc);
+ }
+ }
+ break;
+ case CHILD_REMOVED:
+ //Remove our PathChildrenCache for this child
+ PathChildrenCache pc = null;
+ synchronized (switchPathCaches) {
+ pc = switchPathCaches.remove(strSwitch);
+ }
+ if (pc != null) {
+ pc.close();
+ }
+ break;
+ default:
+ //All other switchLeadershipEvents are connection status switchLeadershipEvents. We don't need to
+ //do anything as the path cache handles these on its own.
+ break;
+ }
+
+ }
+ }
+
+ ;
+
+ protected class ClusterLeaderListener implements LeaderLatchListener {
+ LeaderLatch latch;
+
+ public ClusterLeaderListener(LeaderLatch latch) {
+ this.latch = latch;
+ }
+
+ //
+ // NOTE: If we need to support callbacks when the
+ // leadership changes, those should be called here.
+ //
+
+ @Override
+ public void isLeader() {
+ log.debug("Cluster leadership aquired");
+ }
+
+ @Override
+ public void notLeader() {
+ log.debug("Cluster leadership lost");
+ }
+ }
+
+ /**
+ * Listens for changes to the switch znodes in Zookeeper. This maintains
+ * the second level of PathChildrenCaches that hold the controllers
+ * contending for each switch - there's one for each switch.
+ */
+ PathChildrenCacheListener switchPathCacheListener = new SwitchPathCacheListener();
+ protected ServiceDiscovery<ControllerService> serviceDiscovery;
+ protected ServiceCache<ControllerService> serviceCache;
+
+
+ @Override
+ public void requestControl(long dpid, ControlChangeCallback cb) throws RegistryException {
+ log.info("Requesting control for {}", HexString.toHexString(dpid));
+
+ if (controllerId == null) {
+ throw new RuntimeException("Must register a controller before calling requestControl");
+ }
+
+ String dpidStr = HexString.toHexString(dpid);
+ String latchPath = switchLatchesPath + "/" + dpidStr;
+
+ if (switches.get(dpidStr) != null) {
+ log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
+ throw new RegistryException("Already contesting control for " + dpidStr);
+ }
+
+ LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
+ SwitchLeaderListener listener = new SwitchLeaderListener(dpidStr, latch);
+ latch.addListener(listener);
+
+
+ SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb, listener);
+ SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
+
+ if (oldData != null) {
+ //There was already data for that key in the map
+ //i.e. someone else got here first so we can't succeed
+ log.debug("Already requested control for {}", dpidStr);
+ throw new RegistryException("Already requested control for " + dpidStr);
+ }
+
+ //Now that we know we were able to add our latch to the collection,
+ //we can start the leader election in Zookeeper. However I don't know
+ //how to handle if the start fails - the latch is already in our
+ //switches list.
+ //TODO seems like there's a Curator bug when latch.start is called when
+ //there's no Zookeeper connection which causes two znodes to be put in
+ //Zookeeper at the latch path when we reconnect to Zookeeper.
+ try {
+ latch.start();
+ } catch (Exception e) {
+ log.warn("Error starting leader latch: {}", e.getMessage());
+ throw new RegistryException("Error starting leader latch for " + dpidStr, e);
+ }
+
+ }
+
+ @Override
+ public void releaseControl(long dpid) {
+ log.info("Releasing control for {}", HexString.toHexString(dpid));
+
+ String dpidStr = HexString.toHexString(dpid);
+
+ SwitchLeadershipData swData = switches.remove(dpidStr);
+
+ if (swData == null) {
+ log.debug("Trying to release control of a switch we are not contesting");
+ return;
+ }
+
+ LeaderLatch latch = swData.getLatch();
+
+ latch.removeListener(swData.getListener());
+
+ try {
+ latch.close();
+ } catch (IOException e) {
+ //I think it's OK not to do anything here. Either the node got
+ //deleted correctly, or the connection went down and the node got deleted.
+ log.debug("releaseControl: caught IOException {}", dpidStr);
+ }
+ }
+
+ @Override
+ public boolean hasControl(long dpid) {
+ String dpidStr = HexString.toHexString(dpid);
+
+ SwitchLeadershipData swData = switches.get(dpidStr);
+
+ if (swData == null) {
+ log.warn("No leader latch for dpid {}", dpidStr);
+ return false;
+ }
+
+ return swData.getLatch().hasLeadership();
+ }
+
+ @Override
+ public boolean isClusterLeader() {
+ return clusterLeaderLatch.hasLeadership();
+ }
+
+ @Override
+ public String getControllerId() {
+ return controllerId;
+ }
+
+ @Override
+ public Collection<String> getAllControllers() throws RegistryException {
+ log.debug("Getting all controllers");
+
+ List<String> controllers = new ArrayList<String>();
+ for (ServiceInstance<ControllerService> instance : serviceCache.getInstances()) {
+ String id = instance.getPayload().getControllerId();
+ if (!controllers.contains(id)) {
+ controllers.add(id);
+ }
+ }
+
+ return controllers;
+ }
+
+ @Override
+ public void registerController(String id) throws RegistryException {
+ if (controllerId != null) {
+ throw new RegistryException(
+ "Controller already registered with id " + controllerId);
+ }
+
+ controllerId = id;
+
+ try {
+ ServiceInstance<ControllerService> thisInstance = ServiceInstance.<ControllerService>builder()
+ .name(CONTROLLER_SERVICE_NAME)
+ .payload(new ControllerService(controllerId))
+ //.port((int)(65535 * Math.random())) // in a real application, you'd use a common port
+ //.uriSpec(uriSpec)
+ .build();
+
+ serviceDiscovery.registerService(thisInstance);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public String getControllerForSwitch(long dpid) throws RegistryException {
+ String dpidStr = HexString.toHexString(dpid);
+
+ PathChildrenCache switchCache = switchPathCaches.get(dpidStr);
+
+ if (switchCache == null) {
+ log.warn("Tried to get controller for non-existent switch");
return null;
}
-
- /**
- * Returns a block of IDs which are unique and unused.
- * Range of IDs is fixed size and is assigned incrementally as this method called.
- * Since the range of IDs is managed by Zookeeper in distributed way, this method may block when
- * requests come up simultaneously.
- */
- @Override
- public IdBlock allocateUniqueIdBlock(){
- return allocateUniqueIdBlock(ID_BLOCK_SIZE);
- }
- /**
- * Get a globally unique ID.
- *
- * @return a globally unique ID.
- */
- @Override
- public synchronized long getNextUniqueId() {
- //
- // Generate the next Unique ID.
- //
- // TODO: For now, the higher 32 bits are random, and
- // the lower 32 bits are sequential.
- // The implementation must be updated to use the Zookeeper
- // to allocate the higher 32 bits (globally unique).
- //
- if ((nextUniqueIdSuffix & 0xffffffffL) == 0xffffffffL) {
- nextUniqueIdPrefix = randomGenerator.nextInt();
- nextUniqueIdSuffix = 0;
- } else {
- nextUniqueIdSuffix++;
- }
- long result = (long)nextUniqueIdPrefix << 32;
- result = result | (0xffffffffL & nextUniqueIdSuffix);
- return result;
- }
+ try {
+ //We've seen issues with these caches get stuck out of date, so we'll have to
+ //force them to refresh before each read. This slows down the method as it
+ //blocks on a Zookeeper query, however at the moment only the cleanup thread
+ //uses this and that isn't particularly time-sensitive.
+ switchCache.rebuild();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
- /*
- * IFloodlightModule
- */
-
- @Override
- public Collection<Class<? extends IFloodlightService>> getModuleServices() {
- Collection<Class<? extends IFloodlightService>> l =
- new ArrayList<Class<? extends IFloodlightService>>();
- l.add(IControllerRegistryService.class);
- return l;
- }
-
- @Override
- public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
- Map<Class<? extends IFloodlightService>, IFloodlightService> m =
- new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
- m.put(IControllerRegistryService.class, this);
- return m;
- }
-
- @Override
- public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
- Collection<Class<? extends IFloodlightService>> l =
+ List<ChildData> sortedData = new ArrayList<ChildData>(switchCache.getCurrentData());
+
+ Collections.sort(
+ sortedData,
+ new Comparator<ChildData>() {
+ private String getSequenceNumber(String path) {
+ return path.substring(path.lastIndexOf('-') + 1);
+ }
+
+ @Override
+ public int compare(ChildData lhs, ChildData rhs) {
+ return getSequenceNumber(lhs.getPath()).
+ compareTo(getSequenceNumber(rhs.getPath()));
+ }
+ }
+ );
+
+ if (sortedData.size() == 0) {
+ return null;
+ }
+
+ return new String(sortedData.get(0).getData(), Charsets.UTF_8);
+ }
+
+ @Override
+ public Collection<Long> getSwitchesControlledByController(String controllerId) {
+ //TODO remove this if not needed
+ throw new RuntimeException("Not yet implemented");
+ }
+
+
+ //TODO what should happen when there's no ZK connection? Currently we just return
+ //the cache but this may lead to false impressions - i.e. we don't actually know
+ //what's in ZK so we shouldn't say we do
+ @Override
+ public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
+ Map<String, List<ControllerRegistryEntry>> data =
+ new HashMap<String, List<ControllerRegistryEntry>>();
+
+ for (Map.Entry<String, PathChildrenCache> entry : switchPathCaches.entrySet()) {
+ List<ControllerRegistryEntry> contendingControllers =
+ new ArrayList<ControllerRegistryEntry>();
+
+ if (entry.getValue().getCurrentData().size() < 1) {
+ //TODO prevent even having the PathChildrenCache in this case
+ //log.info("Switch entry with no leader elections: {}", entry.getKey());
+ continue;
+ }
+
+ for (ChildData d : entry.getValue().getCurrentData()) {
+
+ String controllerId = new String(d.getData(), Charsets.UTF_8);
+
+ String[] splitted = d.getPath().split("-");
+ int sequenceNumber = Integer.parseInt(splitted[splitted.length - 1]);
+
+ contendingControllers.add(new ControllerRegistryEntry(controllerId, sequenceNumber));
+ }
+
+ Collections.sort(contendingControllers);
+ data.put(entry.getKey(), contendingControllers);
+ }
+ return data;
+ }
+
+ public IdBlock allocateUniqueIdBlock(long range) {
+ try {
+ AtomicValue<Long> result = null;
+ do {
+ result = distributedIdCounter.add(range);
+ } while (result == null || !result.succeeded());
+
+ return new IdBlock(result.preValue(), result.postValue() - 1, range);
+ } catch (Exception e) {
+ log.error("Error allocating ID block");
+ }
+ return null;
+ }
+
+ /**
+ * Returns a block of IDs which are unique and unused.
+ * Range of IDs is fixed size and is assigned incrementally as this method called.
+ * Since the range of IDs is managed by Zookeeper in distributed way, this method may block when
+ * requests come up simultaneously.
+ */
+ @Override
+ public IdBlock allocateUniqueIdBlock() {
+ return allocateUniqueIdBlock(ID_BLOCK_SIZE);
+ }
+
+ /**
+ * Get a globally unique ID.
+ *
+ * @return a globally unique ID.
+ */
+ @Override
+ public synchronized long getNextUniqueId() {
+ //
+ // Generate the next Unique ID.
+ //
+ // TODO: For now, the higher 32 bits are random, and
+ // the lower 32 bits are sequential.
+ // The implementation must be updated to use the Zookeeper
+ // to allocate the higher 32 bits (globally unique).
+ //
+ if ((nextUniqueIdSuffix & 0xffffffffL) == 0xffffffffL) {
+ nextUniqueIdPrefix = randomGenerator.nextInt();
+ nextUniqueIdSuffix = 0;
+ } else {
+ nextUniqueIdSuffix++;
+ }
+ long result = (long) nextUniqueIdPrefix << 32;
+ result = result | (0xffffffffL & nextUniqueIdSuffix);
+ return result;
+ }
+
+ /*
+ * IFloodlightModule
+ */
+
+ @Override
+ public Collection<Class<? extends IFloodlightService>> getModuleServices() {
+ Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
- l.add(IFloodlightProviderService.class);
- l.add(IRestApiService.class);
- return l;
- }
-
- //TODO currently blocks startup when it can't get a Zookeeper connection.
- //Do we support starting up with no Zookeeper connection?
- @Override
- public void init (FloodlightModuleContext context) throws FloodlightModuleException {
- log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
-
- //Read the Zookeeper connection string from the config
- Map<String, String> configParams = context.getConfigParams(this);
- String connectionString = configParams.get("connectionString");
- if (connectionString != null){
- this.connectionString = connectionString;
- }
- log.info("Setting Zookeeper connection string to {}", this.connectionString);
+ l.add(IControllerRegistryService.class);
+ return l;
+ }
- //
- // Initialize the Unique ID generator
- // TODO: This must be replaced by Zookeeper-based allocation
- //
- nextUniqueIdPrefix = randomGenerator.nextInt();
+ @Override
+ public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
+ Map<Class<? extends IFloodlightService>, IFloodlightService> m =
+ new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
+ m.put(IControllerRegistryService.class, this);
+ return m;
+ }
- restApi = context.getServiceImpl(IRestApiService.class);
+ @Override
+ public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IFloodlightProviderService.class);
+ l.add(IRestApiService.class);
+ return l;
+ }
- switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
- //switchPathCaches = new HashMap<String, PathChildrenCache>();
- switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>();
-
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- client = CuratorFrameworkFactory.newClient(this.connectionString,
- sessionTimeout, connectionTimeout, retryPolicy);
-
- client.start();
- client = client.usingNamespace(namespace);
+ //TODO currently blocks startup when it can't get a Zookeeper connection.
+ //Do we support starting up with no Zookeeper connection?
+ @Override
+ public void init(FloodlightModuleContext context) throws FloodlightModuleException {
+ log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
- distributedIdCounter = new DistributedAtomicLong(
- client,
- ID_COUNTER_PATH,
- new RetryOneTime(100));
-
- switchCache = new PathChildrenCache(client, switchLatchesPath, true);
- switchCache.getListenable().addListener(switchPathCacheListener);
-
- //Build the service discovery object
- serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class)
- .client(client).basePath(SERVICES_PATH).build();
-
- //We read the list of services very frequently (GUI periodically queries them)
- //so we'll cache them to cut down on Zookeeper queries.
- serviceCache = serviceDiscovery.serviceCacheBuilder()
- .name(CONTROLLER_SERVICE_NAME).build();
-
-
- try {
- serviceDiscovery.start();
- serviceCache.start();
-
- //Don't prime the cache, we want a notification for each child node in the path
- switchCache.start(StartMode.NORMAL);
- } catch (Exception e) {
- throw new FloodlightModuleException("Error initialising ZookeeperRegistry: "
- + e.getMessage());
- }
-
- eventThreadExecutorService = Executors.newSingleThreadExecutor();
- eventThreadExecutorService.execute(
- new Runnable() {
- @Override
- public void run(){
- dispatchEvents();
- }
- });
- }
+ //Read the Zookeeper connection string from the config
+ Map<String, String> configParams = context.getConfigParams(this);
+ String connectionString = configParams.get("connectionString");
+ if (connectionString != null) {
+ this.connectionString = connectionString;
+ }
+ log.info("Setting Zookeeper connection string to {}", this.connectionString);
- @Override
- public void startUp (FloodlightModuleContext context) {
- //
- // Cluster Leader election setup.
- // NOTE: We have to do it here, because during the init stage
- // we don't know the Controller ID.
- //
- if (controllerId == null) {
- log.error("Error on startup: unknown ControllerId");
- }
- clusterLeaderLatch = new LeaderLatch(client,
- CLUSTER_LEADER_PATH,
- controllerId);
- clusterLeaderListener = new ClusterLeaderListener(clusterLeaderLatch);
- clusterLeaderLatch.addListener(clusterLeaderListener);
- try {
- clusterLeaderLatch.start();
- } catch (Exception e) {
- log.error("Error on startup starting the cluster leader election: {}", e.getMessage());
- }
+ //
+ // Initialize the Unique ID generator
+ // TODO: This must be replaced by Zookeeper-based allocation
+ //
+ nextUniqueIdPrefix = randomGenerator.nextInt();
- // Keep trying until there is a cluster leader
- do {
- try {
- Participant leader = clusterLeaderLatch.getLeader();
- if (! leader.getId().isEmpty())
- break;
- Thread.sleep(CLUSTER_LEADER_ELECTION_RETRY_MS);
- } catch (Exception e) {
- log.error("Error on startup waiting for cluster leader election: {}", e.getMessage());
- }
- } while (true);
+ restApi = context.getServiceImpl(IRestApiService.class);
- restApi.addRestletRoutable(new RegistryWebRoutable());
- }
+ switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
+ //switchPathCaches = new HashMap<String, PathChildrenCache>();
+ switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>();
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ client = CuratorFrameworkFactory.newClient(this.connectionString,
+ sessionTimeout, connectionTimeout, retryPolicy);
+
+ client.start();
+ client = client.usingNamespace(namespace);
+
+ distributedIdCounter = new DistributedAtomicLong(
+ client,
+ ID_COUNTER_PATH,
+ new RetryOneTime(100));
+
+ switchCache = new PathChildrenCache(client, switchLatchesPath, true);
+ switchCache.getListenable().addListener(switchPathCacheListener);
+
+ //Build the service discovery object
+ serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class)
+ .client(client).basePath(SERVICES_PATH).build();
+
+ //We read the list of services very frequently (GUI periodically queries them)
+ //so we'll cache them to cut down on Zookeeper queries.
+ serviceCache = serviceDiscovery.serviceCacheBuilder()
+ .name(CONTROLLER_SERVICE_NAME).build();
+
+
+ try {
+ serviceDiscovery.start();
+ serviceCache.start();
+
+ //Don't prime the cache, we want a notification for each child node in the path
+ switchCache.start(StartMode.NORMAL);
+ } catch (Exception e) {
+ throw new FloodlightModuleException("Error initialising ZookeeperRegistry: "
+ + e.getMessage());
+ }
+
+ eventThreadExecutorService = Executors.newSingleThreadExecutor();
+ eventThreadExecutorService.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ dispatchEvents();
+ }
+ });
+ }
+
+ @Override
+ public void startUp(FloodlightModuleContext context) {
+ //
+ // Cluster Leader election setup.
+ // NOTE: We have to do it here, because during the init stage
+ // we don't know the Controller ID.
+ //
+ if (controllerId == null) {
+ log.error("Error on startup: unknown ControllerId");
+ }
+ clusterLeaderLatch = new LeaderLatch(client,
+ CLUSTER_LEADER_PATH,
+ controllerId);
+ clusterLeaderListener = new ClusterLeaderListener(clusterLeaderLatch);
+ clusterLeaderLatch.addListener(clusterLeaderListener);
+ try {
+ clusterLeaderLatch.start();
+ } catch (Exception e) {
+ log.error("Error on startup starting the cluster leader election: {}", e.getMessage());
+ }
+
+ // Keep trying until there is a cluster leader
+ do {
+ try {
+ Participant leader = clusterLeaderLatch.getLeader();
+ if (!leader.getId().isEmpty())
+ break;
+ Thread.sleep(CLUSTER_LEADER_ELECTION_RETRY_MS);
+ } catch (Exception e) {
+ log.error("Error on startup waiting for cluster leader election: {}", e.getMessage());
+ }
+ } while (true);
+
+ restApi.addRestletRoutable(new RegistryWebRoutable());
+ }
}