blob: ef651cb18a3c44a7825f1baa989a8eff9cc1529b [file] [log] [blame]
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.impl;
Jonathan Hart949c2842014-11-28 23:44:09 -080017
Jonathan Hart82b9fec2015-02-03 16:18:54 -080018import com.google.common.collect.Maps;
19import com.hazelcast.config.TopicConfig;
20import com.hazelcast.core.IAtomicLong;
Madan Jampani2af244a2015-02-22 13:12:01 -080021
Jonathan Hart949c2842014-11-28 23:44:09 -080022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080028import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080029import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.Leadership;
31import org.onosproject.cluster.LeadershipEvent;
32import org.onosproject.cluster.LeadershipEventListener;
33import org.onosproject.cluster.LeadershipService;
34import org.onosproject.cluster.NodeId;
35import org.onosproject.event.AbstractListenerRegistry;
36import org.onosproject.event.EventDeliveryService;
Madan Jampani97cf7c42015-02-19 16:33:37 -080037import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
38import org.onosproject.store.cluster.messaging.ClusterMessage;
39import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
40import org.onosproject.store.cluster.messaging.MessageSubject;
Brian O'Connorabafb502014-12-02 22:26:20 -080041import org.onosproject.store.hz.StoreService;
42import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.serializers.KryoSerializer;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080044import org.slf4j.Logger;
45import org.slf4j.LoggerFactory;
Jonathan Hart949c2842014-11-28 23:44:09 -080046
Jonathan Hart82b9fec2015-02-03 16:18:54 -080047import java.util.HashMap;
48import java.util.Map;
Madan Jampani59610512015-02-25 15:25:43 -080049import java.util.Set;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080050import java.util.concurrent.ExecutorService;
51import java.util.concurrent.Executors;
52import java.util.concurrent.Future;
53import java.util.concurrent.locks.Lock;
Madan Jampani59610512015-02-25 15:25:43 -080054import java.util.stream.Collectors;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080055
56import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080057import static org.onlab.util.Tools.groupedThreads;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080058
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080059/**
60 * Distributed implementation of LeadershipService that is based on Hazelcast.
61 * <p>
62 * The election is eventually-consistent: if there is Hazelcast partitioning,
63 * and the partitioning is healed, there could be a short window of time
64 * until the leaders in each partition discover each other. If this happens,
65 * the leaders release the leadership and run again for election.
66 * </p>
67 * <p>
68 * The leader election is based on Hazelcast's Global Lock, which is stongly
69 * consistent. In addition, each leader periodically advertises events
70 * (using a Hazelcast Topic) that it is the elected leader. Those events are
71 * used for two purposes: (1) Discover multi-leader collisions (in case of
72 * healed Hazelcast partitions), and (2) Inform all listeners who is
73 * the current leader (e.g., for informational purpose).
74 * </p>
75 */
Jonathan Hart949c2842014-11-28 23:44:09 -080076@Component(immediate = true)
77@Service
Madan Jampani97cf7c42015-02-19 16:33:37 -080078public class HazelcastLeadershipService implements LeadershipService {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080079 private static final Logger log =
Jonathan Hart949c2842014-11-28 23:44:09 -080080 LoggerFactory.getLogger(HazelcastLeadershipService.class);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080081
82 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
83 @Override
84 protected void setupKryoPool() {
85 serializerPool = KryoNamespace.newBuilder()
86 .register(KryoNamespaces.API)
87 .build()
88 .populate(1);
89 }
90 };
91
92 private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
93 private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -080094 private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080095
Jonathan Hart82b9fec2015-02-03 16:18:54 -080096 // indicates there is no term value yet
97 private static final long NO_TERM = 0;
98
Jonathan Hart949c2842014-11-28 23:44:09 -080099 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani97cf7c42015-02-19 16:33:37 -0800100 protected ClusterCommunicationService clusterCommunicator;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart949c2842014-11-28 23:44:09 -0800103 protected ClusterService clusterService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected StoreService storeService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected EventDeliveryService eventDispatcher;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800110
111 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
112 listenerRegistry;
113 private final Map<String, Topic> topics = Maps.newConcurrentMap();
Madan Jampani8d21c792014-12-01 16:31:07 -0800114 private NodeId localNodeId;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800115
Madan Jampani97cf7c42015-02-19 16:33:37 -0800116 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
117 new MessageSubject("hz-leadership-events");
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800118
Madan Jampani2af244a2015-02-22 13:12:01 -0800119 private ExecutorService messageHandlingExecutor;
120
Jonathan Hart949c2842014-11-28 23:44:09 -0800121 @Activate
122 protected void activate() {
Madan Jampani8d21c792014-12-01 16:31:07 -0800123 localNodeId = clusterService.getLocalNode().id();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800124 listenerRegistry = new AbstractListenerRegistry<>();
125 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
126
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800127 TopicConfig topicConfig = new TopicConfig();
128 topicConfig.setGlobalOrderingEnabled(true);
129 topicConfig.setName(TOPIC_HZ_ID);
130 storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
Madan Jampani97cf7c42015-02-19 16:33:37 -0800131
Madan Jampani2af244a2015-02-22 13:12:01 -0800132 messageHandlingExecutor = Executors.newSingleThreadExecutor(
133 groupedThreads("onos/store/leadership", "message-handler"));
134
135 clusterCommunicator.addSubscriber(
136 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
137 new InternalLeadershipEventListener(),
138 messageHandlingExecutor);
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800139
Jonathan Hart949c2842014-11-28 23:44:09 -0800140 log.info("Hazelcast Leadership Service started");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800141 }
142
Jonathan Hart949c2842014-11-28 23:44:09 -0800143 @Deactivate
144 protected void deactivate() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800145 eventDispatcher.removeSink(LeadershipEvent.class);
Madan Jampani2af244a2015-02-22 13:12:01 -0800146 messageHandlingExecutor.shutdown();
Madan Jampani97cf7c42015-02-19 16:33:37 -0800147 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800148
149 for (Topic topic : topics.values()) {
150 topic.stop();
151 }
152 topics.clear();
153
Jonathan Hart949c2842014-11-28 23:44:09 -0800154 log.info("Hazelcast Leadership Service stopped");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800155 }
156
157 @Override
Madan Jampani8d21c792014-12-01 16:31:07 -0800158 public NodeId getLeader(String path) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800159 Topic topic = topics.get(path);
160 if (topic == null) {
161 return null;
162 }
163 return topic.leader();
164 }
165
166 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800167 public Leadership getLeadership(String path) {
168 checkArgument(path != null);
169 Topic topic = topics.get(path);
170 if (topic != null) {
171 return new Leadership(topic.topicName(),
172 topic.leader(),
173 topic.term());
174 }
175 return null;
176 }
177
178 @Override
179 public Set<String> ownedTopics(NodeId nodeId) {
180 checkArgument(nodeId != null);
181 return topics.values()
182 .stream()
183 .filter(topic -> nodeId.equals(topic.leader()))
184 .map(topic -> topic.topicName)
185 .collect(Collectors.toSet());
186 }
187
188 @Override
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800189 public void runForLeadership(String path) {
190 checkArgument(path != null);
191 Topic topic = new Topic(path);
192 Topic oldTopic = topics.putIfAbsent(path, topic);
193 if (oldTopic == null) {
194 topic.start();
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800195 topic.runForLeadership();
196 } else {
197 oldTopic.runForLeadership();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800198 }
199 }
200
201 @Override
202 public void withdraw(String path) {
203 checkArgument(path != null);
204 Topic topic = topics.get(path);
205 if (topic != null) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800206 topics.remove(path, topic);
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900207 topic.stop();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800208 }
209 }
210
211 @Override
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800212 public Map<String, Leadership> getLeaderBoard() {
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800213 Map<String, Leadership> result = new HashMap<>();
214
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800215 for (Topic topic : topics.values()) {
216 Leadership leadership = new Leadership(topic.topicName(),
217 topic.leader(),
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800218 topic.term());
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800219 result.put(topic.topicName(), leadership);
220 }
221 return result;
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800222 }
223
224 @Override
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800225 public void addListener(LeadershipEventListener listener) {
226 listenerRegistry.addListener(listener);
227 }
228
229 @Override
230 public void removeListener(LeadershipEventListener listener) {
231 listenerRegistry.removeListener(listener);
232 }
233
234 /**
235 * Class for keeping per-topic information.
236 */
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800237 private final class Topic {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800238 private final String topicName;
239 private volatile boolean isShutdown = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800240 private volatile boolean isRunningForLeadership = false;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800241 private volatile long lastLeadershipUpdateMs = 0;
242 private ExecutorService leaderElectionExecutor;
243
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800244 private volatile IAtomicLong term;
245 // This is local state, recording the term number for the last time
246 // this instance was leader for this topic. The current term could be
247 // higher if the mastership has changed any times.
248 private long myLastLeaderTerm = NO_TERM;
249
Madan Jampani8d21c792014-12-01 16:31:07 -0800250 private NodeId leader;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800251 private Lock leaderLock;
252 private Future<?> getLockFuture;
253 private Future<?> periodicProcessingFuture;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800254
255 /**
256 * Constructor.
257 *
258 * @param topicName the topic name
259 */
260 private Topic(String topicName) {
261 this.topicName = topicName;
262 }
263
264 /**
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800265 * Gets the topic name.
266 *
267 * @return the topic name
268 */
269 private String topicName() {
270 return topicName;
271 }
272
273 /**
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800274 * Gets the leader for the topic.
275 *
276 * @return the leader for the topic
277 */
Madan Jampani8d21c792014-12-01 16:31:07 -0800278 private NodeId leader() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800279 return leader;
280 }
281
282 /**
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800283 * Gets the current term for the topic.
284 *
285 * @return the term for the topic
286 */
287 private long term() {
288 if (term == null) {
289 return NO_TERM;
290 }
291 return term.get();
292 }
293
294 /**
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800295 * Starts operation.
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800296 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900297 private synchronized void start() {
298 if (!isShutdown) {
299 // already running
300 return;
301 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800302 isShutdown = false;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800303 String threadPoolName = "election-" + topicName + "-%d";
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800304 leaderElectionExecutor = Executors.newScheduledThreadPool(2,
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800305 groupedThreads("onos/leadership", threadPoolName));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800306
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800307 periodicProcessingFuture =
308 leaderElectionExecutor.submit(new Runnable() {
309 @Override
310 public void run() {
311 doPeriodicProcessing();
312 }
313 });
314 }
315
316 /**
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800317 * Runs for leadership.
318 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900319 private synchronized void runForLeadership() {
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800320 if (isRunningForLeadership) {
321 return; // Nothing to do: already running
322 }
323 if (isShutdown) {
324 start();
325 }
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900326 isRunningForLeadership = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800327 String lockHzId = "LeadershipService/" + topicName + "/lock";
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800328 String termHzId = "LeadershipService/" + topicName + "/term";
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800329 leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800330 term = storeService.getHazelcastInstance().getAtomicLong(termHzId);
331
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800332 getLockFuture = leaderElectionExecutor.submit(new Runnable() {
333 @Override
334 public void run() {
335 doLeaderElectionThread();
336 }
337 });
338 }
339
340 /**
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800341 * Stops leadership election for the topic.
342 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900343 private synchronized void stop() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800344 isShutdown = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800345 isRunningForLeadership = false;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800346 // getLockFuture.cancel(true);
347 // periodicProcessingFuture.cancel(true);
348 leaderElectionExecutor.shutdownNow();
349 }
350
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800351 /**
352 * Received a Leadership Event.
353 *
354 * @param leadershipEvent the received Leadership Event
355 */
356 private void receivedLeadershipEvent(LeadershipEvent leadershipEvent) {
Madan Jampani8d21c792014-12-01 16:31:07 -0800357 NodeId eventLeaderId = leadershipEvent.subject().leader();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800358 if (!leadershipEvent.subject().topic().equals(topicName)) {
359 return; // Not our topic: ignore
360 }
Madan Jampani8d21c792014-12-01 16:31:07 -0800361 if (eventLeaderId.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800362 return; // My own message: ignore
363 }
364
365 synchronized (this) {
366 switch (leadershipEvent.type()) {
367 case LEADER_ELECTED:
368 // FALLTHROUGH
369 case LEADER_REELECTED:
370 //
371 // Another leader: if we are also a leader, then give up
372 // leadership and run for re-election.
373 //
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800374 if ((leader != null) && leader.equals(localNodeId)) {
375 if (getLockFuture != null) {
376 getLockFuture.cancel(true);
377 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800378 } else {
379 // Just update the current leader
380 leader = leadershipEvent.subject().leader();
381 lastLeadershipUpdateMs = System.currentTimeMillis();
382 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800383 break;
384 case LEADER_BOOTED:
385 // Remove the state for the current leader
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800386 if ((leader != null) && eventLeaderId.equals(leader)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800387 leader = null;
388 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800389 break;
390 default:
391 break;
392 }
393 }
394 }
395
396 private void doPeriodicProcessing() {
397
398 while (!isShutdown) {
399
400 //
401 // Periodic tasks:
402 // (a) Advertise ourselves as the leader
403 // OR
404 // (b) Expire a stale (remote) leader
405 //
406 synchronized (this) {
407 LeadershipEvent leadershipEvent;
408 if (leader != null) {
Madan Jampani8d21c792014-12-01 16:31:07 -0800409 if (leader.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800410 //
411 // Advertise ourselves as the leader
412 //
413 leadershipEvent = new LeadershipEvent(
414 LeadershipEvent.Type.LEADER_REELECTED,
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800415 new Leadership(topicName, localNodeId, myLastLeaderTerm));
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800416 // Dispatch to all instances
Madan Jampani97cf7c42015-02-19 16:33:37 -0800417
418 clusterCommunicator.broadcastIncludeSelf(
419 new ClusterMessage(
420 clusterService.getLocalNode().id(),
421 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
422 SERIALIZER.encode(leadershipEvent)));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800423 } else {
424 //
425 // Test if time to expire a stale leader
426 //
427 long delta = System.currentTimeMillis() -
428 lastLeadershipUpdateMs;
429 if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
430 leadershipEvent = new LeadershipEvent(
431 LeadershipEvent.Type.LEADER_BOOTED,
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800432 new Leadership(topicName, leader, myLastLeaderTerm));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800433 // Dispatch only to the local listener(s)
434 eventDispatcher.post(leadershipEvent);
435 leader = null;
436 }
437 }
438 }
439 }
440
441 // Sleep before re-advertising
442 try {
443 Thread.sleep(LEADERSHIP_PERIODIC_INTERVAL_MS);
444 } catch (InterruptedException e) {
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800445 log.debug("Leader Election periodic thread interrupted");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800446 }
447 }
448 }
449
450 /**
451 * Performs the leader election by using Hazelcast.
452 */
453 private void doLeaderElectionThread() {
454
455 while (!isShutdown) {
456 LeadershipEvent leadershipEvent;
457 //
458 // Try to acquire the lock and keep it until the instance is
459 // shutdown.
460 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800461 log.debug("Leader Election begin for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800462 topicName);
463 try {
464 // Block until it becomes the leader
465 leaderLock.lockInterruptibly();
466 } catch (InterruptedException e) {
467 //
468 // Thread interrupted. Either shutdown or run for
469 // re-election.
470 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800471 log.debug("Election interrupted for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800472 topicName);
473 continue;
474 }
475
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800476 try {
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900477 synchronized (this) {
478 //
479 // This instance is now the leader
480 //
481 log.info("Leader Elected for topic {}", topicName);
482
483 updateTerm();
484
485 leader = localNodeId;
486 leadershipEvent = new LeadershipEvent(
487 LeadershipEvent.Type.LEADER_ELECTED,
488 new Leadership(topicName, localNodeId, myLastLeaderTerm));
Madan Jampani97cf7c42015-02-19 16:33:37 -0800489 clusterCommunicator.broadcastIncludeSelf(
490 new ClusterMessage(
491 clusterService.getLocalNode().id(),
492 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
493 SERIALIZER.encode(leadershipEvent)));
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900494 }
495
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800496 // Sleep forever until interrupted
497 Thread.sleep(Long.MAX_VALUE);
498 } catch (InterruptedException e) {
499 //
500 // Thread interrupted. Either shutdown or run for
501 // re-election.
502 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800503 log.debug("Leader Interrupted for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800504 topicName);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800505
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900506 } finally {
507 synchronized (this) {
508 // If we reach here, we should release the leadership
509 log.debug("Leader Lock Released for topic {}", topicName);
510 if ((leader != null) &&
511 leader.equals(localNodeId)) {
512 leader = null;
513 }
514 leadershipEvent = new LeadershipEvent(
515 LeadershipEvent.Type.LEADER_BOOTED,
516 new Leadership(topicName, localNodeId, myLastLeaderTerm));
Madan Jampani97cf7c42015-02-19 16:33:37 -0800517 clusterCommunicator.broadcastIncludeSelf(
518 new ClusterMessage(
519 clusterService.getLocalNode().id(),
520 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
521 SERIALIZER.encode(leadershipEvent)));
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900522 leaderLock.unlock();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800523 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800524 }
525 }
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900526 isRunningForLeadership = false;
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800527 }
528
529 // Globally guarded by the leadership lock for this term
530 // Locally guarded by synchronized (this)
531 private void updateTerm() {
532 long oldTerm = term.get();
533 long newTerm = term.incrementAndGet();
534 myLastLeaderTerm = newTerm;
535 log.debug("Topic {} updated term from {} to {}", topicName,
536 oldTerm, newTerm);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800537 }
538 }
Madan Jampani97cf7c42015-02-19 16:33:37 -0800539
540 private class InternalLeadershipEventListener implements ClusterMessageHandler {
541
542 @Override
543 public void handle(ClusterMessage message) {
544 LeadershipEvent leadershipEvent =
545 SERIALIZER.decode(message.payload());
546
547 log.trace("Leadership Event: time = {} type = {} event = {}",
548 leadershipEvent.time(), leadershipEvent.type(),
549 leadershipEvent);
550 //
551 // If there is no entry for the topic, then create a new one to
552 // keep track of the leadership, but don't run for leadership itself.
553 //
554 String topicName = leadershipEvent.subject().topic();
555 Topic topic = topics.get(topicName);
556 if (topic == null) {
557 topic = new Topic(topicName);
558 Topic oldTopic = topics.putIfAbsent(topicName, topic);
559 if (oldTopic == null) {
560 // encountered new topic, start periodic processing
561 topic.start();
562 } else {
563 topic = oldTopic;
564 }
565 }
566 topic.receivedLeadershipEvent(leadershipEvent);
567 eventDispatcher.post(leadershipEvent);
568 }
569 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800570}