blob: 6f498ffc6884fb1fdde12f0c8e4bf8c0724bc39c [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07003import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -08004import com.google.common.collect.ImmutableMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07005import com.google.common.collect.Lists;
Jonathan Harte649c752015-03-03 18:04:25 -08006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07008
Madan Jampanid14166a2015-02-24 17:37:51 -08009import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
Ayaka Koshibefd26a302015-04-13 13:59:54 -070015import org.apache.commons.lang3.mutable.MutableBoolean;
Madan Jampanid14166a2015-02-24 17:37:51 -080016import org.onlab.util.KryoNamespace;
17import org.onosproject.cluster.ClusterService;
18import org.onosproject.cluster.ControllerNode;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070019import org.onosproject.cluster.ControllerNode.State;
Madan Jampanid14166a2015-02-24 17:37:51 -080020import org.onosproject.cluster.Leadership;
21import org.onosproject.cluster.LeadershipEvent;
22import org.onosproject.cluster.LeadershipEventListener;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.event.AbstractListenerRegistry;
26import org.onosproject.event.EventDeliveryService;
27import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
28import org.onosproject.store.cluster.messaging.ClusterMessage;
29import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
30import org.onosproject.store.cluster.messaging.MessageSubject;
31import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080032import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070033import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid14166a2015-02-24 17:37:51 -080034import org.onosproject.store.service.Serializer;
35import org.onosproject.store.service.StorageService;
36import org.onosproject.store.service.Versioned;
37import org.slf4j.Logger;
38
Jonathan Harte649c752015-03-03 18:04:25 -080039import java.util.Map;
40import java.util.Map.Entry;
41import java.util.Objects;
42import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070043import java.util.List;
Jonathan Harte649c752015-03-03 18:04:25 -080044import java.util.concurrent.ExecutorService;
45import java.util.concurrent.Executors;
46import java.util.concurrent.ScheduledExecutorService;
47import java.util.concurrent.TimeUnit;
48import java.util.stream.Collectors;
49
50import static com.google.common.base.Preconditions.checkArgument;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanid14166a2015-02-24 17:37:51 -080053
54/**
55 * Distributed Lock Manager implemented on top of ConsistentMap.
56 * <p>
57 * This implementation makes use of cluster manager's failure
58 * detection capabilities to detect and purge stale locks.
59 * TODO: Ensure lock safety and liveness.
60 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080061@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080062@Service
63public class DistributedLeadershipManager implements LeadershipService {
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected StorageService storageService;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected ClusterService clusterService;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected ClusterCommunicationService clusterCommunicator;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected EventDeliveryService eventDispatcher;
76
77 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
78 new MessageSubject("distributed-leadership-manager-events");
79
80 private final Logger log = getLogger(getClass());
81 private ExecutorService messageHandlingExecutor;
82 private ScheduledExecutorService retryLeaderLockExecutor;
83 private ScheduledExecutorService deadLockDetectionExecutor;
84 private ScheduledExecutorService leadershipStatusBroadcaster;
85
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070086 private ConsistentMap<String, NodeId> leaderMap;
87 private ConsistentMap<String, List<NodeId>> candidateMap;
88
Madan Jampanid14166a2015-02-24 17:37:51 -080089 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
90 listenerRegistry;
91 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
Ayaka Koshibefd26a302015-04-13 13:59:54 -070092 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
Madan Jampanid14166a2015-02-24 17:37:51 -080093 private NodeId localNodeId;
94
95 private Set<String> activeTopics = Sets.newConcurrentHashSet();
96
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070097 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -080098 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
99 private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
Madan Jampania14047d2015-02-25 12:23:02 -0800100 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -0800101
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700102 private static final int LEADER_CANDIDATE_POS = 0;
103
104 private static final Serializer SERIALIZER = Serializer.using(
105 new KryoNamespace.Builder().register(KryoNamespaces.API).build());
Madan Jampanid14166a2015-02-24 17:37:51 -0800106
107 @Activate
108 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700109 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
110 .withName("onos-topic-leaders")
111 .withSerializer(SERIALIZER)
112 .withPartitionsDisabled().build();
113 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
114 .withName("onos-topic-candidates")
115 .withSerializer(SERIALIZER)
116 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800117
118 localNodeId = clusterService.getLocalNode().id();
119
120 messageHandlingExecutor = Executors.newSingleThreadExecutor(
121 groupedThreads("onos/store/leadership", "message-handler"));
122 retryLeaderLockExecutor = Executors.newScheduledThreadPool(
123 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
124 deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(
125 groupedThreads("onos/store/leadership", "dead-lock-detector"));
126 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
127 groupedThreads("onos/store/leadership", "peer-updater"));
128 clusterCommunicator.addSubscriber(
129 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
130 new InternalLeadershipEventListener(),
131 messageHandlingExecutor);
132
133 deadLockDetectionExecutor.scheduleWithFixedDelay(
134 this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
135 leadershipStatusBroadcaster.scheduleWithFixedDelay(
Madan Jampania14047d2015-02-25 12:23:02 -0800136 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800137
138 listenerRegistry = new AbstractListenerRegistry<>();
139 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
140
141 log.info("Started.");
142 }
143
144 @Deactivate
145 public void deactivate() {
146 leaderBoard.forEach((topic, leadership) -> {
147 if (localNodeId.equals(leadership.leader())) {
148 withdraw(topic);
149 }
150 });
151
152 eventDispatcher.removeSink(LeadershipEvent.class);
153 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
154
155 messageHandlingExecutor.shutdown();
156 retryLeaderLockExecutor.shutdown();
157 deadLockDetectionExecutor.shutdown();
158 leadershipStatusBroadcaster.shutdown();
159
160 log.info("Stopped.");
161 }
162
163 @Override
164 public Map<String, Leadership> getLeaderBoard() {
165 return ImmutableMap.copyOf(leaderBoard);
166 }
167
168 @Override
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700169 public Map<String, Leadership> getCandidates() {
170 return ImmutableMap.copyOf(candidateBoard);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700171 }
172
173 @Override
174 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700175 Leadership current = candidateBoard.get(path);
176 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700177 }
178
179 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800180 public NodeId getLeader(String path) {
181 Leadership leadership = leaderBoard.get(path);
182 return leadership != null ? leadership.leader() : null;
183 }
184
185 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800186 public Leadership getLeadership(String path) {
187 checkArgument(path != null);
188 return leaderBoard.get(path);
189 }
190
191 @Override
192 public Set<String> ownedTopics(NodeId nodeId) {
193 checkArgument(nodeId != null);
194 return leaderBoard.entrySet()
195 .stream()
196 .filter(entry -> nodeId.equals(entry.getValue().leader()))
197 .map(Entry::getKey)
198 .collect(Collectors.toSet());
199 }
200
201 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800202 public void runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800203 log.debug("Running for leadership for topic: {}", path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700204 try {
205 Versioned<List<NodeId>> candidates = candidateMap.get(path);
206 if (candidates != null) {
207 List<NodeId> candidateList = Lists.newArrayList(candidates.value());
208 if (!candidateList.contains(localNodeId)) {
209 candidateList.add(localNodeId);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700210 if (candidateMap.replace(path, candidates.version(), candidateList)) {
211 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
212 notifyCandidateAdded(
213 path, candidateList, newCandidates.version(), newCandidates.creationTime());
214 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700215 rerunForLeadership(path);
216 return;
217 }
218 }
219 } else {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700220 List<NodeId> candidateList = ImmutableList.of(localNodeId);
221 if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
222 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
223 notifyCandidateAdded(path, candidateList, newCandidates.version(), newCandidates.creationTime());
224 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700225 rerunForLeadership(path);
226 return;
227 }
228 }
229 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
230 activeTopics.add(path);
231 tryLeaderLock(path);
232 } catch (ConsistentMapException e) {
233 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
234 rerunForLeadership(path);
235 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800236 }
237
238 @Override
239 public void withdraw(String path) {
240 activeTopics.remove(path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700241
Madan Jampanid14166a2015-02-24 17:37:51 -0800242 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700243 Versioned<NodeId> leader = leaderMap.get(path);
244 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
245 if (leaderMap.remove(path, leader.version())) {
Jonathan Harte649c752015-03-03 18:04:25 -0800246 log.info("Gave up leadership for {}", path);
247 notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
248 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800249 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700250 // else we are not the current leader, can still be a candidate.
251 Versioned<List<NodeId>> candidates = candidateMap.get(path);
252 List<NodeId> candidateList = candidates != null
253 ? Lists.newArrayList(candidates.value())
254 : Lists.newArrayList();
255 if (!candidateList.remove(localNodeId)) {
256 return;
257 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700258 boolean success = false;
259 if (candidateList.isEmpty()) {
260 if (candidateMap.remove(path, candidates.version())) {
261 success = true;
262 }
263 } else {
264 if (candidateMap.replace(path, candidates.version(), candidateList)) {
265 success = true;
266 }
267 }
268 if (success) {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700269 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
270 notifyCandidateRemoved(path, candidates.version(), candidates.creationTime(), newCandidates);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700271 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700272 log.warn("Failed to withdraw from candidates list. Will retry");
273 retryWithdraw(path);
274 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800275 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800276 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700277 retryWithdraw(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800278 }
279 }
280
281 @Override
282 public void addListener(LeadershipEventListener listener) {
283 listenerRegistry.addListener(listener);
284 }
285
286 @Override
287 public void removeListener(LeadershipEventListener listener) {
288 listenerRegistry.removeListener(listener);
289 }
290
291 private void tryLeaderLock(String path) {
292 if (!activeTopics.contains(path)) {
293 return;
294 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700295
296 Versioned<List<NodeId>> candidates = candidateMap.get(path);
297 if (candidates != null) {
298 List<NodeId> activeNodes = candidates.value().stream()
299 .filter(n -> clusterService.getState(n) == State.ACTIVE)
300 .collect(Collectors.toList());
301 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
302 leaderLockAttempt(path, candidates.value());
303 } else {
304 retryLock(path);
305 }
306 } else {
307 throw new IllegalStateException("should not be here");
308 }
309 }
310
311 private void leaderLockAttempt(String path, List<NodeId> candidates) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800312 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700313 Versioned<NodeId> currentLeader = leaderMap.get(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800314 if (currentLeader != null) {
315 if (localNodeId.equals(currentLeader.value())) {
316 log.info("Already has leadership for {}", path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700317 // FIXME: candidates can get out of sync.
318 notifyNewLeader(
319 path, localNodeId, candidates, currentLeader.version(), currentLeader.creationTime());
Madan Jampanid14166a2015-02-24 17:37:51 -0800320 } else {
321 // someone else has leadership. will retry after sometime.
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700322 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800323 }
324 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700325 if (leaderMap.putIfAbsent(path, localNodeId) == null) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800326 log.info("Assumed leadership for {}", path);
327 // do a get again to get the version (epoch)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700328 Versioned<NodeId> newLeader = leaderMap.get(path);
329 // FIXME: candidates can get out of sync
330 notifyNewLeader(path, localNodeId, candidates, newLeader.version(), newLeader.creationTime());
Madan Jampanid14166a2015-02-24 17:37:51 -0800331 } else {
332 // someone beat us to it.
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700333 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800334 }
335 }
336 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800337 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700338 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800339 }
340 }
341
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700342 private void notifyCandidateAdded(
343 String path, List<NodeId> candidates, long epoch, long electedTime) {
344 Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
345 final MutableBoolean updated = new MutableBoolean(false);
346 candidateBoard.compute(path, (k, current) -> {
347 if (current == null || current.epoch() < newInfo.epoch()) {
348 log.info("updating candidateboard with {}", newInfo);
349 updated.setTrue();
350 return newInfo;
351 }
352 return current;
353 });
354 // maybe rethink types of candidates events
355 if (updated.booleanValue()) {
356 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
357 notifyPeers(event);
358 }
359 }
360
361 private void notifyCandidateRemoved(
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700362 String path, long oldEpoch, long oldTime, Versioned<List<NodeId>> candidates) {
363 Leadership newInfo = (candidates == null)
364 ? new Leadership(path, ImmutableList.of(), oldEpoch, oldTime)
365 : new Leadership(path, candidates.value(), candidates.version(), candidates.creationTime());
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700366 final MutableBoolean updated = new MutableBoolean(false);
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700367
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700368 candidateBoard.compute(path, (k, current) -> {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700369 if (candidates != null) {
370 if (current != null && current.epoch() < newInfo.epoch()) {
371 updated.setTrue();
372 if (candidates.value().isEmpty()) {
373 return null;
374 } else {
375 return newInfo;
376 }
377 }
378 } else {
379 if (current != null && current.epoch() == oldEpoch) {
380 updated.setTrue();
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700381 return null;
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700382 }
383 }
384 return current;
385 });
386 // maybe rethink types of candidates events
387 if (updated.booleanValue()) {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700388 log.debug("updated candidateboard with removal: {}", newInfo);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700389 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
390 notifyPeers(event);
391 }
392 }
393
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700394 private void notifyNewLeader(String path, NodeId leader,
395 List<NodeId> candidates, long epoch, long electedTime) {
396 Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700397 final MutableBoolean updatedLeader = new MutableBoolean(false);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700398 log.debug("candidates for new Leadership {}", candidates);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700399 leaderBoard.compute(path, (k, currentLeader) -> {
Madan Jampanid14166a2015-02-24 17:37:51 -0800400 if (currentLeader == null || currentLeader.epoch() < epoch) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700401 log.debug("updating leaderboard with new {}", newLeadership);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700402 updatedLeader.setTrue();
403 return newLeadership;
Madan Jampanid14166a2015-02-24 17:37:51 -0800404 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700405 return currentLeader;
406 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800407
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700408 if (updatedLeader.booleanValue()) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800409 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700410 notifyPeers(event);
Madan Jampanid14166a2015-02-24 17:37:51 -0800411 }
412 }
413
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700414 private void notifyPeers(LeadershipEvent event) {
415 eventDispatcher.post(event);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700416 clusterCommunicator.broadcast(event,
417 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
418 SERIALIZER::encode);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700419 }
420
Madan Jampani30a57f82015-03-02 12:19:41 -0800421 private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700422 Versioned<List<NodeId>> candidates = candidateMap.get(path);
423 Leadership oldLeadership = new Leadership(
424 path, leader, candidates.value(), epoch, electedTime);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700425 final MutableBoolean updatedLeader = new MutableBoolean(false);
426 leaderBoard.compute(path, (k, currentLeader) -> {
Madan Jampanid14166a2015-02-24 17:37:51 -0800427 if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700428 updatedLeader.setTrue();
429 return null;
Madan Jampanid14166a2015-02-24 17:37:51 -0800430 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700431 return currentLeader;
432 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800433
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700434 if (updatedLeader.booleanValue()) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800435 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700436 notifyPeers(event);
Madan Jampanid14166a2015-02-24 17:37:51 -0800437 }
438 }
439
440 private class InternalLeadershipEventListener implements ClusterMessageHandler {
441
442 @Override
443 public void handle(ClusterMessage message) {
444 LeadershipEvent leadershipEvent =
445 SERIALIZER.decode(message.payload());
446
Jonathan Hartb59ac4e2015-03-24 18:00:34 -0700447 log.trace("Leadership Event: time = {} type = {} event = {}",
Madan Jampanid14166a2015-02-24 17:37:51 -0800448 leadershipEvent.time(), leadershipEvent.type(),
449 leadershipEvent);
450
451 Leadership leadershipUpdate = leadershipEvent.subject();
452 LeadershipEvent.Type eventType = leadershipEvent.type();
453 String topic = leadershipUpdate.topic();
454
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700455 MutableBoolean updateAccepted = new MutableBoolean(false);
456 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
457 leaderBoard.compute(topic, (k, currentLeadership) -> {
Madan Jampanid14166a2015-02-24 17:37:51 -0800458 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700459 updateAccepted.setTrue();
460 return leadershipUpdate;
Madan Jampanid14166a2015-02-24 17:37:51 -0800461 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700462 return currentLeadership;
463 });
464 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
465 leaderBoard.compute(topic, (k, currentLeadership) -> {
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700466 if (currentLeadership == null || currentLeadership.epoch() == leadershipUpdate.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700467 updateAccepted.setTrue();
468 return null;
Madan Jampanid14166a2015-02-24 17:37:51 -0800469 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700470 return currentLeadership;
471 });
472 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
473 candidateBoard.compute(topic, (k, currentInfo) -> {
474 if (currentInfo == null || currentInfo.epoch() <= leadershipUpdate.epoch()) {
475 updateAccepted.setTrue();
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700476 if (leadershipUpdate.candidates().isEmpty()) {
477 return null;
478 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700479 return leadershipUpdate;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700480 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700481 return currentInfo;
482 });
483 } else {
484 throw new IllegalStateException("Unknown event type.");
485 }
486
487 if (updateAccepted.booleanValue()) {
488 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800489 }
490 }
491 }
492
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700493 private void rerunForLeadership(String path) {
494 retryLeaderLockExecutor.schedule(
495 () -> runForLeadership(path),
496 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
497 TimeUnit.SECONDS);
498 }
499
500 private void retryLock(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800501 retryLeaderLockExecutor.schedule(
502 () -> tryLeaderLock(path),
503 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
504 TimeUnit.SECONDS);
505 }
506
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700507 private void retryWithdraw(String path) {
508 retryLeaderLockExecutor.schedule(
509 () -> withdraw(path),
510 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
511 TimeUnit.SECONDS);
512 }
513
Madan Jampanid14166a2015-02-24 17:37:51 -0800514 private void purgeStaleLocks() {
515 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700516 leaderMap.entrySet()
517 .stream()
518 .filter(e -> clusterService.getState(e.getValue().value()) == ControllerNode.State.INACTIVE)
519 .filter(e -> localNodeId.equals(e.getValue().value()) && !activeTopics.contains(e.getKey()))
520 .forEach(entry -> {
521 String path = entry.getKey();
522 NodeId nodeId = entry.getValue().value();
523 long epoch = entry.getValue().version();
524 long creationTime = entry.getValue().creationTime();
Madan Jampanid14166a2015-02-24 17:37:51 -0800525 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700526 if (leaderMap.remove(path, epoch)) {
Madan Jampania14047d2015-02-25 12:23:02 -0800527 log.info("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampani30a57f82015-03-02 12:19:41 -0800528 notifyRemovedLeader(path, nodeId, epoch, creationTime);
Madan Jampanid14166a2015-02-24 17:37:51 -0800529 }
530 } catch (Exception e) {
531 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
532 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700533 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800534 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800535 log.debug("Failed cleaning up stale locks", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800536 }
537 }
538
539 private void sendLeadershipStatus() {
Madan Jampania14047d2015-02-25 12:23:02 -0800540 try {
541 leaderBoard.forEach((path, leadership) -> {
542 if (leadership.leader().equals(localNodeId)) {
543 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700544 clusterCommunicator.broadcast(event,
545 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
546 SERIALIZER::encode);
Madan Jampania14047d2015-02-25 12:23:02 -0800547 }
548 });
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700549 candidateBoard.forEach((path, leadership) -> {
550 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
551 clusterCommunicator.broadcast(event,
552 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
553 SERIALIZER::encode);
554 });
Madan Jampania14047d2015-02-25 12:23:02 -0800555 } catch (Exception e) {
556 log.debug("Failed to send leadership updates", e);
557 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800558 }
Madan Jampania14047d2015-02-25 12:23:02 -0800559}