blob: 5b73246947fc53c60eacd4373ca2e1708084626a [file] [log] [blame]
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -08001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.impl;
Jonathan Hart949c2842014-11-28 23:44:09 -080017
Jonathan Hart82b9fec2015-02-03 16:18:54 -080018import com.google.common.collect.Maps;
19import com.hazelcast.config.TopicConfig;
20import com.hazelcast.core.IAtomicLong;
HIGUCHI Yuta26647a62015-02-27 10:57:59 -080021import com.hazelcast.core.ILock;
Madan Jampani2af244a2015-02-22 13:12:01 -080022
Jonathan Hart949c2842014-11-28 23:44:09 -080023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080029import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080030import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.Leadership;
32import org.onosproject.cluster.LeadershipEvent;
33import org.onosproject.cluster.LeadershipEventListener;
34import org.onosproject.cluster.LeadershipService;
35import org.onosproject.cluster.NodeId;
Simon Huntff663742015-05-14 13:33:05 -070036import org.onosproject.event.ListenerRegistry;
Brian O'Connorabafb502014-12-02 22:26:20 -080037import org.onosproject.event.EventDeliveryService;
Madan Jampani97cf7c42015-02-19 16:33:37 -080038import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
39import org.onosproject.store.cluster.messaging.ClusterMessage;
40import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
41import org.onosproject.store.cluster.messaging.MessageSubject;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.store.hz.StoreService;
43import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.serializers.KryoSerializer;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080045import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
Jonathan Hart949c2842014-11-28 23:44:09 -080047
Jonathan Hart82b9fec2015-02-03 16:18:54 -080048import java.util.HashMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070049import java.util.List;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080050import java.util.Map;
Madan Jampani59610512015-02-25 15:25:43 -080051import java.util.Set;
Madan Jampanide003d92015-05-11 17:14:20 -070052import java.util.concurrent.CompletableFuture;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080053import java.util.concurrent.ExecutorService;
54import java.util.concurrent.Executors;
55import java.util.concurrent.Future;
Madan Jampani59610512015-02-25 15:25:43 -080056import java.util.stream.Collectors;
Jonathan Hart82b9fec2015-02-03 16:18:54 -080057
58import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080059import static org.onlab.util.Tools.groupedThreads;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080060
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080061/**
62 * Distributed implementation of LeadershipService that is based on Hazelcast.
63 * <p>
64 * The election is eventually-consistent: if there is Hazelcast partitioning,
65 * and the partitioning is healed, there could be a short window of time
66 * until the leaders in each partition discover each other. If this happens,
67 * the leaders release the leadership and run again for election.
68 * </p>
69 * <p>
70 * The leader election is based on Hazelcast's Global Lock, which is stongly
71 * consistent. In addition, each leader periodically advertises events
72 * (using a Hazelcast Topic) that it is the elected leader. Those events are
73 * used for two purposes: (1) Discover multi-leader collisions (in case of
74 * healed Hazelcast partitions), and (2) Inform all listeners who is
75 * the current leader (e.g., for informational purpose).
76 * </p>
77 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080078@Component(immediate = true, enabled = false)
Jonathan Hart949c2842014-11-28 23:44:09 -080079@Service
Madan Jampani97cf7c42015-02-19 16:33:37 -080080public class HazelcastLeadershipService implements LeadershipService {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080081 private static final Logger log =
Jonathan Hart949c2842014-11-28 23:44:09 -080082 LoggerFactory.getLogger(HazelcastLeadershipService.class);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080083
84 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
85 @Override
86 protected void setupKryoPool() {
87 serializerPool = KryoNamespace.newBuilder()
88 .register(KryoNamespaces.API)
89 .build()
90 .populate(1);
91 }
92 };
93
94 private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
95 private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -080096 private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080097
Jonathan Hart82b9fec2015-02-03 16:18:54 -080098 // indicates there is no term value yet
99 private static final long NO_TERM = 0;
100
Jonathan Hart949c2842014-11-28 23:44:09 -0800101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani97cf7c42015-02-19 16:33:37 -0800102 protected ClusterCommunicationService clusterCommunicator;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart949c2842014-11-28 23:44:09 -0800105 protected ClusterService clusterService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected StoreService storeService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected EventDeliveryService eventDispatcher;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800112
Simon Huntff663742015-05-14 13:33:05 -0700113 private ListenerRegistry<LeadershipEvent, LeadershipEventListener>
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800114 listenerRegistry;
115 private final Map<String, Topic> topics = Maps.newConcurrentMap();
Madan Jampani8d21c792014-12-01 16:31:07 -0800116 private NodeId localNodeId;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800117
Madan Jampani97cf7c42015-02-19 16:33:37 -0800118 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
119 new MessageSubject("hz-leadership-events");
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800120
Madan Jampani2af244a2015-02-22 13:12:01 -0800121 private ExecutorService messageHandlingExecutor;
122
Jonathan Hart949c2842014-11-28 23:44:09 -0800123 @Activate
124 protected void activate() {
Madan Jampani8d21c792014-12-01 16:31:07 -0800125 localNodeId = clusterService.getLocalNode().id();
Simon Huntff663742015-05-14 13:33:05 -0700126 listenerRegistry = new ListenerRegistry<>();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800127 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
128
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800129 TopicConfig topicConfig = new TopicConfig();
130 topicConfig.setGlobalOrderingEnabled(true);
131 topicConfig.setName(TOPIC_HZ_ID);
132 storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
Madan Jampani97cf7c42015-02-19 16:33:37 -0800133
Madan Jampani2af244a2015-02-22 13:12:01 -0800134 messageHandlingExecutor = Executors.newSingleThreadExecutor(
135 groupedThreads("onos/store/leadership", "message-handler"));
136
137 clusterCommunicator.addSubscriber(
138 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
139 new InternalLeadershipEventListener(),
140 messageHandlingExecutor);
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800141
Jonathan Hart949c2842014-11-28 23:44:09 -0800142 log.info("Hazelcast Leadership Service started");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800143 }
144
Jonathan Hart949c2842014-11-28 23:44:09 -0800145 @Deactivate
146 protected void deactivate() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800147 eventDispatcher.removeSink(LeadershipEvent.class);
Madan Jampani2af244a2015-02-22 13:12:01 -0800148 messageHandlingExecutor.shutdown();
Madan Jampani97cf7c42015-02-19 16:33:37 -0800149 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800150
151 for (Topic topic : topics.values()) {
152 topic.stop();
153 }
154 topics.clear();
155
Jonathan Hart949c2842014-11-28 23:44:09 -0800156 log.info("Hazelcast Leadership Service stopped");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800157 }
158
159 @Override
Madan Jampani8d21c792014-12-01 16:31:07 -0800160 public NodeId getLeader(String path) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800161 Topic topic = topics.get(path);
162 if (topic == null) {
163 return null;
164 }
165 return topic.leader();
166 }
167
168 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800169 public Leadership getLeadership(String path) {
170 checkArgument(path != null);
171 Topic topic = topics.get(path);
172 if (topic != null) {
173 return new Leadership(topic.topicName(),
174 topic.leader(),
Madan Jampani30a57f82015-03-02 12:19:41 -0800175 topic.term(),
176 0);
Madan Jampani59610512015-02-25 15:25:43 -0800177 }
178 return null;
179 }
180
181 @Override
182 public Set<String> ownedTopics(NodeId nodeId) {
183 checkArgument(nodeId != null);
184 return topics.values()
185 .stream()
186 .filter(topic -> nodeId.equals(topic.leader()))
187 .map(topic -> topic.topicName)
188 .collect(Collectors.toSet());
189 }
190
191 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700192 public CompletableFuture<Leadership> runForLeadership(String path) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800193 checkArgument(path != null);
194 Topic topic = new Topic(path);
195 Topic oldTopic = topics.putIfAbsent(path, topic);
196 if (oldTopic == null) {
197 topic.start();
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800198 topic.runForLeadership();
199 } else {
200 oldTopic.runForLeadership();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800201 }
Madan Jampanide003d92015-05-11 17:14:20 -0700202 return CompletableFuture.completedFuture(getLeadership(path));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800203 }
204
205 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700206 public CompletableFuture<Void> withdraw(String path) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800207 checkArgument(path != null);
208 Topic topic = topics.get(path);
209 if (topic != null) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800210 topics.remove(path, topic);
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900211 topic.stop();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800212 }
Madan Jampanide003d92015-05-11 17:14:20 -0700213 return CompletableFuture.completedFuture(null);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800214 }
215
216 @Override
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800217 public Map<String, Leadership> getLeaderBoard() {
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800218 Map<String, Leadership> result = new HashMap<>();
219
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800220 for (Topic topic : topics.values()) {
221 Leadership leadership = new Leadership(topic.topicName(),
222 topic.leader(),
Madan Jampani30a57f82015-03-02 12:19:41 -0800223 topic.term(),
224 0);
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800225 result.put(topic.topicName(), leadership);
226 }
227 return result;
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800228 }
229
230 @Override
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800231 public void addListener(LeadershipEventListener listener) {
232 listenerRegistry.addListener(listener);
233 }
234
235 @Override
236 public void removeListener(LeadershipEventListener listener) {
237 listenerRegistry.removeListener(listener);
238 }
239
240 /**
241 * Class for keeping per-topic information.
242 */
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800243 private final class Topic {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800244 private final String topicName;
245 private volatile boolean isShutdown = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800246 private volatile boolean isRunningForLeadership = false;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800247 private volatile long lastLeadershipUpdateMs = 0;
248 private ExecutorService leaderElectionExecutor;
249
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800250 private volatile IAtomicLong term;
251 // This is local state, recording the term number for the last time
252 // this instance was leader for this topic. The current term could be
253 // higher if the mastership has changed any times.
254 private long myLastLeaderTerm = NO_TERM;
255
HIGUCHI Yuta26647a62015-02-27 10:57:59 -0800256 private volatile NodeId leader;
257 private ILock leaderLock;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800258 private Future<?> getLockFuture;
259 private Future<?> periodicProcessingFuture;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800260
261 /**
262 * Constructor.
263 *
264 * @param topicName the topic name
265 */
266 private Topic(String topicName) {
267 this.topicName = topicName;
268 }
269
270 /**
Pavlin Radoslavov187dff62014-12-02 07:21:35 -0800271 * Gets the topic name.
272 *
273 * @return the topic name
274 */
275 private String topicName() {
276 return topicName;
277 }
278
279 /**
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800280 * Gets the leader for the topic.
281 *
282 * @return the leader for the topic
283 */
Madan Jampani8d21c792014-12-01 16:31:07 -0800284 private NodeId leader() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800285 return leader;
286 }
287
288 /**
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800289 * Gets the current term for the topic.
290 *
291 * @return the term for the topic
292 */
293 private long term() {
294 if (term == null) {
295 return NO_TERM;
296 }
297 return term.get();
298 }
299
300 /**
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800301 * Starts operation.
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800302 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900303 private synchronized void start() {
304 if (!isShutdown) {
305 // already running
306 return;
307 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800308 isShutdown = false;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800309 String threadPoolName = "election-" + topicName + "-%d";
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800310 leaderElectionExecutor = Executors.newScheduledThreadPool(2,
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800311 groupedThreads("onos/leadership", threadPoolName));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800312
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800313 periodicProcessingFuture =
314 leaderElectionExecutor.submit(new Runnable() {
315 @Override
316 public void run() {
317 doPeriodicProcessing();
318 }
319 });
320 }
321
322 /**
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800323 * Runs for leadership.
324 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900325 private synchronized void runForLeadership() {
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800326 if (isRunningForLeadership) {
327 return; // Nothing to do: already running
328 }
329 if (isShutdown) {
330 start();
331 }
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900332 isRunningForLeadership = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800333 String lockHzId = "LeadershipService/" + topicName + "/lock";
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800334 String termHzId = "LeadershipService/" + topicName + "/term";
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800335 leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800336 term = storeService.getHazelcastInstance().getAtomicLong(termHzId);
337
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800338 getLockFuture = leaderElectionExecutor.submit(new Runnable() {
339 @Override
340 public void run() {
341 doLeaderElectionThread();
342 }
343 });
344 }
345
346 /**
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800347 * Stops leadership election for the topic.
348 */
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900349 private synchronized void stop() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800350 isShutdown = true;
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800351 isRunningForLeadership = false;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800352 // getLockFuture.cancel(true);
353 // periodicProcessingFuture.cancel(true);
354 leaderElectionExecutor.shutdownNow();
355 }
356
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800357 /**
358 * Received a Leadership Event.
359 *
360 * @param leadershipEvent the received Leadership Event
361 */
362 private void receivedLeadershipEvent(LeadershipEvent leadershipEvent) {
Madan Jampani8d21c792014-12-01 16:31:07 -0800363 NodeId eventLeaderId = leadershipEvent.subject().leader();
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800364 if (!leadershipEvent.subject().topic().equals(topicName)) {
365 return; // Not our topic: ignore
366 }
Madan Jampani8d21c792014-12-01 16:31:07 -0800367 if (eventLeaderId.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800368 return; // My own message: ignore
369 }
370
371 synchronized (this) {
372 switch (leadershipEvent.type()) {
373 case LEADER_ELECTED:
374 // FALLTHROUGH
375 case LEADER_REELECTED:
376 //
377 // Another leader: if we are also a leader, then give up
378 // leadership and run for re-election.
379 //
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800380 if ((leader != null) && leader.equals(localNodeId)) {
381 if (getLockFuture != null) {
382 getLockFuture.cancel(true);
383 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800384 } else {
385 // Just update the current leader
386 leader = leadershipEvent.subject().leader();
387 lastLeadershipUpdateMs = System.currentTimeMillis();
388 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800389 break;
390 case LEADER_BOOTED:
391 // Remove the state for the current leader
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800392 if ((leader != null) && eventLeaderId.equals(leader)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800393 leader = null;
394 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800395 break;
396 default:
397 break;
398 }
399 }
400 }
401
402 private void doPeriodicProcessing() {
403
404 while (!isShutdown) {
405
406 //
407 // Periodic tasks:
408 // (a) Advertise ourselves as the leader
409 // OR
410 // (b) Expire a stale (remote) leader
411 //
412 synchronized (this) {
413 LeadershipEvent leadershipEvent;
414 if (leader != null) {
Madan Jampani8d21c792014-12-01 16:31:07 -0800415 if (leader.equals(localNodeId)) {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800416 //
417 // Advertise ourselves as the leader
418 //
419 leadershipEvent = new LeadershipEvent(
420 LeadershipEvent.Type.LEADER_REELECTED,
Madan Jampani30a57f82015-03-02 12:19:41 -0800421 new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
Pavlin Radoslavovbcb454c2014-12-02 11:47:47 -0800422 // Dispatch to all instances
Madan Jampani97cf7c42015-02-19 16:33:37 -0800423
424 clusterCommunicator.broadcastIncludeSelf(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700425 leadershipEvent,
426 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
427 SERIALIZER::encode);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800428 } else {
429 //
430 // Test if time to expire a stale leader
431 //
432 long delta = System.currentTimeMillis() -
433 lastLeadershipUpdateMs;
434 if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
HIGUCHI Yuta26647a62015-02-27 10:57:59 -0800435 log.debug("Topic {} leader {} booted due to heartbeat timeout",
436 topicName, leader);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800437 leadershipEvent = new LeadershipEvent(
438 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani30a57f82015-03-02 12:19:41 -0800439 new Leadership(topicName, leader, myLastLeaderTerm, 0));
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800440 // Dispatch only to the local listener(s)
441 eventDispatcher.post(leadershipEvent);
442 leader = null;
443 }
444 }
445 }
446 }
447
448 // Sleep before re-advertising
449 try {
450 Thread.sleep(LEADERSHIP_PERIODIC_INTERVAL_MS);
451 } catch (InterruptedException e) {
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800452 log.debug("Leader Election periodic thread interrupted");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800453 }
454 }
455 }
456
457 /**
458 * Performs the leader election by using Hazelcast.
459 */
460 private void doLeaderElectionThread() {
461
462 while (!isShutdown) {
463 LeadershipEvent leadershipEvent;
464 //
465 // Try to acquire the lock and keep it until the instance is
466 // shutdown.
467 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800468 log.debug("Leader Election begin for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800469 topicName);
470 try {
471 // Block until it becomes the leader
472 leaderLock.lockInterruptibly();
473 } catch (InterruptedException e) {
474 //
475 // Thread interrupted. Either shutdown or run for
476 // re-election.
477 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800478 log.debug("Election interrupted for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800479 topicName);
480 continue;
481 }
482
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800483 try {
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900484 synchronized (this) {
485 //
486 // This instance is now the leader
487 //
488 log.info("Leader Elected for topic {}", topicName);
489
490 updateTerm();
491
492 leader = localNodeId;
493 leadershipEvent = new LeadershipEvent(
Madan Jampani30a57f82015-03-02 12:19:41 -0800494 LeadershipEvent.Type.LEADER_ELECTED,
495 new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700496
Madan Jampani97cf7c42015-02-19 16:33:37 -0800497 clusterCommunicator.broadcastIncludeSelf(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700498 leadershipEvent,
499 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
500 SERIALIZER::encode);
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900501 }
502
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800503 // Sleep forever until interrupted
504 Thread.sleep(Long.MAX_VALUE);
505 } catch (InterruptedException e) {
506 //
507 // Thread interrupted. Either shutdown or run for
508 // re-election.
509 //
Pavlin Radoslavov2e57b3b2014-11-30 18:36:05 -0800510 log.debug("Leader Interrupted for topic {}",
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800511 topicName);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800512
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900513 } finally {
514 synchronized (this) {
515 // If we reach here, we should release the leadership
516 log.debug("Leader Lock Released for topic {}", topicName);
517 if ((leader != null) &&
518 leader.equals(localNodeId)) {
519 leader = null;
520 }
521 leadershipEvent = new LeadershipEvent(
Madan Jampani30a57f82015-03-02 12:19:41 -0800522 LeadershipEvent.Type.LEADER_BOOTED,
523 new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700524
Madan Jampani97cf7c42015-02-19 16:33:37 -0800525 clusterCommunicator.broadcastIncludeSelf(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700526 leadershipEvent,
527 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
528 SERIALIZER::encode);
529
HIGUCHI Yuta26647a62015-02-27 10:57:59 -0800530 if (leaderLock.isLockedByCurrentThread()) {
531 leaderLock.unlock();
532 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800533 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800534 }
535 }
Yuta HIGUCHI8f068272015-02-18 22:28:32 +0900536 isRunningForLeadership = false;
Jonathan Hart82b9fec2015-02-03 16:18:54 -0800537 }
538
539 // Globally guarded by the leadership lock for this term
540 // Locally guarded by synchronized (this)
541 private void updateTerm() {
542 long oldTerm = term.get();
543 long newTerm = term.incrementAndGet();
544 myLastLeaderTerm = newTerm;
545 log.debug("Topic {} updated term from {} to {}", topicName,
546 oldTerm, newTerm);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800547 }
548 }
Madan Jampani97cf7c42015-02-19 16:33:37 -0800549
550 private class InternalLeadershipEventListener implements ClusterMessageHandler {
551
552 @Override
553 public void handle(ClusterMessage message) {
554 LeadershipEvent leadershipEvent =
555 SERIALIZER.decode(message.payload());
556
557 log.trace("Leadership Event: time = {} type = {} event = {}",
558 leadershipEvent.time(), leadershipEvent.type(),
559 leadershipEvent);
560 //
561 // If there is no entry for the topic, then create a new one to
562 // keep track of the leadership, but don't run for leadership itself.
563 //
564 String topicName = leadershipEvent.subject().topic();
565 Topic topic = topics.get(topicName);
566 if (topic == null) {
567 topic = new Topic(topicName);
568 Topic oldTopic = topics.putIfAbsent(topicName, topic);
569 if (oldTopic == null) {
570 // encountered new topic, start periodic processing
571 topic.start();
572 } else {
573 topic = oldTopic;
574 }
575 }
576 topic.receivedLeadershipEvent(leadershipEvent);
577 eventDispatcher.post(leadershipEvent);
578 }
579 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700580
581 @Override
Madan Jampanifd45d5e2015-04-20 13:33:21 -0700582 public Map<String, List<NodeId>> getCandidates() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700583 return null;
584 }
585
586 @Override
587 public List<NodeId> getCandidates(String path) {
588 return null;
589 }
Madan Jampani1af8e132015-04-30 16:41:18 -0700590
591 @Override
592 public boolean stepdown(String path) {
593 throw new UnsupportedOperationException();
594 }
595
596 @Override
597 public boolean makeTopCandidate(String path, NodeId nodeId) {
598 throw new UnsupportedOperationException();
599 }
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800600}