blob: 69a1a8affbe17356022d5f666a1809698127b398 [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07003import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -08004import com.google.common.collect.ImmutableMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07005import com.google.common.collect.Lists;
Jonathan Harte649c752015-03-03 18:04:25 -08006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07008
Madan Jampanid14166a2015-02-24 17:37:51 -08009import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
15import org.onlab.util.KryoNamespace;
16import org.onosproject.cluster.ClusterService;
Madan Jampanid14166a2015-02-24 17:37:51 -080017import org.onosproject.cluster.Leadership;
18import org.onosproject.cluster.LeadershipEvent;
19import org.onosproject.cluster.LeadershipEventListener;
20import org.onosproject.cluster.LeadershipService;
21import org.onosproject.cluster.NodeId;
22import org.onosproject.event.AbstractListenerRegistry;
23import org.onosproject.event.EventDeliveryService;
24import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampanid14166a2015-02-24 17:37:51 -080025import org.onosproject.store.cluster.messaging.MessageSubject;
26import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080027import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070028import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid14166a2015-02-24 17:37:51 -080029import org.onosproject.store.service.Serializer;
30import org.onosproject.store.service.StorageService;
31import org.onosproject.store.service.Versioned;
32import org.slf4j.Logger;
33
Madan Jampani1af8e132015-04-30 16:41:18 -070034import java.util.ArrayList;
Jonathan Harte649c752015-03-03 18:04:25 -080035import java.util.Map;
36import java.util.Map.Entry;
37import java.util.Objects;
38import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070039import java.util.List;
Jonathan Harte649c752015-03-03 18:04:25 -080040import java.util.concurrent.ExecutorService;
41import java.util.concurrent.Executors;
42import java.util.concurrent.ScheduledExecutorService;
43import java.util.concurrent.TimeUnit;
Madan Jampanid46e18f2015-05-04 23:19:33 -070044import java.util.concurrent.atomic.AtomicBoolean;
Jonathan Harte649c752015-03-03 18:04:25 -080045import java.util.stream.Collectors;
46
47import static com.google.common.base.Preconditions.checkArgument;
48import static org.onlab.util.Tools.groupedThreads;
49import static org.slf4j.LoggerFactory.getLogger;
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -070050import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
51import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
52
Madan Jampanid14166a2015-02-24 17:37:51 -080053/**
54 * Distributed Lock Manager implemented on top of ConsistentMap.
55 * <p>
56 * This implementation makes use of cluster manager's failure
57 * detection capabilities to detect and purge stale locks.
58 * TODO: Ensure lock safety and liveness.
59 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080060@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080061@Service
62public class DistributedLeadershipManager implements LeadershipService {
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65 protected StorageService storageService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 protected ClusterService clusterService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected ClusterCommunicationService clusterCommunicator;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected EventDeliveryService eventDispatcher;
75
76 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
77 new MessageSubject("distributed-leadership-manager-events");
78
79 private final Logger log = getLogger(getClass());
80 private ExecutorService messageHandlingExecutor;
81 private ScheduledExecutorService retryLeaderLockExecutor;
82 private ScheduledExecutorService deadLockDetectionExecutor;
83 private ScheduledExecutorService leadershipStatusBroadcaster;
84
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070085 private ConsistentMap<String, NodeId> leaderMap;
86 private ConsistentMap<String, List<NodeId>> candidateMap;
87
Madan Jampanid14166a2015-02-24 17:37:51 -080088 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
89 listenerRegistry;
90 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
Ayaka Koshibefd26a302015-04-13 13:59:54 -070091 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
Madan Jampanid14166a2015-02-24 17:37:51 -080092 private NodeId localNodeId;
93
94 private Set<String> activeTopics = Sets.newConcurrentHashSet();
95
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070096 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -080097 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
98 private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
Madan Jampania14047d2015-02-25 12:23:02 -080099 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -0800100
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700101 private static final int LEADER_CANDIDATE_POS = 0;
102
103 private static final Serializer SERIALIZER = Serializer.using(
104 new KryoNamespace.Builder().register(KryoNamespaces.API).build());
Madan Jampanid14166a2015-02-24 17:37:51 -0800105
106 @Activate
107 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700108 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
109 .withName("onos-topic-leaders")
110 .withSerializer(SERIALIZER)
111 .withPartitionsDisabled().build();
112 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
113 .withName("onos-topic-candidates")
114 .withSerializer(SERIALIZER)
115 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800116
117 localNodeId = clusterService.getLocalNode().id();
118
119 messageHandlingExecutor = Executors.newSingleThreadExecutor(
120 groupedThreads("onos/store/leadership", "message-handler"));
121 retryLeaderLockExecutor = Executors.newScheduledThreadPool(
122 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
123 deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(
124 groupedThreads("onos/store/leadership", "dead-lock-detector"));
125 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
126 groupedThreads("onos/store/leadership", "peer-updater"));
127 clusterCommunicator.addSubscriber(
128 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
Madan Jampanid46e18f2015-05-04 23:19:33 -0700129 SERIALIZER::decode,
130 this::onLeadershipEvent,
Madan Jampanid14166a2015-02-24 17:37:51 -0800131 messageHandlingExecutor);
132
133 deadLockDetectionExecutor.scheduleWithFixedDelay(
134 this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
135 leadershipStatusBroadcaster.scheduleWithFixedDelay(
Madan Jampania14047d2015-02-25 12:23:02 -0800136 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800137
138 listenerRegistry = new AbstractListenerRegistry<>();
139 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
140
Madan Jampanid46e18f2015-05-04 23:19:33 -0700141 log.info("Started");
Madan Jampanid14166a2015-02-24 17:37:51 -0800142 }
143
144 @Deactivate
145 public void deactivate() {
146 leaderBoard.forEach((topic, leadership) -> {
147 if (localNodeId.equals(leadership.leader())) {
148 withdraw(topic);
149 }
150 });
151
152 eventDispatcher.removeSink(LeadershipEvent.class);
153 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
154
155 messageHandlingExecutor.shutdown();
156 retryLeaderLockExecutor.shutdown();
157 deadLockDetectionExecutor.shutdown();
158 leadershipStatusBroadcaster.shutdown();
159
Madan Jampanid46e18f2015-05-04 23:19:33 -0700160 log.info("Stopped");
Madan Jampanid14166a2015-02-24 17:37:51 -0800161 }
162
163 @Override
164 public Map<String, Leadership> getLeaderBoard() {
165 return ImmutableMap.copyOf(leaderBoard);
166 }
167
168 @Override
Madan Jampanifd45d5e2015-04-20 13:33:21 -0700169 public Map<String, List<NodeId>> getCandidates() {
170 return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700171 }
172
173 @Override
174 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700175 Leadership current = candidateBoard.get(path);
176 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700177 }
178
179 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800180 public NodeId getLeader(String path) {
181 Leadership leadership = leaderBoard.get(path);
182 return leadership != null ? leadership.leader() : null;
183 }
184
185 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800186 public Leadership getLeadership(String path) {
187 checkArgument(path != null);
188 return leaderBoard.get(path);
189 }
190
191 @Override
192 public Set<String> ownedTopics(NodeId nodeId) {
193 checkArgument(nodeId != null);
194 return leaderBoard.entrySet()
195 .stream()
196 .filter(entry -> nodeId.equals(entry.getValue().leader()))
197 .map(Entry::getKey)
198 .collect(Collectors.toSet());
199 }
200
201 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800202 public void runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800203 log.debug("Running for leadership for topic: {}", path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700204 try {
205 Versioned<List<NodeId>> candidates = candidateMap.get(path);
206 if (candidates != null) {
207 List<NodeId> candidateList = Lists.newArrayList(candidates.value());
208 if (!candidateList.contains(localNodeId)) {
209 candidateList.add(localNodeId);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700210 if (candidateMap.replace(path, candidates.version(), candidateList)) {
211 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700212 publish(new LeadershipEvent(
213 LeadershipEvent.Type.CANDIDATES_CHANGED,
214 new Leadership(path,
215 newCandidates.value(),
216 newCandidates.version(),
217 newCandidates.creationTime())));
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700218 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700219 rerunForLeadership(path);
220 return;
221 }
222 }
223 } else {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700224 List<NodeId> candidateList = ImmutableList.of(localNodeId);
225 if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
226 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700227 publish(new LeadershipEvent(
228 LeadershipEvent.Type.CANDIDATES_CHANGED,
229 new Leadership(path,
230 newCandidates.value(),
231 newCandidates.version(),
232 newCandidates.creationTime())));
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700233 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700234 rerunForLeadership(path);
235 return;
236 }
237 }
238 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
239 activeTopics.add(path);
240 tryLeaderLock(path);
241 } catch (ConsistentMapException e) {
242 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
243 rerunForLeadership(path);
244 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800245 }
246
247 @Override
248 public void withdraw(String path) {
249 activeTopics.remove(path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700250
Madan Jampanid14166a2015-02-24 17:37:51 -0800251 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700252 Versioned<NodeId> leader = leaderMap.get(path);
253 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
254 if (leaderMap.remove(path, leader.version())) {
Jonathan Harte649c752015-03-03 18:04:25 -0800255 log.info("Gave up leadership for {}", path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700256 publish(new LeadershipEvent(
257 LeadershipEvent.Type.LEADER_BOOTED,
258 new Leadership(path,
259 localNodeId,
260 leader.version(),
261 leader.creationTime())));
Jonathan Harte649c752015-03-03 18:04:25 -0800262 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800263 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700264 // else we are not the current leader, can still be a candidate.
265 Versioned<List<NodeId>> candidates = candidateMap.get(path);
266 List<NodeId> candidateList = candidates != null
267 ? Lists.newArrayList(candidates.value())
268 : Lists.newArrayList();
269 if (!candidateList.remove(localNodeId)) {
270 return;
271 }
Ayaka Koshibe93451b42015-04-21 15:12:03 -0700272 if (candidateMap.replace(path, candidates.version(), candidateList)) {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700273 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700274 publish(new LeadershipEvent(
275 LeadershipEvent.Type.CANDIDATES_CHANGED,
276 new Leadership(path,
277 newCandidates.value(),
278 newCandidates.version(),
279 newCandidates.creationTime())));
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700280 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700281 log.warn("Failed to withdraw from candidates list. Will retry");
282 retryWithdraw(path);
283 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800284 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800285 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700286 retryWithdraw(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800287 }
288 }
289
290 @Override
Madan Jampani1af8e132015-04-30 16:41:18 -0700291 public boolean stepdown(String path) {
Madan Jampani9bd1f152015-04-30 23:33:35 -0700292 if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700293 return false;
294 }
295
296 try {
297 Versioned<NodeId> leader = leaderMap.get(path);
298 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
299 if (leaderMap.remove(path, leader.version())) {
Madan Jampanid46e18f2015-05-04 23:19:33 -0700300 log.info("Stepped down from leadership for {}", path);
301 publish(new LeadershipEvent(
302 LeadershipEvent.Type.LEADER_BOOTED,
303 new Leadership(path,
304 localNodeId,
305 leader.version(),
306 leader.creationTime())));
307 retryLock(path);
Madan Jampani1af8e132015-04-30 16:41:18 -0700308 return true;
309 }
310 }
311 } catch (Exception e) {
312 log.warn("Error executing stepdown for {}", path, e);
313 }
314 return false;
315 }
316
317 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800318 public void addListener(LeadershipEventListener listener) {
319 listenerRegistry.addListener(listener);
320 }
321
322 @Override
323 public void removeListener(LeadershipEventListener listener) {
324 listenerRegistry.removeListener(listener);
325 }
326
Madan Jampani1af8e132015-04-30 16:41:18 -0700327 @Override
328 public boolean makeTopCandidate(String path, NodeId nodeId) {
329 Versioned<List<NodeId>> candidates = candidateMap.get(path);
330 if (candidates == null || !candidates.value().contains(nodeId)) {
331 return false;
332 }
Madan Jampanid46e18f2015-05-04 23:19:33 -0700333 List<NodeId> currentRoster = candidates.value();
334 if (nodeId.equals(currentRoster.get(LEADER_CANDIDATE_POS))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700335 return true;
336 }
Madan Jampani1af8e132015-04-30 16:41:18 -0700337 List<NodeId> newRoster = new ArrayList<>(currentRoster.size());
338 newRoster.add(nodeId);
339 currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
340 boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
341 if (updated) {
342 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700343 publish(new LeadershipEvent(
344 LeadershipEvent.Type.CANDIDATES_CHANGED,
345 new Leadership(path,
346 newCandidates.value(),
347 newCandidates.version(),
348 newCandidates.creationTime())));
Madan Jampani1af8e132015-04-30 16:41:18 -0700349 }
350 return updated;
351 }
352
Madan Jampanid14166a2015-02-24 17:37:51 -0800353 private void tryLeaderLock(String path) {
Madan Jampanid46e18f2015-05-04 23:19:33 -0700354 if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800355 return;
356 }
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700357 try {
358 Versioned<List<NodeId>> candidates = candidateMap.get(path);
359 if (candidates != null) {
Madan Jampanid46e18f2015-05-04 23:19:33 -0700360 List<NodeId> activeNodes = candidates.value()
361 .stream()
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700362 .filter(n -> clusterService.getState(n) == ACTIVE)
363 .collect(Collectors.toList());
364 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
365 leaderLockAttempt(path, candidates.value());
366 } else {
367 retryLock(path);
368 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700369 } else {
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700370 throw new IllegalStateException("should not be here");
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700371 }
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700372 } catch (Exception e) {
373 log.debug("Failed to fetch candidate information for {}", path, e);
374 retryLock(path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700375 }
376 }
377
378 private void leaderLockAttempt(String path, List<NodeId> candidates) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800379 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700380 Versioned<NodeId> currentLeader = leaderMap.get(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800381 if (currentLeader != null) {
382 if (localNodeId.equals(currentLeader.value())) {
383 log.info("Already has leadership for {}", path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700384 // FIXME: candidates can get out of sync.
Madan Jampanid46e18f2015-05-04 23:19:33 -0700385 publish(new LeadershipEvent(
386 LeadershipEvent.Type.LEADER_ELECTED,
387 new Leadership(path,
388 localNodeId,
389 currentLeader.version(),
390 currentLeader.creationTime())));
Madan Jampanid14166a2015-02-24 17:37:51 -0800391 } else {
392 // someone else has leadership. will retry after sometime.
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700393 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800394 }
395 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700396 if (leaderMap.putIfAbsent(path, localNodeId) == null) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800397 log.info("Assumed leadership for {}", path);
398 // do a get again to get the version (epoch)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700399 Versioned<NodeId> newLeader = leaderMap.get(path);
400 // FIXME: candidates can get out of sync
Madan Jampanid46e18f2015-05-04 23:19:33 -0700401 publish(new LeadershipEvent(
402 LeadershipEvent.Type.LEADER_ELECTED,
403 new Leadership(path,
404 newLeader.value(),
405 newLeader.version(),
406 newLeader.creationTime())));
Madan Jampanid14166a2015-02-24 17:37:51 -0800407 } else {
408 // someone beat us to it.
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700409 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800410 }
411 }
412 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800413 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700414 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800415 }
416 }
417
Madan Jampanid46e18f2015-05-04 23:19:33 -0700418 private void publish(LeadershipEvent event) {
419 onLeadershipEvent(event);
420 clusterCommunicator.broadcast(event, LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::encode);
421 }
422
423 private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
424 log.trace("Leadership Event: time = {} type = {} event = {}",
425 leadershipEvent.time(), leadershipEvent.type(),
426 leadershipEvent);
427
428 Leadership leadershipUpdate = leadershipEvent.subject();
429 LeadershipEvent.Type eventType = leadershipEvent.type();
430 String topic = leadershipUpdate.topic();
431
432 AtomicBoolean updateAccepted = new AtomicBoolean(false);
433 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
434 leaderBoard.compute(topic, (k, currentLeadership) -> {
435 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
436 updateAccepted.set(true);
437 return leadershipUpdate;
438 }
439 return currentLeadership;
440 });
441 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
442 leaderBoard.compute(topic, (k, currentLeadership) -> {
443 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
444 updateAccepted.set(true);
445 return null;
446 }
447 return currentLeadership;
448 });
449 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
450 candidateBoard.compute(topic, (k, currentInfo) -> {
451 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
452 updateAccepted.set(true);
453 return leadershipUpdate;
454 }
455 return currentInfo;
456 });
457 } else {
458 throw new IllegalStateException("Unknown event type.");
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700459 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700460
Madan Jampanid46e18f2015-05-04 23:19:33 -0700461 if (updateAccepted.get()) {
462 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800463 }
464 }
465
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700466 private void rerunForLeadership(String path) {
467 retryLeaderLockExecutor.schedule(
468 () -> runForLeadership(path),
469 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
470 TimeUnit.SECONDS);
471 }
472
473 private void retryLock(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800474 retryLeaderLockExecutor.schedule(
475 () -> tryLeaderLock(path),
476 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
477 TimeUnit.SECONDS);
478 }
479
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700480 private void retryWithdraw(String path) {
481 retryLeaderLockExecutor.schedule(
482 () -> withdraw(path),
483 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
484 TimeUnit.SECONDS);
485 }
486
Madan Jampanid14166a2015-02-24 17:37:51 -0800487 private void purgeStaleLocks() {
488 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700489 leaderMap.entrySet()
490 .stream()
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -0700491 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
492 .filter(e -> activeTopics.contains(e.getKey()))
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700493 .forEach(entry -> {
494 String path = entry.getKey();
495 NodeId nodeId = entry.getValue().value();
496 long epoch = entry.getValue().version();
497 long creationTime = entry.getValue().creationTime();
Madan Jampanid14166a2015-02-24 17:37:51 -0800498 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700499 if (leaderMap.remove(path, epoch)) {
Madan Jampania14047d2015-02-25 12:23:02 -0800500 log.info("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700501 publish(new LeadershipEvent(
502 LeadershipEvent.Type.LEADER_BOOTED,
503 new Leadership(path, nodeId, epoch, creationTime)));
Madan Jampanid14166a2015-02-24 17:37:51 -0800504 }
505 } catch (Exception e) {
506 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
507 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700508 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800509 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800510 log.debug("Failed cleaning up stale locks", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800511 }
512 }
513
514 private void sendLeadershipStatus() {
Madan Jampania14047d2015-02-25 12:23:02 -0800515 try {
516 leaderBoard.forEach((path, leadership) -> {
517 if (leadership.leader().equals(localNodeId)) {
518 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700519 clusterCommunicator.broadcast(event,
520 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
521 SERIALIZER::encode);
Madan Jampania14047d2015-02-25 12:23:02 -0800522 }
523 });
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700524 candidateBoard.forEach((path, leadership) -> {
525 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
526 clusterCommunicator.broadcast(event,
527 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
528 SERIALIZER::encode);
529 });
Madan Jampania14047d2015-02-25 12:23:02 -0800530 } catch (Exception e) {
531 log.debug("Failed to send leadership updates", e);
532 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800533 }
Madan Jampania14047d2015-02-25 12:23:02 -0800534}