blob: ed34928f42e6c6d5a0fcdc648521c3566125bb39 [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07003import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -08004import com.google.common.collect.ImmutableMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07005import com.google.common.collect.Lists;
Jonathan Harte649c752015-03-03 18:04:25 -08006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07008
Madan Jampanid14166a2015-02-24 17:37:51 -08009import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
Ayaka Koshibefd26a302015-04-13 13:59:54 -070015import org.apache.commons.lang3.mutable.MutableBoolean;
Madan Jampanid14166a2015-02-24 17:37:51 -080016import org.onlab.util.KryoNamespace;
17import org.onosproject.cluster.ClusterService;
Madan Jampanid14166a2015-02-24 17:37:51 -080018import org.onosproject.cluster.Leadership;
19import org.onosproject.cluster.LeadershipEvent;
20import org.onosproject.cluster.LeadershipEventListener;
21import org.onosproject.cluster.LeadershipService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.event.AbstractListenerRegistry;
24import org.onosproject.event.EventDeliveryService;
25import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
26import org.onosproject.store.cluster.messaging.ClusterMessage;
27import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
28import org.onosproject.store.cluster.messaging.MessageSubject;
29import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080030import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070031import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid14166a2015-02-24 17:37:51 -080032import org.onosproject.store.service.Serializer;
33import org.onosproject.store.service.StorageService;
34import org.onosproject.store.service.Versioned;
35import org.slf4j.Logger;
36
Madan Jampani1af8e132015-04-30 16:41:18 -070037import java.util.ArrayList;
Jonathan Harte649c752015-03-03 18:04:25 -080038import java.util.Map;
39import java.util.Map.Entry;
40import java.util.Objects;
41import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070042import java.util.List;
Jonathan Harte649c752015-03-03 18:04:25 -080043import java.util.concurrent.ExecutorService;
44import java.util.concurrent.Executors;
45import java.util.concurrent.ScheduledExecutorService;
46import java.util.concurrent.TimeUnit;
47import java.util.stream.Collectors;
48
49import static com.google.common.base.Preconditions.checkArgument;
50import static org.onlab.util.Tools.groupedThreads;
51import static org.slf4j.LoggerFactory.getLogger;
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -070052import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
53import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
54
Madan Jampanid14166a2015-02-24 17:37:51 -080055/**
56 * Distributed Lock Manager implemented on top of ConsistentMap.
57 * <p>
58 * This implementation makes use of cluster manager's failure
59 * detection capabilities to detect and purge stale locks.
60 * TODO: Ensure lock safety and liveness.
61 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080062@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080063@Service
64public class DistributedLeadershipManager implements LeadershipService {
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected StorageService storageService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected ClusterService clusterService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected ClusterCommunicationService clusterCommunicator;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected EventDeliveryService eventDispatcher;
77
78 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
79 new MessageSubject("distributed-leadership-manager-events");
80
81 private final Logger log = getLogger(getClass());
82 private ExecutorService messageHandlingExecutor;
83 private ScheduledExecutorService retryLeaderLockExecutor;
84 private ScheduledExecutorService deadLockDetectionExecutor;
85 private ScheduledExecutorService leadershipStatusBroadcaster;
86
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070087 private ConsistentMap<String, NodeId> leaderMap;
88 private ConsistentMap<String, List<NodeId>> candidateMap;
89
Madan Jampanid14166a2015-02-24 17:37:51 -080090 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
91 listenerRegistry;
92 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
Ayaka Koshibefd26a302015-04-13 13:59:54 -070093 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
Madan Jampanid14166a2015-02-24 17:37:51 -080094 private NodeId localNodeId;
95
96 private Set<String> activeTopics = Sets.newConcurrentHashSet();
97
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070098 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -080099 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
100 private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
Madan Jampania14047d2015-02-25 12:23:02 -0800101 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -0800102
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700103 private static final int LEADER_CANDIDATE_POS = 0;
104
105 private static final Serializer SERIALIZER = Serializer.using(
106 new KryoNamespace.Builder().register(KryoNamespaces.API).build());
Madan Jampanid14166a2015-02-24 17:37:51 -0800107
108 @Activate
109 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700110 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
111 .withName("onos-topic-leaders")
112 .withSerializer(SERIALIZER)
113 .withPartitionsDisabled().build();
114 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
115 .withName("onos-topic-candidates")
116 .withSerializer(SERIALIZER)
117 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800118
119 localNodeId = clusterService.getLocalNode().id();
120
121 messageHandlingExecutor = Executors.newSingleThreadExecutor(
122 groupedThreads("onos/store/leadership", "message-handler"));
123 retryLeaderLockExecutor = Executors.newScheduledThreadPool(
124 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
125 deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(
126 groupedThreads("onos/store/leadership", "dead-lock-detector"));
127 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
128 groupedThreads("onos/store/leadership", "peer-updater"));
129 clusterCommunicator.addSubscriber(
130 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
131 new InternalLeadershipEventListener(),
132 messageHandlingExecutor);
133
134 deadLockDetectionExecutor.scheduleWithFixedDelay(
135 this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
136 leadershipStatusBroadcaster.scheduleWithFixedDelay(
Madan Jampania14047d2015-02-25 12:23:02 -0800137 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800138
139 listenerRegistry = new AbstractListenerRegistry<>();
140 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
141
142 log.info("Started.");
143 }
144
145 @Deactivate
146 public void deactivate() {
147 leaderBoard.forEach((topic, leadership) -> {
148 if (localNodeId.equals(leadership.leader())) {
149 withdraw(topic);
150 }
151 });
152
153 eventDispatcher.removeSink(LeadershipEvent.class);
154 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
155
156 messageHandlingExecutor.shutdown();
157 retryLeaderLockExecutor.shutdown();
158 deadLockDetectionExecutor.shutdown();
159 leadershipStatusBroadcaster.shutdown();
160
161 log.info("Stopped.");
162 }
163
164 @Override
165 public Map<String, Leadership> getLeaderBoard() {
166 return ImmutableMap.copyOf(leaderBoard);
167 }
168
169 @Override
Madan Jampanifd45d5e2015-04-20 13:33:21 -0700170 public Map<String, List<NodeId>> getCandidates() {
171 return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700172 }
173
174 @Override
175 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700176 Leadership current = candidateBoard.get(path);
177 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700178 }
179
180 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800181 public NodeId getLeader(String path) {
182 Leadership leadership = leaderBoard.get(path);
183 return leadership != null ? leadership.leader() : null;
184 }
185
186 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800187 public Leadership getLeadership(String path) {
188 checkArgument(path != null);
189 return leaderBoard.get(path);
190 }
191
192 @Override
193 public Set<String> ownedTopics(NodeId nodeId) {
194 checkArgument(nodeId != null);
195 return leaderBoard.entrySet()
196 .stream()
197 .filter(entry -> nodeId.equals(entry.getValue().leader()))
198 .map(Entry::getKey)
199 .collect(Collectors.toSet());
200 }
201
202 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800203 public void runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800204 log.debug("Running for leadership for topic: {}", path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700205 try {
206 Versioned<List<NodeId>> candidates = candidateMap.get(path);
207 if (candidates != null) {
208 List<NodeId> candidateList = Lists.newArrayList(candidates.value());
209 if (!candidateList.contains(localNodeId)) {
210 candidateList.add(localNodeId);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700211 if (candidateMap.replace(path, candidates.version(), candidateList)) {
212 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampani1af8e132015-04-30 16:41:18 -0700213 notifyCandidateUpdated(
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700214 path, candidateList, newCandidates.version(), newCandidates.creationTime());
215 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700216 rerunForLeadership(path);
217 return;
218 }
219 }
220 } else {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700221 List<NodeId> candidateList = ImmutableList.of(localNodeId);
222 if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
223 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampani1af8e132015-04-30 16:41:18 -0700224 notifyCandidateUpdated(path, candidateList, newCandidates.version(), newCandidates.creationTime());
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700225 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700226 rerunForLeadership(path);
227 return;
228 }
229 }
230 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
231 activeTopics.add(path);
232 tryLeaderLock(path);
233 } catch (ConsistentMapException e) {
234 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
235 rerunForLeadership(path);
236 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800237 }
238
239 @Override
240 public void withdraw(String path) {
241 activeTopics.remove(path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700242
Madan Jampanid14166a2015-02-24 17:37:51 -0800243 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700244 Versioned<NodeId> leader = leaderMap.get(path);
245 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
246 if (leaderMap.remove(path, leader.version())) {
Jonathan Harte649c752015-03-03 18:04:25 -0800247 log.info("Gave up leadership for {}", path);
248 notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
249 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800250 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700251 // else we are not the current leader, can still be a candidate.
252 Versioned<List<NodeId>> candidates = candidateMap.get(path);
253 List<NodeId> candidateList = candidates != null
254 ? Lists.newArrayList(candidates.value())
255 : Lists.newArrayList();
256 if (!candidateList.remove(localNodeId)) {
257 return;
258 }
Ayaka Koshibe93451b42015-04-21 15:12:03 -0700259 if (candidateMap.replace(path, candidates.version(), candidateList)) {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700260 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
261 notifyCandidateRemoved(path, candidates.version(), candidates.creationTime(), newCandidates);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700262 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700263 log.warn("Failed to withdraw from candidates list. Will retry");
264 retryWithdraw(path);
265 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800266 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800267 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700268 retryWithdraw(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800269 }
270 }
271
272 @Override
Madan Jampani1af8e132015-04-30 16:41:18 -0700273 public boolean stepdown(String path) {
Madan Jampani9bd1f152015-04-30 23:33:35 -0700274 if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700275 return false;
276 }
277
278 try {
279 Versioned<NodeId> leader = leaderMap.get(path);
280 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
281 if (leaderMap.remove(path, leader.version())) {
282 log.info("Gave up leadership for {}", path);
283 notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
284 return true;
285 }
286 }
287 } catch (Exception e) {
288 log.warn("Error executing stepdown for {}", path, e);
289 }
290 return false;
291 }
292
293 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800294 public void addListener(LeadershipEventListener listener) {
295 listenerRegistry.addListener(listener);
296 }
297
298 @Override
299 public void removeListener(LeadershipEventListener listener) {
300 listenerRegistry.removeListener(listener);
301 }
302
Madan Jampani1af8e132015-04-30 16:41:18 -0700303 @Override
304 public boolean makeTopCandidate(String path, NodeId nodeId) {
305 Versioned<List<NodeId>> candidates = candidateMap.get(path);
306 if (candidates == null || !candidates.value().contains(nodeId)) {
307 return false;
308 }
309 if (nodeId.equals(candidates.value().get(0))) {
310 return true;
311 }
312 List<NodeId> currentRoster = candidates.value();
313 List<NodeId> newRoster = new ArrayList<>(currentRoster.size());
314 newRoster.add(nodeId);
315 currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
316 boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
317 if (updated) {
318 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
319 notifyCandidateUpdated(
320 path, newCandidates.value(), newCandidates.version(), newCandidates.creationTime());
321 }
322 return updated;
323 }
324
Madan Jampanid14166a2015-02-24 17:37:51 -0800325 private void tryLeaderLock(String path) {
326 if (!activeTopics.contains(path)) {
327 return;
328 }
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700329 try {
330 Versioned<List<NodeId>> candidates = candidateMap.get(path);
331 if (candidates != null) {
332 List<NodeId> activeNodes = candidates.value().stream()
333 .filter(n -> clusterService.getState(n) == ACTIVE)
334 .collect(Collectors.toList());
335 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
336 leaderLockAttempt(path, candidates.value());
337 } else {
338 retryLock(path);
339 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700340 } else {
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700341 throw new IllegalStateException("should not be here");
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700342 }
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700343 } catch (Exception e) {
344 log.debug("Failed to fetch candidate information for {}", path, e);
345 retryLock(path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700346 }
347 }
348
349 private void leaderLockAttempt(String path, List<NodeId> candidates) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800350 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700351 Versioned<NodeId> currentLeader = leaderMap.get(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800352 if (currentLeader != null) {
353 if (localNodeId.equals(currentLeader.value())) {
354 log.info("Already has leadership for {}", path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700355 // FIXME: candidates can get out of sync.
356 notifyNewLeader(
357 path, localNodeId, candidates, currentLeader.version(), currentLeader.creationTime());
Madan Jampanid14166a2015-02-24 17:37:51 -0800358 } else {
359 // someone else has leadership. will retry after sometime.
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700360 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800361 }
362 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700363 if (leaderMap.putIfAbsent(path, localNodeId) == null) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800364 log.info("Assumed leadership for {}", path);
365 // do a get again to get the version (epoch)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700366 Versioned<NodeId> newLeader = leaderMap.get(path);
367 // FIXME: candidates can get out of sync
368 notifyNewLeader(path, localNodeId, candidates, newLeader.version(), newLeader.creationTime());
Madan Jampanid14166a2015-02-24 17:37:51 -0800369 } else {
370 // someone beat us to it.
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700371 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800372 }
373 }
374 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800375 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700376 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800377 }
378 }
379
Madan Jampani1af8e132015-04-30 16:41:18 -0700380 private void notifyCandidateUpdated(
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700381 String path, List<NodeId> candidates, long epoch, long electedTime) {
382 Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
383 final MutableBoolean updated = new MutableBoolean(false);
384 candidateBoard.compute(path, (k, current) -> {
385 if (current == null || current.epoch() < newInfo.epoch()) {
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700386 log.debug("updating candidateboard with {}", newInfo);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700387 updated.setTrue();
388 return newInfo;
389 }
390 return current;
391 });
392 // maybe rethink types of candidates events
393 if (updated.booleanValue()) {
394 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
395 notifyPeers(event);
396 }
397 }
398
399 private void notifyCandidateRemoved(
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700400 String path, long oldEpoch, long oldTime, Versioned<List<NodeId>> candidates) {
401 Leadership newInfo = (candidates == null)
402 ? new Leadership(path, ImmutableList.of(), oldEpoch, oldTime)
403 : new Leadership(path, candidates.value(), candidates.version(), candidates.creationTime());
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700404 final MutableBoolean updated = new MutableBoolean(false);
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700405
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700406 candidateBoard.compute(path, (k, current) -> {
Ayaka Koshibe93451b42015-04-21 15:12:03 -0700407 if (current != null && current.epoch() < newInfo.epoch()) {
408 updated.setTrue();
409 return newInfo;
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700410 }
411 return current;
412 });
413 // maybe rethink types of candidates events
414 if (updated.booleanValue()) {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700415 log.debug("updated candidateboard with removal: {}", newInfo);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700416 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
417 notifyPeers(event);
418 }
419 }
420
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700421 private void notifyNewLeader(String path, NodeId leader,
422 List<NodeId> candidates, long epoch, long electedTime) {
423 Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700424 final MutableBoolean updatedLeader = new MutableBoolean(false);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700425 log.debug("candidates for new Leadership {}", candidates);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700426 leaderBoard.compute(path, (k, currentLeader) -> {
Madan Jampanid14166a2015-02-24 17:37:51 -0800427 if (currentLeader == null || currentLeader.epoch() < epoch) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700428 log.debug("updating leaderboard with new {}", newLeadership);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700429 updatedLeader.setTrue();
430 return newLeadership;
Madan Jampanid14166a2015-02-24 17:37:51 -0800431 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700432 return currentLeader;
433 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800434
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700435 if (updatedLeader.booleanValue()) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800436 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700437 notifyPeers(event);
Madan Jampanid14166a2015-02-24 17:37:51 -0800438 }
439 }
440
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700441 private void notifyPeers(LeadershipEvent event) {
442 eventDispatcher.post(event);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700443 clusterCommunicator.broadcast(event,
444 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
445 SERIALIZER::encode);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700446 }
447
Madan Jampani30a57f82015-03-02 12:19:41 -0800448 private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700449 Versioned<List<NodeId>> candidates = candidateMap.get(path);
450 Leadership oldLeadership = new Leadership(
451 path, leader, candidates.value(), epoch, electedTime);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700452 final MutableBoolean updatedLeader = new MutableBoolean(false);
453 leaderBoard.compute(path, (k, currentLeader) -> {
Madan Jampanid14166a2015-02-24 17:37:51 -0800454 if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700455 updatedLeader.setTrue();
456 return null;
Madan Jampanid14166a2015-02-24 17:37:51 -0800457 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700458 return currentLeader;
459 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800460
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700461 if (updatedLeader.booleanValue()) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800462 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700463 notifyPeers(event);
Madan Jampanid14166a2015-02-24 17:37:51 -0800464 }
465 }
466
467 private class InternalLeadershipEventListener implements ClusterMessageHandler {
468
469 @Override
470 public void handle(ClusterMessage message) {
471 LeadershipEvent leadershipEvent =
472 SERIALIZER.decode(message.payload());
473
Jonathan Hartb59ac4e2015-03-24 18:00:34 -0700474 log.trace("Leadership Event: time = {} type = {} event = {}",
Madan Jampanid14166a2015-02-24 17:37:51 -0800475 leadershipEvent.time(), leadershipEvent.type(),
476 leadershipEvent);
477
478 Leadership leadershipUpdate = leadershipEvent.subject();
479 LeadershipEvent.Type eventType = leadershipEvent.type();
480 String topic = leadershipUpdate.topic();
481
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700482 MutableBoolean updateAccepted = new MutableBoolean(false);
483 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
484 leaderBoard.compute(topic, (k, currentLeadership) -> {
Madan Jampanid14166a2015-02-24 17:37:51 -0800485 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700486 updateAccepted.setTrue();
487 return leadershipUpdate;
Madan Jampanid14166a2015-02-24 17:37:51 -0800488 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700489 return currentLeadership;
490 });
491 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
492 leaderBoard.compute(topic, (k, currentLeadership) -> {
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700493 if (currentLeadership == null || currentLeadership.epoch() == leadershipUpdate.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700494 updateAccepted.setTrue();
495 return null;
Madan Jampanid14166a2015-02-24 17:37:51 -0800496 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700497 return currentLeadership;
498 });
499 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
500 candidateBoard.compute(topic, (k, currentInfo) -> {
Ayaka Koshibe93451b42015-04-21 15:12:03 -0700501 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700502 updateAccepted.setTrue();
503 return leadershipUpdate;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700504 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700505 return currentInfo;
506 });
507 } else {
508 throw new IllegalStateException("Unknown event type.");
509 }
510
511 if (updateAccepted.booleanValue()) {
512 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800513 }
514 }
515 }
516
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700517 private void rerunForLeadership(String path) {
518 retryLeaderLockExecutor.schedule(
519 () -> runForLeadership(path),
520 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
521 TimeUnit.SECONDS);
522 }
523
524 private void retryLock(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800525 retryLeaderLockExecutor.schedule(
526 () -> tryLeaderLock(path),
527 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
528 TimeUnit.SECONDS);
529 }
530
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700531 private void retryWithdraw(String path) {
532 retryLeaderLockExecutor.schedule(
533 () -> withdraw(path),
534 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
535 TimeUnit.SECONDS);
536 }
537
Madan Jampanid14166a2015-02-24 17:37:51 -0800538 private void purgeStaleLocks() {
539 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700540 leaderMap.entrySet()
541 .stream()
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -0700542 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
543 .filter(e -> activeTopics.contains(e.getKey()))
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700544 .forEach(entry -> {
545 String path = entry.getKey();
546 NodeId nodeId = entry.getValue().value();
547 long epoch = entry.getValue().version();
548 long creationTime = entry.getValue().creationTime();
Madan Jampanid14166a2015-02-24 17:37:51 -0800549 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700550 if (leaderMap.remove(path, epoch)) {
Madan Jampania14047d2015-02-25 12:23:02 -0800551 log.info("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampani30a57f82015-03-02 12:19:41 -0800552 notifyRemovedLeader(path, nodeId, epoch, creationTime);
Madan Jampanid14166a2015-02-24 17:37:51 -0800553 }
554 } catch (Exception e) {
555 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
556 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700557 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800558 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800559 log.debug("Failed cleaning up stale locks", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800560 }
561 }
562
563 private void sendLeadershipStatus() {
Madan Jampania14047d2015-02-25 12:23:02 -0800564 try {
565 leaderBoard.forEach((path, leadership) -> {
566 if (leadership.leader().equals(localNodeId)) {
567 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700568 clusterCommunicator.broadcast(event,
569 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
570 SERIALIZER::encode);
Madan Jampania14047d2015-02-25 12:23:02 -0800571 }
572 });
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700573 candidateBoard.forEach((path, leadership) -> {
574 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
575 clusterCommunicator.broadcast(event,
576 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
577 SERIALIZER::encode);
578 });
Madan Jampania14047d2015-02-25 12:23:02 -0800579 } catch (Exception e) {
580 log.debug("Failed to send leadership updates", e);
581 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800582 }
Madan Jampania14047d2015-02-25 12:23:02 -0800583}