blob: ce68ec27c23034c7f25cbd21593d513e65be46f4 [file] [log] [blame]
Jonathan Hartdeda0ba2014-04-03 11:14:12 -07001package net.onrc.onos.core.registry;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08002
Jonathan Hartbd181b62013-02-17 16:05:38 -08003import java.io.IOException;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08004import java.util.ArrayList;
5import java.util.Collection;
Jonathan Hart3d7730a2013-02-22 11:51:17 -08006import java.util.Collections;
Jonathan Hart599c6b32013-03-24 22:42:02 -07007import java.util.Comparator;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -08008import java.util.HashMap;
Jonathan Hartedd6a442013-02-20 15:22:06 -08009import java.util.List;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080010import java.util.Map;
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070011import java.util.Random;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070012import java.util.concurrent.BlockingQueue;
Jonathan Hart89187372013-03-14 16:41:09 -070013import java.util.concurrent.ConcurrentHashMap;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070014import java.util.concurrent.ExecutorService;
15import java.util.concurrent.Executors;
16import java.util.concurrent.LinkedBlockingQueue;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080017
Pavlin Radoslavovc35229e2014-02-06 16:19:37 -080018import net.floodlightcontroller.core.IFloodlightProviderService;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080019import net.floodlightcontroller.core.module.FloodlightModuleContext;
20import net.floodlightcontroller.core.module.FloodlightModuleException;
21import net.floodlightcontroller.core.module.IFloodlightModule;
22import net.floodlightcontroller.core.module.IFloodlightService;
Jonathan Hart3d7730a2013-02-22 11:51:17 -080023import net.floodlightcontroller.restserver.IRestApiService;
Jonathan Hartdeda0ba2014-04-03 11:14:12 -070024import net.onrc.onos.core.registry.web.RegistryWebRoutable;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080025
Jonathan Hart12a26aa2014-06-04 14:33:09 -070026import org.apache.commons.lang.NotImplementedException;
Jonathan Hart116b1fe2014-03-14 18:53:47 -070027import org.apache.curator.RetryPolicy;
28import org.apache.curator.framework.CuratorFramework;
29import org.apache.curator.framework.CuratorFrameworkFactory;
30import org.apache.curator.framework.recipes.atomic.AtomicValue;
31import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
32import org.apache.curator.framework.recipes.cache.ChildData;
33import org.apache.curator.framework.recipes.cache.PathChildrenCache;
34import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
35import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
36import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
37import org.apache.curator.framework.recipes.leader.LeaderLatch;
38import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
39import org.apache.curator.framework.recipes.leader.Participant;
40import org.apache.curator.retry.ExponentialBackoffRetry;
41import org.apache.curator.retry.RetryOneTime;
42import org.apache.curator.x.discovery.ServiceCache;
43import org.apache.curator.x.discovery.ServiceDiscovery;
44import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
45import org.apache.curator.x.discovery.ServiceInstance;
Jonathan Hartbd181b62013-02-17 16:05:38 -080046import org.openflow.util.HexString;
47import org.slf4j.Logger;
48import org.slf4j.LoggerFactory;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080049
Jonathan Hartd10008d2013-02-23 17:04:08 -080050import com.google.common.base.Charsets;
Jonathan Hartbd181b62013-02-17 16:05:38 -080051
Jonathan Hart7bf62172013-02-28 13:17:18 -080052/**
53 * A registry service that uses Zookeeper. All data is stored in Zookeeper,
54 * so this can be used as a global registry in a multi-node ONOS cluster.
Jonathan Hart7bf62172013-02-28 13:17:18 -080055 */
Jonathan Hart1dbcce62014-06-04 15:21:45 -070056public class ZookeeperRegistry implements IFloodlightModule,
57 IControllerRegistryService {
Jonathan Hartc6eee9e2013-02-18 14:58:27 -080058
Yuta HIGUCHI5f1ce1c2014-07-20 22:43:54 -070059 private static final String DEFAULT_CONNECTION_STRING = "localhost:2181";
60
Ray Milkeyec838942014-04-09 11:28:43 -070061 private static final Logger log = LoggerFactory.getLogger(ZookeeperRegistry.class);
Jonathan Hart71c0ffc2013-03-24 15:58:42 -070062
Jonathan Hart12a26aa2014-06-04 14:33:09 -070063 private String controllerId;
64
65 private IRestApiService restApi;
Jonathan Hartbd181b62013-02-17 16:05:38 -080066
Jonathan Hart1dbcce62014-06-04 15:21:45 -070067 // This is the default. It is overwritten by the connectionString
68 // configuration parameter
Yuta HIGUCHI5f1ce1c2014-07-20 22:43:54 -070069 private String connectionString = DEFAULT_CONNECTION_STRING;
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -080070
Yuta HIGUCHI85de40d2014-06-12 14:06:41 -070071 /**
72 * JVM Option to specify ZooKeeper namespace.
73 */
74 public static final String ZK_NAMESPACE_KEY = "zookeeper.namespace";
75 private static final String DEFAULT_NAMESPACE = "onos";
76 private String namespace = DEFAULT_NAMESPACE;
Jonathan Hart12a26aa2014-06-04 14:33:09 -070077 private static final String SWITCH_LATCHES_PATH = "/switches";
Ray Milkey2476cac2014-04-08 11:03:21 -070078 private static final String CLUSTER_LEADER_PATH = "/cluster/leader";
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -080079
Jonathan Hart1dbcce62014-06-04 15:21:45 -070080 private static final String SERVICES_PATH = "/"; // i.e. the root of our namespace
Ray Milkey2476cac2014-04-08 11:03:21 -070081 private static final String CONTROLLER_SERVICE_NAME = "controllers";
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070082
Jonathan Hart12a26aa2014-06-04 14:33:09 -070083 private CuratorFramework curatorFrameworkClient;
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -070084
Jonathan Hart12a26aa2014-06-04 14:33:09 -070085 private PathChildrenCache rootSwitchCache;
Ray Milkey269ffb92014-04-03 14:43:30 -070086
Jonathan Hart12a26aa2014-06-04 14:33:09 -070087 private ConcurrentHashMap<String, SwitchLeadershipData> switches;
88 private Map<String, PathChildrenCache> switchPathCaches;
Ray Milkey269ffb92014-04-03 14:43:30 -070089
Jonathan Hart12a26aa2014-06-04 14:33:09 -070090 private LeaderLatch clusterLeaderLatch;
91 private ClusterLeaderListener clusterLeaderListener;
Ray Milkey269ffb92014-04-03 14:43:30 -070092 private static final long CLUSTER_LEADER_ELECTION_RETRY_MS = 100;
93
Ray Milkey2476cac2014-04-08 11:03:21 -070094 private static final String ID_COUNTER_PATH = "/flowidcounter";
95 private static final Long ID_BLOCK_SIZE = 0x100000000L;
Jonathan Hart12a26aa2014-06-04 14:33:09 -070096 private DistributedAtomicLong distributedIdCounter;
Ray Milkey269ffb92014-04-03 14:43:30 -070097
98 //Zookeeper performance-related configuration
Jonathan Hart12a26aa2014-06-04 14:33:09 -070099 private static final int SESSION_TIMEOUT = 7000; // ms
100 private static final int CONNECTION_TIMEOUT = 5000; // ms
Ray Milkey269ffb92014-04-03 14:43:30 -0700101
102 //
103 // Unique ID generation state
104 // TODO: The implementation must be updated to use the Zookeeper
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700105 // instead of a random generator.
Ray Milkey269ffb92014-04-03 14:43:30 -0700106 //
107 private static Random randomGenerator = new Random();
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700108 private static long nextUniqueIdPrefix;
Pavlin Radoslavov952a9762014-04-10 13:47:03 -0700109 // NOTE: The 0xffffffffL value is used by the Unique ID generator for
110 // initialization purpose.
111 private static long nextUniqueIdSuffix = 0xffffffffL;
Ray Milkey269ffb92014-04-03 14:43:30 -0700112
113 private final BlockingQueue<SwitchLeaderEvent> switchLeadershipEvents =
114 new LinkedBlockingQueue<SwitchLeaderEvent>();
115
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700116 /**
117 * Listens for changes to the switch znodes in Zookeeper. This maintains
118 * the second level of PathChildrenCaches that hold the controllers
119 * contending for each switch - there's one for each switch.
120 */
121 private PathChildrenCacheListener switchPathCacheListener =
122 new SwitchPathCacheListener();
123 private ServiceDiscovery<ControllerService> serviceDiscovery;
124 private ServiceCache<ControllerService> serviceCache;
125
Ray Milkey269ffb92014-04-03 14:43:30 -0700126
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700127 private static class SwitchLeaderEvent {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700128 private final long dpid;
129 private final boolean isLeader;
Ray Milkey269ffb92014-04-03 14:43:30 -0700130
131 public SwitchLeaderEvent(long dpid, boolean isLeader) {
132 this.dpid = dpid;
133 this.isLeader = isLeader;
134 }
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700135
136 public long getDpid() {
137 return dpid;
138 }
139
140 public boolean isLeader() {
141 return isLeader;
142 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700143 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700144
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700145 // Dispatcher thread for leadership change events coming from Curator
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700146 private void dispatchEvents() {
Ray Milkey269ffb92014-04-03 14:43:30 -0700147 while (!Thread.currentThread().isInterrupted()) {
148 try {
149 SwitchLeaderEvent event = switchLeadershipEvents.take();
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700150 SwitchLeadershipData swData =
151 switches.get(HexString.toHexString(event.getDpid()));
Ray Milkey269ffb92014-04-03 14:43:30 -0700152 if (swData == null) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700153 log.debug("Leadership data {} not found", event.getDpid());
Ray Milkey269ffb92014-04-03 14:43:30 -0700154 continue;
155 }
156
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700157 swData.getCallback().controlChanged(event.getDpid(), event.isLeader());
Ray Milkey269ffb92014-04-03 14:43:30 -0700158 } catch (InterruptedException e) {
159 Thread.currentThread().interrupt();
160 break;
161 } catch (Exception e) {
162 log.error("Exception in registry event thread", e);
163 }
164 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700165 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800166
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700167 class SwitchLeaderListener implements LeaderLatchListener {
Yuta HIGUCHI5bbbaca2014-06-09 16:39:08 -0700168 private final String dpid;
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700169
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700170 public SwitchLeaderListener(String dpid) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700171 this.dpid = dpid;
Ray Milkey269ffb92014-04-03 14:43:30 -0700172 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700173
Ray Milkey269ffb92014-04-03 14:43:30 -0700174 @Override
175 public void isLeader() {
Yuta HIGUCHI5bbbaca2014-06-09 16:39:08 -0700176 log.info("Became leader for {}", dpid);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800177
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700178 switchLeadershipEvents.add(
179 new SwitchLeaderEvent(HexString.toLong(dpid), true));
Ray Milkey269ffb92014-04-03 14:43:30 -0700180 }
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800181
Ray Milkey269ffb92014-04-03 14:43:30 -0700182 @Override
183 public void notLeader() {
Yuta HIGUCHI5bbbaca2014-06-09 16:39:08 -0700184 log.info("Lost leadership for {}", dpid);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800185
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700186 switchLeadershipEvents.add(
187 new SwitchLeaderEvent(HexString.toLong(dpid), false));
Ray Milkey269ffb92014-04-03 14:43:30 -0700188 }
189 }
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700190
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700191 class SwitchPathCacheListener implements PathChildrenCacheListener {
Ray Milkey269ffb92014-04-03 14:43:30 -0700192 @Override
193 public void childEvent(CuratorFramework client,
194 PathChildrenCacheEvent event) throws Exception {
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800195
Ray Milkey269ffb92014-04-03 14:43:30 -0700196 String strSwitch = null;
197 if (event.getData() != null) {
198 String[] splitted = event.getData().getPath().split("/");
199 strSwitch = splitted[splitted.length - 1];
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800200 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700201
202 switch (event.getType()) {
203 case CHILD_ADDED:
204 case CHILD_UPDATED:
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700205 // Check we have a PathChildrenCache for this child
206 // and add one if not
Ray Milkey269ffb92014-04-03 14:43:30 -0700207 synchronized (switchPathCaches) {
208 if (switchPathCaches.get(strSwitch) == null) {
209 PathChildrenCache pc = new PathChildrenCache(client,
210 event.getData().getPath(), true);
211 pc.start(StartMode.NORMAL);
212 switchPathCaches.put(strSwitch, pc);
213 }
214 }
215 break;
216 case CHILD_REMOVED:
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700217 // Remove our PathChildrenCache for this child
Ray Milkey269ffb92014-04-03 14:43:30 -0700218 PathChildrenCache pc = null;
219 synchronized (switchPathCaches) {
220 pc = switchPathCaches.remove(strSwitch);
221 }
222 if (pc != null) {
223 pc.close();
224 }
225 break;
226 default:
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700227 // All other switchLeadershipEvents are connection status
228 // switchLeadershipEvents. We don't need to do anything as
229 // the path cache handles these on its own.
Ray Milkey269ffb92014-04-03 14:43:30 -0700230 break;
231 }
232
233 }
234 }
235
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700236 private static class ClusterLeaderListener implements LeaderLatchListener {
Ray Milkey269ffb92014-04-03 14:43:30 -0700237 //
238 // NOTE: If we need to support callbacks when the
239 // leadership changes, those should be called here.
240 //
241
242 @Override
243 public void isLeader() {
Yuta HIGUCHI5bbbaca2014-06-09 16:39:08 -0700244 log.info("Cluster leadership aquired");
Ray Milkey269ffb92014-04-03 14:43:30 -0700245 }
246
247 @Override
248 public void notLeader() {
Yuta HIGUCHI5bbbaca2014-06-09 16:39:08 -0700249 log.info("Cluster leadership lost");
Ray Milkey269ffb92014-04-03 14:43:30 -0700250 }
251 }
252
Ray Milkey269ffb92014-04-03 14:43:30 -0700253 @Override
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700254 public void requestControl(long dpid, ControlChangeCallback cb)
255 throws RegistryException {
Ray Milkey269ffb92014-04-03 14:43:30 -0700256 log.info("Requesting control for {}", HexString.toHexString(dpid));
257
258 if (controllerId == null) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700259 throw new IllegalStateException("Must register a controller before"
260 + " calling requestControl");
Ray Milkey269ffb92014-04-03 14:43:30 -0700261 }
262
263 String dpidStr = HexString.toHexString(dpid);
Ray Milkey269ffb92014-04-03 14:43:30 -0700264
265 if (switches.get(dpidStr) != null) {
266 log.debug("Already contesting {}, returning", HexString.toHexString(dpid));
267 throw new RegistryException("Already contesting control for " + dpidStr);
268 }
269
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700270 String latchPath = SWITCH_LATCHES_PATH + "/" + dpidStr;
271
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700272 LeaderLatch latch =
273 new LeaderLatch(curatorFrameworkClient, latchPath, controllerId);
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700274 SwitchLeaderListener listener = new SwitchLeaderListener(dpidStr);
Ray Milkey269ffb92014-04-03 14:43:30 -0700275 latch.addListener(listener);
276
277
278 SwitchLeadershipData swData = new SwitchLeadershipData(latch, cb, listener);
279 SwitchLeadershipData oldData = switches.putIfAbsent(dpidStr, swData);
280
281 if (oldData != null) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700282 // There was already data for that key in the map
283 // i.e. someone else got here first so we can't succeed
Ray Milkey269ffb92014-04-03 14:43:30 -0700284 log.debug("Already requested control for {}", dpidStr);
285 throw new RegistryException("Already requested control for " + dpidStr);
286 }
287
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700288 // Now that we know we were able to add our latch to the collection,
289 // we can start the leader election in Zookeeper. However I don't know
290 // how to handle if the start fails - the latch is already in our
291 // switches list.
292 // TODO seems like there's a Curator bug when latch.start is called when
293 // there's no Zookeeper connection which causes two znodes to be put in
294 // Zookeeper at the latch path when we reconnect to Zookeeper.
Ray Milkey269ffb92014-04-03 14:43:30 -0700295 try {
296 latch.start();
297 } catch (Exception e) {
298 log.warn("Error starting leader latch: {}", e.getMessage());
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700299 throw new RegistryException("Error starting leader latch for "
300 + dpidStr, e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700301 }
302
303 }
304
305 @Override
306 public void releaseControl(long dpid) {
307 log.info("Releasing control for {}", HexString.toHexString(dpid));
308
309 String dpidStr = HexString.toHexString(dpid);
310
311 SwitchLeadershipData swData = switches.remove(dpidStr);
312
313 if (swData == null) {
314 log.debug("Trying to release control of a switch we are not contesting");
315 return;
316 }
317
318 LeaderLatch latch = swData.getLatch();
319
320 latch.removeListener(swData.getListener());
321
322 try {
323 latch.close();
324 } catch (IOException e) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700325 // I think it's OK not to do anything here. Either the node got
326 // deleted correctly, or the connection went down and the node got deleted.
Ray Milkey269ffb92014-04-03 14:43:30 -0700327 log.debug("releaseControl: caught IOException {}", dpidStr);
328 }
329 }
330
331 @Override
332 public boolean hasControl(long dpid) {
333 String dpidStr = HexString.toHexString(dpid);
334
335 SwitchLeadershipData swData = switches.get(dpidStr);
336
337 if (swData == null) {
338 log.warn("No leader latch for dpid {}", dpidStr);
339 return false;
340 }
341
342 return swData.getLatch().hasLeadership();
343 }
344
345 @Override
346 public boolean isClusterLeader() {
347 return clusterLeaderLatch.hasLeadership();
348 }
349
350 @Override
351 public String getControllerId() {
352 return controllerId;
353 }
354
355 @Override
356 public Collection<String> getAllControllers() throws RegistryException {
357 log.debug("Getting all controllers");
358
359 List<String> controllers = new ArrayList<String>();
360 for (ServiceInstance<ControllerService> instance : serviceCache.getInstances()) {
361 String id = instance.getPayload().getControllerId();
362 if (!controllers.contains(id)) {
363 controllers.add(id);
364 }
365 }
366
367 return controllers;
368 }
369
370 @Override
371 public void registerController(String id) throws RegistryException {
372 if (controllerId != null) {
373 throw new RegistryException(
374 "Controller already registered with id " + controllerId);
375 }
376
377 controllerId = id;
378
379 try {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700380 ServiceInstance<ControllerService> thisInstance =
381 ServiceInstance.<ControllerService>builder()
Ray Milkey269ffb92014-04-03 14:43:30 -0700382 .name(CONTROLLER_SERVICE_NAME)
383 .payload(new ControllerService(controllerId))
Ray Milkey269ffb92014-04-03 14:43:30 -0700384 .build();
385
386 serviceDiscovery.registerService(thisInstance);
387 } catch (Exception e) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700388 log.error("Exception starting service instance:", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700389 }
390
391 }
392
393 @Override
394 public String getControllerForSwitch(long dpid) throws RegistryException {
395 String dpidStr = HexString.toHexString(dpid);
396
397 PathChildrenCache switchCache = switchPathCaches.get(dpidStr);
398
399 if (switchCache == null) {
400 log.warn("Tried to get controller for non-existent switch");
Nick Karanatsios8abe7172014-02-19 20:31:48 -0800401 return null;
402 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700403
Ray Milkey269ffb92014-04-03 14:43:30 -0700404 try {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700405 // We've seen issues with these caches get stuck out of date, so
406 // we'll have to force them to refresh before each read. This slows
407 // down the method as it blocks on a Zookeeper query, however at
408 // the moment only the cleanup thread uses this and that isn't
409 // particularly time-sensitive.
410 // TODO verify if it is still the case that caches can be out of date
Ray Milkey269ffb92014-04-03 14:43:30 -0700411 switchCache.rebuild();
412 } catch (Exception e) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700413 log.error("Exception rebuilding the switch cache:", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700414 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700415
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700416 List<ChildData> sortedData =
417 new ArrayList<ChildData>(switchCache.getCurrentData());
Ray Milkey269ffb92014-04-03 14:43:30 -0700418
419 Collections.sort(
420 sortedData,
421 new Comparator<ChildData>() {
422 private String getSequenceNumber(String path) {
423 return path.substring(path.lastIndexOf('-') + 1);
424 }
425
426 @Override
427 public int compare(ChildData lhs, ChildData rhs) {
428 return getSequenceNumber(lhs.getPath()).
429 compareTo(getSequenceNumber(rhs.getPath()));
430 }
431 }
432 );
433
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700434 if (sortedData.isEmpty()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700435 return null;
436 }
437
438 return new String(sortedData.get(0).getData(), Charsets.UTF_8);
439 }
440
441 @Override
Ray Milkey5df613b2014-04-15 10:50:56 -0700442 public Collection<Long> getSwitchesControlledByController(String controller) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700443 // TODO remove this if not needed
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700444 throw new NotImplementedException("Not yet implemented");
Ray Milkey269ffb92014-04-03 14:43:30 -0700445 }
446
447
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700448 // TODO what should happen when there's no ZK connection? Currently we just
449 // return the cache but this may lead to false impressions - i.e. we don't
450 // actually know what's in ZK so we shouldn't say we do
Ray Milkey269ffb92014-04-03 14:43:30 -0700451 @Override
452 public Map<String, List<ControllerRegistryEntry>> getAllSwitches() {
453 Map<String, List<ControllerRegistryEntry>> data =
454 new HashMap<String, List<ControllerRegistryEntry>>();
455
456 for (Map.Entry<String, PathChildrenCache> entry : switchPathCaches.entrySet()) {
457 List<ControllerRegistryEntry> contendingControllers =
458 new ArrayList<ControllerRegistryEntry>();
459
460 if (entry.getValue().getCurrentData().size() < 1) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700461 // TODO prevent even having the PathChildrenCache in this case
Ray Milkey269ffb92014-04-03 14:43:30 -0700462 continue;
463 }
464
465 for (ChildData d : entry.getValue().getCurrentData()) {
466
Ray Milkey5df613b2014-04-15 10:50:56 -0700467 String childsControllerId = new String(d.getData(), Charsets.UTF_8);
Ray Milkey269ffb92014-04-03 14:43:30 -0700468
469 String[] splitted = d.getPath().split("-");
470 int sequenceNumber = Integer.parseInt(splitted[splitted.length - 1]);
471
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700472 contendingControllers.add(new ControllerRegistryEntry(
473 childsControllerId, sequenceNumber));
Ray Milkey269ffb92014-04-03 14:43:30 -0700474 }
475
476 Collections.sort(contendingControllers);
477 data.put(entry.getKey(), contendingControllers);
478 }
479 return data;
480 }
481
Yuta HIGUCHI5bbbaca2014-06-09 16:39:08 -0700482 @Override
Ray Milkey269ffb92014-04-03 14:43:30 -0700483 public IdBlock allocateUniqueIdBlock(long range) {
484 try {
485 AtomicValue<Long> result = null;
486 do {
487 result = distributedIdCounter.add(range);
488 } while (result == null || !result.succeeded());
489
490 return new IdBlock(result.preValue(), result.postValue() - 1, range);
491 } catch (Exception e) {
492 log.error("Error allocating ID block");
493 }
494 return null;
495 }
496
497 /**
498 * Returns a block of IDs which are unique and unused.
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700499 * The range of IDs is a fixed size and is allocated incrementally as this
500 * method is called. Since the range of IDs is managed by Zookeeper in
501 * distributed way, this method may block during Zookeeper access.
502 *
503 * @return an IdBlock containing a set of unique IDs
Ray Milkey269ffb92014-04-03 14:43:30 -0700504 */
505 @Override
506 public IdBlock allocateUniqueIdBlock() {
507 return allocateUniqueIdBlock(ID_BLOCK_SIZE);
508 }
509
510 /**
511 * Get a globally unique ID.
512 *
513 * @return a globally unique ID.
514 */
515 @Override
516 public synchronized long getNextUniqueId() {
517 //
518 // Generate the next Unique ID.
519 //
520 // TODO: For now, the higher 32 bits are random, and
521 // the lower 32 bits are sequential.
522 // The implementation must be updated to use the Zookeeper
523 // to allocate the higher 32 bits (globally unique).
524 //
525 if ((nextUniqueIdSuffix & 0xffffffffL) == 0xffffffffL) {
526 nextUniqueIdPrefix = randomGenerator.nextInt();
527 nextUniqueIdSuffix = 0;
528 } else {
529 nextUniqueIdSuffix++;
530 }
Pavlin Radoslavov952a9762014-04-10 13:47:03 -0700531 long result = nextUniqueIdPrefix << 32;
Ray Milkey269ffb92014-04-03 14:43:30 -0700532 result = result | (0xffffffffL & nextUniqueIdSuffix);
533 return result;
534 }
535
536 /*
537 * IFloodlightModule
538 */
539
540 @Override
541 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
542 Collection<Class<? extends IFloodlightService>> l =
Jonathan Hart3d7730a2013-02-22 11:51:17 -0800543 new ArrayList<Class<? extends IFloodlightService>>();
Ray Milkey269ffb92014-04-03 14:43:30 -0700544 l.add(IControllerRegistryService.class);
545 return l;
546 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700547
Ray Milkey269ffb92014-04-03 14:43:30 -0700548 @Override
549 public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
550 Map<Class<? extends IFloodlightService>, IFloodlightService> m =
551 new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
552 m.put(IControllerRegistryService.class, this);
553 return m;
554 }
Pavlin Radoslavov52163ed2014-03-19 11:39:34 -0700555
Ray Milkey269ffb92014-04-03 14:43:30 -0700556 @Override
557 public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
558 Collection<Class<? extends IFloodlightService>> l =
559 new ArrayList<Class<? extends IFloodlightService>>();
560 l.add(IFloodlightProviderService.class);
561 l.add(IRestApiService.class);
562 return l;
563 }
Jonathan Hartbd181b62013-02-17 16:05:38 -0800564
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700565 // TODO currently blocks startup when it can't get a Zookeeper connection.
566 // Do we support starting up with no Zookeeper connection?
Ray Milkey269ffb92014-04-03 14:43:30 -0700567 @Override
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700568 public void init(FloodlightModuleContext context)
569 throws FloodlightModuleException {
570 // Read the Zookeeper connection string from the config
Ray Milkey269ffb92014-04-03 14:43:30 -0700571 Map<String, String> configParams = context.getConfigParams(this);
Ray Milkey5df613b2014-04-15 10:50:56 -0700572 String connectionStringParam = configParams.get("connectionString");
573 if (connectionStringParam != null) {
574 connectionString = connectionStringParam;
Yuta HIGUCHI5f1ce1c2014-07-20 22:43:54 -0700575 } else {
576 connectionString = System.getProperty(
577 "net.onrc.onos.core.registry.ZookeeperRegistry.connectionString",
578 DEFAULT_CONNECTION_STRING);
Ray Milkey269ffb92014-04-03 14:43:30 -0700579 }
580 log.info("Setting Zookeeper connection string to {}", this.connectionString);
Jonathan Hart116b1fe2014-03-14 18:53:47 -0700581
Yuta HIGUCHI85de40d2014-06-12 14:06:41 -0700582 namespace = System.getProperty(ZK_NAMESPACE_KEY, DEFAULT_NAMESPACE).trim();
583 if (namespace.isEmpty()) {
584 namespace = DEFAULT_NAMESPACE;
585 }
586 log.info("Setting Zookeeper namespace to {}", namespace);
587
Ray Milkey269ffb92014-04-03 14:43:30 -0700588 restApi = context.getServiceImpl(IRestApiService.class);
Pavlin Radoslavovf1377ce2014-02-05 17:37:24 -0800589
Ray Milkey269ffb92014-04-03 14:43:30 -0700590 switches = new ConcurrentHashMap<String, SwitchLeadershipData>();
Ray Milkey269ffb92014-04-03 14:43:30 -0700591 switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>();
592
593 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700594 curatorFrameworkClient =
595 CuratorFrameworkFactory.newClient(this.connectionString,
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700596 SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
Ray Milkey269ffb92014-04-03 14:43:30 -0700597
Ray Milkey5df613b2014-04-15 10:50:56 -0700598 curatorFrameworkClient.start();
Yuta HIGUCHI85de40d2014-06-12 14:06:41 -0700599 curatorFrameworkClient = curatorFrameworkClient.usingNamespace(namespace);
Ray Milkey269ffb92014-04-03 14:43:30 -0700600
601 distributedIdCounter = new DistributedAtomicLong(
Ray Milkey5df613b2014-04-15 10:50:56 -0700602 curatorFrameworkClient,
Ray Milkey269ffb92014-04-03 14:43:30 -0700603 ID_COUNTER_PATH,
604 new RetryOneTime(100));
605
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700606 rootSwitchCache = new PathChildrenCache(
607 curatorFrameworkClient, SWITCH_LATCHES_PATH, true);
Ray Milkey5df613b2014-04-15 10:50:56 -0700608 rootSwitchCache.getListenable().addListener(switchPathCacheListener);
Ray Milkey269ffb92014-04-03 14:43:30 -0700609
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700610 // Build the service discovery object
Ray Milkey269ffb92014-04-03 14:43:30 -0700611 serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class)
Ray Milkey5df613b2014-04-15 10:50:56 -0700612 .client(curatorFrameworkClient).basePath(SERVICES_PATH).build();
Ray Milkey269ffb92014-04-03 14:43:30 -0700613
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700614 // We read the list of services very frequently (GUI periodically
615 // queries them) so we'll cache them to cut down on Zookeeper queries.
Ray Milkey269ffb92014-04-03 14:43:30 -0700616 serviceCache = serviceDiscovery.serviceCacheBuilder()
617 .name(CONTROLLER_SERVICE_NAME).build();
618
Ray Milkey269ffb92014-04-03 14:43:30 -0700619 try {
620 serviceDiscovery.start();
621 serviceCache.start();
622
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700623 // Don't prime the cache, we want a notification for each child
624 // node in the path
Ray Milkey5df613b2014-04-15 10:50:56 -0700625 rootSwitchCache.start(StartMode.NORMAL);
Ray Milkey269ffb92014-04-03 14:43:30 -0700626 } catch (Exception e) {
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700627 throw new FloodlightModuleException(
628 "Error initialising ZookeeperRegistry", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700629 }
630
Jonathan Hart12a26aa2014-06-04 14:33:09 -0700631 ExecutorService eventThreadExecutorService =
632 Executors.newSingleThreadExecutor();
Ray Milkey269ffb92014-04-03 14:43:30 -0700633 eventThreadExecutorService.execute(
634 new Runnable() {
635 @Override
636 public void run() {
637 dispatchEvents();
638 }
639 });
640 }
641
642 @Override
643 public void startUp(FloodlightModuleContext context) {
644 //
645 // Cluster Leader election setup.
646 // NOTE: We have to do it here, because during the init stage
647 // we don't know the Controller ID.
648 //
649 if (controllerId == null) {
650 log.error("Error on startup: unknown ControllerId");
651 }
Ray Milkey5df613b2014-04-15 10:50:56 -0700652 clusterLeaderLatch = new LeaderLatch(curatorFrameworkClient,
Ray Milkey269ffb92014-04-03 14:43:30 -0700653 CLUSTER_LEADER_PATH,
654 controllerId);
Pavlin Radoslavov0294e052014-04-10 13:36:45 -0700655 clusterLeaderListener = new ClusterLeaderListener();
Ray Milkey269ffb92014-04-03 14:43:30 -0700656 clusterLeaderLatch.addListener(clusterLeaderListener);
657 try {
658 clusterLeaderLatch.start();
659 } catch (Exception e) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700660 log.error("Error starting the cluster leader election: ", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700661 }
662
663 // Keep trying until there is a cluster leader
664 do {
665 try {
666 Participant leader = clusterLeaderLatch.getLeader();
Ray Milkeyb29e6262014-04-09 16:02:14 -0700667 if (!leader.getId().isEmpty()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700668 break;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700669 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700670 Thread.sleep(CLUSTER_LEADER_ELECTION_RETRY_MS);
671 } catch (Exception e) {
Jonathan Hart1dbcce62014-06-04 15:21:45 -0700672 log.error("Error waiting for cluster leader election:", e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700673 }
674 } while (true);
675
676 restApi.addRestletRoutable(new RegistryWebRoutable());
677 }
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800678}