blob: a276367516e37903695cdbfa7dec1d83238a544e [file] [log] [blame]
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.impl;
Jonathan Hart949c2842014-11-28 23:44:09 -080017
Jonathan Hart82b9fec2015-02-03 16:18:54 -080018import com.google.common.collect.Maps;
19import com.hazelcast.config.TopicConfig;
20import com.hazelcast.core.IAtomicLong;
Madan Jampani2af244a2015-02-22 13:12:01 -080021
Jonathan Hart949c2842014-11-28 23:44:09 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080028import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080029import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.Leadership;
31import org.onosproject.cluster.LeadershipEvent;
32import org.onosproject.cluster.LeadershipEventListener;
33import org.onosproject.cluster.LeadershipService;
34import org.onosproject.cluster.NodeId;
35import org.onosproject.event.AbstractListenerRegistry;
36import org.onosproject.event.EventDeliveryService;
Madan Jampani97cf7c42015-02-19 16:33:37 -080037import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
38import org.onosproject.store.cluster.messaging.ClusterMessage;
39import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
40import org.onosproject.store.cluster.messaging.MessageSubject;
Brian O'Connorabafb502014-12-02 22:26:20 -080041import org.onosproject.store.hz.StoreService;
42import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.serializers.KryoSerializer;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080044import org.slf4j.Logger;
45import org.slf4j.LoggerFactory;
Jonathan Hart949c2842014-11-28 23:44:09 -080046
Jonathan Hart82b9fec2015-02-03 16:18:54 -080047import java.util.HashMap;
48import java.util.Map;
49import java.util.concurrent.ExecutorService;
50import java.util.concurrent.Executors;
51import java.util.concurrent.Future;
52import java.util.concurrent.locks.Lock;
53
54import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080055import static org.onlab.util.Tools.groupedThreads;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080056
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080057/**
58 * Distributed implementation of LeadershipService that is based on Hazelcast.
59 * <p>
60 * The election is eventually-consistent: if there is Hazelcast partitioning,
61 * and the partitioning is healed, there could be a short window of time
62 * until the leaders in each partition discover each other. If this happens,
63 * the leaders release the leadership and run again for election.
64 * </p>
65 * <p>
66 * The leader election is based on Hazelcast's Global Lock, which is stongly
67 * consistent. In addition, each leader periodically advertises events
68 * (using a Hazelcast Topic) that it is the elected leader. Those events are
69 * used for two purposes: (1) Discover multi-leader collisions (in case of
70 * healed Hazelcast partitions), and (2) Inform all listeners who is
71 * the current leader (e.g., for informational purpose).
72 * </p>
73 */
Jonathan Hart949c2842014-11-28 23:44:09 -080074@Component(immediate = true)
75@Service
Madan Jampani97cf7c42015-02-19 16:33:37 -080076public class HazelcastLeadershipService implements LeadershipService {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080077 private static final Logger log =
Jonathan Hart949c2842014-11-28 23:44:09 -080078 LoggerFactory.getLogger(HazelcastLeadershipService.class);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080079
80 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
81 @Override
82 protected void setupKryoPool() {
83 serializerPool = KryoNamespace.newBuilder()
84 .register(KryoNamespaces.API)
85 .build()
86 .populate(1);
87 }
88 };
89
90 private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
91 private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -080092 private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080093
Jonathan Hart82b9fec2015-02-03 16:18:54 -080094 // indicates there is no term value yet
95 private static final long NO_TERM = 0;
96
Jonathan Hart949c2842014-11-28 23:44:09 -080097 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani97cf7c42015-02-19 16:33:37 -080098 protected ClusterCommunicationService clusterCommunicator;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart949c2842014-11-28 23:44:09 -0800101 protected ClusterService clusterService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected StoreService storeService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected EventDeliveryService eventDispatcher;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800108
109 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
110 listenerRegistry;
111 private final Map<String, Topic> topics = Maps.newConcurrentMap();
Madan Jampani8d21c792014-12-01 16:31:07 -0800112 private NodeId localNodeId;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800113
Madan Jampani97cf7c42015-02-19 16:33:37 -0800114 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
115 new MessageSubject("hz-leadership-events");
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800116
Madan Jampani2af244a2015-02-22 13:12:01 -0800117 private ExecutorService messageHandlingExecutor;
118
Jonathan Hart949c2842014-11-28 23:44:09 -0800119 @Activate
120 protected void activate() {
Madan Jampani8d21c792014-12-01 16:31:07 -0800121 localNodeId = clusterService.getLocalNode().id();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800122 listenerRegistry = new AbstractListenerRegistry<>();
123 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
124
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800125 TopicConfig topicConfig = new TopicConfig();
126 topicConfig.setGlobalOrderingEnabled(true);
127 topicConfig.setName(TOPIC_HZ_ID);
128 storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
Madan Jampani97cf7c42015-02-19 16:33:37 -0800129
Madan Jampani2af244a2015-02-22 13:12:01 -0800130 messageHandlingExecutor = Executors.newSingleThreadExecutor(
131 groupedThreads("onos/store/leadership", "message-handler"));
132
133 clusterCommunicator.addSubscriber(
134 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
135 new InternalLeadershipEventListener(),
136 messageHandlingExecutor);
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800137
Jonathan Hart949c2842014-11-28 23:44:09 -0800138 log.info("Hazelcast Leadership Service started");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800139 }
140
Jonathan Hart949c2842014-11-28 23:44:09 -0800141 @Deactivate
142 protected void deactivate() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800143 eventDispatcher.removeSink(LeadershipEvent.class);
Madan Jampani2af244a2015-02-22 13:12:01 -0800144 messageHandlingExecutor.shutdown();
Madan Jampani97cf7c42015-02-19 16:33:37 -0800145 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800146
147 for (Topic topic : topics.values()) {
148 topic.stop();
149 }
150 topics.clear();
151
Jonathan Hart949c2842014-11-28 23:44:09 -0800152 log.info("Hazelcast Leadership Service stopped");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800153 }
154
155 @Override
Madan Jampani8d21c792014-12-01 16:31:07 -0800156 public NodeId getLeader(String path) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800157 Topic topic = topics.get(path);
158 if (topic == null) {
159 return null;
160 }
161 return topic.leader();
162 }
163
164 @Override
165 public void runForLeadership(String path) {
166 checkArgument(path != null);
167 Topic topic = new Topic(path);
168 Topic oldTopic = topics.putIfAbsent(path, topic);
169 if (oldTopic == null) {
170 topic.start();
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800171 topic.runForLeadership();
172 } else {
173 oldTopic.runForLeadership();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800174 }
175 }
176
177 @Override
178 public void withdraw(String path) {
179 checkArgument(path != null);
180 Topic topic = topics.get(path);
181 if (topic != null) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800182 topics.remove(path, topic);
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900183 topic.stop();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800184 }
185 }
186
187 @Override
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800188 public Map<String, Leadership> getLeaderBoard() {
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800189 Map<String, Leadership> result = new HashMap<>();
190
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800191 for (Topic topic : topics.values()) {
192 Leadership leadership = new Leadership(topic.topicName(),
193 topic.leader(),
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800194 topic.term());
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800195 result.put(topic.topicName(), leadership);
196 }
197 return result;
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800198 }
199
200 @Override
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800201 public void addListener(LeadershipEventListener listener) {
202 listenerRegistry.addListener(listener);
203 }
204
205 @Override
206 public void removeListener(LeadershipEventListener listener) {
207 listenerRegistry.removeListener(listener);
208 }
209
210 /**
211 * Class for keeping per-topic information.
212 */
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800213 private final class Topic {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800214 private final String topicName;
215 private volatile boolean isShutdown = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800216 private volatile boolean isRunningForLeadership = false;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800217 private volatile long lastLeadershipUpdateMs = 0;
218 private ExecutorService leaderElectionExecutor;
219
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800220 private volatile IAtomicLong term;
221 // This is local state, recording the term number for the last time
222 // this instance was leader for this topic. The current term could be
223 // higher if the mastership has changed any times.
224 private long myLastLeaderTerm = NO_TERM;
225
Madan Jampani8d21c792014-12-01 16:31:07 -0800226 private NodeId leader;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800227 private Lock leaderLock;
228 private Future<?> getLockFuture;
229 private Future<?> periodicProcessingFuture;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800230
231 /**
232 * Constructor.
233 *
234 * @param topicName the topic name
235 */
236 private Topic(String topicName) {
237 this.topicName = topicName;
238 }
239
240 /**
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800241 * Gets the topic name.
242 *
243 * @return the topic name
244 */
245 private String topicName() {
246 return topicName;
247 }
248
249 /**
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800250 * Gets the leader for the topic.
251 *
252 * @return the leader for the topic
253 */
Madan Jampani8d21c792014-12-01 16:31:07 -0800254 private NodeId leader() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800255 return leader;
256 }
257
258 /**
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800259 * Gets the current term for the topic.
260 *
261 * @return the term for the topic
262 */
263 private long term() {
264 if (term == null) {
265 return NO_TERM;
266 }
267 return term.get();
268 }
269
270 /**
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800271 * Starts operation.
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800272 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900273 private synchronized void start() {
274 if (!isShutdown) {
275 // already running
276 return;
277 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800278 isShutdown = false;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800279 String threadPoolName = "election-" + topicName + "-%d";
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800280 leaderElectionExecutor = Executors.newScheduledThreadPool(2,
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800281 groupedThreads("onos/leadership", threadPoolName));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800282
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800283 periodicProcessingFuture =
284 leaderElectionExecutor.submit(new Runnable() {
285 @Override
286 public void run() {
287 doPeriodicProcessing();
288 }
289 });
290 }
291
292 /**
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800293 * Runs for leadership.
294 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900295 private synchronized void runForLeadership() {
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800296 if (isRunningForLeadership) {
297 return; // Nothing to do: already running
298 }
299 if (isShutdown) {
300 start();
301 }
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900302 isRunningForLeadership = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800303 String lockHzId = "LeadershipService/" + topicName + "/lock";
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800304 String termHzId = "LeadershipService/" + topicName + "/term";
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800305 leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800306 term = storeService.getHazelcastInstance().getAtomicLong(termHzId);
307
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800308 getLockFuture = leaderElectionExecutor.submit(new Runnable() {
309 @Override
310 public void run() {
311 doLeaderElectionThread();
312 }
313 });
314 }
315
316 /**
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800317 * Stops leadership election for the topic.
318 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900319 private synchronized void stop() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800320 isShutdown = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800321 isRunningForLeadership = false;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800322 // getLockFuture.cancel(true);
323 // periodicProcessingFuture.cancel(true);
324 leaderElectionExecutor.shutdownNow();
325 }
326
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800327 /**
328 * Received a Leadership Event.
329 *
330 * @param leadershipEvent the received Leadership Event
331 */
332 private void receivedLeadershipEvent(LeadershipEvent leadershipEvent) {
Madan Jampani8d21c792014-12-01 16:31:07 -0800333 NodeId eventLeaderId = leadershipEvent.subject().leader();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800334 if (!leadershipEvent.subject().topic().equals(topicName)) {
335 return; // Not our topic: ignore
336 }
Madan Jampani8d21c792014-12-01 16:31:07 -0800337 if (eventLeaderId.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800338 return; // My own message: ignore
339 }
340
341 synchronized (this) {
342 switch (leadershipEvent.type()) {
343 case LEADER_ELECTED:
344 // FALLTHROUGH
345 case LEADER_REELECTED:
346 //
347 // Another leader: if we are also a leader, then give up
348 // leadership and run for re-election.
349 //
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800350 if ((leader != null) && leader.equals(localNodeId)) {
351 if (getLockFuture != null) {
352 getLockFuture.cancel(true);
353 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800354 } else {
355 // Just update the current leader
356 leader = leadershipEvent.subject().leader();
357 lastLeadershipUpdateMs = System.currentTimeMillis();
358 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800359 break;
360 case LEADER_BOOTED:
361 // Remove the state for the current leader
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800362 if ((leader != null) && eventLeaderId.equals(leader)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800363 leader = null;
364 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800365 break;
366 default:
367 break;
368 }
369 }
370 }
371
372 private void doPeriodicProcessing() {
373
374 while (!isShutdown) {
375
376 //
377 // Periodic tasks:
378 // (a) Advertise ourselves as the leader
379 // OR
380 // (b) Expire a stale (remote) leader
381 //
382 synchronized (this) {
383 LeadershipEvent leadershipEvent;
384 if (leader != null) {
Madan Jampani8d21c792014-12-01 16:31:07 -0800385 if (leader.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800386 //
387 // Advertise ourselves as the leader
388 //
389 leadershipEvent = new LeadershipEvent(
390 LeadershipEvent.Type.LEADER_REELECTED,
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800391 new Leadership(topicName, localNodeId, myLastLeaderTerm));
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800392 // Dispatch to all instances
Madan Jampani97cf7c42015-02-19 16:33:37 -0800393
394 clusterCommunicator.broadcastIncludeSelf(
395 new ClusterMessage(
396 clusterService.getLocalNode().id(),
397 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
398 SERIALIZER.encode(leadershipEvent)));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800399 } else {
400 //
401 // Test if time to expire a stale leader
402 //
403 long delta = System.currentTimeMillis() -
404 lastLeadershipUpdateMs;
405 if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
406 leadershipEvent = new LeadershipEvent(
407 LeadershipEvent.Type.LEADER_BOOTED,
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800408 new Leadership(topicName, leader, myLastLeaderTerm));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800409 // Dispatch only to the local listener(s)
410 eventDispatcher.post(leadershipEvent);
411 leader = null;
412 }
413 }
414 }
415 }
416
417 // Sleep before re-advertising
418 try {
419 Thread.sleep(LEADERSHIP_PERIODIC_INTERVAL_MS);
420 } catch (InterruptedException e) {
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800421 log.debug("Leader Election periodic thread interrupted");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800422 }
423 }
424 }
425
426 /**
427 * Performs the leader election by using Hazelcast.
428 */
429 private void doLeaderElectionThread() {
430
431 while (!isShutdown) {
432 LeadershipEvent leadershipEvent;
433 //
434 // Try to acquire the lock and keep it until the instance is
435 // shutdown.
436 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800437 log.debug("Leader Election begin for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800438 topicName);
439 try {
440 // Block until it becomes the leader
441 leaderLock.lockInterruptibly();
442 } catch (InterruptedException e) {
443 //
444 // Thread interrupted. Either shutdown or run for
445 // re-election.
446 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800447 log.debug("Election interrupted for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800448 topicName);
449 continue;
450 }
451
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800452 try {
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900453 synchronized (this) {
454 //
455 // This instance is now the leader
456 //
457 log.info("Leader Elected for topic {}", topicName);
458
459 updateTerm();
460
461 leader = localNodeId;
462 leadershipEvent = new LeadershipEvent(
463 LeadershipEvent.Type.LEADER_ELECTED,
464 new Leadership(topicName, localNodeId, myLastLeaderTerm));
Madan Jampani97cf7c42015-02-19 16:33:37 -0800465 clusterCommunicator.broadcastIncludeSelf(
466 new ClusterMessage(
467 clusterService.getLocalNode().id(),
468 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
469 SERIALIZER.encode(leadershipEvent)));
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900470 }
471
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800472 // Sleep forever until interrupted
473 Thread.sleep(Long.MAX_VALUE);
474 } catch (InterruptedException e) {
475 //
476 // Thread interrupted. Either shutdown or run for
477 // re-election.
478 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800479 log.debug("Leader Interrupted for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800480 topicName);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800481
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900482 } finally {
483 synchronized (this) {
484 // If we reach here, we should release the leadership
485 log.debug("Leader Lock Released for topic {}", topicName);
486 if ((leader != null) &&
487 leader.equals(localNodeId)) {
488 leader = null;
489 }
490 leadershipEvent = new LeadershipEvent(
491 LeadershipEvent.Type.LEADER_BOOTED,
492 new Leadership(topicName, localNodeId, myLastLeaderTerm));
Madan Jampani97cf7c42015-02-19 16:33:37 -0800493 clusterCommunicator.broadcastIncludeSelf(
494 new ClusterMessage(
495 clusterService.getLocalNode().id(),
496 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
497 SERIALIZER.encode(leadershipEvent)));
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900498 leaderLock.unlock();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800499 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800500 }
501 }
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900502 isRunningForLeadership = false;
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800503 }
504
505 // Globally guarded by the leadership lock for this term
506 // Locally guarded by synchronized (this)
507 private void updateTerm() {
508 long oldTerm = term.get();
509 long newTerm = term.incrementAndGet();
510 myLastLeaderTerm = newTerm;
511 log.debug("Topic {} updated term from {} to {}", topicName,
512 oldTerm, newTerm);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800513 }
514 }
Madan Jampani97cf7c42015-02-19 16:33:37 -0800515
516 private class InternalLeadershipEventListener implements ClusterMessageHandler {
517
518 @Override
519 public void handle(ClusterMessage message) {
520 LeadershipEvent leadershipEvent =
521 SERIALIZER.decode(message.payload());
522
523 log.trace("Leadership Event: time = {} type = {} event = {}",
524 leadershipEvent.time(), leadershipEvent.type(),
525 leadershipEvent);
526 //
527 // If there is no entry for the topic, then create a new one to
528 // keep track of the leadership, but don't run for leadership itself.
529 //
530 String topicName = leadershipEvent.subject().topic();
531 Topic topic = topics.get(topicName);
532 if (topic == null) {
533 topic = new Topic(topicName);
534 Topic oldTopic = topics.putIfAbsent(topicName, topic);
535 if (oldTopic == null) {
536 // encountered new topic, start periodic processing
537 topic.start();
538 } else {
539 topic = oldTopic;
540 }
541 }
542 topic.receivedLeadershipEvent(leadershipEvent);
543 eventDispatcher.post(leadershipEvent);
544 }
545 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800546}