blob: b3cd370afb71996139b96fdf60bde3bee648f799 [file] [log] [blame]
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart949c2842014-11-28 23:44:09 -080016package org.onlab.onos.store.cluster.impl;
17
18import static com.google.common.base.Preconditions.checkArgument;
19import static org.onlab.util.Tools.namedThreads;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080020
Pavlin Radoslavov187dff62014-12-02 07:21:35 -080021import java.util.HashMap;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080022import java.util.Map;
23import java.util.concurrent.ExecutorService;
24import java.util.concurrent.Executors;
25import java.util.concurrent.Future;
26import java.util.concurrent.locks.Lock;
27
Jonathan Hart949c2842014-11-28 23:44:09 -080028import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080034import org.onlab.onos.cluster.ClusterService;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080035import org.onlab.onos.cluster.Leadership;
36import org.onlab.onos.cluster.LeadershipEvent;
37import org.onlab.onos.cluster.LeadershipEventListener;
38import org.onlab.onos.cluster.LeadershipService;
39import org.onlab.onos.cluster.NodeId;
40import org.onlab.onos.event.AbstractListenerRegistry;
41import org.onlab.onos.event.EventDeliveryService;
42import org.onlab.onos.store.hz.StoreService;
43import org.onlab.onos.store.serializers.KryoNamespaces;
44import org.onlab.onos.store.serializers.KryoSerializer;
45import org.onlab.util.KryoNamespace;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080046import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
Jonathan Hart949c2842014-11-28 23:44:09 -080048
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080049import com.google.common.collect.Maps;
50import com.hazelcast.config.TopicConfig;
51import com.hazelcast.core.ITopic;
52import com.hazelcast.core.Message;
53import com.hazelcast.core.MessageListener;
54
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080055/**
56 * Distributed implementation of LeadershipService that is based on Hazelcast.
57 * <p>
58 * The election is eventually-consistent: if there is Hazelcast partitioning,
59 * and the partitioning is healed, there could be a short window of time
60 * until the leaders in each partition discover each other. If this happens,
61 * the leaders release the leadership and run again for election.
62 * </p>
63 * <p>
64 * The leader election is based on Hazelcast's Global Lock, which is stongly
65 * consistent. In addition, each leader periodically advertises events
66 * (using a Hazelcast Topic) that it is the elected leader. Those events are
67 * used for two purposes: (1) Discover multi-leader collisions (in case of
68 * healed Hazelcast partitions), and (2) Inform all listeners who is
69 * the current leader (e.g., for informational purpose).
70 * </p>
71 */
Jonathan Hart949c2842014-11-28 23:44:09 -080072@Component(immediate = true)
73@Service
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -080074public class HazelcastLeadershipService implements LeadershipService,
75 MessageListener<byte[]> {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080076 private static final Logger log =
Jonathan Hart949c2842014-11-28 23:44:09 -080077 LoggerFactory.getLogger(HazelcastLeadershipService.class);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080078
79 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
80 @Override
81 protected void setupKryoPool() {
82 serializerPool = KryoNamespace.newBuilder()
83 .register(KryoNamespaces.API)
84 .build()
85 .populate(1);
86 }
87 };
88
89 private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
90 private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -080091 private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080092
Jonathan Hart949c2842014-11-28 23:44:09 -080093 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected ClusterService clusterService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected StoreService storeService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected EventDeliveryService eventDispatcher;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800101
102 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
103 listenerRegistry;
104 private final Map<String, Topic> topics = Maps.newConcurrentMap();
Madan Jampani8d21c792014-12-01 16:31:07 -0800105 private NodeId localNodeId;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800106
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800107 private ITopic<byte[]> leaderTopic;
108 private String leaderTopicRegistrationId;
109
Jonathan Hart949c2842014-11-28 23:44:09 -0800110 @Activate
111 protected void activate() {
Madan Jampani8d21c792014-12-01 16:31:07 -0800112 localNodeId = clusterService.getLocalNode().id();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800113 listenerRegistry = new AbstractListenerRegistry<>();
114 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
115
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800116 TopicConfig topicConfig = new TopicConfig();
117 topicConfig.setGlobalOrderingEnabled(true);
118 topicConfig.setName(TOPIC_HZ_ID);
119 storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
120 leaderTopic = storeService.getHazelcastInstance().getTopic(TOPIC_HZ_ID);
121 leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
122
Jonathan Hart949c2842014-11-28 23:44:09 -0800123 log.info("Hazelcast Leadership Service started");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800124 }
125
Jonathan Hart949c2842014-11-28 23:44:09 -0800126 @Deactivate
127 protected void deactivate() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800128 eventDispatcher.removeSink(LeadershipEvent.class);
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800129 leaderTopic.removeMessageListener(leaderTopicRegistrationId);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800130
131 for (Topic topic : topics.values()) {
132 topic.stop();
133 }
134 topics.clear();
135
Jonathan Hart949c2842014-11-28 23:44:09 -0800136 log.info("Hazelcast Leadership Service stopped");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800137 }
138
139 @Override
Madan Jampani8d21c792014-12-01 16:31:07 -0800140 public NodeId getLeader(String path) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800141 Topic topic = topics.get(path);
142 if (topic == null) {
143 return null;
144 }
145 return topic.leader();
146 }
147
148 @Override
149 public void runForLeadership(String path) {
150 checkArgument(path != null);
151 Topic topic = new Topic(path);
152 Topic oldTopic = topics.putIfAbsent(path, topic);
153 if (oldTopic == null) {
154 topic.start();
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800155 topic.runForLeadership();
156 } else {
157 oldTopic.runForLeadership();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800158 }
159 }
160
161 @Override
162 public void withdraw(String path) {
163 checkArgument(path != null);
164 Topic topic = topics.get(path);
165 if (topic != null) {
166 topic.stop();
167 topics.remove(path, topic);
168 }
169 }
170
171 @Override
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800172 public Map<String, Leadership> getLeaderBoard() {
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800173 Map<String, Leadership> result = new HashMap<>();
174
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800175 for (Topic topic : topics.values()) {
176 Leadership leadership = new Leadership(topic.topicName(),
177 topic.leader(),
178 0L); // TODO: epoch not used
179 result.put(topic.topicName(), leadership);
180 }
181 return result;
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800182 }
183
184 @Override
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800185 public void addListener(LeadershipEventListener listener) {
186 listenerRegistry.addListener(listener);
187 }
188
189 @Override
190 public void removeListener(LeadershipEventListener listener) {
191 listenerRegistry.removeListener(listener);
192 }
193
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800194 @Override
195 public void onMessage(Message<byte[]> message) {
196 LeadershipEvent leadershipEvent =
197 SERIALIZER.decode(message.getMessageObject());
198
199 log.debug("Leadership Event: time = {} type = {} event = {}",
200 leadershipEvent.time(), leadershipEvent.type(),
201 leadershipEvent);
202
203 //
204 // If there is no entry for the topic, then create a new one to
205 // keep track of the leadership, but don't run for leadership itself.
206 //
207 String topicName = leadershipEvent.subject().topic();
208 Topic topic = topics.get(topicName);
209 if (topic == null) {
210 topic = new Topic(topicName);
211 Topic oldTopic = topics.putIfAbsent(topicName, topic);
212 if (oldTopic == null) {
213 topic.start();
214 } else {
215 topic = oldTopic;
216 }
217 }
218 topic.receivedLeadershipEvent(leadershipEvent);
219 eventDispatcher.post(leadershipEvent);
220 }
221
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800222 /**
223 * Class for keeping per-topic information.
224 */
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800225 private final class Topic {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800226 private final String topicName;
227 private volatile boolean isShutdown = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800228 private volatile boolean isRunningForLeadership = false;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800229 private volatile long lastLeadershipUpdateMs = 0;
230 private ExecutorService leaderElectionExecutor;
231
Madan Jampani8d21c792014-12-01 16:31:07 -0800232 private NodeId leader;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800233 private Lock leaderLock;
234 private Future<?> getLockFuture;
235 private Future<?> periodicProcessingFuture;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800236
237 /**
238 * Constructor.
239 *
240 * @param topicName the topic name
241 */
242 private Topic(String topicName) {
243 this.topicName = topicName;
244 }
245
246 /**
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800247 * Gets the topic name.
248 *
249 * @return the topic name
250 */
251 private String topicName() {
252 return topicName;
253 }
254
255 /**
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800256 * Gets the leader for the topic.
257 *
258 * @return the leader for the topic
259 */
Madan Jampani8d21c792014-12-01 16:31:07 -0800260 private NodeId leader() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800261 return leader;
262 }
263
264 /**
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800265 * Starts operation.
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800266 */
267 private void start() {
268 isShutdown = false;
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800269 String threadPoolName = "leader-election-" + topicName + "-%d";
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800270 leaderElectionExecutor = Executors.newScheduledThreadPool(2,
271 namedThreads(threadPoolName));
272
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800273 periodicProcessingFuture =
274 leaderElectionExecutor.submit(new Runnable() {
275 @Override
276 public void run() {
277 doPeriodicProcessing();
278 }
279 });
280 }
281
282 /**
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800283 * Runs for leadership.
284 */
285 private void runForLeadership() {
286 if (isRunningForLeadership) {
287 return; // Nothing to do: already running
288 }
289 if (isShutdown) {
290 start();
291 }
292 String lockHzId = "LeadershipService/" + topicName + "/lock";
293 leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
294 getLockFuture = leaderElectionExecutor.submit(new Runnable() {
295 @Override
296 public void run() {
297 doLeaderElectionThread();
298 }
299 });
300 }
301
302 /**
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800303 * Stops leadership election for the topic.
304 */
305 private void stop() {
306 isShutdown = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800307 isRunningForLeadership = false;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800308 // getLockFuture.cancel(true);
309 // periodicProcessingFuture.cancel(true);
310 leaderElectionExecutor.shutdownNow();
311 }
312
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800313 /**
314 * Received a Leadership Event.
315 *
316 * @param leadershipEvent the received Leadership Event
317 */
318 private void receivedLeadershipEvent(LeadershipEvent leadershipEvent) {
Madan Jampani8d21c792014-12-01 16:31:07 -0800319 NodeId eventLeaderId = leadershipEvent.subject().leader();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800320 if (!leadershipEvent.subject().topic().equals(topicName)) {
321 return; // Not our topic: ignore
322 }
Madan Jampani8d21c792014-12-01 16:31:07 -0800323 if (eventLeaderId.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800324 return; // My own message: ignore
325 }
326
327 synchronized (this) {
328 switch (leadershipEvent.type()) {
329 case LEADER_ELECTED:
330 // FALLTHROUGH
331 case LEADER_REELECTED:
332 //
333 // Another leader: if we are also a leader, then give up
334 // leadership and run for re-election.
335 //
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800336 if ((leader != null) && leader.equals(localNodeId)) {
337 if (getLockFuture != null) {
338 getLockFuture.cancel(true);
339 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800340 } else {
341 // Just update the current leader
342 leader = leadershipEvent.subject().leader();
343 lastLeadershipUpdateMs = System.currentTimeMillis();
344 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800345 break;
346 case LEADER_BOOTED:
347 // Remove the state for the current leader
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800348 if ((leader != null) && eventLeaderId.equals(leader)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800349 leader = null;
350 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800351 break;
352 default:
353 break;
354 }
355 }
356 }
357
358 private void doPeriodicProcessing() {
359
360 while (!isShutdown) {
361
362 //
363 // Periodic tasks:
364 // (a) Advertise ourselves as the leader
365 // OR
366 // (b) Expire a stale (remote) leader
367 //
368 synchronized (this) {
369 LeadershipEvent leadershipEvent;
370 if (leader != null) {
Madan Jampani8d21c792014-12-01 16:31:07 -0800371 if (leader.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800372 //
373 // Advertise ourselves as the leader
374 //
375 leadershipEvent = new LeadershipEvent(
376 LeadershipEvent.Type.LEADER_REELECTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800377 new Leadership(topicName, localNodeId, 0));
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800378 // Dispatch to all instances
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800379 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
380 } else {
381 //
382 // Test if time to expire a stale leader
383 //
384 long delta = System.currentTimeMillis() -
385 lastLeadershipUpdateMs;
386 if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
387 leadershipEvent = new LeadershipEvent(
388 LeadershipEvent.Type.LEADER_BOOTED,
389 new Leadership(topicName, leader, 0));
390 // Dispatch only to the local listener(s)
391 eventDispatcher.post(leadershipEvent);
392 leader = null;
393 }
394 }
395 }
396 }
397
398 // Sleep before re-advertising
399 try {
400 Thread.sleep(LEADERSHIP_PERIODIC_INTERVAL_MS);
401 } catch (InterruptedException e) {
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800402 log.debug("Leader Election periodic thread interrupted");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800403 }
404 }
405 }
406
407 /**
408 * Performs the leader election by using Hazelcast.
409 */
410 private void doLeaderElectionThread() {
411
412 while (!isShutdown) {
413 LeadershipEvent leadershipEvent;
414 //
415 // Try to acquire the lock and keep it until the instance is
416 // shutdown.
417 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800418 log.debug("Leader Election begin for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800419 topicName);
420 try {
421 // Block until it becomes the leader
422 leaderLock.lockInterruptibly();
423 } catch (InterruptedException e) {
424 //
425 // Thread interrupted. Either shutdown or run for
426 // re-election.
427 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800428 log.debug("Election interrupted for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800429 topicName);
430 continue;
431 }
432
433 synchronized (this) {
434 //
435 // This instance is now the leader
436 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800437 log.info("Leader Elected for topic {}", topicName);
Madan Jampani8d21c792014-12-01 16:31:07 -0800438 leader = localNodeId;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800439 leadershipEvent = new LeadershipEvent(
440 LeadershipEvent.Type.LEADER_ELECTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800441 new Leadership(topicName, localNodeId, 0));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800442 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
443 }
444
445 try {
446 // Sleep forever until interrupted
447 Thread.sleep(Long.MAX_VALUE);
448 } catch (InterruptedException e) {
449 //
450 // Thread interrupted. Either shutdown or run for
451 // re-election.
452 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800453 log.debug("Leader Interrupted for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800454 topicName);
455 }
456
457 synchronized (this) {
458 // If we reach here, we should release the leadership
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800459 log.debug("Leader Lock Released for topic {}", topicName);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800460 if ((leader != null) &&
Madan Jampani8d21c792014-12-01 16:31:07 -0800461 leader.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800462 leader = null;
463 }
464 leadershipEvent = new LeadershipEvent(
465 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800466 new Leadership(topicName, localNodeId, 0));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800467 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
468 leaderLock.unlock();
469 }
470 }
471 }
472 }
473}