blob: b8a1021fba0692a605ba57cbd3ef2818ebacfe4f [file] [log] [blame]
Jonathan Hartd82f20d2013-02-21 18:04:24 -08001package net.onrc.onos.registry.controller;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08002
Jonathan Hartbd181b62013-02-17 16:05:38 -08003import java.io.IOException;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08004import java.util.ArrayList;
5import java.util.Collection;
Jonathan Hart3d7730a2013-02-22 11:51:17 -08006import java.util.Collections;
Jonathan Hart599c6b32013-03-24 22:42:02 -07007import java.util.Comparator;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08008import java.util.HashMap;
Jonathan Hartedd6a442013-02-20 15:22:06 -08009import java.util.List;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080010import java.util.Map;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070011import java.util.concurrent.BlockingQueue;
Jonathan Hart89187372013-03-14 16:41:09 -070012import java.util.concurrent.ConcurrentHashMap;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070013import java.util.concurrent.ExecutorService;
14import java.util.concurrent.Executors;
15import java.util.concurrent.LinkedBlockingQueue;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080016
Pavlin Radoslavovc35229e2014-02-06 16:19:37 -080017import net.floodlightcontroller.core.IFloodlightProviderService;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080018import net.floodlightcontroller.core.module.FloodlightModuleContext;
19import net.floodlightcontroller.core.module.FloodlightModuleException;
20import net.floodlightcontroller.core.module.IFloodlightModule;
21import net.floodlightcontroller.core.module.IFloodlightService;
Jonathan Hart3d7730a2013-02-22 11:51:17 -080022import net.floodlightcontroller.restserver.IRestApiService;
Naoki Shiotab32edf52013-12-12 14:09:36 -080023import net.onrc.onos.registry.controller.web.RegistryWebRoutable;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080024
Jonathan Hart116b1fe2014-03-14 18:53:47 -070025import org.apache.curator.RetryPolicy;
26import org.apache.curator.framework.CuratorFramework;
27import org.apache.curator.framework.CuratorFrameworkFactory;
28import org.apache.curator.framework.recipes.atomic.AtomicValue;
29import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
30import org.apache.curator.framework.recipes.cache.ChildData;
31import org.apache.curator.framework.recipes.cache.PathChildrenCache;
32import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
33import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
34import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
35import org.apache.curator.framework.recipes.leader.LeaderLatch;
36import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
37import org.apache.curator.framework.recipes.leader.Participant;
38import org.apache.curator.retry.ExponentialBackoffRetry;
39import org.apache.curator.retry.RetryOneTime;
40import org.apache.curator.x.discovery.ServiceCache;
41import org.apache.curator.x.discovery.ServiceDiscovery;
42import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
43import org.apache.curator.x.discovery.ServiceInstance;
Jonathan Hartbd181b62013-02-17 16:05:38 -080044import org.openflow.util.HexString;
45import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080047
Jonathan Hartd10008d2013-02-23 17:04:08 -080048import com.google.common.base.Charsets;
Jonathan Hartbd181b62013-02-17 16:05:38 -080049
Jonathan Hart7bf62172013-02-28 13:17:18 -080050/**
51 * A registry service that uses Zookeeper. All data is stored in Zookeeper,
52 * so this can be used as a global registry in a multi-node ONOS cluster.
53 * @author jono
54 *
55 */
Jonathan Hartbd766972013-02-22 15:13:03 -080056public class ZookeeperRegistry implements IFloodlightModule, IControllerRegistryService {
Jonathan Hartc6eee9e2013-02-18 14:58:27 -080057
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070058 protected final static Logger log = LoggerFactory.getLogger(ZookeeperRegistry.class);
Jonathan Hartbd766972013-02-22 15:13:03 -080059 protected String controllerId = null;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080060
Jonathan Hart3d7730a2013-02-22 11:51:17 -080061 protected IRestApiService restApi;
62
Jonathan Hart7bf62172013-02-28 13:17:18 -080063 //This is the default, it's overwritten by the connectionString configuration parameter
Jonathan Hartbd181b62013-02-17 16:05:38 -080064 protected String connectionString = "localhost:2181";
Jonathan Hart3d7730a2013-02-22 11:51:17 -080065
Jonathan Hartbd181b62013-02-17 16:05:38 -080066 private final String namespace = "onos";
Jonathan Hartedd6a442013-02-20 15:22:06 -080067 private final String switchLatchesPath = "/switches";
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -080068 private final String CLUSTER_LEADER_PATH = "/cluster/leader";
Jonathan Hart71c0ffc2013-03-24 15:58:42 -070069
70 private final String SERVICES_PATH = "/"; //i.e. the root of our namespace
71 private final String CONTROLLER_SERVICE_NAME = "controllers";
Jonathan Hartbd181b62013-02-17 16:05:38 -080072
73 protected CuratorFramework client;
Jonathan Hartedd6a442013-02-20 15:22:06 -080074
Jonathan Hart3d7730a2013-02-22 11:51:17 -080075 protected PathChildrenCache switchCache;
Jonathan Hartbd181b62013-02-17 16:05:38 -080076
Jonathan Hart89187372013-03-14 16:41:09 -070077 protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
Jonathan Hart3d7730a2013-02-22 11:51:17 -080078 protected Map<String, PathChildrenCache> switchPathCaches;
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -080079
80 protected LeaderLatch clusterLeaderLatch;
81 protected ClusterLeaderListener clusterLeaderListener;
82 private static final long CLUSTER_LEADER_ELECTION_RETRY_MS = 100;
83
Jonathan Hart1530ccc2013-04-03 19:36:02 -070084 private final String ID_COUNTER_PATH = "/flowidcounter";
85 private final Long ID_BLOCK_SIZE = 0x100000000L;
86 protected DistributedAtomicLong distributedIdCounter;
87
Jonathan Hart97801ac2013-02-26 14:29:16 -080088 //Zookeeper performance-related configuration
Jonathan Hart0b3eee42013-03-16 18:20:04 -070089 protected static final int sessionTimeout = 5000;
90 protected static final int connectionTimeout = 7000;
Jonathan Hart57080fb2013-02-21 10:55:46 -080091
Jonathan Hart116b1fe2014-03-14 18:53:47 -070092 private final BlockingQueue<SwitchLeaderEvent> switchLeadershipEvents =
93 new LinkedBlockingQueue<SwitchLeaderEvent>();
94
95 private ExecutorService eventThreadExecutorService;
96
97 private static class SwitchLeaderEvent {
98 public final long dpid;
99 public final boolean isLeader;
100
101 public SwitchLeaderEvent(long dpid, boolean isLeader) {
102 this.dpid = dpid;
103 this.isLeader = isLeader;
104 }
105 }
106
107 /*
108 * Dispatcher thread for leadership change events coming from Curator.
109 */
110 private void dispatchEvents() {
111 while (!Thread.currentThread().isInterrupted()) {
112 try {
113 SwitchLeaderEvent event = switchLeadershipEvents.take();
114 SwitchLeadershipData swData = switches.get(HexString.toHexString(event.dpid));
115 if (swData == null) {
116 log.debug("Leadership data {} not found", event.dpid);
117 continue;
118 }
119
120 swData.getCallback().controlChanged(event.dpid, event.isLeader);
121 } catch (InterruptedException e) {
122 Thread.currentThread().interrupt();
123 break;
124 } catch (Exception e) {
125 log.error("Exception in registry event thread", e);
126 }
127 }
128 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800129
Jonathan Hart89187372013-03-14 16:41:09 -0700130 protected class SwitchLeaderListener implements LeaderLatchListener{
Jonathan Hart0de09492013-03-13 14:37:21 -0700131 String dpid;
132 LeaderLatch latch;
133
Jonathan Hart89187372013-03-14 16:41:09 -0700134 public SwitchLeaderListener(String dpid, LeaderLatch latch){
Jonathan Hart0de09492013-03-13 14:37:21 -0700135 this.dpid = dpid;
136 this.latch = latch;
137 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700138
Jonathan Hart0de09492013-03-13 14:37:21 -0700139 @Override
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700140 public void isLeader() {
141 log.debug("Became leader for {}", dpid);
142
143 switchLeadershipEvents.offer(new SwitchLeaderEvent(HexString.toLong(dpid), true));
144 }
145
146 @Override
147 public void notLeader() {
148 log.debug("Lost leadership for {}", dpid);
Jonathan Hart0de09492013-03-13 14:37:21 -0700149
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700150 switchLeadershipEvents.offer(new SwitchLeaderEvent(HexString.toLong(dpid), false));
Jonathan Hart0de09492013-03-13 14:37:21 -0700151 }
152 }
153
Naoki Shiotad00accf2013-06-25 14:40:37 -0700154 protected class SwitchPathCacheListener implements PathChildrenCacheListener {
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800155 @Override
156 public void childEvent(CuratorFramework client,
157 PathChildrenCacheEvent event) throws Exception {
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800158
159 String strSwitch = null;
160 if (event.getData() != null){
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800161 String[] splitted = event.getData().getPath().split("/");
162 strSwitch = splitted[splitted.length - 1];
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800163 }
164
165 switch (event.getType()){
166 case CHILD_ADDED:
167 case CHILD_UPDATED:
168 //Check we have a PathChildrenCache for this child, add one if not
Jonathan Hart4baf3be2013-03-21 18:26:13 -0700169 synchronized (switchPathCaches){
170 if (switchPathCaches.get(strSwitch) == null){
171 PathChildrenCache pc = new PathChildrenCache(client,
172 event.getData().getPath(), true);
173 pc.start(StartMode.NORMAL);
174 switchPathCaches.put(strSwitch, pc);
175 }
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800176 }
177 break;
178 case CHILD_REMOVED:
179 //Remove our PathChildrenCache for this child
Jonathan Hart4baf3be2013-03-21 18:26:13 -0700180 PathChildrenCache pc = null;
181 synchronized(switchPathCaches){
182 pc = switchPathCaches.remove(strSwitch);
183 }
184 if (pc != null){
185 pc.close();
186 }
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800187 break;
188 default:
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700189 //All other switchLeadershipEvents are connection status switchLeadershipEvents. We don't need to
Jonathan Hart4baf3be2013-03-21 18:26:13 -0700190 //do anything as the path cache handles these on its own.
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800191 break;
192 }
193
194 }
195 };
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800196
197 protected class ClusterLeaderListener implements LeaderLatchListener {
198 LeaderLatch latch;
199
200 public ClusterLeaderListener(LeaderLatch latch) {
201 this.latch = latch;
202 }
203
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700204 //
205 // NOTE: If we need to support callbacks when the
206 // leadership changes, those should be called here.
207 //
208
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800209 @Override
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700210 public void isLeader() {
211 log.debug("Cluster leadership aquired");
212 }
213
214 @Override
215 public void notLeader() {
216 log.debug("Cluster leadership lost");
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800217 }
218 }
219
Naoki Shiotad00accf2013-06-25 14:40:37 -0700220 /**
221 * Listens for changes to the switch znodes in Zookeeper. This maintains
222 * the second level of PathChildrenCaches that hold the controllers
223 * contending for each switch - there's one for each switch.
224 */
225 PathChildrenCacheListener switchPathCacheListener = new SwitchPathCacheListener();
Jonathan Hart71c0ffc2013-03-24 15:58:42 -0700226 protected ServiceDiscovery<ControllerService> serviceDiscovery;
227 protected ServiceCache<ControllerService> serviceCache;
Jonathan Hartedd6a442013-02-20 15:22:06 -0800228
Jonathan Hartbd181b62013-02-17 16:05:38 -0800229
230 @Override
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800231 public void requestControl(long dpid, ControlChangeCallback cb) throws RegistryException {
Jonathan Hart7bf62172013-02-28 13:17:18 -0800232 log.info("Requesting control for {}", HexString.toHexString(dpid));
Jonathan Hartc6eee9e2013-02-18 14:58:27 -0800233
Jonathan Hartbd766972013-02-22 15:13:03 -0800234 if (controllerId == null){
235 throw new RuntimeException("Must register a controller before calling requestControl");
Jonathan Hartbd181b62013-02-17 16:05:38 -0800236 }
237
238 String dpidStr = HexString.toHexString(dpid);
239 String latchPath = switchLatchesPath + "/" + dpidStr;
240
Jonathan Hart89187372013-03-14 16:41:09 -0700241 if (switches.get(dpidStr) != null){
Jonathan Hart3c0eccd2013-03-12 22:32:50 -0700242 log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700243 throw new RegistryException("Already contesting control for " + dpidStr);
Jonathan Hartc6eee9e2013-02-18 14:58:27 -0800244 }
245
Jonathan Hartbd766972013-02-22 15:13:03 -0800246 LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700247 SwitchLeaderListener listener = new SwitchLeaderListener(dpidStr, latch);
248 latch.addListener(listener);
Jonathan Hartbd181b62013-02-17 16:05:38 -0800249
Jonathan Hart44e56fc2013-03-14 16:53:59 -0700250
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700251 SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb, listener);
Jonathan Hart89187372013-03-14 16:41:09 -0700252 SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
253
254 if (oldData != null){
255 //There was already data for that key in the map
256 //i.e. someone else got here first so we can't succeed
257 log.debug("Already requested control for {}", dpidStr);
258 throw new RegistryException("Already requested control for " + dpidStr);
259 }
260
261 //Now that we know we were able to add our latch to the collection,
Jonathan Hart44e56fc2013-03-14 16:53:59 -0700262 //we can start the leader election in Zookeeper. However I don't know
263 //how to handle if the start fails - the latch is already in our
264 //switches list.
265 //TODO seems like there's a Curator bug when latch.start is called when
266 //there's no Zookeeper connection which causes two znodes to be put in
267 //Zookeeper at the latch path when we reconnect to Zookeeper.
Jonathan Hartbd181b62013-02-17 16:05:38 -0800268 try {
Jonathan Hartbd181b62013-02-17 16:05:38 -0800269 latch.start();
270 } catch (Exception e) {
Jonathan Hartc6eee9e2013-02-18 14:58:27 -0800271 log.warn("Error starting leader latch: {}", e.getMessage());
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800272 throw new RegistryException("Error starting leader latch for " + dpidStr, e);
Jonathan Hartbd181b62013-02-17 16:05:38 -0800273 }
274
275 }
276
277 @Override
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800278 public void releaseControl(long dpid) {
Jonathan Hart7bf62172013-02-28 13:17:18 -0800279 log.info("Releasing control for {}", HexString.toHexString(dpid));
Jonathan Hart57080fb2013-02-21 10:55:46 -0800280
Jonathan Hartc6eee9e2013-02-18 14:58:27 -0800281 String dpidStr = HexString.toHexString(dpid);
282
Jonathan Hart89187372013-03-14 16:41:09 -0700283 SwitchLeadershipData swData = switches.remove(dpidStr);
284
285 if (swData == null) {
Jonathan Hart7bf62172013-02-28 13:17:18 -0800286 log.debug("Trying to release control of a switch we are not contesting");
Jonathan Hartbd181b62013-02-17 16:05:38 -0800287 return;
288 }
Jonathan Hart89187372013-03-14 16:41:09 -0700289
Jonathan Hart89187372013-03-14 16:41:09 -0700290 LeaderLatch latch = swData.getLatch();
Jonathan Hartbd181b62013-02-17 16:05:38 -0800291
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700292 latch.removeListener(swData.getListener());
Jonathan Hart4baf3be2013-03-21 18:26:13 -0700293
Jonathan Hartbd181b62013-02-17 16:05:38 -0800294 try {
295 latch.close();
296 } catch (IOException e) {
Jonathan Hart7bf62172013-02-28 13:17:18 -0800297 //I think it's OK not to do anything here. Either the node got
298 //deleted correctly, or the connection went down and the node got deleted.
Umesh Krishnaswamy0ef75ee2013-03-25 17:50:27 -0700299 log.debug("releaseControl: caught IOException {}", dpidStr);
Jonathan Hartbd181b62013-02-17 16:05:38 -0800300 }
301 }
302
303 @Override
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800304 public boolean hasControl(long dpid) {
Jonathan Hart89187372013-03-14 16:41:09 -0700305 String dpidStr = HexString.toHexString(dpid);
Jonathan Hart57080fb2013-02-21 10:55:46 -0800306
Jonathan Hart89187372013-03-14 16:41:09 -0700307 SwitchLeadershipData swData = switches.get(dpidStr);
Jonathan Hartbd181b62013-02-17 16:05:38 -0800308
Jonathan Hart89187372013-03-14 16:41:09 -0700309 if (swData == null) {
310 log.warn("No leader latch for dpid {}", dpidStr);
Jonathan Hartbd181b62013-02-17 16:05:38 -0800311 return false;
312 }
313
Jonathan Hart89187372013-03-14 16:41:09 -0700314 return swData.getLatch().hasLeadership();
Jonathan Hartbd181b62013-02-17 16:05:38 -0800315 }
316
317 @Override
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800318 public boolean isClusterLeader() {
319 return clusterLeaderLatch.hasLeadership();
320 }
321
322 @Override
Jonathan Hart7bf62172013-02-28 13:17:18 -0800323 public String getControllerId() {
Jonathan Hartbd766972013-02-22 15:13:03 -0800324 return controllerId;
Jonathan Hartbd181b62013-02-17 16:05:38 -0800325 }
326
Jonathan Hartedd6a442013-02-20 15:22:06 -0800327 @Override
Jonathan Hart57080fb2013-02-21 10:55:46 -0800328 public Collection<String> getAllControllers() throws RegistryException {
Jonathan Hartedd6a442013-02-20 15:22:06 -0800329 log.debug("Getting all controllers");
Jonathan Hart1be46262013-02-20 16:43:51 -0800330
Jonathan Hartedd6a442013-02-20 15:22:06 -0800331 List<String> controllers = new ArrayList<String>();
Jonathan Hart71c0ffc2013-03-24 15:58:42 -0700332 for (ServiceInstance<ControllerService> instance : serviceCache.getInstances()){
333 String id = instance.getPayload().getControllerId();
334 if (!controllers.contains(id)){
335 controllers.add(id);
Jonathan Hartedd6a442013-02-20 15:22:06 -0800336 }
Jonathan Hartedd6a442013-02-20 15:22:06 -0800337 }
Jonathan Hart71c0ffc2013-03-24 15:58:42 -0700338
Jonathan Hartedd6a442013-02-20 15:22:06 -0800339 return controllers;
340 }
341
342 @Override
Jonathan Hart57080fb2013-02-21 10:55:46 -0800343 public void registerController(String id) throws RegistryException {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800344 if (controllerId != null) {
345 throw new RegistryException(
346 "Controller already registered with id " + controllerId);
347 }
Jonathan Hartbd766972013-02-22 15:13:03 -0800348
349 controllerId = id;
Jonathan Hart57080fb2013-02-21 10:55:46 -0800350
Jonathan Hartedd6a442013-02-20 15:22:06 -0800351 try {
Jonathan Hart71c0ffc2013-03-24 15:58:42 -0700352 ServiceInstance<ControllerService> thisInstance = ServiceInstance.<ControllerService>builder()
353 .name(CONTROLLER_SERVICE_NAME)
354 .payload(new ControllerService(controllerId))
355 //.port((int)(65535 * Math.random())) // in a real application, you'd use a common port
356 //.uriSpec(uriSpec)
357 .build();
Jonathan Hart0b3eee42013-03-16 18:20:04 -0700358
Jonathan Hart71c0ffc2013-03-24 15:58:42 -0700359 serviceDiscovery.registerService(thisInstance);
Jonathan Hartedd6a442013-02-20 15:22:06 -0800360 } catch (Exception e) {
Jonathan Hart71c0ffc2013-03-24 15:58:42 -0700361 // TODO Auto-generated catch block
362 e.printStackTrace();
Jonathan Hartedd6a442013-02-20 15:22:06 -0800363 }
Jonathan Hart71c0ffc2013-03-24 15:58:42 -0700364
Jonathan Hartedd6a442013-02-20 15:22:06 -0800365 }
366
367 @Override
Jonathan Hart57080fb2013-02-21 10:55:46 -0800368 public String getControllerForSwitch(long dpid) throws RegistryException {
Jonathan Hart89187372013-03-14 16:41:09 -0700369 String dpidStr = HexString.toHexString(dpid);
Pankaj Berde017960a2013-03-14 20:32:26 -0700370
Jonathan Hart599c6b32013-03-24 22:42:02 -0700371 PathChildrenCache switchCache = switchPathCaches.get(dpidStr);
372
373 if (switchCache == null){
Jonathan Hartedd6a442013-02-20 15:22:06 -0800374 log.warn("Tried to get controller for non-existent switch");
375 return null;
376 }
377
Jonathan Hartf4e80842013-03-26 23:55:02 -0700378 try {
379 //We've seen issues with these caches get stuck out of date, so we'll have to
380 //force them to refresh before each read. This slows down the method as it
381 //blocks on a Zookeeper query, however at the moment only the cleanup thread
382 //uses this and that isn't particularly time-sensitive.
383 switchCache.rebuild();
384 } catch (Exception e) {
385 // TODO Auto-generated catch block
386 e.printStackTrace();
387 }
388
Jonathan Hart599c6b32013-03-24 22:42:02 -0700389 List<ChildData> sortedData = new ArrayList<ChildData>(switchCache.getCurrentData());
Jonathan Hart0b3eee42013-03-16 18:20:04 -0700390
Jonathan Hart599c6b32013-03-24 22:42:02 -0700391 Collections.sort(
392 sortedData,
393 new Comparator<ChildData>(){
394 private String getSequenceNumber(String path){
395 return path.substring(path.lastIndexOf('-') + 1);
396 }
397 @Override
398 public int compare(ChildData lhs, ChildData rhs) {
399 return getSequenceNumber(lhs.getPath()).
400 compareTo(getSequenceNumber(rhs.getPath()));
401 }
402 }
403 );
Jonathan Hartedd6a442013-02-20 15:22:06 -0800404
Jonathan Hart56b296e2013-03-25 13:30:10 -0700405 if (sortedData.size() == 0){
406 return null;
407 }
408
Jonathan Hart599c6b32013-03-24 22:42:02 -0700409 return new String(sortedData.get(0).getData(), Charsets.UTF_8);
Jonathan Hartedd6a442013-02-20 15:22:06 -0800410 }
411
412 @Override
413 public Collection<Long> getSwitchesControlledByController(String controllerId) {
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800414 //TODO remove this if not needed
Jonathan Hartbd766972013-02-22 15:13:03 -0800415 throw new RuntimeException("Not yet implemented");
Jonathan Hartedd6a442013-02-20 15:22:06 -0800416 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800417
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800418
Jonathan Hart89187372013-03-14 16:41:09 -0700419 //TODO what should happen when there's no ZK connection? Currently we just return
420 //the cache but this may lead to false impressions - i.e. we don't actually know
421 //what's in ZK so we shouldn't say we do
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800422 @Override
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800423 public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
424 Map<String, List<ControllerRegistryEntry>> data =
425 new HashMap<String, List<ControllerRegistryEntry>>();
426
427 for (Map.Entry<String, PathChildrenCache> entry : switchPathCaches.entrySet()){
428 List<ControllerRegistryEntry> contendingControllers =
429 new ArrayList<ControllerRegistryEntry>();
430
431 if (entry.getValue().getCurrentData().size() < 1){
Jonathan Hartcbb4b952013-03-18 16:15:18 -0700432 //TODO prevent even having the PathChildrenCache in this case
433 //log.info("Switch entry with no leader elections: {}", entry.getKey());
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800434 continue;
435 }
436
437 for (ChildData d : entry.getValue().getCurrentData()) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800438
Jonathan Hartd10008d2013-02-23 17:04:08 -0800439 String controllerId = new String(d.getData(), Charsets.UTF_8);
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800440
441 String[] splitted = d.getPath().split("-");
442 int sequenceNumber = Integer.parseInt(splitted[splitted.length - 1]);
443
444 contendingControllers.add(new ControllerRegistryEntry(controllerId, sequenceNumber));
445 }
446
447 Collections.sort(contendingControllers);
448 data.put(entry.getKey(), contendingControllers);
449 }
450 return data;
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800451 }
452
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800453 public IdBlock allocateUniqueIdBlock(long range) {
454 try {
455 AtomicValue<Long> result = null;
456 do {
457 result = distributedIdCounter.add(range);
458 } while (result == null || !result.succeeded());
459
460 return new IdBlock(result.preValue(), result.postValue() - 1, range);
461 } catch (Exception e) {
462 log.error("Error allocating ID block");
463 }
464 return null;
465 }
466
Naoki Shiotaa3b2dfa2013-06-27 13:52:24 -0700467 /**
468 * Returns a block of IDs which are unique and unused.
469 * Range of IDs is fixed size and is assigned incrementally as this method called.
470 * Since the range of IDs is managed by Zookeeper in distributed way, this method may block when
471 * requests come up simultaneously.
472 */
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800473 @Override
Jonathan Hart1530ccc2013-04-03 19:36:02 -0700474 public IdBlock allocateUniqueIdBlock(){
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800475 return allocateUniqueIdBlock(ID_BLOCK_SIZE);
Jonathan Hart1530ccc2013-04-03 19:36:02 -0700476 }
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800477
Jonathan Hartbd181b62013-02-17 16:05:38 -0800478 /*
479 * IFloodlightModule
480 */
481
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800482 @Override
483 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Jonathan Hartedd6a442013-02-20 15:22:06 -0800484 Collection<Class<? extends IFloodlightService>> l =
485 new ArrayList<Class<? extends IFloodlightService>>();
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800486 l.add(IControllerRegistryService.class);
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800487 return l;
488 }
489
490 @Override
491 public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
492 Map<Class<? extends IFloodlightService>, IFloodlightService> m =
493 new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800494 m.put(IControllerRegistryService.class, this);
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800495 return m;
496 }
497
498 @Override
499 public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800500 Collection<Class<? extends IFloodlightService>> l =
501 new ArrayList<Class<? extends IFloodlightService>>();
Pavlin Radoslavovc35229e2014-02-06 16:19:37 -0800502 l.add(IFloodlightProviderService.class);
503 l.add(IRestApiService.class);
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800504 return l;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800505 }
506
Jonathan Hart89187372013-03-14 16:41:09 -0700507 //TODO currently blocks startup when it can't get a Zookeeper connection.
508 //Do we support starting up with no Zookeeper connection?
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800509 @Override
510 public void init (FloodlightModuleContext context) throws FloodlightModuleException {
Jonathan Hartbd766972013-02-22 15:13:03 -0800511 log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
512
Jonathan Hart97801ac2013-02-26 14:29:16 -0800513 //Read the Zookeeper connection string from the config
514 Map<String, String> configParams = context.getConfigParams(this);
515 String connectionString = configParams.get("connectionString");
516 if (connectionString != null){
517 this.connectionString = connectionString;
Jonathan Hart57080fb2013-02-21 10:55:46 -0800518 }
Jonathan Hart97801ac2013-02-26 14:29:16 -0800519 log.info("Setting Zookeeper connection string to {}", this.connectionString);
Jonathan Hart57080fb2013-02-21 10:55:46 -0800520
Jonathan Hart97801ac2013-02-26 14:29:16 -0800521 restApi = context.getServiceImpl(IRestApiService.class);
Jonathan Hartbd181b62013-02-17 16:05:38 -0800522
Jonathan Hart89187372013-03-14 16:41:09 -0700523 switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
Jonathan Hart4baf3be2013-03-21 18:26:13 -0700524 //switchPathCaches = new HashMap<String, PathChildrenCache>();
525 switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>();
Jonathan Hartbd181b62013-02-17 16:05:38 -0800526
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800527 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
Jonathan Hart97801ac2013-02-26 14:29:16 -0800528 client = CuratorFrameworkFactory.newClient(this.connectionString,
Jonathan Hartcc957a02013-02-26 10:39:04 -0800529 sessionTimeout, connectionTimeout, retryPolicy);
Jonathan Hartbd181b62013-02-17 16:05:38 -0800530
531 client.start();
Jonathan Hartbd181b62013-02-17 16:05:38 -0800532 client = client.usingNamespace(namespace);
Jonathan Hart97801ac2013-02-26 14:29:16 -0800533
Jonathan Hart1530ccc2013-04-03 19:36:02 -0700534 distributedIdCounter = new DistributedAtomicLong(
535 client,
536 ID_COUNTER_PATH,
537 new RetryOneTime(100));
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800538
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800539 switchCache = new PathChildrenCache(client, switchLatchesPath, true);
540 switchCache.getListenable().addListener(switchPathCacheListener);
Jonathan Hartedd6a442013-02-20 15:22:06 -0800541
Jonathan Hart71c0ffc2013-03-24 15:58:42 -0700542 //Build the service discovery object
543 serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class)
544 .client(client).basePath(SERVICES_PATH).build();
545
546 //We read the list of services very frequently (GUI periodically queries them)
547 //so we'll cache them to cut down on Zookeeper queries.
548 serviceCache = serviceDiscovery.serviceCacheBuilder()
549 .name(CONTROLLER_SERVICE_NAME).build();
550
551
Jonathan Hartedd6a442013-02-20 15:22:06 -0800552 try {
Jonathan Hart71c0ffc2013-03-24 15:58:42 -0700553 serviceDiscovery.start();
554 serviceCache.start();
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800555
556 //Don't prime the cache, we want a notification for each child node in the path
557 switchCache.start(StartMode.NORMAL);
Jonathan Hartedd6a442013-02-20 15:22:06 -0800558 } catch (Exception e) {
Jonathan Hart7bf62172013-02-28 13:17:18 -0800559 throw new FloodlightModuleException("Error initialising ZookeeperRegistry: "
560 + e.getMessage());
Jonathan Hartedd6a442013-02-20 15:22:06 -0800561 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700562
563 eventThreadExecutorService = Executors.newSingleThreadExecutor();
564 eventThreadExecutorService.execute(
565 new Runnable() {
566 @Override
567 public void run(){
568 dispatchEvents();
569 }
570 });
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800571 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700572
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800573 @Override
574 public void startUp (FloodlightModuleContext context) {
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800575 //
576 // Cluster Leader election setup.
577 // NOTE: We have to do it here, because during the init stage
578 // we don't know the Controller ID.
579 //
580 if (controllerId == null) {
581 log.error("Error on startup: unknown ControllerId");
582 }
583 clusterLeaderLatch = new LeaderLatch(client,
584 CLUSTER_LEADER_PATH,
585 controllerId);
586 clusterLeaderListener = new ClusterLeaderListener(clusterLeaderLatch);
587 clusterLeaderLatch.addListener(clusterLeaderListener);
588 try {
589 clusterLeaderLatch.start();
590 } catch (Exception e) {
591 log.error("Error on startup starting the cluster leader election: {}", e.getMessage());
592 }
593
594 // Keep trying until there is a cluster leader
595 do {
596 try {
597 Participant leader = clusterLeaderLatch.getLeader();
598 if (! leader.getId().isEmpty())
599 break;
600 Thread.sleep(CLUSTER_LEADER_ELECTION_RETRY_MS);
601 } catch (Exception e) {
602 log.error("Error on startup waiting for cluster leader election: {}", e.getMessage());
603 }
604 } while (true);
605
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800606 restApi.addRestletRoutable(new RegistryWebRoutable());
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800607 }
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800608}