blob: 35ce9623dbba81f4da2c15bd869a9e925d1573c6 [file] [log] [blame]
Jonathan Hartdeda0ba2014-04-03 11:14:12 -07001package net.onrc.onos.core.registry;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08002
Jonathan Hartbd181b62013-02-17 16:05:38 -08003import java.io.IOException;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08004import java.util.ArrayList;
5import java.util.Collection;
Jonathan Hart3d7730a2013-02-22 11:51:17 -08006import java.util.Collections;
Jonathan Hart599c6b32013-03-24 22:42:02 -07007import java.util.Comparator;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08008import java.util.HashMap;
Jonathan Hartedd6a442013-02-20 15:22:06 -08009import java.util.List;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080010import java.util.Map;
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070011import java.util.Random;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070012import java.util.concurrent.BlockingQueue;
Jonathan Hart89187372013-03-14 16:41:09 -070013import java.util.concurrent.ConcurrentHashMap;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070014import java.util.concurrent.ExecutorService;
15import java.util.concurrent.Executors;
16import java.util.concurrent.LinkedBlockingQueue;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080017
Pavlin Radoslavovc35229e2014-02-06 16:19:37 -080018import net.floodlightcontroller.core.IFloodlightProviderService;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080019import net.floodlightcontroller.core.module.FloodlightModuleContext;
20import net.floodlightcontroller.core.module.FloodlightModuleException;
21import net.floodlightcontroller.core.module.IFloodlightModule;
22import net.floodlightcontroller.core.module.IFloodlightService;
Jonathan Hart3d7730a2013-02-22 11:51:17 -080023import net.floodlightcontroller.restserver.IRestApiService;
Jonathan Hartdeda0ba2014-04-03 11:14:12 -070024import net.onrc.onos.core.registry.web.RegistryWebRoutable;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080025
Jonathan Hart12a26aa2014-06-04 14:33:09 -070026import org.apache.commons.lang.NotImplementedException;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070027import org.apache.curator.RetryPolicy;
28import org.apache.curator.framework.CuratorFramework;
29import org.apache.curator.framework.CuratorFrameworkFactory;
30import org.apache.curator.framework.recipes.atomic.AtomicValue;
31import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
32import org.apache.curator.framework.recipes.cache.ChildData;
33import org.apache.curator.framework.recipes.cache.PathChildrenCache;
34import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
35import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
36import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
37import org.apache.curator.framework.recipes.leader.LeaderLatch;
38import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
39import org.apache.curator.framework.recipes.leader.Participant;
40import org.apache.curator.retry.ExponentialBackoffRetry;
41import org.apache.curator.retry.RetryOneTime;
42import org.apache.curator.x.discovery.ServiceCache;
43import org.apache.curator.x.discovery.ServiceDiscovery;
44import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
45import org.apache.curator.x.discovery.ServiceInstance;
Jonathan Hartbd181b62013-02-17 16:05:38 -080046import org.openflow.util.HexString;
47import org.slf4j.Logger;
48import org.slf4j.LoggerFactory;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080049
Jonathan Hartd10008d2013-02-23 17:04:08 -080050import com.google.common.base.Charsets;
Jonathan Hartbd181b62013-02-17 16:05:38 -080051
Jonathan Hart7bf62172013-02-28 13:17:18 -080052/**
53 * A registry service that uses Zookeeper. All data is stored in Zookeeper,
54 * so this can be used as a global registry in a multi-node ONOS cluster.
Jonathan Hart7bf62172013-02-28 13:17:18 -080055 *
Ray Milkey269ffb92014-04-03 14:43:30 -070056 * @author jono
Jonathan Hart7bf62172013-02-28 13:17:18 -080057 */
Jonathan Hart1dbcce62014-06-04 15:21:45 -070058public class ZookeeperRegistry implements IFloodlightModule,
59 IControllerRegistryService {
Jonathan Hartc6eee9e2013-02-18 14:58:27 -080060
Ray Milkeyec838942014-04-09 11:28:43 -070061 private static final Logger log = LoggerFactory.getLogger(ZookeeperRegistry.class);
Jonathan Hart71c0ffc2013-03-24 15:58:42 -070062
Jonathan Hart12a26aa2014-06-04 14:33:09 -070063 private String controllerId;
64
65 private IRestApiService restApi;
Jonathan Hartbd181b62013-02-17 16:05:38 -080066
Jonathan Hart1dbcce62014-06-04 15:21:45 -070067 // This is the default. It is overwritten by the connectionString
68 // configuration parameter
Jonathan Hart12a26aa2014-06-04 14:33:09 -070069 private String connectionString = "localhost:2181";
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -080070
Jonathan Hart12a26aa2014-06-04 14:33:09 -070071 private static final String NAMESPACE = "onos";
72 private static final String SWITCH_LATCHES_PATH = "/switches";
Ray Milkey2476cac2014-04-08 11:03:21 -070073 private static final String CLUSTER_LEADER_PATH = "/cluster/leader";
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -080074
Jonathan Hart1dbcce62014-06-04 15:21:45 -070075 private static final String SERVICES_PATH = "/"; // i.e. the root of our namespace
Ray Milkey2476cac2014-04-08 11:03:21 -070076 private static final String CONTROLLER_SERVICE_NAME = "controllers";
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070077
Jonathan Hart12a26aa2014-06-04 14:33:09 -070078 private CuratorFramework curatorFrameworkClient;
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070079
Jonathan Hart12a26aa2014-06-04 14:33:09 -070080 private PathChildrenCache rootSwitchCache;
Ray Milkey269ffb92014-04-03 14:43:30 -070081
Jonathan Hart12a26aa2014-06-04 14:33:09 -070082 private ConcurrentHashMap<String, SwitchLeadershipData> switches;
83 private Map<String, PathChildrenCache> switchPathCaches;
Ray Milkey269ffb92014-04-03 14:43:30 -070084
Jonathan Hart12a26aa2014-06-04 14:33:09 -070085 private LeaderLatch clusterLeaderLatch;
86 private ClusterLeaderListener clusterLeaderListener;
Ray Milkey269ffb92014-04-03 14:43:30 -070087 private static final long CLUSTER_LEADER_ELECTION_RETRY_MS = 100;
88
Ray Milkey2476cac2014-04-08 11:03:21 -070089 private static final String ID_COUNTER_PATH = "/flowidcounter";
90 private static final Long ID_BLOCK_SIZE = 0x100000000L;
Jonathan Hart12a26aa2014-06-04 14:33:09 -070091 private DistributedAtomicLong distributedIdCounter;
Ray Milkey269ffb92014-04-03 14:43:30 -070092
93 //Zookeeper performance-related configuration
Jonathan Hart12a26aa2014-06-04 14:33:09 -070094 private static final int SESSION_TIMEOUT = 7000; // ms
95 private static final int CONNECTION_TIMEOUT = 5000; // ms
Ray Milkey269ffb92014-04-03 14:43:30 -070096
97 //
98 // Unique ID generation state
99 // TODO: The implementation must be updated to use the Zookeeper
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700100 // instead of a random generator.
Ray Milkey269ffb92014-04-03 14:43:30 -0700101 //
102 private static Random randomGenerator = new Random();
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700103 private static long nextUniqueIdPrefix;
Pavlin Radoslavov952a9762014-04-10 13:47:03 -0700104 // NOTE: The 0xffffffffL value is used by the Unique ID generator for
105 // initialization purpose.
106 private static long nextUniqueIdSuffix = 0xffffffffL;
Ray Milkey269ffb92014-04-03 14:43:30 -0700107
108 private final BlockingQueue<SwitchLeaderEvent> switchLeadershipEvents =
109 new LinkedBlockingQueue<SwitchLeaderEvent>();
110
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700111 /**
112 * Listens for changes to the switch znodes in Zookeeper. This maintains
113 * the second level of PathChildrenCaches that hold the controllers
114 * contending for each switch - there's one for each switch.
115 */
116 private PathChildrenCacheListener switchPathCacheListener =
117 new SwitchPathCacheListener();
118 private ServiceDiscovery<ControllerService> serviceDiscovery;
119 private ServiceCache<ControllerService> serviceCache;
120
Ray Milkey269ffb92014-04-03 14:43:30 -0700121
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700122 private static class SwitchLeaderEvent {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700123 private final long dpid;
124 private final boolean isLeader;
Ray Milkey269ffb92014-04-03 14:43:30 -0700125
126 public SwitchLeaderEvent(long dpid, boolean isLeader) {
127 this.dpid = dpid;
128 this.isLeader = isLeader;
129 }
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700130
131 public long getDpid() {
132 return dpid;
133 }
134
135 public boolean isLeader() {
136 return isLeader;
137 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700138 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700139
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700140 // Dispatcher thread for leadership change events coming from Curator
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700141 private void dispatchEvents() {
Ray Milkey269ffb92014-04-03 14:43:30 -0700142 while (!Thread.currentThread().isInterrupted()) {
143 try {
144 SwitchLeaderEvent event = switchLeadershipEvents.take();
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700145 SwitchLeadershipData swData =
146 switches.get(HexString.toHexString(event.getDpid()));
Ray Milkey269ffb92014-04-03 14:43:30 -0700147 if (swData == null) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700148 log.debug("Leadership data {} not found", event.getDpid());
Ray Milkey269ffb92014-04-03 14:43:30 -0700149 continue;
150 }
151
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700152 swData.getCallback().controlChanged(event.getDpid(), event.isLeader());
Ray Milkey269ffb92014-04-03 14:43:30 -0700153 } catch (InterruptedException e) {
154 Thread.currentThread().interrupt();
155 break;
156 } catch (Exception e) {
157 log.error("Exception in registry event thread", e);
158 }
159 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700160 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800161
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700162 class SwitchLeaderListener implements LeaderLatchListener {
163 private String dpid;
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700164
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700165 public SwitchLeaderListener(String dpid) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700166 this.dpid = dpid;
Ray Milkey269ffb92014-04-03 14:43:30 -0700167 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700168
Ray Milkey269ffb92014-04-03 14:43:30 -0700169 @Override
170 public void isLeader() {
171 log.debug("Became leader for {}", dpid);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800172
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700173 switchLeadershipEvents.add(
174 new SwitchLeaderEvent(HexString.toLong(dpid), true));
Ray Milkey269ffb92014-04-03 14:43:30 -0700175 }
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800176
Ray Milkey269ffb92014-04-03 14:43:30 -0700177 @Override
178 public void notLeader() {
179 log.debug("Lost leadership for {}", dpid);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800180
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700181 switchLeadershipEvents.add(
182 new SwitchLeaderEvent(HexString.toLong(dpid), false));
Ray Milkey269ffb92014-04-03 14:43:30 -0700183 }
184 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700185
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700186 class SwitchPathCacheListener implements PathChildrenCacheListener {
Ray Milkey269ffb92014-04-03 14:43:30 -0700187 @Override
188 public void childEvent(CuratorFramework client,
189 PathChildrenCacheEvent event) throws Exception {
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800190
Ray Milkey269ffb92014-04-03 14:43:30 -0700191 String strSwitch = null;
192 if (event.getData() != null) {
193 String[] splitted = event.getData().getPath().split("/");
194 strSwitch = splitted[splitted.length - 1];
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800195 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700196
197 switch (event.getType()) {
198 case CHILD_ADDED:
199 case CHILD_UPDATED:
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700200 // Check we have a PathChildrenCache for this child
201 // and add one if not
Ray Milkey269ffb92014-04-03 14:43:30 -0700202 synchronized (switchPathCaches) {
203 if (switchPathCaches.get(strSwitch) == null) {
204 PathChildrenCache pc = new PathChildrenCache(client,
205 event.getData().getPath(), true);
206 pc.start(StartMode.NORMAL);
207 switchPathCaches.put(strSwitch, pc);
208 }
209 }
210 break;
211 case CHILD_REMOVED:
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700212 // Remove our PathChildrenCache for this child
Ray Milkey269ffb92014-04-03 14:43:30 -0700213 PathChildrenCache pc = null;
214 synchronized (switchPathCaches) {
215 pc = switchPathCaches.remove(strSwitch);
216 }
217 if (pc != null) {
218 pc.close();
219 }
220 break;
221 default:
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700222 // All other switchLeadershipEvents are connection status
223 // switchLeadershipEvents. We don't need to do anything as
224 // the path cache handles these on its own.
Ray Milkey269ffb92014-04-03 14:43:30 -0700225 break;
226 }
227
228 }
229 }
230
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700231 private static class ClusterLeaderListener implements LeaderLatchListener {
Ray Milkey269ffb92014-04-03 14:43:30 -0700232 //
233 // NOTE: If we need to support callbacks when the
234 // leadership changes, those should be called here.
235 //
236
237 @Override
238 public void isLeader() {
239 log.debug("Cluster leadership aquired");
240 }
241
242 @Override
243 public void notLeader() {
244 log.debug("Cluster leadership lost");
245 }
246 }
247
Ray Milkey269ffb92014-04-03 14:43:30 -0700248 @Override
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700249 public void requestControl(long dpid, ControlChangeCallback cb)
250 throws RegistryException {
Ray Milkey269ffb92014-04-03 14:43:30 -0700251 log.info("Requesting control for {}", HexString.toHexString(dpid));
252
253 if (controllerId == null) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700254 throw new IllegalStateException("Must register a controller before"
255 + " calling requestControl");
Ray Milkey269ffb92014-04-03 14:43:30 -0700256 }
257
258 String dpidStr = HexString.toHexString(dpid);
Ray Milkey269ffb92014-04-03 14:43:30 -0700259
260 if (switches.get(dpidStr) != null) {
261 log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
262 throw new RegistryException("Already contesting control for " + dpidStr);
263 }
264
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700265 String latchPath = SWITCH_LATCHES_PATH + "/" + dpidStr;
266
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700267 LeaderLatch latch =
268 new LeaderLatch(curatorFrameworkClient, latchPath, controllerId);
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700269 SwitchLeaderListener listener = new SwitchLeaderListener(dpidStr);
Ray Milkey269ffb92014-04-03 14:43:30 -0700270 latch.addListener(listener);
271
272
273 SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb, listener);
274 SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
275
276 if (oldData != null) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700277 // There was already data for that key in the map
278 // i.e. someone else got here first so we can't succeed
Ray Milkey269ffb92014-04-03 14:43:30 -0700279 log.debug("Already requested control for {}", dpidStr);
280 throw new RegistryException("Already requested control for " + dpidStr);
281 }
282
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700283 // Now that we know we were able to add our latch to the collection,
284 // we can start the leader election in Zookeeper. However I don't know
285 // how to handle if the start fails - the latch is already in our
286 // switches list.
287 // TODO seems like there's a Curator bug when latch.start is called when
288 // there's no Zookeeper connection which causes two znodes to be put in
289 // Zookeeper at the latch path when we reconnect to Zookeeper.
Ray Milkey269ffb92014-04-03 14:43:30 -0700290 try {
291 latch.start();
292 } catch (Exception e) {
293 log.warn("Error starting leader latch: {}", e.getMessage());
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700294 throw new RegistryException("Error starting leader latch for "
295 + dpidStr, e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700296 }
297
298 }
299
300 @Override
301 public void releaseControl(long dpid) {
302 log.info("Releasing control for {}", HexString.toHexString(dpid));
303
304 String dpidStr = HexString.toHexString(dpid);
305
306 SwitchLeadershipData swData = switches.remove(dpidStr);
307
308 if (swData == null) {
309 log.debug("Trying to release control of a switch we are not contesting");
310 return;
311 }
312
313 LeaderLatch latch = swData.getLatch();
314
315 latch.removeListener(swData.getListener());
316
317 try {
318 latch.close();
319 } catch (IOException e) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700320 // I think it's OK not to do anything here. Either the node got
321 // deleted correctly, or the connection went down and the node got deleted.
Ray Milkey269ffb92014-04-03 14:43:30 -0700322 log.debug("releaseControl: caught IOException {}", dpidStr);
323 }
324 }
325
326 @Override
327 public boolean hasControl(long dpid) {
328 String dpidStr = HexString.toHexString(dpid);
329
330 SwitchLeadershipData swData = switches.get(dpidStr);
331
332 if (swData == null) {
333 log.warn("No leader latch for dpid {}", dpidStr);
334 return false;
335 }
336
337 return swData.getLatch().hasLeadership();
338 }
339
340 @Override
341 public boolean isClusterLeader() {
342 return clusterLeaderLatch.hasLeadership();
343 }
344
345 @Override
346 public String getControllerId() {
347 return controllerId;
348 }
349
350 @Override
351 public Collection<String> getAllControllers() throws RegistryException {
352 log.debug("Getting all controllers");
353
354 List<String> controllers = new ArrayList<String>();
355 for (ServiceInstance<ControllerService> instance : serviceCache.getInstances()) {
356 String id = instance.getPayload().getControllerId();
357 if (!controllers.contains(id)) {
358 controllers.add(id);
359 }
360 }
361
362 return controllers;
363 }
364
365 @Override
366 public void registerController(String id) throws RegistryException {
367 if (controllerId != null) {
368 throw new RegistryException(
369 "Controller already registered with id " + controllerId);
370 }
371
372 controllerId = id;
373
374 try {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700375 ServiceInstance<ControllerService> thisInstance =
376 ServiceInstance.<ControllerService>builder()
Ray Milkey269ffb92014-04-03 14:43:30 -0700377 .name(CONTROLLER_SERVICE_NAME)
378 .payload(new ControllerService(controllerId))
Ray Milkey269ffb92014-04-03 14:43:30 -0700379 .build();
380
381 serviceDiscovery.registerService(thisInstance);
382 } catch (Exception e) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700383 log.error("Exception starting service instance:", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700384 }
385
386 }
387
388 @Override
389 public String getControllerForSwitch(long dpid) throws RegistryException {
390 String dpidStr = HexString.toHexString(dpid);
391
392 PathChildrenCache switchCache = switchPathCaches.get(dpidStr);
393
394 if (switchCache == null) {
395 log.warn("Tried to get controller for non-existent switch");
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800396 return null;
397 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700398
Ray Milkey269ffb92014-04-03 14:43:30 -0700399 try {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700400 // We've seen issues with these caches get stuck out of date, so
401 // we'll have to force them to refresh before each read. This slows
402 // down the method as it blocks on a Zookeeper query, however at
403 // the moment only the cleanup thread uses this and that isn't
404 // particularly time-sensitive.
405 // TODO verify if it is still the case that caches can be out of date
Ray Milkey269ffb92014-04-03 14:43:30 -0700406 switchCache.rebuild();
407 } catch (Exception e) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700408 log.error("Exception rebuilding the switch cache:", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700409 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700410
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700411 List<ChildData> sortedData =
412 new ArrayList<ChildData>(switchCache.getCurrentData());
Ray Milkey269ffb92014-04-03 14:43:30 -0700413
414 Collections.sort(
415 sortedData,
416 new Comparator<ChildData>() {
417 private String getSequenceNumber(String path) {
418 return path.substring(path.lastIndexOf('-') + 1);
419 }
420
421 @Override
422 public int compare(ChildData lhs, ChildData rhs) {
423 return getSequenceNumber(lhs.getPath()).
424 compareTo(getSequenceNumber(rhs.getPath()));
425 }
426 }
427 );
428
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700429 if (sortedData.isEmpty()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700430 return null;
431 }
432
433 return new String(sortedData.get(0).getData(), Charsets.UTF_8);
434 }
435
436 @Override
Ray Milkey5df613b2014-04-15 10:50:56 -0700437 public Collection<Long> getSwitchesControlledByController(String controller) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700438 // TODO remove this if not needed
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700439 throw new NotImplementedException("Not yet implemented");
Ray Milkey269ffb92014-04-03 14:43:30 -0700440 }
441
442
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700443 // TODO what should happen when there's no ZK connection? Currently we just
444 // return the cache but this may lead to false impressions - i.e. we don't
445 // actually know what's in ZK so we shouldn't say we do
Ray Milkey269ffb92014-04-03 14:43:30 -0700446 @Override
447 public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
448 Map<String, List<ControllerRegistryEntry>> data =
449 new HashMap<String, List<ControllerRegistryEntry>>();
450
451 for (Map.Entry<String, PathChildrenCache> entry : switchPathCaches.entrySet()) {
452 List<ControllerRegistryEntry> contendingControllers =
453 new ArrayList<ControllerRegistryEntry>();
454
455 if (entry.getValue().getCurrentData().size() < 1) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700456 // TODO prevent even having the PathChildrenCache in this case
Ray Milkey269ffb92014-04-03 14:43:30 -0700457 continue;
458 }
459
460 for (ChildData d : entry.getValue().getCurrentData()) {
461
Ray Milkey5df613b2014-04-15 10:50:56 -0700462 String childsControllerId = new String(d.getData(), Charsets.UTF_8);
Ray Milkey269ffb92014-04-03 14:43:30 -0700463
464 String[] splitted = d.getPath().split("-");
465 int sequenceNumber = Integer.parseInt(splitted[splitted.length - 1]);
466
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700467 contendingControllers.add(new ControllerRegistryEntry(
468 childsControllerId, sequenceNumber));
Ray Milkey269ffb92014-04-03 14:43:30 -0700469 }
470
471 Collections.sort(contendingControllers);
472 data.put(entry.getKey(), contendingControllers);
473 }
474 return data;
475 }
476
477 public IdBlock allocateUniqueIdBlock(long range) {
478 try {
479 AtomicValue<Long> result = null;
480 do {
481 result = distributedIdCounter.add(range);
482 } while (result == null || !result.succeeded());
483
484 return new IdBlock(result.preValue(), result.postValue() - 1, range);
485 } catch (Exception e) {
486 log.error("Error allocating ID block");
487 }
488 return null;
489 }
490
491 /**
492 * Returns a block of IDs which are unique and unused.
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700493 * The range of IDs is a fixed size and is allocated incrementally as this
494 * method is called. Since the range of IDs is managed by Zookeeper in
495 * distributed way, this method may block during Zookeeper access.
496 *
497 * @return an IdBlock containing a set of unique IDs
Ray Milkey269ffb92014-04-03 14:43:30 -0700498 */
499 @Override
500 public IdBlock allocateUniqueIdBlock() {
501 return allocateUniqueIdBlock(ID_BLOCK_SIZE);
502 }
503
504 /**
505 * Get a globally unique ID.
506 *
507 * @return a globally unique ID.
508 */
509 @Override
510 public synchronized long getNextUniqueId() {
511 //
512 // Generate the next Unique ID.
513 //
514 // TODO: For now, the higher 32 bits are random, and
515 // the lower 32 bits are sequential.
516 // The implementation must be updated to use the Zookeeper
517 // to allocate the higher 32 bits (globally unique).
518 //
519 if ((nextUniqueIdSuffix & 0xffffffffL) == 0xffffffffL) {
520 nextUniqueIdPrefix = randomGenerator.nextInt();
521 nextUniqueIdSuffix = 0;
522 } else {
523 nextUniqueIdSuffix++;
524 }
Pavlin Radoslavov952a9762014-04-10 13:47:03 -0700525 long result = nextUniqueIdPrefix << 32;
Ray Milkey269ffb92014-04-03 14:43:30 -0700526 result = result | (0xffffffffL & nextUniqueIdSuffix);
527 return result;
528 }
529
530 /*
531 * IFloodlightModule
532 */
533
534 @Override
535 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
536 Collection<Class<? extends IFloodlightService>> l =
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800537 new ArrayList<Class<? extends IFloodlightService>>();
Ray Milkey269ffb92014-04-03 14:43:30 -0700538 l.add(IControllerRegistryService.class);
539 return l;
540 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700541
Ray Milkey269ffb92014-04-03 14:43:30 -0700542 @Override
543 public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
544 Map<Class<? extends IFloodlightService>, IFloodlightService> m =
545 new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
546 m.put(IControllerRegistryService.class, this);
547 return m;
548 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700549
Ray Milkey269ffb92014-04-03 14:43:30 -0700550 @Override
551 public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
552 Collection<Class<? extends IFloodlightService>> l =
553 new ArrayList<Class<? extends IFloodlightService>>();
554 l.add(IFloodlightProviderService.class);
555 l.add(IRestApiService.class);
556 return l;
557 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800558
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700559 // TODO currently blocks startup when it can't get a Zookeeper connection.
560 // Do we support starting up with no Zookeeper connection?
Ray Milkey269ffb92014-04-03 14:43:30 -0700561 @Override
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700562 public void init(FloodlightModuleContext context)
563 throws FloodlightModuleException {
564 // Read the Zookeeper connection string from the config
Ray Milkey269ffb92014-04-03 14:43:30 -0700565 Map<String, String> configParams = context.getConfigParams(this);
Ray Milkey5df613b2014-04-15 10:50:56 -0700566 String connectionStringParam = configParams.get("connectionString");
567 if (connectionStringParam != null) {
568 connectionString = connectionStringParam;
Ray Milkey269ffb92014-04-03 14:43:30 -0700569 }
570 log.info("Setting Zookeeper connection string to {}", this.connectionString);
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700571
Ray Milkey269ffb92014-04-03 14:43:30 -0700572 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800573
Ray Milkey269ffb92014-04-03 14:43:30 -0700574 switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
Ray Milkey269ffb92014-04-03 14:43:30 -0700575 switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>();
576
577 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700578 curatorFrameworkClient =
579 CuratorFrameworkFactory.newClient(this.connectionString,
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700580 SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
Ray Milkey269ffb92014-04-03 14:43:30 -0700581
Ray Milkey5df613b2014-04-15 10:50:56 -0700582 curatorFrameworkClient.start();
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700583 curatorFrameworkClient = curatorFrameworkClient.usingNamespace(NAMESPACE);
Ray Milkey269ffb92014-04-03 14:43:30 -0700584
585 distributedIdCounter = new DistributedAtomicLong(
Ray Milkey5df613b2014-04-15 10:50:56 -0700586 curatorFrameworkClient,
Ray Milkey269ffb92014-04-03 14:43:30 -0700587 ID_COUNTER_PATH,
588 new RetryOneTime(100));
589
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700590 rootSwitchCache = new PathChildrenCache(
591 curatorFrameworkClient, SWITCH_LATCHES_PATH, true);
Ray Milkey5df613b2014-04-15 10:50:56 -0700592 rootSwitchCache.getListenable().addListener(switchPathCacheListener);
Ray Milkey269ffb92014-04-03 14:43:30 -0700593
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700594 // Build the service discovery object
Ray Milkey269ffb92014-04-03 14:43:30 -0700595 serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class)
Ray Milkey5df613b2014-04-15 10:50:56 -0700596 .client(curatorFrameworkClient).basePath(SERVICES_PATH).build();
Ray Milkey269ffb92014-04-03 14:43:30 -0700597
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700598 // We read the list of services very frequently (GUI periodically
599 // queries them) so we'll cache them to cut down on Zookeeper queries.
Ray Milkey269ffb92014-04-03 14:43:30 -0700600 serviceCache = serviceDiscovery.serviceCacheBuilder()
601 .name(CONTROLLER_SERVICE_NAME).build();
602
Ray Milkey269ffb92014-04-03 14:43:30 -0700603 try {
604 serviceDiscovery.start();
605 serviceCache.start();
606
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700607 // Don't prime the cache, we want a notification for each child
608 // node in the path
Ray Milkey5df613b2014-04-15 10:50:56 -0700609 rootSwitchCache.start(StartMode.NORMAL);
Ray Milkey269ffb92014-04-03 14:43:30 -0700610 } catch (Exception e) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700611 throw new FloodlightModuleException(
612 "Error initialising ZookeeperRegistry", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700613 }
614
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700615 ExecutorService eventThreadExecutorService =
616 Executors.newSingleThreadExecutor();
Ray Milkey269ffb92014-04-03 14:43:30 -0700617 eventThreadExecutorService.execute(
618 new Runnable() {
619 @Override
620 public void run() {
621 dispatchEvents();
622 }
623 });
624 }
625
626 @Override
627 public void startUp(FloodlightModuleContext context) {
628 //
629 // Cluster Leader election setup.
630 // NOTE: We have to do it here, because during the init stage
631 // we don't know the Controller ID.
632 //
633 if (controllerId == null) {
634 log.error("Error on startup: unknown ControllerId");
635 }
Ray Milkey5df613b2014-04-15 10:50:56 -0700636 clusterLeaderLatch = new LeaderLatch(curatorFrameworkClient,
Ray Milkey269ffb92014-04-03 14:43:30 -0700637 CLUSTER_LEADER_PATH,
638 controllerId);
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700639 clusterLeaderListener = new ClusterLeaderListener();
Ray Milkey269ffb92014-04-03 14:43:30 -0700640 clusterLeaderLatch.addListener(clusterLeaderListener);
641 try {
642 clusterLeaderLatch.start();
643 } catch (Exception e) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700644 log.error("Error starting the cluster leader election: ", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700645 }
646
647 // Keep trying until there is a cluster leader
648 do {
649 try {
650 Participant leader = clusterLeaderLatch.getLeader();
Ray Milkeyb29e6262014-04-09 16:02:14 -0700651 if (!leader.getId().isEmpty()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700652 break;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700653 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700654 Thread.sleep(CLUSTER_LEADER_ELECTION_RETRY_MS);
655 } catch (Exception e) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700656 log.error("Error waiting for cluster leader election:", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700657 }
658 } while (true);
659
660 restApi.addRestletRoutable(new RegistryWebRoutable());
661 }
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800662}