blob: 2f1c4c7a8ac8d743b065954a1b1b2f0749b26357 [file] [log] [blame]
Jonathan Hartdeda0ba2014-04-03 11:14:12 -07001package net.onrc.onos.core.registry;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08002
Jonathan Hartbd181b62013-02-17 16:05:38 -08003import java.io.IOException;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08004import java.util.ArrayList;
5import java.util.Collection;
Jonathan Hart3d7730a2013-02-22 11:51:17 -08006import java.util.Collections;
Jonathan Hart599c6b32013-03-24 22:42:02 -07007import java.util.Comparator;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08008import java.util.HashMap;
Jonathan Hartedd6a442013-02-20 15:22:06 -08009import java.util.List;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080010import java.util.Map;
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070011import java.util.Random;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070012import java.util.concurrent.BlockingQueue;
Jonathan Hart89187372013-03-14 16:41:09 -070013import java.util.concurrent.ConcurrentHashMap;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070014import java.util.concurrent.ExecutorService;
15import java.util.concurrent.Executors;
16import java.util.concurrent.LinkedBlockingQueue;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080017
Pavlin Radoslavovc35229e2014-02-06 16:19:37 -080018import net.floodlightcontroller.core.IFloodlightProviderService;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080019import net.floodlightcontroller.core.module.FloodlightModuleContext;
20import net.floodlightcontroller.core.module.FloodlightModuleException;
21import net.floodlightcontroller.core.module.IFloodlightModule;
22import net.floodlightcontroller.core.module.IFloodlightService;
Jonathan Hart3d7730a2013-02-22 11:51:17 -080023import net.floodlightcontroller.restserver.IRestApiService;
Jonathan Hartdeda0ba2014-04-03 11:14:12 -070024import net.onrc.onos.core.registry.web.RegistryWebRoutable;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080025
Jonathan Hart12a26aa2014-06-04 14:33:09 -070026import org.apache.commons.lang.NotImplementedException;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070027import org.apache.curator.RetryPolicy;
28import org.apache.curator.framework.CuratorFramework;
29import org.apache.curator.framework.CuratorFrameworkFactory;
30import org.apache.curator.framework.recipes.atomic.AtomicValue;
31import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
32import org.apache.curator.framework.recipes.cache.ChildData;
33import org.apache.curator.framework.recipes.cache.PathChildrenCache;
34import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
35import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
36import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
37import org.apache.curator.framework.recipes.leader.LeaderLatch;
38import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
39import org.apache.curator.framework.recipes.leader.Participant;
40import org.apache.curator.retry.ExponentialBackoffRetry;
41import org.apache.curator.retry.RetryOneTime;
42import org.apache.curator.x.discovery.ServiceCache;
43import org.apache.curator.x.discovery.ServiceDiscovery;
44import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
45import org.apache.curator.x.discovery.ServiceInstance;
Jonathan Hartbd181b62013-02-17 16:05:38 -080046import org.openflow.util.HexString;
47import org.slf4j.Logger;
48import org.slf4j.LoggerFactory;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080049
Jonathan Hartd10008d2013-02-23 17:04:08 -080050import com.google.common.base.Charsets;
Jonathan Hartbd181b62013-02-17 16:05:38 -080051
Jonathan Hart7bf62172013-02-28 13:17:18 -080052/**
53 * A registry service that uses Zookeeper. All data is stored in Zookeeper,
54 * so this can be used as a global registry in a multi-node ONOS cluster.
Jonathan Hart7bf62172013-02-28 13:17:18 -080055 *
Ray Milkey269ffb92014-04-03 14:43:30 -070056 * @author jono
Jonathan Hart7bf62172013-02-28 13:17:18 -080057 */
Jonathan Hartbd766972013-02-22 15:13:03 -080058public class ZookeeperRegistry implements IFloodlightModule, IControllerRegistryService {
Jonathan Hartc6eee9e2013-02-18 14:58:27 -080059
Ray Milkeyec838942014-04-09 11:28:43 -070060 private static final Logger log = LoggerFactory.getLogger(ZookeeperRegistry.class);
Jonathan Hart71c0ffc2013-03-24 15:58:42 -070061
Jonathan Hart12a26aa2014-06-04 14:33:09 -070062 private String controllerId;
63
64 private IRestApiService restApi;
Jonathan Hartbd181b62013-02-17 16:05:38 -080065
Ray Milkey269ffb92014-04-03 14:43:30 -070066 //This is the default, it's overwritten by the connectionString configuration parameter
Jonathan Hart12a26aa2014-06-04 14:33:09 -070067 private String connectionString = "localhost:2181";
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -080068
Jonathan Hart12a26aa2014-06-04 14:33:09 -070069 private static final String NAMESPACE = "onos";
70 private static final String SWITCH_LATCHES_PATH = "/switches";
Ray Milkey2476cac2014-04-08 11:03:21 -070071 private static final String CLUSTER_LEADER_PATH = "/cluster/leader";
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -080072
Ray Milkey2476cac2014-04-08 11:03:21 -070073 private static final String SERVICES_PATH = "/"; //i.e. the root of our namespace
74 private static final String CONTROLLER_SERVICE_NAME = "controllers";
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070075
Jonathan Hart12a26aa2014-06-04 14:33:09 -070076 private CuratorFramework curatorFrameworkClient;
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070077
Jonathan Hart12a26aa2014-06-04 14:33:09 -070078 private PathChildrenCache rootSwitchCache;
Ray Milkey269ffb92014-04-03 14:43:30 -070079
Jonathan Hart12a26aa2014-06-04 14:33:09 -070080 private ConcurrentHashMap<String, SwitchLeadershipData> switches;
81 private Map<String, PathChildrenCache> switchPathCaches;
Ray Milkey269ffb92014-04-03 14:43:30 -070082
Jonathan Hart12a26aa2014-06-04 14:33:09 -070083 private LeaderLatch clusterLeaderLatch;
84 private ClusterLeaderListener clusterLeaderListener;
Ray Milkey269ffb92014-04-03 14:43:30 -070085 private static final long CLUSTER_LEADER_ELECTION_RETRY_MS = 100;
86
Ray Milkey2476cac2014-04-08 11:03:21 -070087 private static final String ID_COUNTER_PATH = "/flowidcounter";
88 private static final Long ID_BLOCK_SIZE = 0x100000000L;
Jonathan Hart12a26aa2014-06-04 14:33:09 -070089 private DistributedAtomicLong distributedIdCounter;
Ray Milkey269ffb92014-04-03 14:43:30 -070090
91 //Zookeeper performance-related configuration
Jonathan Hart12a26aa2014-06-04 14:33:09 -070092 private static final int SESSION_TIMEOUT = 7000; // ms
93 private static final int CONNECTION_TIMEOUT = 5000; // ms
Ray Milkey269ffb92014-04-03 14:43:30 -070094
95 //
96 // Unique ID generation state
97 // TODO: The implementation must be updated to use the Zookeeper
Jonathan Hart12a26aa2014-06-04 14:33:09 -070098 // instead of a random generator.
Ray Milkey269ffb92014-04-03 14:43:30 -070099 //
100 private static Random randomGenerator = new Random();
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700101 private static long nextUniqueIdPrefix;
Pavlin Radoslavov952a9762014-04-10 13:47:03 -0700102 // NOTE: The 0xffffffffL value is used by the Unique ID generator for
103 // initialization purpose.
104 private static long nextUniqueIdSuffix = 0xffffffffL;
Ray Milkey269ffb92014-04-03 14:43:30 -0700105
106 private final BlockingQueue<SwitchLeaderEvent> switchLeadershipEvents =
107 new LinkedBlockingQueue<SwitchLeaderEvent>();
108
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700109 /**
110 * Listens for changes to the switch znodes in Zookeeper. This maintains
111 * the second level of PathChildrenCaches that hold the controllers
112 * contending for each switch - there's one for each switch.
113 */
114 private PathChildrenCacheListener switchPathCacheListener =
115 new SwitchPathCacheListener();
116 private ServiceDiscovery<ControllerService> serviceDiscovery;
117 private ServiceCache<ControllerService> serviceCache;
118
Ray Milkey269ffb92014-04-03 14:43:30 -0700119
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700120 private static class SwitchLeaderEvent {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700121 private final long dpid;
122 private final boolean isLeader;
Ray Milkey269ffb92014-04-03 14:43:30 -0700123
124 public SwitchLeaderEvent(long dpid, boolean isLeader) {
125 this.dpid = dpid;
126 this.isLeader = isLeader;
127 }
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700128
129 public long getDpid() {
130 return dpid;
131 }
132
133 public boolean isLeader() {
134 return isLeader;
135 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700136 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700137
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700138 /*
139 * Dispatcher thread for leadership change events coming from Curator.
140 */
141 private void dispatchEvents() {
Ray Milkey269ffb92014-04-03 14:43:30 -0700142 while (!Thread.currentThread().isInterrupted()) {
143 try {
144 SwitchLeaderEvent event = switchLeadershipEvents.take();
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700145 SwitchLeadershipData swData = switches.get(HexString.toHexString(event.getDpid()));
Ray Milkey269ffb92014-04-03 14:43:30 -0700146 if (swData == null) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700147 log.debug("Leadership data {} not found", event.getDpid());
Ray Milkey269ffb92014-04-03 14:43:30 -0700148 continue;
149 }
150
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700151 swData.getCallback().controlChanged(event.getDpid(), event.isLeader());
Ray Milkey269ffb92014-04-03 14:43:30 -0700152 } catch (InterruptedException e) {
153 Thread.currentThread().interrupt();
154 break;
155 } catch (Exception e) {
156 log.error("Exception in registry event thread", e);
157 }
158 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700159 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800160
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700161 class SwitchLeaderListener implements LeaderLatchListener {
162 private String dpid;
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700163
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700164 public SwitchLeaderListener(String dpid) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700165 this.dpid = dpid;
Ray Milkey269ffb92014-04-03 14:43:30 -0700166 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700167
Ray Milkey269ffb92014-04-03 14:43:30 -0700168 @Override
169 public void isLeader() {
170 log.debug("Became leader for {}", dpid);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800171
Pavlin Radoslavov8374e4f2014-04-10 11:56:15 -0700172 switchLeadershipEvents.add(new SwitchLeaderEvent(HexString.toLong(dpid), true));
Ray Milkey269ffb92014-04-03 14:43:30 -0700173 }
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800174
Ray Milkey269ffb92014-04-03 14:43:30 -0700175 @Override
176 public void notLeader() {
177 log.debug("Lost leadership for {}", dpid);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800178
Pavlin Radoslavov8374e4f2014-04-10 11:56:15 -0700179 switchLeadershipEvents.add(new SwitchLeaderEvent(HexString.toLong(dpid), false));
Ray Milkey269ffb92014-04-03 14:43:30 -0700180 }
181 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700182
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700183 class SwitchPathCacheListener implements PathChildrenCacheListener {
Ray Milkey269ffb92014-04-03 14:43:30 -0700184 @Override
185 public void childEvent(CuratorFramework client,
186 PathChildrenCacheEvent event) throws Exception {
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800187
Ray Milkey269ffb92014-04-03 14:43:30 -0700188 String strSwitch = null;
189 if (event.getData() != null) {
190 String[] splitted = event.getData().getPath().split("/");
191 strSwitch = splitted[splitted.length - 1];
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800192 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700193
194 switch (event.getType()) {
195 case CHILD_ADDED:
196 case CHILD_UPDATED:
197 //Check we have a PathChildrenCache for this child, add one if not
198 synchronized (switchPathCaches) {
199 if (switchPathCaches.get(strSwitch) == null) {
200 PathChildrenCache pc = new PathChildrenCache(client,
201 event.getData().getPath(), true);
202 pc.start(StartMode.NORMAL);
203 switchPathCaches.put(strSwitch, pc);
204 }
205 }
206 break;
207 case CHILD_REMOVED:
208 //Remove our PathChildrenCache for this child
209 PathChildrenCache pc = null;
210 synchronized (switchPathCaches) {
211 pc = switchPathCaches.remove(strSwitch);
212 }
213 if (pc != null) {
214 pc.close();
215 }
216 break;
217 default:
218 //All other switchLeadershipEvents are connection status switchLeadershipEvents. We don't need to
219 //do anything as the path cache handles these on its own.
220 break;
221 }
222
223 }
224 }
225
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700226 private static class ClusterLeaderListener implements LeaderLatchListener {
Ray Milkey269ffb92014-04-03 14:43:30 -0700227 //
228 // NOTE: If we need to support callbacks when the
229 // leadership changes, those should be called here.
230 //
231
232 @Override
233 public void isLeader() {
234 log.debug("Cluster leadership aquired");
235 }
236
237 @Override
238 public void notLeader() {
239 log.debug("Cluster leadership lost");
240 }
241 }
242
Ray Milkey269ffb92014-04-03 14:43:30 -0700243 @Override
244 public void requestControl(long dpid, ControlChangeCallback cb) throws RegistryException {
245 log.info("Requesting control for {}", HexString.toHexString(dpid));
246
247 if (controllerId == null) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700248 throw new IllegalStateException("Must register a controller before calling requestControl");
Ray Milkey269ffb92014-04-03 14:43:30 -0700249 }
250
251 String dpidStr = HexString.toHexString(dpid);
Ray Milkey269ffb92014-04-03 14:43:30 -0700252
253 if (switches.get(dpidStr) != null) {
254 log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
255 throw new RegistryException("Already contesting control for " + dpidStr);
256 }
257
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700258 String latchPath = SWITCH_LATCHES_PATH + "/" + dpidStr;
259
Ray Milkey5df613b2014-04-15 10:50:56 -0700260 LeaderLatch latch = new LeaderLatch(curatorFrameworkClient, latchPath, controllerId);
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700261 SwitchLeaderListener listener = new SwitchLeaderListener(dpidStr);
Ray Milkey269ffb92014-04-03 14:43:30 -0700262 latch.addListener(listener);
263
264
265 SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb, listener);
266 SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
267
268 if (oldData != null) {
269 //There was already data for that key in the map
270 //i.e. someone else got here first so we can't succeed
271 log.debug("Already requested control for {}", dpidStr);
272 throw new RegistryException("Already requested control for " + dpidStr);
273 }
274
275 //Now that we know we were able to add our latch to the collection,
276 //we can start the leader election in Zookeeper. However I don't know
277 //how to handle if the start fails - the latch is already in our
278 //switches list.
279 //TODO seems like there's a Curator bug when latch.start is called when
280 //there's no Zookeeper connection which causes two znodes to be put in
281 //Zookeeper at the latch path when we reconnect to Zookeeper.
282 try {
283 latch.start();
284 } catch (Exception e) {
285 log.warn("Error starting leader latch: {}", e.getMessage());
286 throw new RegistryException("Error starting leader latch for " + dpidStr, e);
287 }
288
289 }
290
291 @Override
292 public void releaseControl(long dpid) {
293 log.info("Releasing control for {}", HexString.toHexString(dpid));
294
295 String dpidStr = HexString.toHexString(dpid);
296
297 SwitchLeadershipData swData = switches.remove(dpidStr);
298
299 if (swData == null) {
300 log.debug("Trying to release control of a switch we are not contesting");
301 return;
302 }
303
304 LeaderLatch latch = swData.getLatch();
305
306 latch.removeListener(swData.getListener());
307
308 try {
309 latch.close();
310 } catch (IOException e) {
311 //I think it's OK not to do anything here. Either the node got
312 //deleted correctly, or the connection went down and the node got deleted.
313 log.debug("releaseControl: caught IOException {}", dpidStr);
314 }
315 }
316
317 @Override
318 public boolean hasControl(long dpid) {
319 String dpidStr = HexString.toHexString(dpid);
320
321 SwitchLeadershipData swData = switches.get(dpidStr);
322
323 if (swData == null) {
324 log.warn("No leader latch for dpid {}", dpidStr);
325 return false;
326 }
327
328 return swData.getLatch().hasLeadership();
329 }
330
331 @Override
332 public boolean isClusterLeader() {
333 return clusterLeaderLatch.hasLeadership();
334 }
335
336 @Override
337 public String getControllerId() {
338 return controllerId;
339 }
340
341 @Override
342 public Collection<String> getAllControllers() throws RegistryException {
343 log.debug("Getting all controllers");
344
345 List<String> controllers = new ArrayList<String>();
346 for (ServiceInstance<ControllerService> instance : serviceCache.getInstances()) {
347 String id = instance.getPayload().getControllerId();
348 if (!controllers.contains(id)) {
349 controllers.add(id);
350 }
351 }
352
353 return controllers;
354 }
355
356 @Override
357 public void registerController(String id) throws RegistryException {
358 if (controllerId != null) {
359 throw new RegistryException(
360 "Controller already registered with id " + controllerId);
361 }
362
363 controllerId = id;
364
365 try {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700366 ServiceInstance<ControllerService> thisInstance =
367 ServiceInstance.<ControllerService>builder()
Ray Milkey269ffb92014-04-03 14:43:30 -0700368 .name(CONTROLLER_SERVICE_NAME)
369 .payload(new ControllerService(controllerId))
Ray Milkey269ffb92014-04-03 14:43:30 -0700370 .build();
371
372 serviceDiscovery.registerService(thisInstance);
373 } catch (Exception e) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700374 log.error("Exception starting service instance:", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700375 }
376
377 }
378
379 @Override
380 public String getControllerForSwitch(long dpid) throws RegistryException {
381 String dpidStr = HexString.toHexString(dpid);
382
383 PathChildrenCache switchCache = switchPathCaches.get(dpidStr);
384
385 if (switchCache == null) {
386 log.warn("Tried to get controller for non-existent switch");
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800387 return null;
388 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700389
Ray Milkey269ffb92014-04-03 14:43:30 -0700390 try {
391 //We've seen issues with these caches get stuck out of date, so we'll have to
392 //force them to refresh before each read. This slows down the method as it
393 //blocks on a Zookeeper query, however at the moment only the cleanup thread
394 //uses this and that isn't particularly time-sensitive.
395 switchCache.rebuild();
396 } catch (Exception e) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700397 log.error("Exception rebuilding the switch cache:", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700398 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700399
Ray Milkey269ffb92014-04-03 14:43:30 -0700400 List<ChildData> sortedData = new ArrayList<ChildData>(switchCache.getCurrentData());
401
402 Collections.sort(
403 sortedData,
404 new Comparator<ChildData>() {
405 private String getSequenceNumber(String path) {
406 return path.substring(path.lastIndexOf('-') + 1);
407 }
408
409 @Override
410 public int compare(ChildData lhs, ChildData rhs) {
411 return getSequenceNumber(lhs.getPath()).
412 compareTo(getSequenceNumber(rhs.getPath()));
413 }
414 }
415 );
416
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700417 if (sortedData.isEmpty()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700418 return null;
419 }
420
421 return new String(sortedData.get(0).getData(), Charsets.UTF_8);
422 }
423
424 @Override
Ray Milkey5df613b2014-04-15 10:50:56 -0700425 public Collection<Long> getSwitchesControlledByController(String controller) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700426 //TODO remove this if not needed
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700427 throw new NotImplementedException("Not yet implemented");
Ray Milkey269ffb92014-04-03 14:43:30 -0700428 }
429
430
431 //TODO what should happen when there's no ZK connection? Currently we just return
432 //the cache but this may lead to false impressions - i.e. we don't actually know
433 //what's in ZK so we shouldn't say we do
434 @Override
435 public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
436 Map<String, List<ControllerRegistryEntry>> data =
437 new HashMap<String, List<ControllerRegistryEntry>>();
438
439 for (Map.Entry<String, PathChildrenCache> entry : switchPathCaches.entrySet()) {
440 List<ControllerRegistryEntry> contendingControllers =
441 new ArrayList<ControllerRegistryEntry>();
442
443 if (entry.getValue().getCurrentData().size() < 1) {
444 //TODO prevent even having the PathChildrenCache in this case
445 //log.info("Switch entry with no leader elections: {}", entry.getKey());
446 continue;
447 }
448
449 for (ChildData d : entry.getValue().getCurrentData()) {
450
Ray Milkey5df613b2014-04-15 10:50:56 -0700451 String childsControllerId = new String(d.getData(), Charsets.UTF_8);
Ray Milkey269ffb92014-04-03 14:43:30 -0700452
453 String[] splitted = d.getPath().split("-");
454 int sequenceNumber = Integer.parseInt(splitted[splitted.length - 1]);
455
Ray Milkey5df613b2014-04-15 10:50:56 -0700456 contendingControllers.add(new ControllerRegistryEntry(childsControllerId, sequenceNumber));
Ray Milkey269ffb92014-04-03 14:43:30 -0700457 }
458
459 Collections.sort(contendingControllers);
460 data.put(entry.getKey(), contendingControllers);
461 }
462 return data;
463 }
464
465 public IdBlock allocateUniqueIdBlock(long range) {
466 try {
467 AtomicValue<Long> result = null;
468 do {
469 result = distributedIdCounter.add(range);
470 } while (result == null || !result.succeeded());
471
472 return new IdBlock(result.preValue(), result.postValue() - 1, range);
473 } catch (Exception e) {
474 log.error("Error allocating ID block");
475 }
476 return null;
477 }
478
479 /**
480 * Returns a block of IDs which are unique and unused.
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700481 * The range of IDs is a fixed size and is allocated incrementally as this
482 * method is called. Since the range of IDs is managed by Zookeeper in
483 * distributed way, this method may block during Zookeeper access.
484 *
485 * @return an IdBlock containing a set of unique IDs
Ray Milkey269ffb92014-04-03 14:43:30 -0700486 */
487 @Override
488 public IdBlock allocateUniqueIdBlock() {
489 return allocateUniqueIdBlock(ID_BLOCK_SIZE);
490 }
491
492 /**
493 * Get a globally unique ID.
494 *
495 * @return a globally unique ID.
496 */
497 @Override
498 public synchronized long getNextUniqueId() {
499 //
500 // Generate the next Unique ID.
501 //
502 // TODO: For now, the higher 32 bits are random, and
503 // the lower 32 bits are sequential.
504 // The implementation must be updated to use the Zookeeper
505 // to allocate the higher 32 bits (globally unique).
506 //
507 if ((nextUniqueIdSuffix & 0xffffffffL) == 0xffffffffL) {
508 nextUniqueIdPrefix = randomGenerator.nextInt();
509 nextUniqueIdSuffix = 0;
510 } else {
511 nextUniqueIdSuffix++;
512 }
Pavlin Radoslavov952a9762014-04-10 13:47:03 -0700513 long result = nextUniqueIdPrefix << 32;
Ray Milkey269ffb92014-04-03 14:43:30 -0700514 result = result | (0xffffffffL & nextUniqueIdSuffix);
515 return result;
516 }
517
518 /*
519 * IFloodlightModule
520 */
521
522 @Override
523 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
524 Collection<Class<? extends IFloodlightService>> l =
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800525 new ArrayList<Class<? extends IFloodlightService>>();
Ray Milkey269ffb92014-04-03 14:43:30 -0700526 l.add(IControllerRegistryService.class);
527 return l;
528 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700529
Ray Milkey269ffb92014-04-03 14:43:30 -0700530 @Override
531 public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
532 Map<Class<? extends IFloodlightService>, IFloodlightService> m =
533 new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
534 m.put(IControllerRegistryService.class, this);
535 return m;
536 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700537
Ray Milkey269ffb92014-04-03 14:43:30 -0700538 @Override
539 public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
540 Collection<Class<? extends IFloodlightService>> l =
541 new ArrayList<Class<? extends IFloodlightService>>();
542 l.add(IFloodlightProviderService.class);
543 l.add(IRestApiService.class);
544 return l;
545 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800546
Ray Milkey269ffb92014-04-03 14:43:30 -0700547 //TODO currently blocks startup when it can't get a Zookeeper connection.
548 //Do we support starting up with no Zookeeper connection?
549 @Override
550 public void init(FloodlightModuleContext context) throws FloodlightModuleException {
551 log.info("Initialising the Zookeeper Registry - Zookeeper connection required");
Jonathan Hart97801ac2013-02-26 14:29:16 -0800552
Ray Milkey269ffb92014-04-03 14:43:30 -0700553 //Read the Zookeeper connection string from the config
554 Map<String, String> configParams = context.getConfigParams(this);
Ray Milkey5df613b2014-04-15 10:50:56 -0700555 String connectionStringParam = configParams.get("connectionString");
556 if (connectionStringParam != null) {
557 connectionString = connectionStringParam;
Ray Milkey269ffb92014-04-03 14:43:30 -0700558 }
559 log.info("Setting Zookeeper connection string to {}", this.connectionString);
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700560
Ray Milkey269ffb92014-04-03 14:43:30 -0700561 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800562
Ray Milkey269ffb92014-04-03 14:43:30 -0700563 switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
564 //switchPathCaches = new HashMap<String, PathChildrenCache>();
565 switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>();
566
567 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
Ray Milkey5df613b2014-04-15 10:50:56 -0700568 curatorFrameworkClient = CuratorFrameworkFactory.newClient(this.connectionString,
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700569 SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
Ray Milkey269ffb92014-04-03 14:43:30 -0700570
Ray Milkey5df613b2014-04-15 10:50:56 -0700571 curatorFrameworkClient.start();
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700572 curatorFrameworkClient = curatorFrameworkClient.usingNamespace(NAMESPACE);
Ray Milkey269ffb92014-04-03 14:43:30 -0700573
574 distributedIdCounter = new DistributedAtomicLong(
Ray Milkey5df613b2014-04-15 10:50:56 -0700575 curatorFrameworkClient,
Ray Milkey269ffb92014-04-03 14:43:30 -0700576 ID_COUNTER_PATH,
577 new RetryOneTime(100));
578
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700579 rootSwitchCache = new PathChildrenCache(curatorFrameworkClient, SWITCH_LATCHES_PATH, true);
Ray Milkey5df613b2014-04-15 10:50:56 -0700580 rootSwitchCache.getListenable().addListener(switchPathCacheListener);
Ray Milkey269ffb92014-04-03 14:43:30 -0700581
582 //Build the service discovery object
583 serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class)
Ray Milkey5df613b2014-04-15 10:50:56 -0700584 .client(curatorFrameworkClient).basePath(SERVICES_PATH).build();
Ray Milkey269ffb92014-04-03 14:43:30 -0700585
586 //We read the list of services very frequently (GUI periodically queries them)
587 //so we'll cache them to cut down on Zookeeper queries.
588 serviceCache = serviceDiscovery.serviceCacheBuilder()
589 .name(CONTROLLER_SERVICE_NAME).build();
590
591
592 try {
593 serviceDiscovery.start();
594 serviceCache.start();
595
596 //Don't prime the cache, we want a notification for each child node in the path
Ray Milkey5df613b2014-04-15 10:50:56 -0700597 rootSwitchCache.start(StartMode.NORMAL);
Ray Milkey269ffb92014-04-03 14:43:30 -0700598 } catch (Exception e) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700599 throw new FloodlightModuleException(
600 "Error initialising ZookeeperRegistry", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700601 }
602
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700603 ExecutorService eventThreadExecutorService =
604 Executors.newSingleThreadExecutor();
Ray Milkey269ffb92014-04-03 14:43:30 -0700605 eventThreadExecutorService.execute(
606 new Runnable() {
607 @Override
608 public void run() {
609 dispatchEvents();
610 }
611 });
612 }
613
614 @Override
615 public void startUp(FloodlightModuleContext context) {
616 //
617 // Cluster Leader election setup.
618 // NOTE: We have to do it here, because during the init stage
619 // we don't know the Controller ID.
620 //
621 if (controllerId == null) {
622 log.error("Error on startup: unknown ControllerId");
623 }
Ray Milkey5df613b2014-04-15 10:50:56 -0700624 clusterLeaderLatch = new LeaderLatch(curatorFrameworkClient,
Ray Milkey269ffb92014-04-03 14:43:30 -0700625 CLUSTER_LEADER_PATH,
626 controllerId);
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700627 clusterLeaderListener = new ClusterLeaderListener();
Ray Milkey269ffb92014-04-03 14:43:30 -0700628 clusterLeaderLatch.addListener(clusterLeaderListener);
629 try {
630 clusterLeaderLatch.start();
631 } catch (Exception e) {
632 log.error("Error on startup starting the cluster leader election: {}", e.getMessage());
633 }
634
635 // Keep trying until there is a cluster leader
636 do {
637 try {
638 Participant leader = clusterLeaderLatch.getLeader();
Ray Milkeyb29e6262014-04-09 16:02:14 -0700639 if (!leader.getId().isEmpty()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700640 break;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700641 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700642 Thread.sleep(CLUSTER_LEADER_ELECTION_RETRY_MS);
643 } catch (Exception e) {
644 log.error("Error on startup waiting for cluster leader election: {}", e.getMessage());
645 }
646 } while (true);
647
648 restApi.addRestletRoutable(new RegistryWebRoutable());
649 }
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800650}