blob: 070850d32a601ce634dba901fa65e66451ca11eb [file] [log] [blame]
Jonathan Hartdeda0ba2014-04-03 11:14:12 -07001package net.onrc.onos.core.registry;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08002
Jonathan Hartbd181b62013-02-17 16:05:38 -08003import java.io.IOException;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08004import java.util.ArrayList;
5import java.util.Collection;
Jonathan Hart3d7730a2013-02-22 11:51:17 -08006import java.util.Collections;
Jonathan Hart599c6b32013-03-24 22:42:02 -07007import java.util.Comparator;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08008import java.util.HashMap;
Jonathan Hartedd6a442013-02-20 15:22:06 -08009import java.util.List;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080010import java.util.Map;
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070011import java.util.Random;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070012import java.util.concurrent.BlockingQueue;
Jonathan Hart89187372013-03-14 16:41:09 -070013import java.util.concurrent.ConcurrentHashMap;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070014import java.util.concurrent.ExecutorService;
15import java.util.concurrent.Executors;
16import java.util.concurrent.LinkedBlockingQueue;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080017
Pavlin Radoslavovc35229e2014-02-06 16:19:37 -080018import net.floodlightcontroller.core.IFloodlightProviderService;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080019import net.floodlightcontroller.core.module.FloodlightModuleContext;
20import net.floodlightcontroller.core.module.FloodlightModuleException;
21import net.floodlightcontroller.core.module.IFloodlightModule;
22import net.floodlightcontroller.core.module.IFloodlightService;
Jonathan Hart3d7730a2013-02-22 11:51:17 -080023import net.floodlightcontroller.restserver.IRestApiService;
Jonathan Hartdeda0ba2014-04-03 11:14:12 -070024import net.onrc.onos.core.registry.web.RegistryWebRoutable;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080025
Jonathan Hart116b1fe2014-03-14 18:53:47 -070026import org.apache.curator.RetryPolicy;
27import org.apache.curator.framework.CuratorFramework;
28import org.apache.curator.framework.CuratorFrameworkFactory;
29import org.apache.curator.framework.recipes.atomic.AtomicValue;
30import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
31import org.apache.curator.framework.recipes.cache.ChildData;
32import org.apache.curator.framework.recipes.cache.PathChildrenCache;
33import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
34import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
35import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
36import org.apache.curator.framework.recipes.leader.LeaderLatch;
37import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
38import org.apache.curator.framework.recipes.leader.Participant;
39import org.apache.curator.retry.ExponentialBackoffRetry;
40import org.apache.curator.retry.RetryOneTime;
41import org.apache.curator.x.discovery.ServiceCache;
42import org.apache.curator.x.discovery.ServiceDiscovery;
43import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
44import org.apache.curator.x.discovery.ServiceInstance;
Jonathan Hartbd181b62013-02-17 16:05:38 -080045import org.openflow.util.HexString;
46import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080048
Jonathan Hartd10008d2013-02-23 17:04:08 -080049import com.google.common.base.Charsets;
Jonathan Hartbd181b62013-02-17 16:05:38 -080050
Jonathan Hart7bf62172013-02-28 13:17:18 -080051/**
52 * A registry service that uses Zookeeper. All data is stored in Zookeeper,
53 * so this can be used as a global registry in a multi-node ONOS cluster.
Jonathan Hart7bf62172013-02-28 13:17:18 -080054 *
Ray Milkey269ffb92014-04-03 14:43:30 -070055 * @author jono
Jonathan Hart7bf62172013-02-28 13:17:18 -080056 */
Jonathan Hartbd766972013-02-22 15:13:03 -080057public class ZookeeperRegistry implements IFloodlightModule, IControllerRegistryService {
Jonathan Hartc6eee9e2013-02-18 14:58:27 -080058
Ray Milkeyec838942014-04-09 11:28:43 -070059 private static final Logger log = LoggerFactory.getLogger(ZookeeperRegistry.class);
Ray Milkey269ffb92014-04-03 14:43:30 -070060 protected String controllerId = null;
Jonathan Hart71c0ffc2013-03-24 15:58:42 -070061
Ray Milkey269ffb92014-04-03 14:43:30 -070062 protected IRestApiService restApi;
Jonathan Hartbd181b62013-02-17 16:05:38 -080063
Ray Milkey269ffb92014-04-03 14:43:30 -070064 //This is the default, it's overwritten by the connectionString configuration parameter
65 protected String connectionString = "localhost:2181";
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -080066
Ray Milkey269ffb92014-04-03 14:43:30 -070067 private final String namespace = "onos";
68 private final String switchLatchesPath = "/switches";
Ray Milkey2476cac2014-04-08 11:03:21 -070069 private static final String CLUSTER_LEADER_PATH = "/cluster/leader";
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -080070
Ray Milkey2476cac2014-04-08 11:03:21 -070071 private static final String SERVICES_PATH = "/"; //i.e. the root of our namespace
72 private static final String CONTROLLER_SERVICE_NAME = "controllers";
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070073
Ray Milkey269ffb92014-04-03 14:43:30 -070074 protected CuratorFramework client;
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070075
Ray Milkey269ffb92014-04-03 14:43:30 -070076 protected PathChildrenCache switchCache;
77
78 protected ConcurrentHashMap<String, SwitchLeadershipData> switches;
79 protected Map<String, PathChildrenCache> switchPathCaches;
80
81 protected LeaderLatch clusterLeaderLatch;
82 protected ClusterLeaderListener clusterLeaderListener;
83 private static final long CLUSTER_LEADER_ELECTION_RETRY_MS = 100;
84
Ray Milkey2476cac2014-04-08 11:03:21 -070085 private static final String ID_COUNTER_PATH = "/flowidcounter";
86 private static final Long ID_BLOCK_SIZE = 0x100000000L;
Ray Milkey269ffb92014-04-03 14:43:30 -070087 protected DistributedAtomicLong distributedIdCounter;
88
89 //Zookeeper performance-related configuration
Ray Milkey5c9f2db2014-04-09 10:31:21 -070090 protected static final int SESSION_TIMEOUT = 5000;
91 protected static final int CONNECTION_TIMEOUT = 7000;
Ray Milkey269ffb92014-04-03 14:43:30 -070092
93 //
94 // Unique ID generation state
95 // TODO: The implementation must be updated to use the Zookeeper
96 // instead of a ramdon generator.
97 //
98 private static Random randomGenerator = new Random();
Pavlin Radoslavov952a9762014-04-10 13:47:03 -070099 private static long nextUniqueIdPrefix = 0;
100 // NOTE: The 0xffffffffL value is used by the Unique ID generator for
101 // initialization purpose.
102 private static long nextUniqueIdSuffix = 0xffffffffL;
Ray Milkey269ffb92014-04-03 14:43:30 -0700103
104 private final BlockingQueue<SwitchLeaderEvent> switchLeadershipEvents =
105 new LinkedBlockingQueue<SwitchLeaderEvent>();
106
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700107 private ExecutorService eventThreadExecutorService;
Ray Milkey269ffb92014-04-03 14:43:30 -0700108
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700109 private static class SwitchLeaderEvent {
Ray Milkey269ffb92014-04-03 14:43:30 -0700110 public final long dpid;
111 public final boolean isLeader;
112
113 public SwitchLeaderEvent(long dpid, boolean isLeader) {
114 this.dpid = dpid;
115 this.isLeader = isLeader;
116 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700117 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700118
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700119 /*
120 * Dispatcher thread for leadership change events coming from Curator.
121 */
122 private void dispatchEvents() {
Ray Milkey269ffb92014-04-03 14:43:30 -0700123 while (!Thread.currentThread().isInterrupted()) {
124 try {
125 SwitchLeaderEvent event = switchLeadershipEvents.take();
126 SwitchLeadershipData swData = switches.get(HexString.toHexString(event.dpid));
127 if (swData == null) {
128 log.debug("Leadership data {} not found", event.dpid);
129 continue;
130 }
131
132 swData.getCallback().controlChanged(event.dpid, event.isLeader);
133 } catch (InterruptedException e) {
134 Thread.currentThread().interrupt();
135 break;
136 } catch (Exception e) {
137 log.error("Exception in registry event thread", e);
138 }
139 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700140 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800141
Ray Milkey269ffb92014-04-03 14:43:30 -0700142 protected class SwitchLeaderListener implements LeaderLatchListener {
143 String dpid;
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700144
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700145 public SwitchLeaderListener(String dpid) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700146 this.dpid = dpid;
Ray Milkey269ffb92014-04-03 14:43:30 -0700147 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700148
Ray Milkey269ffb92014-04-03 14:43:30 -0700149 @Override
150 public void isLeader() {
151 log.debug("Became leader for {}", dpid);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800152
Pavlin Radoslavov8374e4f2014-04-10 11:56:15 -0700153 switchLeadershipEvents.add(new SwitchLeaderEvent(HexString.toLong(dpid), true));
Ray Milkey269ffb92014-04-03 14:43:30 -0700154 }
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800155
Ray Milkey269ffb92014-04-03 14:43:30 -0700156 @Override
157 public void notLeader() {
158 log.debug("Lost leadership for {}", dpid);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800159
Pavlin Radoslavov8374e4f2014-04-10 11:56:15 -0700160 switchLeadershipEvents.add(new SwitchLeaderEvent(HexString.toLong(dpid), false));
Ray Milkey269ffb92014-04-03 14:43:30 -0700161 }
162 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700163
Ray Milkey269ffb92014-04-03 14:43:30 -0700164 protected class SwitchPathCacheListener implements PathChildrenCacheListener {
165 @Override
166 public void childEvent(CuratorFramework client,
167 PathChildrenCacheEvent event) throws Exception {
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800168
Ray Milkey269ffb92014-04-03 14:43:30 -0700169 String strSwitch = null;
170 if (event.getData() != null) {
171 String[] splitted = event.getData().getPath().split("/");
172 strSwitch = splitted[splitted.length - 1];
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800173 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700174
175 switch (event.getType()) {
176 case CHILD_ADDED:
177 case CHILD_UPDATED:
178 //Check we have a PathChildrenCache for this child, add one if not
179 synchronized (switchPathCaches) {
180 if (switchPathCaches.get(strSwitch) == null) {
181 PathChildrenCache pc = new PathChildrenCache(client,
182 event.getData().getPath(), true);
183 pc.start(StartMode.NORMAL);
184 switchPathCaches.put(strSwitch, pc);
185 }
186 }
187 break;
188 case CHILD_REMOVED:
189 //Remove our PathChildrenCache for this child
190 PathChildrenCache pc = null;
191 synchronized (switchPathCaches) {
192 pc = switchPathCaches.remove(strSwitch);
193 }
194 if (pc != null) {
195 pc.close();
196 }
197 break;
198 default:
199 //All other switchLeadershipEvents are connection status switchLeadershipEvents. We don't need to
200 //do anything as the path cache handles these on its own.
201 break;
202 }
203
204 }
205 }
206
Pavlin Radoslavovfee80982014-04-10 12:12:04 -0700207 protected static class ClusterLeaderListener implements LeaderLatchListener {
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700208 public ClusterLeaderListener() {
Ray Milkey269ffb92014-04-03 14:43:30 -0700209 }
210
211 //
212 // NOTE: If we need to support callbacks when the
213 // leadership changes, those should be called here.
214 //
215
216 @Override
217 public void isLeader() {
218 log.debug("Cluster leadership aquired");
219 }
220
221 @Override
222 public void notLeader() {
223 log.debug("Cluster leadership lost");
224 }
225 }
226
227 /**
228 * Listens for changes to the switch znodes in Zookeeper. This maintains
229 * the second level of PathChildrenCaches that hold the controllers
230 * contending for each switch - there's one for each switch.
231 */
232 PathChildrenCacheListener switchPathCacheListener = new SwitchPathCacheListener();
233 protected ServiceDiscovery<ControllerService> serviceDiscovery;
234 protected ServiceCache<ControllerService> serviceCache;
235
236
237 @Override
238 public void requestControl(long dpid, ControlChangeCallback cb) throws RegistryException {
239 log.info("Requesting control for {}", HexString.toHexString(dpid));
240
241 if (controllerId == null) {
242 throw new RuntimeException("Must register a controller before calling requestControl");
243 }
244
245 String dpidStr = HexString.toHexString(dpid);
246 String latchPath = switchLatchesPath + "/" + dpidStr;
247
248 if (switches.get(dpidStr) != null) {
249 log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
250 throw new RegistryException("Already contesting control for " + dpidStr);
251 }
252
253 LeaderLatch latch = new LeaderLatch(client, latchPath, controllerId);
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700254 SwitchLeaderListener listener = new SwitchLeaderListener(dpidStr);
Ray Milkey269ffb92014-04-03 14:43:30 -0700255 latch.addListener(listener);
256
257
258 SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb, listener);
259 SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
260
261 if (oldData != null) {
262 //There was already data for that key in the map
263 //i.e. someone else got here first so we can't succeed
264 log.debug("Already requested control for {}", dpidStr);
265 throw new RegistryException("Already requested control for " + dpidStr);
266 }
267
268 //Now that we know we were able to add our latch to the collection,
269 //we can start the leader election in Zookeeper. However I don't know
270 //how to handle if the start fails - the latch is already in our
271 //switches list.
272 //TODO seems like there's a Curator bug when latch.start is called when
273 //there's no Zookeeper connection which causes two znodes to be put in
274 //Zookeeper at the latch path when we reconnect to Zookeeper.
275 try {
276 latch.start();
277 } catch (Exception e) {
278 log.warn("Error starting leader latch: {}", e.getMessage());
279 throw new RegistryException("Error starting leader latch for " + dpidStr, e);
280 }
281
282 }
283
284 @Override
285 public void releaseControl(long dpid) {
286 log.info("Releasing control for {}", HexString.toHexString(dpid));
287
288 String dpidStr = HexString.toHexString(dpid);
289
290 SwitchLeadershipData swData = switches.remove(dpidStr);
291
292 if (swData == null) {
293 log.debug("Trying to release control of a switch we are not contesting");
294 return;
295 }
296
297 LeaderLatch latch = swData.getLatch();
298
299 latch.removeListener(swData.getListener());
300
301 try {
302 latch.close();
303 } catch (IOException e) {
304 //I think it's OK not to do anything here. Either the node got
305 //deleted correctly, or the connection went down and the node got deleted.
306 log.debug("releaseControl: caught IOException {}", dpidStr);
307 }
308 }
309
310 @Override
311 public boolean hasControl(long dpid) {
312 String dpidStr = HexString.toHexString(dpid);
313
314 SwitchLeadershipData swData = switches.get(dpidStr);
315
316 if (swData == null) {
317 log.warn("No leader latch for dpid {}", dpidStr);
318 return false;
319 }
320
321 return swData.getLatch().hasLeadership();
322 }
323
324 @Override
325 public boolean isClusterLeader() {
326 return clusterLeaderLatch.hasLeadership();
327 }
328
329 @Override
330 public String getControllerId() {
331 return controllerId;
332 }
333
334 @Override
335 public Collection<String> getAllControllers() throws RegistryException {
336 log.debug("Getting all controllers");
337
338 List<String> controllers = new ArrayList<String>();
339 for (ServiceInstance<ControllerService> instance : serviceCache.getInstances()) {
340 String id = instance.getPayload().getControllerId();
341 if (!controllers.contains(id)) {
342 controllers.add(id);
343 }
344 }
345
346 return controllers;
347 }
348
349 @Override
350 public void registerController(String id) throws RegistryException {
351 if (controllerId != null) {
352 throw new RegistryException(
353 "Controller already registered with id " + controllerId);
354 }
355
356 controllerId = id;
357
358 try {
359 ServiceInstance<ControllerService> thisInstance = ServiceInstance.<ControllerService>builder()
360 .name(CONTROLLER_SERVICE_NAME)
361 .payload(new ControllerService(controllerId))
362 //.port((int)(65535 * Math.random())) // in a real application, you'd use a common port
363 //.uriSpec(uriSpec)
364 .build();
365
366 serviceDiscovery.registerService(thisInstance);
367 } catch (Exception e) {
368 // TODO Auto-generated catch block
369 e.printStackTrace();
370 }
371
372 }
373
374 @Override
375 public String getControllerForSwitch(long dpid) throws RegistryException {
376 String dpidStr = HexString.toHexString(dpid);
377
378 PathChildrenCache switchCache = switchPathCaches.get(dpidStr);
379
380 if (switchCache == null) {
381 log.warn("Tried to get controller for non-existent switch");
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800382 return null;
383 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700384
Ray Milkey269ffb92014-04-03 14:43:30 -0700385 try {
386 //We've seen issues with these caches get stuck out of date, so we'll have to
387 //force them to refresh before each read. This slows down the method as it
388 //blocks on a Zookeeper query, however at the moment only the cleanup thread
389 //uses this and that isn't particularly time-sensitive.
390 switchCache.rebuild();
391 } catch (Exception e) {
392 // TODO Auto-generated catch block
393 e.printStackTrace();
394 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700395
Ray Milkey269ffb92014-04-03 14:43:30 -0700396 List<ChildData> sortedData = new ArrayList<ChildData>(switchCache.getCurrentData());
397
398 Collections.sort(
399 sortedData,
400 new Comparator<ChildData>() {
401 private String getSequenceNumber(String path) {
402 return path.substring(path.lastIndexOf('-') + 1);
403 }
404
405 @Override
406 public int compare(ChildData lhs, ChildData rhs) {
407 return getSequenceNumber(lhs.getPath()).
408 compareTo(getSequenceNumber(rhs.getPath()));
409 }
410 }
411 );
412
413 if (sortedData.size() == 0) {
414 return null;
415 }
416
417 return new String(sortedData.get(0).getData(), Charsets.UTF_8);
418 }
419
420 @Override
421 public Collection<Long> getSwitchesControlledByController(String controllerId) {
422 //TODO remove this if not needed
423 throw new RuntimeException("Not yet implemented");
424 }
425
426
427 //TODO what should happen when there's no ZK connection? Currently we just return
428 //the cache but this may lead to false impressions - i.e. we don't actually know
429 //what's in ZK so we shouldn't say we do
430 @Override
431 public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
432 Map<String, List<ControllerRegistryEntry>> data =
433 new HashMap<String, List<ControllerRegistryEntry>>();
434
435 for (Map.Entry<String, PathChildrenCache> entry : switchPathCaches.entrySet()) {
436 List<ControllerRegistryEntry> contendingControllers =
437 new ArrayList<ControllerRegistryEntry>();
438
439 if (entry.getValue().getCurrentData().size() < 1) {
440 //TODO prevent even having the PathChildrenCache in this case
441 //log.info("Switch entry with no leader elections: {}", entry.getKey());
442 continue;
443 }
444
445 for (ChildData d : entry.getValue().getCurrentData()) {
446
447 String controllerId = new String(d.getData(), Charsets.UTF_8);
448
449 String[] splitted = d.getPath().split("-");
450 int sequenceNumber = Integer.parseInt(splitted[splitted.length - 1]);
451
452 contendingControllers.add(new ControllerRegistryEntry(controllerId, sequenceNumber));
453 }
454
455 Collections.sort(contendingControllers);
456 data.put(entry.getKey(), contendingControllers);
457 }
458 return data;
459 }
460
461 public IdBlock allocateUniqueIdBlock(long range) {
462 try {
463 AtomicValue<Long> result = null;
464 do {
465 result = distributedIdCounter.add(range);
466 } while (result == null || !result.succeeded());
467
468 return new IdBlock(result.preValue(), result.postValue() - 1, range);
469 } catch (Exception e) {
470 log.error("Error allocating ID block");
471 }
472 return null;
473 }
474
475 /**
476 * Returns a block of IDs which are unique and unused.
477 * Range of IDs is fixed size and is assigned incrementally as this method called.
478 * Since the range of IDs is managed by Zookeeper in distributed way, this method may block when
479 * requests come up simultaneously.
480 */
481 @Override
482 public IdBlock allocateUniqueIdBlock() {
483 return allocateUniqueIdBlock(ID_BLOCK_SIZE);
484 }
485
486 /**
487 * Get a globally unique ID.
488 *
489 * @return a globally unique ID.
490 */
491 @Override
492 public synchronized long getNextUniqueId() {
493 //
494 // Generate the next Unique ID.
495 //
496 // TODO: For now, the higher 32 bits are random, and
497 // the lower 32 bits are sequential.
498 // The implementation must be updated to use the Zookeeper
499 // to allocate the higher 32 bits (globally unique).
500 //
501 if ((nextUniqueIdSuffix & 0xffffffffL) == 0xffffffffL) {
502 nextUniqueIdPrefix = randomGenerator.nextInt();
503 nextUniqueIdSuffix = 0;
504 } else {
505 nextUniqueIdSuffix++;
506 }
Pavlin Radoslavov952a9762014-04-10 13:47:03 -0700507 long result = nextUniqueIdPrefix << 32;
Ray Milkey269ffb92014-04-03 14:43:30 -0700508 result = result | (0xffffffffL & nextUniqueIdSuffix);
509 return result;
510 }
511
512 /*
513 * IFloodlightModule
514 */
515
516 @Override
517 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
518 Collection<Class<? extends IFloodlightService>> l =
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800519 new ArrayList<Class<? extends IFloodlightService>>();
Ray Milkey269ffb92014-04-03 14:43:30 -0700520 l.add(IControllerRegistryService.class);
521 return l;
522 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700523
Ray Milkey269ffb92014-04-03 14:43:30 -0700524 @Override
525 public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
526 Map<Class<? extends IFloodlightService>, IFloodlightService> m =
527 new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
528 m.put(IControllerRegistryService.class, this);
529 return m;
530 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700531
Ray Milkey269ffb92014-04-03 14:43:30 -0700532 @Override
533 public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
534 Collection<Class<? extends IFloodlightService>> l =
535 new ArrayList<Class<? extends IFloodlightService>>();
536 l.add(IFloodlightProviderService.class);
537 l.add(IRestApiService.class);
538 return l;
539 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800540
Ray Milkey269ffb92014-04-03 14:43:30 -0700541 //TODO currently blocks startup when it can't get a Zookeeper connection.
542 //Do we support starting up with no Zookeeper connection?
543 @Override
544 public void init(FloodlightModuleContext context) throws FloodlightModuleException {
545 log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
Jonathan Hart97801ac2013-02-26 14:29:16 -0800546
Ray Milkey269ffb92014-04-03 14:43:30 -0700547 //Read the Zookeeper connection string from the config
548 Map<String, String> configParams = context.getConfigParams(this);
549 String connectionString = configParams.get("connectionString");
550 if (connectionString != null) {
551 this.connectionString = connectionString;
552 }
553 log.info("Setting Zookeeper connection string to {}", this.connectionString);
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700554
Ray Milkey269ffb92014-04-03 14:43:30 -0700555 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800556
Ray Milkey269ffb92014-04-03 14:43:30 -0700557 switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
558 //switchPathCaches = new HashMap<String, PathChildrenCache>();
559 switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>();
560
561 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
562 client = CuratorFrameworkFactory.newClient(this.connectionString,
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700563 SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
Ray Milkey269ffb92014-04-03 14:43:30 -0700564
565 client.start();
566 client = client.usingNamespace(namespace);
567
568 distributedIdCounter = new DistributedAtomicLong(
569 client,
570 ID_COUNTER_PATH,
571 new RetryOneTime(100));
572
573 switchCache = new PathChildrenCache(client, switchLatchesPath, true);
574 switchCache.getListenable().addListener(switchPathCacheListener);
575
576 //Build the service discovery object
577 serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class)
578 .client(client).basePath(SERVICES_PATH).build();
579
580 //We read the list of services very frequently (GUI periodically queries them)
581 //so we'll cache them to cut down on Zookeeper queries.
582 serviceCache = serviceDiscovery.serviceCacheBuilder()
583 .name(CONTROLLER_SERVICE_NAME).build();
584
585
586 try {
587 serviceDiscovery.start();
588 serviceCache.start();
589
590 //Don't prime the cache, we want a notification for each child node in the path
591 switchCache.start(StartMode.NORMAL);
592 } catch (Exception e) {
593 throw new FloodlightModuleException("Error initialising ZookeeperRegistry: "
594 + e.getMessage());
595 }
596
597 eventThreadExecutorService = Executors.newSingleThreadExecutor();
598 eventThreadExecutorService.execute(
599 new Runnable() {
600 @Override
601 public void run() {
602 dispatchEvents();
603 }
604 });
605 }
606
607 @Override
608 public void startUp(FloodlightModuleContext context) {
609 //
610 // Cluster Leader election setup.
611 // NOTE: We have to do it here, because during the init stage
612 // we don't know the Controller ID.
613 //
614 if (controllerId == null) {
615 log.error("Error on startup: unknown ControllerId");
616 }
617 clusterLeaderLatch = new LeaderLatch(client,
618 CLUSTER_LEADER_PATH,
619 controllerId);
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700620 clusterLeaderListener = new ClusterLeaderListener();
Ray Milkey269ffb92014-04-03 14:43:30 -0700621 clusterLeaderLatch.addListener(clusterLeaderListener);
622 try {
623 clusterLeaderLatch.start();
624 } catch (Exception e) {
625 log.error("Error on startup starting the cluster leader election: {}", e.getMessage());
626 }
627
628 // Keep trying until there is a cluster leader
629 do {
630 try {
631 Participant leader = clusterLeaderLatch.getLeader();
Ray Milkeyb29e6262014-04-09 16:02:14 -0700632 if (!leader.getId().isEmpty()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700633 break;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700634 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700635 Thread.sleep(CLUSTER_LEADER_ELECTION_RETRY_MS);
636 } catch (Exception e) {
637 log.error("Error on startup waiting for cluster leader election: {}", e.getMessage());
638 }
639 } while (true);
640
641 restApi.addRestletRoutable(new RegistryWebRoutable());
642 }
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800643}