blob: 103438e524578933fc640b5f7ad0fa0c1513316d [file] [log] [blame]
Jonathan Hartdeda0ba2014-04-03 11:14:12 -07001package net.onrc.onos.core.registry;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08002
Jonathan Hartbd181b62013-02-17 16:05:38 -08003import java.io.IOException;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08004import java.util.ArrayList;
5import java.util.Collection;
Jonathan Hart3d7730a2013-02-22 11:51:17 -08006import java.util.Collections;
Jonathan Hart599c6b32013-03-24 22:42:02 -07007import java.util.Comparator;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08008import java.util.HashMap;
Jonathan Hartedd6a442013-02-20 15:22:06 -08009import java.util.List;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080010import java.util.Map;
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070011import java.util.Random;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070012import java.util.concurrent.BlockingQueue;
Jonathan Hart89187372013-03-14 16:41:09 -070013import java.util.concurrent.ConcurrentHashMap;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070014import java.util.concurrent.ExecutorService;
15import java.util.concurrent.Executors;
16import java.util.concurrent.LinkedBlockingQueue;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080017
Pavlin Radoslavovc35229e2014-02-06 16:19:37 -080018import net.floodlightcontroller.core.IFloodlightProviderService;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080019import net.floodlightcontroller.core.module.FloodlightModuleContext;
20import net.floodlightcontroller.core.module.FloodlightModuleException;
21import net.floodlightcontroller.core.module.IFloodlightModule;
22import net.floodlightcontroller.core.module.IFloodlightService;
Jonathan Hart3d7730a2013-02-22 11:51:17 -080023import net.floodlightcontroller.restserver.IRestApiService;
Jonathan Hartdeda0ba2014-04-03 11:14:12 -070024import net.onrc.onos.core.registry.web.RegistryWebRoutable;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080025
Jonathan Hart12a26aa2014-06-04 14:33:09 -070026import org.apache.commons.lang.NotImplementedException;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070027import org.apache.curator.RetryPolicy;
28import org.apache.curator.framework.CuratorFramework;
29import org.apache.curator.framework.CuratorFrameworkFactory;
30import org.apache.curator.framework.recipes.atomic.AtomicValue;
31import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
32import org.apache.curator.framework.recipes.cache.ChildData;
33import org.apache.curator.framework.recipes.cache.PathChildrenCache;
34import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
35import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
36import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
37import org.apache.curator.framework.recipes.leader.LeaderLatch;
38import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
39import org.apache.curator.framework.recipes.leader.Participant;
40import org.apache.curator.retry.ExponentialBackoffRetry;
41import org.apache.curator.retry.RetryOneTime;
42import org.apache.curator.x.discovery.ServiceCache;
43import org.apache.curator.x.discovery.ServiceDiscovery;
44import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
45import org.apache.curator.x.discovery.ServiceInstance;
Jonathan Hartbd181b62013-02-17 16:05:38 -080046import org.openflow.util.HexString;
47import org.slf4j.Logger;
48import org.slf4j.LoggerFactory;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080049
Jonathan Hartd10008d2013-02-23 17:04:08 -080050import com.google.common.base.Charsets;
Jonathan Hartbd181b62013-02-17 16:05:38 -080051
Jonathan Hart7bf62172013-02-28 13:17:18 -080052/**
53 * A registry service that uses Zookeeper. All data is stored in Zookeeper,
54 * so this can be used as a global registry in a multi-node ONOS cluster.
Jonathan Hart7bf62172013-02-28 13:17:18 -080055 */
Jonathan Hart1dbcce62014-06-04 15:21:45 -070056public class ZookeeperRegistry implements IFloodlightModule,
57 IControllerRegistryService {
Jonathan Hartc6eee9e2013-02-18 14:58:27 -080058
Ray Milkeyec838942014-04-09 11:28:43 -070059 private static final Logger log = LoggerFactory.getLogger(ZookeeperRegistry.class);
Jonathan Hart71c0ffc2013-03-24 15:58:42 -070060
Jonathan Hart12a26aa2014-06-04 14:33:09 -070061 private String controllerId;
62
63 private IRestApiService restApi;
Jonathan Hartbd181b62013-02-17 16:05:38 -080064
Jonathan Hart1dbcce62014-06-04 15:21:45 -070065 // This is the default. It is overwritten by the connectionString
66 // configuration parameter
Jonathan Hart12a26aa2014-06-04 14:33:09 -070067 private String connectionString = "localhost:2181";
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -080068
Yuta HIGUCHI85de40d2014-06-12 14:06:41 -070069 /**
70 * JVM Option to specify ZooKeeper namespace.
71 */
72 public static final String ZK_NAMESPACE_KEY = "zookeeper.namespace";
73 private static final String DEFAULT_NAMESPACE = "onos";
74 private String namespace = DEFAULT_NAMESPACE;
Jonathan Hart12a26aa2014-06-04 14:33:09 -070075 private static final String SWITCH_LATCHES_PATH = "/switches";
Ray Milkey2476cac2014-04-08 11:03:21 -070076 private static final String CLUSTER_LEADER_PATH = "/cluster/leader";
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -080077
Jonathan Hart1dbcce62014-06-04 15:21:45 -070078 private static final String SERVICES_PATH = "/"; // i.e. the root of our namespace
Ray Milkey2476cac2014-04-08 11:03:21 -070079 private static final String CONTROLLER_SERVICE_NAME = "controllers";
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070080
Jonathan Hart12a26aa2014-06-04 14:33:09 -070081 private CuratorFramework curatorFrameworkClient;
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070082
Jonathan Hart12a26aa2014-06-04 14:33:09 -070083 private PathChildrenCache rootSwitchCache;
Ray Milkey269ffb92014-04-03 14:43:30 -070084
Jonathan Hart12a26aa2014-06-04 14:33:09 -070085 private ConcurrentHashMap<String, SwitchLeadershipData> switches;
86 private Map<String, PathChildrenCache> switchPathCaches;
Ray Milkey269ffb92014-04-03 14:43:30 -070087
Jonathan Hart12a26aa2014-06-04 14:33:09 -070088 private LeaderLatch clusterLeaderLatch;
89 private ClusterLeaderListener clusterLeaderListener;
Ray Milkey269ffb92014-04-03 14:43:30 -070090 private static final long CLUSTER_LEADER_ELECTION_RETRY_MS = 100;
91
Ray Milkey2476cac2014-04-08 11:03:21 -070092 private static final String ID_COUNTER_PATH = "/flowidcounter";
93 private static final Long ID_BLOCK_SIZE = 0x100000000L;
Jonathan Hart12a26aa2014-06-04 14:33:09 -070094 private DistributedAtomicLong distributedIdCounter;
Ray Milkey269ffb92014-04-03 14:43:30 -070095
96 //Zookeeper performance-related configuration
Jonathan Hart12a26aa2014-06-04 14:33:09 -070097 private static final int SESSION_TIMEOUT = 7000; // ms
98 private static final int CONNECTION_TIMEOUT = 5000; // ms
Ray Milkey269ffb92014-04-03 14:43:30 -070099
100 //
101 // Unique ID generation state
102 // TODO: The implementation must be updated to use the Zookeeper
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700103 // instead of a random generator.
Ray Milkey269ffb92014-04-03 14:43:30 -0700104 //
105 private static Random randomGenerator = new Random();
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700106 private static long nextUniqueIdPrefix;
Pavlin Radoslavov952a9762014-04-10 13:47:03 -0700107 // NOTE: The 0xffffffffL value is used by the Unique ID generator for
108 // initialization purpose.
109 private static long nextUniqueIdSuffix = 0xffffffffL;
Ray Milkey269ffb92014-04-03 14:43:30 -0700110
111 private final BlockingQueue<SwitchLeaderEvent> switchLeadershipEvents =
112 new LinkedBlockingQueue<SwitchLeaderEvent>();
113
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700114 /**
115 * Listens for changes to the switch znodes in Zookeeper. This maintains
116 * the second level of PathChildrenCaches that hold the controllers
117 * contending for each switch - there's one for each switch.
118 */
119 private PathChildrenCacheListener switchPathCacheListener =
120 new SwitchPathCacheListener();
121 private ServiceDiscovery<ControllerService> serviceDiscovery;
122 private ServiceCache<ControllerService> serviceCache;
123
Ray Milkey269ffb92014-04-03 14:43:30 -0700124
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700125 private static class SwitchLeaderEvent {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700126 private final long dpid;
127 private final boolean isLeader;
Ray Milkey269ffb92014-04-03 14:43:30 -0700128
129 public SwitchLeaderEvent(long dpid, boolean isLeader) {
130 this.dpid = dpid;
131 this.isLeader = isLeader;
132 }
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700133
134 public long getDpid() {
135 return dpid;
136 }
137
138 public boolean isLeader() {
139 return isLeader;
140 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700141 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700142
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700143 // Dispatcher thread for leadership change events coming from Curator
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700144 private void dispatchEvents() {
Ray Milkey269ffb92014-04-03 14:43:30 -0700145 while (!Thread.currentThread().isInterrupted()) {
146 try {
147 SwitchLeaderEvent event = switchLeadershipEvents.take();
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700148 SwitchLeadershipData swData =
149 switches.get(HexString.toHexString(event.getDpid()));
Ray Milkey269ffb92014-04-03 14:43:30 -0700150 if (swData == null) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700151 log.debug("Leadership data {} not found", event.getDpid());
Ray Milkey269ffb92014-04-03 14:43:30 -0700152 continue;
153 }
154
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700155 swData.getCallback().controlChanged(event.getDpid(), event.isLeader());
Ray Milkey269ffb92014-04-03 14:43:30 -0700156 } catch (InterruptedException e) {
157 Thread.currentThread().interrupt();
158 break;
159 } catch (Exception e) {
160 log.error("Exception in registry event thread", e);
161 }
162 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700163 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800164
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700165 class SwitchLeaderListener implements LeaderLatchListener {
Yuta HIGUCHI5bbbaca2014-06-09 16:39:08 -0700166 private final String dpid;
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700167
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700168 public SwitchLeaderListener(String dpid) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700169 this.dpid = dpid;
Ray Milkey269ffb92014-04-03 14:43:30 -0700170 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700171
Ray Milkey269ffb92014-04-03 14:43:30 -0700172 @Override
173 public void isLeader() {
Yuta HIGUCHI5bbbaca2014-06-09 16:39:08 -0700174 log.info("Became leader for {}", dpid);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800175
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700176 switchLeadershipEvents.add(
177 new SwitchLeaderEvent(HexString.toLong(dpid), true));
Ray Milkey269ffb92014-04-03 14:43:30 -0700178 }
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800179
Ray Milkey269ffb92014-04-03 14:43:30 -0700180 @Override
181 public void notLeader() {
Yuta HIGUCHI5bbbaca2014-06-09 16:39:08 -0700182 log.info("Lost leadership for {}", dpid);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800183
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700184 switchLeadershipEvents.add(
185 new SwitchLeaderEvent(HexString.toLong(dpid), false));
Ray Milkey269ffb92014-04-03 14:43:30 -0700186 }
187 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700188
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700189 class SwitchPathCacheListener implements PathChildrenCacheListener {
Ray Milkey269ffb92014-04-03 14:43:30 -0700190 @Override
191 public void childEvent(CuratorFramework client,
192 PathChildrenCacheEvent event) throws Exception {
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800193
Ray Milkey269ffb92014-04-03 14:43:30 -0700194 String strSwitch = null;
195 if (event.getData() != null) {
196 String[] splitted = event.getData().getPath().split("/");
197 strSwitch = splitted[splitted.length - 1];
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800198 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700199
200 switch (event.getType()) {
201 case CHILD_ADDED:
202 case CHILD_UPDATED:
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700203 // Check we have a PathChildrenCache for this child
204 // and add one if not
Ray Milkey269ffb92014-04-03 14:43:30 -0700205 synchronized (switchPathCaches) {
206 if (switchPathCaches.get(strSwitch) == null) {
207 PathChildrenCache pc = new PathChildrenCache(client,
208 event.getData().getPath(), true);
209 pc.start(StartMode.NORMAL);
210 switchPathCaches.put(strSwitch, pc);
211 }
212 }
213 break;
214 case CHILD_REMOVED:
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700215 // Remove our PathChildrenCache for this child
Ray Milkey269ffb92014-04-03 14:43:30 -0700216 PathChildrenCache pc = null;
217 synchronized (switchPathCaches) {
218 pc = switchPathCaches.remove(strSwitch);
219 }
220 if (pc != null) {
221 pc.close();
222 }
223 break;
224 default:
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700225 // All other switchLeadershipEvents are connection status
226 // switchLeadershipEvents. We don't need to do anything as
227 // the path cache handles these on its own.
Ray Milkey269ffb92014-04-03 14:43:30 -0700228 break;
229 }
230
231 }
232 }
233
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700234 private static class ClusterLeaderListener implements LeaderLatchListener {
Ray Milkey269ffb92014-04-03 14:43:30 -0700235 //
236 // NOTE: If we need to support callbacks when the
237 // leadership changes, those should be called here.
238 //
239
240 @Override
241 public void isLeader() {
Yuta HIGUCHI5bbbaca2014-06-09 16:39:08 -0700242 log.info("Cluster leadership aquired");
Ray Milkey269ffb92014-04-03 14:43:30 -0700243 }
244
245 @Override
246 public void notLeader() {
Yuta HIGUCHI5bbbaca2014-06-09 16:39:08 -0700247 log.info("Cluster leadership lost");
Ray Milkey269ffb92014-04-03 14:43:30 -0700248 }
249 }
250
Ray Milkey269ffb92014-04-03 14:43:30 -0700251 @Override
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700252 public void requestControl(long dpid, ControlChangeCallback cb)
253 throws RegistryException {
Ray Milkey269ffb92014-04-03 14:43:30 -0700254 log.info("Requesting control for {}", HexString.toHexString(dpid));
255
256 if (controllerId == null) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700257 throw new IllegalStateException("Must register a controller before"
258 + " calling requestControl");
Ray Milkey269ffb92014-04-03 14:43:30 -0700259 }
260
261 String dpidStr = HexString.toHexString(dpid);
Ray Milkey269ffb92014-04-03 14:43:30 -0700262
263 if (switches.get(dpidStr) != null) {
264 log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
265 throw new RegistryException("Already contesting control for " + dpidStr);
266 }
267
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700268 String latchPath = SWITCH_LATCHES_PATH + "/" + dpidStr;
269
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700270 LeaderLatch latch =
271 new LeaderLatch(curatorFrameworkClient, latchPath, controllerId);
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700272 SwitchLeaderListener listener = new SwitchLeaderListener(dpidStr);
Ray Milkey269ffb92014-04-03 14:43:30 -0700273 latch.addListener(listener);
274
275
276 SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb, listener);
277 SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
278
279 if (oldData != null) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700280 // There was already data for that key in the map
281 // i.e. someone else got here first so we can't succeed
Ray Milkey269ffb92014-04-03 14:43:30 -0700282 log.debug("Already requested control for {}", dpidStr);
283 throw new RegistryException("Already requested control for " + dpidStr);
284 }
285
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700286 // Now that we know we were able to add our latch to the collection,
287 // we can start the leader election in Zookeeper. However I don't know
288 // how to handle if the start fails - the latch is already in our
289 // switches list.
290 // TODO seems like there's a Curator bug when latch.start is called when
291 // there's no Zookeeper connection which causes two znodes to be put in
292 // Zookeeper at the latch path when we reconnect to Zookeeper.
Ray Milkey269ffb92014-04-03 14:43:30 -0700293 try {
294 latch.start();
295 } catch (Exception e) {
296 log.warn("Error starting leader latch: {}", e.getMessage());
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700297 throw new RegistryException("Error starting leader latch for "
298 + dpidStr, e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700299 }
300
301 }
302
303 @Override
304 public void releaseControl(long dpid) {
305 log.info("Releasing control for {}", HexString.toHexString(dpid));
306
307 String dpidStr = HexString.toHexString(dpid);
308
309 SwitchLeadershipData swData = switches.remove(dpidStr);
310
311 if (swData == null) {
312 log.debug("Trying to release control of a switch we are not contesting");
313 return;
314 }
315
316 LeaderLatch latch = swData.getLatch();
317
318 latch.removeListener(swData.getListener());
319
320 try {
321 latch.close();
322 } catch (IOException e) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700323 // I think it's OK not to do anything here. Either the node got
324 // deleted correctly, or the connection went down and the node got deleted.
Ray Milkey269ffb92014-04-03 14:43:30 -0700325 log.debug("releaseControl: caught IOException {}", dpidStr);
326 }
327 }
328
329 @Override
330 public boolean hasControl(long dpid) {
331 String dpidStr = HexString.toHexString(dpid);
332
333 SwitchLeadershipData swData = switches.get(dpidStr);
334
335 if (swData == null) {
336 log.warn("No leader latch for dpid {}", dpidStr);
337 return false;
338 }
339
340 return swData.getLatch().hasLeadership();
341 }
342
343 @Override
344 public boolean isClusterLeader() {
345 return clusterLeaderLatch.hasLeadership();
346 }
347
348 @Override
349 public String getControllerId() {
350 return controllerId;
351 }
352
353 @Override
354 public Collection<String> getAllControllers() throws RegistryException {
355 log.debug("Getting all controllers");
356
357 List<String> controllers = new ArrayList<String>();
358 for (ServiceInstance<ControllerService> instance : serviceCache.getInstances()) {
359 String id = instance.getPayload().getControllerId();
360 if (!controllers.contains(id)) {
361 controllers.add(id);
362 }
363 }
364
365 return controllers;
366 }
367
368 @Override
369 public void registerController(String id) throws RegistryException {
370 if (controllerId != null) {
371 throw new RegistryException(
372 "Controller already registered with id " + controllerId);
373 }
374
375 controllerId = id;
376
377 try {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700378 ServiceInstance<ControllerService> thisInstance =
379 ServiceInstance.<ControllerService>builder()
Ray Milkey269ffb92014-04-03 14:43:30 -0700380 .name(CONTROLLER_SERVICE_NAME)
381 .payload(new ControllerService(controllerId))
Ray Milkey269ffb92014-04-03 14:43:30 -0700382 .build();
383
384 serviceDiscovery.registerService(thisInstance);
385 } catch (Exception e) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700386 log.error("Exception starting service instance:", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700387 }
388
389 }
390
391 @Override
392 public String getControllerForSwitch(long dpid) throws RegistryException {
393 String dpidStr = HexString.toHexString(dpid);
394
395 PathChildrenCache switchCache = switchPathCaches.get(dpidStr);
396
397 if (switchCache == null) {
398 log.warn("Tried to get controller for non-existent switch");
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800399 return null;
400 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700401
Ray Milkey269ffb92014-04-03 14:43:30 -0700402 try {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700403 // We've seen issues with these caches get stuck out of date, so
404 // we'll have to force them to refresh before each read. This slows
405 // down the method as it blocks on a Zookeeper query, however at
406 // the moment only the cleanup thread uses this and that isn't
407 // particularly time-sensitive.
408 // TODO verify if it is still the case that caches can be out of date
Ray Milkey269ffb92014-04-03 14:43:30 -0700409 switchCache.rebuild();
410 } catch (Exception e) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700411 log.error("Exception rebuilding the switch cache:", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700412 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700413
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700414 List<ChildData> sortedData =
415 new ArrayList<ChildData>(switchCache.getCurrentData());
Ray Milkey269ffb92014-04-03 14:43:30 -0700416
417 Collections.sort(
418 sortedData,
419 new Comparator<ChildData>() {
420 private String getSequenceNumber(String path) {
421 return path.substring(path.lastIndexOf('-') + 1);
422 }
423
424 @Override
425 public int compare(ChildData lhs, ChildData rhs) {
426 return getSequenceNumber(lhs.getPath()).
427 compareTo(getSequenceNumber(rhs.getPath()));
428 }
429 }
430 );
431
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700432 if (sortedData.isEmpty()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700433 return null;
434 }
435
436 return new String(sortedData.get(0).getData(), Charsets.UTF_8);
437 }
438
439 @Override
Ray Milkey5df613b2014-04-15 10:50:56 -0700440 public Collection<Long> getSwitchesControlledByController(String controller) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700441 // TODO remove this if not needed
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700442 throw new NotImplementedException("Not yet implemented");
Ray Milkey269ffb92014-04-03 14:43:30 -0700443 }
444
445
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700446 // TODO what should happen when there's no ZK connection? Currently we just
447 // return the cache but this may lead to false impressions - i.e. we don't
448 // actually know what's in ZK so we shouldn't say we do
Ray Milkey269ffb92014-04-03 14:43:30 -0700449 @Override
450 public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
451 Map<String, List<ControllerRegistryEntry>> data =
452 new HashMap<String, List<ControllerRegistryEntry>>();
453
454 for (Map.Entry<String, PathChildrenCache> entry : switchPathCaches.entrySet()) {
455 List<ControllerRegistryEntry> contendingControllers =
456 new ArrayList<ControllerRegistryEntry>();
457
458 if (entry.getValue().getCurrentData().size() < 1) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700459 // TODO prevent even having the PathChildrenCache in this case
Ray Milkey269ffb92014-04-03 14:43:30 -0700460 continue;
461 }
462
463 for (ChildData d : entry.getValue().getCurrentData()) {
464
Ray Milkey5df613b2014-04-15 10:50:56 -0700465 String childsControllerId = new String(d.getData(), Charsets.UTF_8);
Ray Milkey269ffb92014-04-03 14:43:30 -0700466
467 String[] splitted = d.getPath().split("-");
468 int sequenceNumber = Integer.parseInt(splitted[splitted.length - 1]);
469
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700470 contendingControllers.add(new ControllerRegistryEntry(
471 childsControllerId, sequenceNumber));
Ray Milkey269ffb92014-04-03 14:43:30 -0700472 }
473
474 Collections.sort(contendingControllers);
475 data.put(entry.getKey(), contendingControllers);
476 }
477 return data;
478 }
479
Yuta HIGUCHI5bbbaca2014-06-09 16:39:08 -0700480 @Override
Ray Milkey269ffb92014-04-03 14:43:30 -0700481 public IdBlock allocateUniqueIdBlock(long range) {
482 try {
483 AtomicValue<Long> result = null;
484 do {
485 result = distributedIdCounter.add(range);
486 } while (result == null || !result.succeeded());
487
488 return new IdBlock(result.preValue(), result.postValue() - 1, range);
489 } catch (Exception e) {
490 log.error("Error allocating ID block");
491 }
492 return null;
493 }
494
495 /**
496 * Returns a block of IDs which are unique and unused.
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700497 * The range of IDs is a fixed size and is allocated incrementally as this
498 * method is called. Since the range of IDs is managed by Zookeeper in
499 * distributed way, this method may block during Zookeeper access.
500 *
501 * @return an IdBlock containing a set of unique IDs
Ray Milkey269ffb92014-04-03 14:43:30 -0700502 */
503 @Override
504 public IdBlock allocateUniqueIdBlock() {
505 return allocateUniqueIdBlock(ID_BLOCK_SIZE);
506 }
507
508 /**
509 * Get a globally unique ID.
510 *
511 * @return a globally unique ID.
512 */
513 @Override
514 public synchronized long getNextUniqueId() {
515 //
516 // Generate the next Unique ID.
517 //
518 // TODO: For now, the higher 32 bits are random, and
519 // the lower 32 bits are sequential.
520 // The implementation must be updated to use the Zookeeper
521 // to allocate the higher 32 bits (globally unique).
522 //
523 if ((nextUniqueIdSuffix & 0xffffffffL) == 0xffffffffL) {
524 nextUniqueIdPrefix = randomGenerator.nextInt();
525 nextUniqueIdSuffix = 0;
526 } else {
527 nextUniqueIdSuffix++;
528 }
Pavlin Radoslavov952a9762014-04-10 13:47:03 -0700529 long result = nextUniqueIdPrefix << 32;
Ray Milkey269ffb92014-04-03 14:43:30 -0700530 result = result | (0xffffffffL & nextUniqueIdSuffix);
531 return result;
532 }
533
534 /*
535 * IFloodlightModule
536 */
537
538 @Override
539 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
540 Collection<Class<? extends IFloodlightService>> l =
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800541 new ArrayList<Class<? extends IFloodlightService>>();
Ray Milkey269ffb92014-04-03 14:43:30 -0700542 l.add(IControllerRegistryService.class);
543 return l;
544 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700545
Ray Milkey269ffb92014-04-03 14:43:30 -0700546 @Override
547 public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
548 Map<Class<? extends IFloodlightService>, IFloodlightService> m =
549 new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
550 m.put(IControllerRegistryService.class, this);
551 return m;
552 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700553
Ray Milkey269ffb92014-04-03 14:43:30 -0700554 @Override
555 public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
556 Collection<Class<? extends IFloodlightService>> l =
557 new ArrayList<Class<? extends IFloodlightService>>();
558 l.add(IFloodlightProviderService.class);
559 l.add(IRestApiService.class);
560 return l;
561 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800562
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700563 // TODO currently blocks startup when it can't get a Zookeeper connection.
564 // Do we support starting up with no Zookeeper connection?
Ray Milkey269ffb92014-04-03 14:43:30 -0700565 @Override
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700566 public void init(FloodlightModuleContext context)
567 throws FloodlightModuleException {
568 // Read the Zookeeper connection string from the config
Ray Milkey269ffb92014-04-03 14:43:30 -0700569 Map<String, String> configParams = context.getConfigParams(this);
Ray Milkey5df613b2014-04-15 10:50:56 -0700570 String connectionStringParam = configParams.get("connectionString");
571 if (connectionStringParam != null) {
572 connectionString = connectionStringParam;
Ray Milkey269ffb92014-04-03 14:43:30 -0700573 }
574 log.info("Setting Zookeeper connection string to {}", this.connectionString);
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700575
Yuta HIGUCHI85de40d2014-06-12 14:06:41 -0700576 namespace = System.getProperty(ZK_NAMESPACE_KEY, DEFAULT_NAMESPACE).trim();
577 if (namespace.isEmpty()) {
578 namespace = DEFAULT_NAMESPACE;
579 }
580 log.info("Setting Zookeeper namespace to {}", namespace);
581
Ray Milkey269ffb92014-04-03 14:43:30 -0700582 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800583
Ray Milkey269ffb92014-04-03 14:43:30 -0700584 switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
Ray Milkey269ffb92014-04-03 14:43:30 -0700585 switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>();
586
587 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700588 curatorFrameworkClient =
589 CuratorFrameworkFactory.newClient(this.connectionString,
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700590 SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
Ray Milkey269ffb92014-04-03 14:43:30 -0700591
Ray Milkey5df613b2014-04-15 10:50:56 -0700592 curatorFrameworkClient.start();
Yuta HIGUCHI85de40d2014-06-12 14:06:41 -0700593 curatorFrameworkClient = curatorFrameworkClient.usingNamespace(namespace);
Ray Milkey269ffb92014-04-03 14:43:30 -0700594
595 distributedIdCounter = new DistributedAtomicLong(
Ray Milkey5df613b2014-04-15 10:50:56 -0700596 curatorFrameworkClient,
Ray Milkey269ffb92014-04-03 14:43:30 -0700597 ID_COUNTER_PATH,
598 new RetryOneTime(100));
599
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700600 rootSwitchCache = new PathChildrenCache(
601 curatorFrameworkClient, SWITCH_LATCHES_PATH, true);
Ray Milkey5df613b2014-04-15 10:50:56 -0700602 rootSwitchCache.getListenable().addListener(switchPathCacheListener);
Ray Milkey269ffb92014-04-03 14:43:30 -0700603
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700604 // Build the service discovery object
Ray Milkey269ffb92014-04-03 14:43:30 -0700605 serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class)
Ray Milkey5df613b2014-04-15 10:50:56 -0700606 .client(curatorFrameworkClient).basePath(SERVICES_PATH).build();
Ray Milkey269ffb92014-04-03 14:43:30 -0700607
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700608 // We read the list of services very frequently (GUI periodically
609 // queries them) so we'll cache them to cut down on Zookeeper queries.
Ray Milkey269ffb92014-04-03 14:43:30 -0700610 serviceCache = serviceDiscovery.serviceCacheBuilder()
611 .name(CONTROLLER_SERVICE_NAME).build();
612
Ray Milkey269ffb92014-04-03 14:43:30 -0700613 try {
614 serviceDiscovery.start();
615 serviceCache.start();
616
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700617 // Don't prime the cache, we want a notification for each child
618 // node in the path
Ray Milkey5df613b2014-04-15 10:50:56 -0700619 rootSwitchCache.start(StartMode.NORMAL);
Ray Milkey269ffb92014-04-03 14:43:30 -0700620 } catch (Exception e) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700621 throw new FloodlightModuleException(
622 "Error initialising ZookeeperRegistry", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700623 }
624
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700625 ExecutorService eventThreadExecutorService =
626 Executors.newSingleThreadExecutor();
Ray Milkey269ffb92014-04-03 14:43:30 -0700627 eventThreadExecutorService.execute(
628 new Runnable() {
629 @Override
630 public void run() {
631 dispatchEvents();
632 }
633 });
634 }
635
636 @Override
637 public void startUp(FloodlightModuleContext context) {
638 //
639 // Cluster Leader election setup.
640 // NOTE: We have to do it here, because during the init stage
641 // we don't know the Controller ID.
642 //
643 if (controllerId == null) {
644 log.error("Error on startup: unknown ControllerId");
645 }
Ray Milkey5df613b2014-04-15 10:50:56 -0700646 clusterLeaderLatch = new LeaderLatch(curatorFrameworkClient,
Ray Milkey269ffb92014-04-03 14:43:30 -0700647 CLUSTER_LEADER_PATH,
648 controllerId);
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700649 clusterLeaderListener = new ClusterLeaderListener();
Ray Milkey269ffb92014-04-03 14:43:30 -0700650 clusterLeaderLatch.addListener(clusterLeaderListener);
651 try {
652 clusterLeaderLatch.start();
653 } catch (Exception e) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700654 log.error("Error starting the cluster leader election: ", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700655 }
656
657 // Keep trying until there is a cluster leader
658 do {
659 try {
660 Participant leader = clusterLeaderLatch.getLeader();
Ray Milkeyb29e6262014-04-09 16:02:14 -0700661 if (!leader.getId().isEmpty()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700662 break;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700663 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700664 Thread.sleep(CLUSTER_LEADER_ELECTION_RETRY_MS);
665 } catch (Exception e) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700666 log.error("Error waiting for cluster leader election:", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700667 }
668 } while (true);
669
670 restApi.addRestletRoutable(new RegistryWebRoutable());
671 }
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800672}