blob: df746f71451ad64af648a0ca554947cfd88dd881 [file] [log] [blame]
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.impl;
Jonathan Hart949c2842014-11-28 23:44:09 -080017
Jonathan Hart82b9fec2015-02-03 16:18:54 -080018import com.google.common.collect.Maps;
19import com.hazelcast.config.TopicConfig;
20import com.hazelcast.core.IAtomicLong;
Jonathan Hart949c2842014-11-28 23:44:09 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080027import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080028import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.Leadership;
30import org.onosproject.cluster.LeadershipEvent;
31import org.onosproject.cluster.LeadershipEventListener;
32import org.onosproject.cluster.LeadershipService;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.event.AbstractListenerRegistry;
35import org.onosproject.event.EventDeliveryService;
Madan Jampani97cf7c42015-02-19 16:33:37 -080036import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
37import org.onosproject.store.cluster.messaging.ClusterMessage;
38import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
39import org.onosproject.store.cluster.messaging.MessageSubject;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.store.hz.StoreService;
41import org.onosproject.store.serializers.KryoNamespaces;
42import org.onosproject.store.serializers.KryoSerializer;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080043import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
Jonathan Hart949c2842014-11-28 23:44:09 -080045
Jonathan Hart82b9fec2015-02-03 16:18:54 -080046import java.util.HashMap;
47import java.util.Map;
48import java.util.concurrent.ExecutorService;
49import java.util.concurrent.Executors;
50import java.util.concurrent.Future;
51import java.util.concurrent.locks.Lock;
52
53import static com.google.common.base.Preconditions.checkArgument;
54import static org.onlab.util.Tools.namedThreads;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080055
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080056/**
57 * Distributed implementation of LeadershipService that is based on Hazelcast.
58 * <p>
59 * The election is eventually-consistent: if there is Hazelcast partitioning,
60 * and the partitioning is healed, there could be a short window of time
61 * until the leaders in each partition discover each other. If this happens,
62 * the leaders release the leadership and run again for election.
63 * </p>
64 * <p>
65 * The leader election is based on Hazelcast's Global Lock, which is stongly
66 * consistent. In addition, each leader periodically advertises events
67 * (using a Hazelcast Topic) that it is the elected leader. Those events are
68 * used for two purposes: (1) Discover multi-leader collisions (in case of
69 * healed Hazelcast partitions), and (2) Inform all listeners who is
70 * the current leader (e.g., for informational purpose).
71 * </p>
72 */
Jonathan Hart949c2842014-11-28 23:44:09 -080073@Component(immediate = true)
74@Service
Madan Jampani97cf7c42015-02-19 16:33:37 -080075public class HazelcastLeadershipService implements LeadershipService {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080076 private static final Logger log =
Jonathan Hart949c2842014-11-28 23:44:09 -080077 LoggerFactory.getLogger(HazelcastLeadershipService.class);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080078
79 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
80 @Override
81 protected void setupKryoPool() {
82 serializerPool = KryoNamespace.newBuilder()
83 .register(KryoNamespaces.API)
84 .build()
85 .populate(1);
86 }
87 };
88
89 private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
90 private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -080091 private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080092
Jonathan Hart82b9fec2015-02-03 16:18:54 -080093 // indicates there is no term value yet
94 private static final long NO_TERM = 0;
95
Jonathan Hart949c2842014-11-28 23:44:09 -080096 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani97cf7c42015-02-19 16:33:37 -080097 protected ClusterCommunicationService clusterCommunicator;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart949c2842014-11-28 23:44:09 -0800100 protected ClusterService clusterService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected StoreService storeService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected EventDeliveryService eventDispatcher;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800107
108 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
109 listenerRegistry;
110 private final Map<String, Topic> topics = Maps.newConcurrentMap();
Madan Jampani8d21c792014-12-01 16:31:07 -0800111 private NodeId localNodeId;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800112
Madan Jampani97cf7c42015-02-19 16:33:37 -0800113 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
114 new MessageSubject("hz-leadership-events");
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800115
Jonathan Hart949c2842014-11-28 23:44:09 -0800116 @Activate
117 protected void activate() {
Madan Jampani8d21c792014-12-01 16:31:07 -0800118 localNodeId = clusterService.getLocalNode().id();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800119 listenerRegistry = new AbstractListenerRegistry<>();
120 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
121
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800122 TopicConfig topicConfig = new TopicConfig();
123 topicConfig.setGlobalOrderingEnabled(true);
124 topicConfig.setName(TOPIC_HZ_ID);
125 storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
Madan Jampani97cf7c42015-02-19 16:33:37 -0800126
127 clusterCommunicator.addSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT, new InternalLeadershipEventListener());
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800128
Jonathan Hart949c2842014-11-28 23:44:09 -0800129 log.info("Hazelcast Leadership Service started");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800130 }
131
Jonathan Hart949c2842014-11-28 23:44:09 -0800132 @Deactivate
133 protected void deactivate() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800134 eventDispatcher.removeSink(LeadershipEvent.class);
Madan Jampani97cf7c42015-02-19 16:33:37 -0800135 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800136
137 for (Topic topic : topics.values()) {
138 topic.stop();
139 }
140 topics.clear();
141
Jonathan Hart949c2842014-11-28 23:44:09 -0800142 log.info("Hazelcast Leadership Service stopped");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800143 }
144
145 @Override
Madan Jampani8d21c792014-12-01 16:31:07 -0800146 public NodeId getLeader(String path) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800147 Topic topic = topics.get(path);
148 if (topic == null) {
149 return null;
150 }
151 return topic.leader();
152 }
153
154 @Override
155 public void runForLeadership(String path) {
156 checkArgument(path != null);
157 Topic topic = new Topic(path);
158 Topic oldTopic = topics.putIfAbsent(path, topic);
159 if (oldTopic == null) {
160 topic.start();
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800161 topic.runForLeadership();
162 } else {
163 oldTopic.runForLeadership();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800164 }
165 }
166
167 @Override
168 public void withdraw(String path) {
169 checkArgument(path != null);
170 Topic topic = topics.get(path);
171 if (topic != null) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800172 topics.remove(path, topic);
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900173 topic.stop();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800174 }
175 }
176
177 @Override
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800178 public Map<String, Leadership> getLeaderBoard() {
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800179 Map<String, Leadership> result = new HashMap<>();
180
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800181 for (Topic topic : topics.values()) {
182 Leadership leadership = new Leadership(topic.topicName(),
183 topic.leader(),
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800184 topic.term());
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800185 result.put(topic.topicName(), leadership);
186 }
187 return result;
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800188 }
189
190 @Override
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800191 public void addListener(LeadershipEventListener listener) {
192 listenerRegistry.addListener(listener);
193 }
194
195 @Override
196 public void removeListener(LeadershipEventListener listener) {
197 listenerRegistry.removeListener(listener);
198 }
199
200 /**
201 * Class for keeping per-topic information.
202 */
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800203 private final class Topic {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800204 private final String topicName;
205 private volatile boolean isShutdown = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800206 private volatile boolean isRunningForLeadership = false;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800207 private volatile long lastLeadershipUpdateMs = 0;
208 private ExecutorService leaderElectionExecutor;
209
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800210 private volatile IAtomicLong term;
211 // This is local state, recording the term number for the last time
212 // this instance was leader for this topic. The current term could be
213 // higher if the mastership has changed any times.
214 private long myLastLeaderTerm = NO_TERM;
215
Madan Jampani8d21c792014-12-01 16:31:07 -0800216 private NodeId leader;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800217 private Lock leaderLock;
218 private Future<?> getLockFuture;
219 private Future<?> periodicProcessingFuture;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800220
221 /**
222 * Constructor.
223 *
224 * @param topicName the topic name
225 */
226 private Topic(String topicName) {
227 this.topicName = topicName;
228 }
229
230 /**
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800231 * Gets the topic name.
232 *
233 * @return the topic name
234 */
235 private String topicName() {
236 return topicName;
237 }
238
239 /**
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800240 * Gets the leader for the topic.
241 *
242 * @return the leader for the topic
243 */
Madan Jampani8d21c792014-12-01 16:31:07 -0800244 private NodeId leader() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800245 return leader;
246 }
247
248 /**
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800249 * Gets the current term for the topic.
250 *
251 * @return the term for the topic
252 */
253 private long term() {
254 if (term == null) {
255 return NO_TERM;
256 }
257 return term.get();
258 }
259
260 /**
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800261 * Starts operation.
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800262 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900263 private synchronized void start() {
264 if (!isShutdown) {
265 // already running
266 return;
267 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800268 isShutdown = false;
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800269 String threadPoolName = "onos-leader-election-" + topicName + "-%d";
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800270 leaderElectionExecutor = Executors.newScheduledThreadPool(2,
271 namedThreads(threadPoolName));
272
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800273 periodicProcessingFuture =
274 leaderElectionExecutor.submit(new Runnable() {
275 @Override
276 public void run() {
277 doPeriodicProcessing();
278 }
279 });
280 }
281
282 /**
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800283 * Runs for leadership.
284 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900285 private synchronized void runForLeadership() {
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800286 if (isRunningForLeadership) {
287 return; // Nothing to do: already running
288 }
289 if (isShutdown) {
290 start();
291 }
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900292 isRunningForLeadership = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800293 String lockHzId = "LeadershipService/" + topicName + "/lock";
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800294 String termHzId = "LeadershipService/" + topicName + "/term";
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800295 leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800296 term = storeService.getHazelcastInstance().getAtomicLong(termHzId);
297
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800298 getLockFuture = leaderElectionExecutor.submit(new Runnable() {
299 @Override
300 public void run() {
301 doLeaderElectionThread();
302 }
303 });
304 }
305
306 /**
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800307 * Stops leadership election for the topic.
308 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900309 private synchronized void stop() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800310 isShutdown = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800311 isRunningForLeadership = false;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800312 // getLockFuture.cancel(true);
313 // periodicProcessingFuture.cancel(true);
314 leaderElectionExecutor.shutdownNow();
315 }
316
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800317 /**
318 * Received a Leadership Event.
319 *
320 * @param leadershipEvent the received Leadership Event
321 */
322 private void receivedLeadershipEvent(LeadershipEvent leadershipEvent) {
Madan Jampani8d21c792014-12-01 16:31:07 -0800323 NodeId eventLeaderId = leadershipEvent.subject().leader();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800324 if (!leadershipEvent.subject().topic().equals(topicName)) {
325 return; // Not our topic: ignore
326 }
Madan Jampani8d21c792014-12-01 16:31:07 -0800327 if (eventLeaderId.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800328 return; // My own message: ignore
329 }
330
331 synchronized (this) {
332 switch (leadershipEvent.type()) {
333 case LEADER_ELECTED:
334 // FALLTHROUGH
335 case LEADER_REELECTED:
336 //
337 // Another leader: if we are also a leader, then give up
338 // leadership and run for re-election.
339 //
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800340 if ((leader != null) && leader.equals(localNodeId)) {
341 if (getLockFuture != null) {
342 getLockFuture.cancel(true);
343 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800344 } else {
345 // Just update the current leader
346 leader = leadershipEvent.subject().leader();
347 lastLeadershipUpdateMs = System.currentTimeMillis();
348 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800349 break;
350 case LEADER_BOOTED:
351 // Remove the state for the current leader
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800352 if ((leader != null) && eventLeaderId.equals(leader)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800353 leader = null;
354 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800355 break;
356 default:
357 break;
358 }
359 }
360 }
361
362 private void doPeriodicProcessing() {
363
364 while (!isShutdown) {
365
366 //
367 // Periodic tasks:
368 // (a) Advertise ourselves as the leader
369 // OR
370 // (b) Expire a stale (remote) leader
371 //
372 synchronized (this) {
373 LeadershipEvent leadershipEvent;
374 if (leader != null) {
Madan Jampani8d21c792014-12-01 16:31:07 -0800375 if (leader.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800376 //
377 // Advertise ourselves as the leader
378 //
379 leadershipEvent = new LeadershipEvent(
380 LeadershipEvent.Type.LEADER_REELECTED,
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800381 new Leadership(topicName, localNodeId, myLastLeaderTerm));
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800382 // Dispatch to all instances
Madan Jampani97cf7c42015-02-19 16:33:37 -0800383
384 clusterCommunicator.broadcastIncludeSelf(
385 new ClusterMessage(
386 clusterService.getLocalNode().id(),
387 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
388 SERIALIZER.encode(leadershipEvent)));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800389 } else {
390 //
391 // Test if time to expire a stale leader
392 //
393 long delta = System.currentTimeMillis() -
394 lastLeadershipUpdateMs;
395 if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
396 leadershipEvent = new LeadershipEvent(
397 LeadershipEvent.Type.LEADER_BOOTED,
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800398 new Leadership(topicName, leader, myLastLeaderTerm));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800399 // Dispatch only to the local listener(s)
400 eventDispatcher.post(leadershipEvent);
401 leader = null;
402 }
403 }
404 }
405 }
406
407 // Sleep before re-advertising
408 try {
409 Thread.sleep(LEADERSHIP_PERIODIC_INTERVAL_MS);
410 } catch (InterruptedException e) {
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800411 log.debug("Leader Election periodic thread interrupted");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800412 }
413 }
414 }
415
416 /**
417 * Performs the leader election by using Hazelcast.
418 */
419 private void doLeaderElectionThread() {
420
421 while (!isShutdown) {
422 LeadershipEvent leadershipEvent;
423 //
424 // Try to acquire the lock and keep it until the instance is
425 // shutdown.
426 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800427 log.debug("Leader Election begin for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800428 topicName);
429 try {
430 // Block until it becomes the leader
431 leaderLock.lockInterruptibly();
432 } catch (InterruptedException e) {
433 //
434 // Thread interrupted. Either shutdown or run for
435 // re-election.
436 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800437 log.debug("Election interrupted for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800438 topicName);
439 continue;
440 }
441
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800442 try {
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900443 synchronized (this) {
444 //
445 // This instance is now the leader
446 //
447 log.info("Leader Elected for topic {}", topicName);
448
449 updateTerm();
450
451 leader = localNodeId;
452 leadershipEvent = new LeadershipEvent(
453 LeadershipEvent.Type.LEADER_ELECTED,
454 new Leadership(topicName, localNodeId, myLastLeaderTerm));
Madan Jampani97cf7c42015-02-19 16:33:37 -0800455 clusterCommunicator.broadcastIncludeSelf(
456 new ClusterMessage(
457 clusterService.getLocalNode().id(),
458 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
459 SERIALIZER.encode(leadershipEvent)));
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900460 }
461
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800462 // Sleep forever until interrupted
463 Thread.sleep(Long.MAX_VALUE);
464 } catch (InterruptedException e) {
465 //
466 // Thread interrupted. Either shutdown or run for
467 // re-election.
468 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800469 log.debug("Leader Interrupted for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800470 topicName);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800471
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900472 } finally {
473 synchronized (this) {
474 // If we reach here, we should release the leadership
475 log.debug("Leader Lock Released for topic {}", topicName);
476 if ((leader != null) &&
477 leader.equals(localNodeId)) {
478 leader = null;
479 }
480 leadershipEvent = new LeadershipEvent(
481 LeadershipEvent.Type.LEADER_BOOTED,
482 new Leadership(topicName, localNodeId, myLastLeaderTerm));
Madan Jampani97cf7c42015-02-19 16:33:37 -0800483 clusterCommunicator.broadcastIncludeSelf(
484 new ClusterMessage(
485 clusterService.getLocalNode().id(),
486 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
487 SERIALIZER.encode(leadershipEvent)));
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900488 leaderLock.unlock();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800489 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800490 }
491 }
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900492 isRunningForLeadership = false;
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800493 }
494
495 // Globally guarded by the leadership lock for this term
496 // Locally guarded by synchronized (this)
497 private void updateTerm() {
498 long oldTerm = term.get();
499 long newTerm = term.incrementAndGet();
500 myLastLeaderTerm = newTerm;
501 log.debug("Topic {} updated term from {} to {}", topicName,
502 oldTerm, newTerm);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800503 }
504 }
Madan Jampani97cf7c42015-02-19 16:33:37 -0800505
506 private class InternalLeadershipEventListener implements ClusterMessageHandler {
507
508 @Override
509 public void handle(ClusterMessage message) {
510 LeadershipEvent leadershipEvent =
511 SERIALIZER.decode(message.payload());
512
513 log.trace("Leadership Event: time = {} type = {} event = {}",
514 leadershipEvent.time(), leadershipEvent.type(),
515 leadershipEvent);
516 //
517 // If there is no entry for the topic, then create a new one to
518 // keep track of the leadership, but don't run for leadership itself.
519 //
520 String topicName = leadershipEvent.subject().topic();
521 Topic topic = topics.get(topicName);
522 if (topic == null) {
523 topic = new Topic(topicName);
524 Topic oldTopic = topics.putIfAbsent(topicName, topic);
525 if (oldTopic == null) {
526 // encountered new topic, start periodic processing
527 topic.start();
528 } else {
529 topic = oldTopic;
530 }
531 }
532 topic.receivedLeadershipEvent(leadershipEvent);
533 eventDispatcher.post(leadershipEvent);
534 }
535 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800536}