blob: 464dd0149a53171182a8fc443a6a6925a5d64340 [file] [log] [blame]
package net.onrc.onos.core.registry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.restserver.IRestApiService;
import net.onrc.onos.core.registry.web.RegistryWebRoutable;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
/**
* A registry service that uses Zookeeper. All data is stored in Zookeeper,
* so this can be used as a global registry in a multi-node ONOS cluster.
*
* @author jono
*/
public class ZookeeperRegistry implements IFloodlightModule, IControllerRegistryService {
private static final Logger log = LoggerFactory.getLogger(ZookeeperRegistry.class);
protected String controllerId = null;
protected IRestApiService restApi;
//This is the default, it's overwritten by the connectionString configuration parameter
protected String connectionString = "localhost:2181";
private final String namespace = "onos";
private final String switchLatchesPath = "/switches";
private static final String CLUSTER_LEADER_PATH = "/cluster/leader";
private static final String SERVICES_PATH = "/"; //i.e. the root of our namespace
private static final String CONTROLLER_SERVICE_NAME = "controllers";
protected CuratorFramework client;
protected PathChildrenCache switchCache;
protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
protected Map<String, PathChildrenCache> switchPathCaches;
protected LeaderLatch clusterLeaderLatch;
protected ClusterLeaderListener clusterLeaderListener;
private static final long CLUSTER_LEADER_ELECTION_RETRY_MS = 100;
private static final String ID_COUNTER_PATH = "/flowidcounter";
private static final Long ID_BLOCK_SIZE = 0x100000000L;
protected DistributedAtomicLong distributedIdCounter;
//Zookeeper performance-related configuration
protected static final int SESSION_TIMEOUT = 5000;
protected static final int CONNECTION_TIMEOUT = 7000;
//
// Unique ID generation state
// TODO: The implementation must be updated to use the Zookeeper
// instead of a ramdon generator.
//
private static Random randomGenerator = new Random();
private static int nextUniqueIdPrefix = 0;
private static int nextUniqueIdSuffix = 0;
private final BlockingQueue<SwitchLeaderEvent> switchLeadershipEvents =
new LinkedBlockingQueue<SwitchLeaderEvent>();
private ExecutorService eventThreadExecutorService;
private static class SwitchLeaderEvent {
public final long dpid;
public final boolean isLeader;
public SwitchLeaderEvent(long dpid, boolean isLeader) {
this.dpid = dpid;
this.isLeader = isLeader;
}
}
/*
* Dispatcher thread for leadership change events coming from Curator.
*/
private void dispatchEvents() {
while (!Thread.currentThread().isInterrupted()) {
try {
SwitchLeaderEvent event = switchLeadershipEvents.take();
SwitchLeadershipData swData = switches.get(HexString.toHexString(event.dpid));
if (swData == null) {
log.debug("Leadership data {} not found", event.dpid);
continue;
}
swData.getCallback().controlChanged(event.dpid, event.isLeader);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Exception in registry event thread", e);
}
}
}
protected class SwitchLeaderListener implements LeaderLatchListener {
String dpid;
LeaderLatch latch;
public SwitchLeaderListener(String dpid, LeaderLatch latch) {
this.dpid = dpid;
this.latch = latch;
}
@Override
public void isLeader() {
log.debug("Became leader for {}", dpid);
switchLeadershipEvents.add(new SwitchLeaderEvent(HexString.toLong(dpid), true));
}
@Override
public void notLeader() {
log.debug("Lost leadership for {}", dpid);
switchLeadershipEvents.add(new SwitchLeaderEvent(HexString.toLong(dpid), false));
}
}
protected class SwitchPathCacheListener implements PathChildrenCacheListener {
@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
String strSwitch = null;
if (event.getData() != null) {
String[] splitted = event.getData().getPath().split("/");
strSwitch = splitted[splitted.length - 1];
}
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
//Check we have a PathChildrenCache for this child, add one if not
synchronized (switchPathCaches) {
if (switchPathCaches.get(strSwitch) == null) {
PathChildrenCache pc = new PathChildrenCache(client,
event.getData().getPath(), true);
pc.start(StartMode.NORMAL);
switchPathCaches.put(strSwitch, pc);
}
}
break;
case CHILD_REMOVED:
//Remove our PathChildrenCache for this child
PathChildrenCache pc = null;
synchronized (switchPathCaches) {
pc = switchPathCaches.remove(strSwitch);
}
if (pc != null) {
pc.close();
}
break;
default:
//All other switchLeadershipEvents are connection status switchLeadershipEvents. We don't need to
//do anything as the path cache handles these on its own.
break;
}
}
}
protected static class ClusterLeaderListener implements LeaderLatchListener {
LeaderLatch latch;
public ClusterLeaderListener(LeaderLatch latch) {
this.latch = latch;
}
//
// NOTE: If we need to support callbacks when the
// leadership changes, those should be called here.
//
@Override
public void isLeader() {
log.debug("Cluster leadership aquired");
}
@Override
public void notLeader() {
log.debug("Cluster leadership lost");
}
}
/**
* Listens for changes to the switch znodes in Zookeeper. This maintains
* the second level of PathChildrenCaches that hold the controllers
* contending for each switch - there's one for each switch.
*/
PathChildrenCacheListener switchPathCacheListener = new SwitchPathCacheListener();
protected ServiceDiscovery<ControllerService> serviceDiscovery;
protected ServiceCache<ControllerService> serviceCache;
@Override
public void requestControl(long dpid, ControlChangeCallback cb) throws RegistryException {
log.info("Requesting control for {}", HexString.toHexString(dpid));
if (controllerId == null) {
throw new RuntimeException("Must register a controller before calling requestControl");
}
String dpidStr = HexString.toHexString(dpid);
String latchPath = switchLatchesPath + "/" + dpidStr;
if (switches.get(dpidStr) != null) {
log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
throw new RegistryException("Already contesting control for " + dpidStr);
}
LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
SwitchLeaderListener listener = new SwitchLeaderListener(dpidStr, latch);
latch.addListener(listener);
SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb, listener);
SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
if (oldData != null) {
//There was already data for that key in the map
//i.e. someone else got here first so we can't succeed
log.debug("Already requested control for {}", dpidStr);
throw new RegistryException("Already requested control for " + dpidStr);
}
//Now that we know we were able to add our latch to the collection,
//we can start the leader election in Zookeeper. However I don't know
//how to handle if the start fails - the latch is already in our
//switches list.
//TODO seems like there's a Curator bug when latch.start is called when
//there's no Zookeeper connection which causes two znodes to be put in
//Zookeeper at the latch path when we reconnect to Zookeeper.
try {
latch.start();
} catch (Exception e) {
log.warn("Error starting leader latch: {}", e.getMessage());
throw new RegistryException("Error starting leader latch for " + dpidStr, e);
}
}
@Override
public void releaseControl(long dpid) {
log.info("Releasing control for {}", HexString.toHexString(dpid));
String dpidStr = HexString.toHexString(dpid);
SwitchLeadershipData swData = switches.remove(dpidStr);
if (swData == null) {
log.debug("Trying to release control of a switch we are not contesting");
return;
}
LeaderLatch latch = swData.getLatch();
latch.removeListener(swData.getListener());
try {
latch.close();
} catch (IOException e) {
//I think it's OK not to do anything here. Either the node got
//deleted correctly, or the connection went down and the node got deleted.
log.debug("releaseControl: caught IOException {}", dpidStr);
}
}
@Override
public boolean hasControl(long dpid) {
String dpidStr = HexString.toHexString(dpid);
SwitchLeadershipData swData = switches.get(dpidStr);
if (swData == null) {
log.warn("No leader latch for dpid {}", dpidStr);
return false;
}
return swData.getLatch().hasLeadership();
}
@Override
public boolean isClusterLeader() {
return clusterLeaderLatch.hasLeadership();
}
@Override
public String getControllerId() {
return controllerId;
}
@Override
public Collection<String> getAllControllers() throws RegistryException {
log.debug("Getting all controllers");
List<String> controllers = new ArrayList<String>();
for (ServiceInstance<ControllerService> instance : serviceCache.getInstances()) {
String id = instance.getPayload().getControllerId();
if (!controllers.contains(id)) {
controllers.add(id);
}
}
return controllers;
}
@Override
public void registerController(String id) throws RegistryException {
if (controllerId != null) {
throw new RegistryException(
"Controller already registered with id " + controllerId);
}
controllerId = id;
try {
ServiceInstance<ControllerService> thisInstance = ServiceInstance.<ControllerService>builder()
.name(CONTROLLER_SERVICE_NAME)
.payload(new ControllerService(controllerId))
//.port((int)(65535 * Math.random())) // in a real application, you'd use a common port
//.uriSpec(uriSpec)
.build();
serviceDiscovery.registerService(thisInstance);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public String getControllerForSwitch(long dpid) throws RegistryException {
String dpidStr = HexString.toHexString(dpid);
PathChildrenCache switchCache = switchPathCaches.get(dpidStr);
if (switchCache == null) {
log.warn("Tried to get controller for non-existent switch");
return null;
}
try {
//We've seen issues with these caches get stuck out of date, so we'll have to
//force them to refresh before each read. This slows down the method as it
//blocks on a Zookeeper query, however at the moment only the cleanup thread
//uses this and that isn't particularly time-sensitive.
switchCache.rebuild();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
List<ChildData> sortedData = new ArrayList<ChildData>(switchCache.getCurrentData());
Collections.sort(
sortedData,
new Comparator<ChildData>() {
private String getSequenceNumber(String path) {
return path.substring(path.lastIndexOf('-') + 1);
}
@Override
public int compare(ChildData lhs, ChildData rhs) {
return getSequenceNumber(lhs.getPath()).
compareTo(getSequenceNumber(rhs.getPath()));
}
}
);
if (sortedData.size() == 0) {
return null;
}
return new String(sortedData.get(0).getData(), Charsets.UTF_8);
}
@Override
public Collection<Long> getSwitchesControlledByController(String controllerId) {
//TODO remove this if not needed
throw new RuntimeException("Not yet implemented");
}
//TODO what should happen when there's no ZK connection? Currently we just return
//the cache but this may lead to false impressions - i.e. we don't actually know
//what's in ZK so we shouldn't say we do
@Override
public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
Map<String, List<ControllerRegistryEntry>> data =
new HashMap<String, List<ControllerRegistryEntry>>();
for (Map.Entry<String, PathChildrenCache> entry : switchPathCaches.entrySet()) {
List<ControllerRegistryEntry> contendingControllers =
new ArrayList<ControllerRegistryEntry>();
if (entry.getValue().getCurrentData().size() < 1) {
//TODO prevent even having the PathChildrenCache in this case
//log.info("Switch entry with no leader elections: {}", entry.getKey());
continue;
}
for (ChildData d : entry.getValue().getCurrentData()) {
String controllerId = new String(d.getData(), Charsets.UTF_8);
String[] splitted = d.getPath().split("-");
int sequenceNumber = Integer.parseInt(splitted[splitted.length - 1]);
contendingControllers.add(new ControllerRegistryEntry(controllerId, sequenceNumber));
}
Collections.sort(contendingControllers);
data.put(entry.getKey(), contendingControllers);
}
return data;
}
public IdBlock allocateUniqueIdBlock(long range) {
try {
AtomicValue<Long> result = null;
do {
result = distributedIdCounter.add(range);
} while (result == null || !result.succeeded());
return new IdBlock(result.preValue(), result.postValue() - 1, range);
} catch (Exception e) {
log.error("Error allocating ID block");
}
return null;
}
/**
* Returns a block of IDs which are unique and unused.
* Range of IDs is fixed size and is assigned incrementally as this method called.
* Since the range of IDs is managed by Zookeeper in distributed way, this method may block when
* requests come up simultaneously.
*/
@Override
public IdBlock allocateUniqueIdBlock() {
return allocateUniqueIdBlock(ID_BLOCK_SIZE);
}
/**
* Get a globally unique ID.
*
* @return a globally unique ID.
*/
@Override
public synchronized long getNextUniqueId() {
//
// Generate the next Unique ID.
//
// TODO: For now, the higher 32 bits are random, and
// the lower 32 bits are sequential.
// The implementation must be updated to use the Zookeeper
// to allocate the higher 32 bits (globally unique).
//
if ((nextUniqueIdSuffix & 0xffffffffL) == 0xffffffffL) {
nextUniqueIdPrefix = randomGenerator.nextInt();
nextUniqueIdSuffix = 0;
} else {
nextUniqueIdSuffix++;
}
long result = (long) nextUniqueIdPrefix << 32;
result = result | (0xffffffffL & nextUniqueIdSuffix);
return result;
}
/*
* IFloodlightModule
*/
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IControllerRegistryService.class);
return l;
}
@Override
public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
Map<Class<? extends IFloodlightService>, IFloodlightService> m =
new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
m.put(IControllerRegistryService.class, this);
return m;
}
@Override
public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IFloodlightProviderService.class);
l.add(IRestApiService.class);
return l;
}
//TODO currently blocks startup when it can't get a Zookeeper connection.
//Do we support starting up with no Zookeeper connection?
@Override
public void init(FloodlightModuleContext context) throws FloodlightModuleException {
log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
//Read the Zookeeper connection string from the config
Map<String, String> configParams = context.getConfigParams(this);
String connectionString = configParams.get("connectionString");
if (connectionString != null) {
this.connectionString = connectionString;
}
log.info("Setting Zookeeper connection string to {}", this.connectionString);
//
// Initialize the Unique ID generator
// TODO: This must be replaced by Zookeeper-based allocation
//
nextUniqueIdPrefix = randomGenerator.nextInt();
restApi = context.getServiceImpl(IRestApiService.class);
switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
//switchPathCaches = new HashMap<String, PathChildrenCache>();
switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(this.connectionString,
SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
client.start();
client = client.usingNamespace(namespace);
distributedIdCounter = new DistributedAtomicLong(
client,
ID_COUNTER_PATH,
new RetryOneTime(100));
switchCache = new PathChildrenCache(client, switchLatchesPath, true);
switchCache.getListenable().addListener(switchPathCacheListener);
//Build the service discovery object
serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class)
.client(client).basePath(SERVICES_PATH).build();
//We read the list of services very frequently (GUI periodically queries them)
//so we'll cache them to cut down on Zookeeper queries.
serviceCache = serviceDiscovery.serviceCacheBuilder()
.name(CONTROLLER_SERVICE_NAME).build();
try {
serviceDiscovery.start();
serviceCache.start();
//Don't prime the cache, we want a notification for each child node in the path
switchCache.start(StartMode.NORMAL);
} catch (Exception e) {
throw new FloodlightModuleException("Error initialising ZookeeperRegistry: "
+ e.getMessage());
}
eventThreadExecutorService = Executors.newSingleThreadExecutor();
eventThreadExecutorService.execute(
new Runnable() {
@Override
public void run() {
dispatchEvents();
}
});
}
@Override
public void startUp(FloodlightModuleContext context) {
//
// Cluster Leader election setup.
// NOTE: We have to do it here, because during the init stage
// we don't know the Controller ID.
//
if (controllerId == null) {
log.error("Error on startup: unknown ControllerId");
}
clusterLeaderLatch = new LeaderLatch(client,
CLUSTER_LEADER_PATH,
controllerId);
clusterLeaderListener = new ClusterLeaderListener(clusterLeaderLatch);
clusterLeaderLatch.addListener(clusterLeaderListener);
try {
clusterLeaderLatch.start();
} catch (Exception e) {
log.error("Error on startup starting the cluster leader election: {}", e.getMessage());
}
// Keep trying until there is a cluster leader
do {
try {
Participant leader = clusterLeaderLatch.getLeader();
if (!leader.getId().isEmpty()) {
break;
}
Thread.sleep(CLUSTER_LEADER_ELECTION_RETRY_MS);
} catch (Exception e) {
log.error("Error on startup waiting for cluster leader election: {}", e.getMessage());
}
} while (true);
restApi.addRestletRoutable(new RegistryWebRoutable());
}
}