blob: 220a28d17453712a30130971955df59fdce70a57 [file] [log] [blame]
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -08001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.impl;
Jonathan Hart949c2842014-11-28 23:44:09 -080017
Jonathan Hart82b9fec2015-02-03 16:18:54 -080018import com.google.common.collect.Maps;
19import com.hazelcast.config.TopicConfig;
20import com.hazelcast.core.IAtomicLong;
HIGUCHI Yuta26647a62015-02-27 10:57:59 -080021import com.hazelcast.core.ILock;
Madan Jampani2af244a2015-02-22 13:12:01 -080022
Jonathan Hart949c2842014-11-28 23:44:09 -080023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080029import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080030import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.Leadership;
32import org.onosproject.cluster.LeadershipEvent;
33import org.onosproject.cluster.LeadershipEventListener;
34import org.onosproject.cluster.LeadershipService;
35import org.onosproject.cluster.NodeId;
36import org.onosproject.event.AbstractListenerRegistry;
37import org.onosproject.event.EventDeliveryService;
Madan Jampani97cf7c42015-02-19 16:33:37 -080038import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
39import org.onosproject.store.cluster.messaging.ClusterMessage;
40import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
41import org.onosproject.store.cluster.messaging.MessageSubject;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.store.hz.StoreService;
43import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.serializers.KryoSerializer;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080045import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
Jonathan Hart949c2842014-11-28 23:44:09 -080047
Jonathan Hart82b9fec2015-02-03 16:18:54 -080048import java.util.HashMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070049import java.util.List;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080050import java.util.Map;
Madan Jampani59610512015-02-25 15:25:43 -080051import java.util.Set;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080052import java.util.concurrent.ExecutorService;
53import java.util.concurrent.Executors;
54import java.util.concurrent.Future;
Madan Jampani59610512015-02-25 15:25:43 -080055import java.util.stream.Collectors;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080056
57import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080058import static org.onlab.util.Tools.groupedThreads;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080059
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080060/**
61 * Distributed implementation of LeadershipService that is based on Hazelcast.
62 * <p>
63 * The election is eventually-consistent: if there is Hazelcast partitioning,
64 * and the partitioning is healed, there could be a short window of time
65 * until the leaders in each partition discover each other. If this happens,
66 * the leaders release the leadership and run again for election.
67 * </p>
68 * <p>
69 * The leader election is based on Hazelcast's Global Lock, which is stongly
70 * consistent. In addition, each leader periodically advertises events
71 * (using a Hazelcast Topic) that it is the elected leader. Those events are
72 * used for two purposes: (1) Discover multi-leader collisions (in case of
73 * healed Hazelcast partitions), and (2) Inform all listeners who is
74 * the current leader (e.g., for informational purpose).
75 * </p>
76 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080077@Component(immediate = true, enabled = false)
Jonathan Hart949c2842014-11-28 23:44:09 -080078@Service
Madan Jampani97cf7c42015-02-19 16:33:37 -080079public class HazelcastLeadershipService implements LeadershipService {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080080 private static final Logger log =
Jonathan Hart949c2842014-11-28 23:44:09 -080081 LoggerFactory.getLogger(HazelcastLeadershipService.class);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080082
83 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
84 @Override
85 protected void setupKryoPool() {
86 serializerPool = KryoNamespace.newBuilder()
87 .register(KryoNamespaces.API)
88 .build()
89 .populate(1);
90 }
91 };
92
93 private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
94 private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -080095 private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080096
Jonathan Hart82b9fec2015-02-03 16:18:54 -080097 // indicates there is no term value yet
98 private static final long NO_TERM = 0;
99
Jonathan Hart949c2842014-11-28 23:44:09 -0800100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani97cf7c42015-02-19 16:33:37 -0800101 protected ClusterCommunicationService clusterCommunicator;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart949c2842014-11-28 23:44:09 -0800104 protected ClusterService clusterService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected StoreService storeService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected EventDeliveryService eventDispatcher;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800111
112 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
113 listenerRegistry;
114 private final Map<String, Topic> topics = Maps.newConcurrentMap();
Madan Jampani8d21c792014-12-01 16:31:07 -0800115 private NodeId localNodeId;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800116
Madan Jampani97cf7c42015-02-19 16:33:37 -0800117 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
118 new MessageSubject("hz-leadership-events");
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800119
Madan Jampani2af244a2015-02-22 13:12:01 -0800120 private ExecutorService messageHandlingExecutor;
121
Jonathan Hart949c2842014-11-28 23:44:09 -0800122 @Activate
123 protected void activate() {
Madan Jampani8d21c792014-12-01 16:31:07 -0800124 localNodeId = clusterService.getLocalNode().id();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800125 listenerRegistry = new AbstractListenerRegistry<>();
126 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
127
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800128 TopicConfig topicConfig = new TopicConfig();
129 topicConfig.setGlobalOrderingEnabled(true);
130 topicConfig.setName(TOPIC_HZ_ID);
131 storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
Madan Jampani97cf7c42015-02-19 16:33:37 -0800132
Madan Jampani2af244a2015-02-22 13:12:01 -0800133 messageHandlingExecutor = Executors.newSingleThreadExecutor(
134 groupedThreads("onos/store/leadership", "message-handler"));
135
136 clusterCommunicator.addSubscriber(
137 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
138 new InternalLeadershipEventListener(),
139 messageHandlingExecutor);
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800140
Jonathan Hart949c2842014-11-28 23:44:09 -0800141 log.info("Hazelcast Leadership Service started");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800142 }
143
Jonathan Hart949c2842014-11-28 23:44:09 -0800144 @Deactivate
145 protected void deactivate() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800146 eventDispatcher.removeSink(LeadershipEvent.class);
Madan Jampani2af244a2015-02-22 13:12:01 -0800147 messageHandlingExecutor.shutdown();
Madan Jampani97cf7c42015-02-19 16:33:37 -0800148 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800149
150 for (Topic topic : topics.values()) {
151 topic.stop();
152 }
153 topics.clear();
154
Jonathan Hart949c2842014-11-28 23:44:09 -0800155 log.info("Hazelcast Leadership Service stopped");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800156 }
157
158 @Override
Madan Jampani8d21c792014-12-01 16:31:07 -0800159 public NodeId getLeader(String path) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800160 Topic topic = topics.get(path);
161 if (topic == null) {
162 return null;
163 }
164 return topic.leader();
165 }
166
167 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800168 public Leadership getLeadership(String path) {
169 checkArgument(path != null);
170 Topic topic = topics.get(path);
171 if (topic != null) {
172 return new Leadership(topic.topicName(),
173 topic.leader(),
Madan Jampani30a57f82015-03-02 12:19:41 -0800174 topic.term(),
175 0);
Madan Jampani59610512015-02-25 15:25:43 -0800176 }
177 return null;
178 }
179
180 @Override
181 public Set<String> ownedTopics(NodeId nodeId) {
182 checkArgument(nodeId != null);
183 return topics.values()
184 .stream()
185 .filter(topic -> nodeId.equals(topic.leader()))
186 .map(topic -> topic.topicName)
187 .collect(Collectors.toSet());
188 }
189
190 @Override
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800191 public void runForLeadership(String path) {
192 checkArgument(path != null);
193 Topic topic = new Topic(path);
194 Topic oldTopic = topics.putIfAbsent(path, topic);
195 if (oldTopic == null) {
196 topic.start();
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800197 topic.runForLeadership();
198 } else {
199 oldTopic.runForLeadership();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800200 }
201 }
202
203 @Override
204 public void withdraw(String path) {
205 checkArgument(path != null);
206 Topic topic = topics.get(path);
207 if (topic != null) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800208 topics.remove(path, topic);
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900209 topic.stop();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800210 }
211 }
212
213 @Override
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800214 public Map<String, Leadership> getLeaderBoard() {
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800215 Map<String, Leadership> result = new HashMap<>();
216
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800217 for (Topic topic : topics.values()) {
218 Leadership leadership = new Leadership(topic.topicName(),
219 topic.leader(),
Madan Jampani30a57f82015-03-02 12:19:41 -0800220 topic.term(),
221 0);
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800222 result.put(topic.topicName(), leadership);
223 }
224 return result;
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800225 }
226
227 @Override
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800228 public void addListener(LeadershipEventListener listener) {
229 listenerRegistry.addListener(listener);
230 }
231
232 @Override
233 public void removeListener(LeadershipEventListener listener) {
234 listenerRegistry.removeListener(listener);
235 }
236
237 /**
238 * Class for keeping per-topic information.
239 */
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800240 private final class Topic {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800241 private final String topicName;
242 private volatile boolean isShutdown = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800243 private volatile boolean isRunningForLeadership = false;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800244 private volatile long lastLeadershipUpdateMs = 0;
245 private ExecutorService leaderElectionExecutor;
246
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800247 private volatile IAtomicLong term;
248 // This is local state, recording the term number for the last time
249 // this instance was leader for this topic. The current term could be
250 // higher if the mastership has changed any times.
251 private long myLastLeaderTerm = NO_TERM;
252
HIGUCHI Yuta26647a62015-02-27 10:57:59 -0800253 private volatile NodeId leader;
254 private ILock leaderLock;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800255 private Future<?> getLockFuture;
256 private Future<?> periodicProcessingFuture;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800257
258 /**
259 * Constructor.
260 *
261 * @param topicName the topic name
262 */
263 private Topic(String topicName) {
264 this.topicName = topicName;
265 }
266
267 /**
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800268 * Gets the topic name.
269 *
270 * @return the topic name
271 */
272 private String topicName() {
273 return topicName;
274 }
275
276 /**
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800277 * Gets the leader for the topic.
278 *
279 * @return the leader for the topic
280 */
Madan Jampani8d21c792014-12-01 16:31:07 -0800281 private NodeId leader() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800282 return leader;
283 }
284
285 /**
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800286 * Gets the current term for the topic.
287 *
288 * @return the term for the topic
289 */
290 private long term() {
291 if (term == null) {
292 return NO_TERM;
293 }
294 return term.get();
295 }
296
297 /**
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800298 * Starts operation.
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800299 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900300 private synchronized void start() {
301 if (!isShutdown) {
302 // already running
303 return;
304 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800305 isShutdown = false;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800306 String threadPoolName = "election-" + topicName + "-%d";
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800307 leaderElectionExecutor = Executors.newScheduledThreadPool(2,
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800308 groupedThreads("onos/leadership", threadPoolName));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800309
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800310 periodicProcessingFuture =
311 leaderElectionExecutor.submit(new Runnable() {
312 @Override
313 public void run() {
314 doPeriodicProcessing();
315 }
316 });
317 }
318
319 /**
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800320 * Runs for leadership.
321 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900322 private synchronized void runForLeadership() {
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800323 if (isRunningForLeadership) {
324 return; // Nothing to do: already running
325 }
326 if (isShutdown) {
327 start();
328 }
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900329 isRunningForLeadership = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800330 String lockHzId = "LeadershipService/" + topicName + "/lock";
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800331 String termHzId = "LeadershipService/" + topicName + "/term";
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800332 leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800333 term = storeService.getHazelcastInstance().getAtomicLong(termHzId);
334
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800335 getLockFuture = leaderElectionExecutor.submit(new Runnable() {
336 @Override
337 public void run() {
338 doLeaderElectionThread();
339 }
340 });
341 }
342
343 /**
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800344 * Stops leadership election for the topic.
345 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900346 private synchronized void stop() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800347 isShutdown = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800348 isRunningForLeadership = false;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800349 // getLockFuture.cancel(true);
350 // periodicProcessingFuture.cancel(true);
351 leaderElectionExecutor.shutdownNow();
352 }
353
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800354 /**
355 * Received a Leadership Event.
356 *
357 * @param leadershipEvent the received Leadership Event
358 */
359 private void receivedLeadershipEvent(LeadershipEvent leadershipEvent) {
Madan Jampani8d21c792014-12-01 16:31:07 -0800360 NodeId eventLeaderId = leadershipEvent.subject().leader();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800361 if (!leadershipEvent.subject().topic().equals(topicName)) {
362 return; // Not our topic: ignore
363 }
Madan Jampani8d21c792014-12-01 16:31:07 -0800364 if (eventLeaderId.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800365 return; // My own message: ignore
366 }
367
368 synchronized (this) {
369 switch (leadershipEvent.type()) {
370 case LEADER_ELECTED:
371 // FALLTHROUGH
372 case LEADER_REELECTED:
373 //
374 // Another leader: if we are also a leader, then give up
375 // leadership and run for re-election.
376 //
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800377 if ((leader != null) && leader.equals(localNodeId)) {
378 if (getLockFuture != null) {
379 getLockFuture.cancel(true);
380 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800381 } else {
382 // Just update the current leader
383 leader = leadershipEvent.subject().leader();
384 lastLeadershipUpdateMs = System.currentTimeMillis();
385 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800386 break;
387 case LEADER_BOOTED:
388 // Remove the state for the current leader
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800389 if ((leader != null) && eventLeaderId.equals(leader)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800390 leader = null;
391 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800392 break;
393 default:
394 break;
395 }
396 }
397 }
398
399 private void doPeriodicProcessing() {
400
401 while (!isShutdown) {
402
403 //
404 // Periodic tasks:
405 // (a) Advertise ourselves as the leader
406 // OR
407 // (b) Expire a stale (remote) leader
408 //
409 synchronized (this) {
410 LeadershipEvent leadershipEvent;
411 if (leader != null) {
Madan Jampani8d21c792014-12-01 16:31:07 -0800412 if (leader.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800413 //
414 // Advertise ourselves as the leader
415 //
416 leadershipEvent = new LeadershipEvent(
417 LeadershipEvent.Type.LEADER_REELECTED,
Madan Jampani30a57f82015-03-02 12:19:41 -0800418 new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800419 // Dispatch to all instances
Madan Jampani97cf7c42015-02-19 16:33:37 -0800420
421 clusterCommunicator.broadcastIncludeSelf(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700422 leadershipEvent,
423 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
424 SERIALIZER::encode);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800425 } else {
426 //
427 // Test if time to expire a stale leader
428 //
429 long delta = System.currentTimeMillis() -
430 lastLeadershipUpdateMs;
431 if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
HIGUCHI Yuta26647a62015-02-27 10:57:59 -0800432 log.debug("Topic {} leader {} booted due to heartbeat timeout",
433 topicName, leader);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800434 leadershipEvent = new LeadershipEvent(
435 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani30a57f82015-03-02 12:19:41 -0800436 new Leadership(topicName, leader, myLastLeaderTerm, 0));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800437 // Dispatch only to the local listener(s)
438 eventDispatcher.post(leadershipEvent);
439 leader = null;
440 }
441 }
442 }
443 }
444
445 // Sleep before re-advertising
446 try {
447 Thread.sleep(LEADERSHIP_PERIODIC_INTERVAL_MS);
448 } catch (InterruptedException e) {
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800449 log.debug("Leader Election periodic thread interrupted");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800450 }
451 }
452 }
453
454 /**
455 * Performs the leader election by using Hazelcast.
456 */
457 private void doLeaderElectionThread() {
458
459 while (!isShutdown) {
460 LeadershipEvent leadershipEvent;
461 //
462 // Try to acquire the lock and keep it until the instance is
463 // shutdown.
464 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800465 log.debug("Leader Election begin for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800466 topicName);
467 try {
468 // Block until it becomes the leader
469 leaderLock.lockInterruptibly();
470 } catch (InterruptedException e) {
471 //
472 // Thread interrupted. Either shutdown or run for
473 // re-election.
474 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800475 log.debug("Election interrupted for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800476 topicName);
477 continue;
478 }
479
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800480 try {
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900481 synchronized (this) {
482 //
483 // This instance is now the leader
484 //
485 log.info("Leader Elected for topic {}", topicName);
486
487 updateTerm();
488
489 leader = localNodeId;
490 leadershipEvent = new LeadershipEvent(
Madan Jampani30a57f82015-03-02 12:19:41 -0800491 LeadershipEvent.Type.LEADER_ELECTED,
492 new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700493
Madan Jampani97cf7c42015-02-19 16:33:37 -0800494 clusterCommunicator.broadcastIncludeSelf(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700495 leadershipEvent,
496 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
497 SERIALIZER::encode);
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900498 }
499
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800500 // Sleep forever until interrupted
501 Thread.sleep(Long.MAX_VALUE);
502 } catch (InterruptedException e) {
503 //
504 // Thread interrupted. Either shutdown or run for
505 // re-election.
506 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800507 log.debug("Leader Interrupted for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800508 topicName);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800509
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900510 } finally {
511 synchronized (this) {
512 // If we reach here, we should release the leadership
513 log.debug("Leader Lock Released for topic {}", topicName);
514 if ((leader != null) &&
515 leader.equals(localNodeId)) {
516 leader = null;
517 }
518 leadershipEvent = new LeadershipEvent(
Madan Jampani30a57f82015-03-02 12:19:41 -0800519 LeadershipEvent.Type.LEADER_BOOTED,
520 new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700521
Madan Jampani97cf7c42015-02-19 16:33:37 -0800522 clusterCommunicator.broadcastIncludeSelf(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700523 leadershipEvent,
524 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
525 SERIALIZER::encode);
526
HIGUCHI Yuta26647a62015-02-27 10:57:59 -0800527 if (leaderLock.isLockedByCurrentThread()) {
528 leaderLock.unlock();
529 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800530 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800531 }
532 }
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900533 isRunningForLeadership = false;
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800534 }
535
536 // Globally guarded by the leadership lock for this term
537 // Locally guarded by synchronized (this)
538 private void updateTerm() {
539 long oldTerm = term.get();
540 long newTerm = term.incrementAndGet();
541 myLastLeaderTerm = newTerm;
542 log.debug("Topic {} updated term from {} to {}", topicName,
543 oldTerm, newTerm);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800544 }
545 }
Madan Jampani97cf7c42015-02-19 16:33:37 -0800546
547 private class InternalLeadershipEventListener implements ClusterMessageHandler {
548
549 @Override
550 public void handle(ClusterMessage message) {
551 LeadershipEvent leadershipEvent =
552 SERIALIZER.decode(message.payload());
553
554 log.trace("Leadership Event: time = {} type = {} event = {}",
555 leadershipEvent.time(), leadershipEvent.type(),
556 leadershipEvent);
557 //
558 // If there is no entry for the topic, then create a new one to
559 // keep track of the leadership, but don't run for leadership itself.
560 //
561 String topicName = leadershipEvent.subject().topic();
562 Topic topic = topics.get(topicName);
563 if (topic == null) {
564 topic = new Topic(topicName);
565 Topic oldTopic = topics.putIfAbsent(topicName, topic);
566 if (oldTopic == null) {
567 // encountered new topic, start periodic processing
568 topic.start();
569 } else {
570 topic = oldTopic;
571 }
572 }
573 topic.receivedLeadershipEvent(leadershipEvent);
574 eventDispatcher.post(leadershipEvent);
575 }
576 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700577
578 @Override
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700579 public Map<String, Leadership> getCandidates() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700580 return null;
581 }
582
583 @Override
584 public List<NodeId> getCandidates(String path) {
585 return null;
586 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800587}