blob: 434d0e2d57ba445580d2fbe2008836252e853fc2 [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07003import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -08004import com.google.common.collect.ImmutableMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07005import com.google.common.collect.Lists;
Jonathan Harte649c752015-03-03 18:04:25 -08006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07008
Madan Jampanid14166a2015-02-24 17:37:51 -08009import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
15import org.onlab.util.KryoNamespace;
16import org.onosproject.cluster.ClusterService;
Madan Jampanid14166a2015-02-24 17:37:51 -080017import org.onosproject.cluster.Leadership;
18import org.onosproject.cluster.LeadershipEvent;
19import org.onosproject.cluster.LeadershipEventListener;
20import org.onosproject.cluster.LeadershipService;
21import org.onosproject.cluster.NodeId;
Simon Huntff663742015-05-14 13:33:05 -070022import org.onosproject.event.ListenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080023import org.onosproject.event.EventDeliveryService;
24import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampanid14166a2015-02-24 17:37:51 -080025import org.onosproject.store.cluster.messaging.MessageSubject;
26import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080027import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070028import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid14166a2015-02-24 17:37:51 -080029import org.onosproject.store.service.Serializer;
30import org.onosproject.store.service.StorageService;
31import org.onosproject.store.service.Versioned;
32import org.slf4j.Logger;
33
Madan Jampani1af8e132015-04-30 16:41:18 -070034import java.util.ArrayList;
Jonathan Harte649c752015-03-03 18:04:25 -080035import java.util.Map;
36import java.util.Map.Entry;
37import java.util.Objects;
38import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070039import java.util.List;
Madan Jampanide003d92015-05-11 17:14:20 -070040import java.util.concurrent.CancellationException;
41import java.util.concurrent.CompletableFuture;
Jonathan Harte649c752015-03-03 18:04:25 -080042import java.util.concurrent.ExecutorService;
43import java.util.concurrent.Executors;
44import java.util.concurrent.ScheduledExecutorService;
45import java.util.concurrent.TimeUnit;
Madan Jampanid46e18f2015-05-04 23:19:33 -070046import java.util.concurrent.atomic.AtomicBoolean;
Jonathan Harte649c752015-03-03 18:04:25 -080047import java.util.stream.Collectors;
48
49import static com.google.common.base.Preconditions.checkArgument;
50import static org.onlab.util.Tools.groupedThreads;
51import static org.slf4j.LoggerFactory.getLogger;
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -070052import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
53import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
54
Madan Jampanid14166a2015-02-24 17:37:51 -080055/**
56 * Distributed Lock Manager implemented on top of ConsistentMap.
57 * <p>
58 * This implementation makes use of cluster manager's failure
59 * detection capabilities to detect and purge stale locks.
60 * TODO: Ensure lock safety and liveness.
61 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080062@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080063@Service
64public class DistributedLeadershipManager implements LeadershipService {
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected StorageService storageService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected ClusterService clusterService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected ClusterCommunicationService clusterCommunicator;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected EventDeliveryService eventDispatcher;
77
78 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
79 new MessageSubject("distributed-leadership-manager-events");
80
81 private final Logger log = getLogger(getClass());
82 private ExecutorService messageHandlingExecutor;
83 private ScheduledExecutorService retryLeaderLockExecutor;
84 private ScheduledExecutorService deadLockDetectionExecutor;
85 private ScheduledExecutorService leadershipStatusBroadcaster;
86
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070087 private ConsistentMap<String, NodeId> leaderMap;
88 private ConsistentMap<String, List<NodeId>> candidateMap;
89
Simon Huntff663742015-05-14 13:33:05 -070090 private ListenerRegistry<LeadershipEvent, LeadershipEventListener>
Madan Jampanid14166a2015-02-24 17:37:51 -080091 listenerRegistry;
92 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
Ayaka Koshibefd26a302015-04-13 13:59:54 -070093 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
Madan Jampanid14166a2015-02-24 17:37:51 -080094 private NodeId localNodeId;
95
96 private Set<String> activeTopics = Sets.newConcurrentHashSet();
97
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070098 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -080099 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
100 private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
Madan Jampania14047d2015-02-25 12:23:02 -0800101 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -0800102
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700103 private static final int LEADER_CANDIDATE_POS = 0;
104
105 private static final Serializer SERIALIZER = Serializer.using(
106 new KryoNamespace.Builder().register(KryoNamespaces.API).build());
Madan Jampanid14166a2015-02-24 17:37:51 -0800107
108 @Activate
109 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700110 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
111 .withName("onos-topic-leaders")
112 .withSerializer(SERIALIZER)
113 .withPartitionsDisabled().build();
114 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
115 .withName("onos-topic-candidates")
116 .withSerializer(SERIALIZER)
117 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800118
119 localNodeId = clusterService.getLocalNode().id();
120
121 messageHandlingExecutor = Executors.newSingleThreadExecutor(
122 groupedThreads("onos/store/leadership", "message-handler"));
123 retryLeaderLockExecutor = Executors.newScheduledThreadPool(
124 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
125 deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(
126 groupedThreads("onos/store/leadership", "dead-lock-detector"));
127 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
128 groupedThreads("onos/store/leadership", "peer-updater"));
129 clusterCommunicator.addSubscriber(
130 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
Madan Jampanid46e18f2015-05-04 23:19:33 -0700131 SERIALIZER::decode,
132 this::onLeadershipEvent,
Madan Jampanid14166a2015-02-24 17:37:51 -0800133 messageHandlingExecutor);
134
135 deadLockDetectionExecutor.scheduleWithFixedDelay(
136 this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
137 leadershipStatusBroadcaster.scheduleWithFixedDelay(
Madan Jampania14047d2015-02-25 12:23:02 -0800138 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800139
Simon Huntff663742015-05-14 13:33:05 -0700140 listenerRegistry = new ListenerRegistry<>();
Madan Jampanid14166a2015-02-24 17:37:51 -0800141 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
142
Madan Jampanid46e18f2015-05-04 23:19:33 -0700143 log.info("Started");
Madan Jampanid14166a2015-02-24 17:37:51 -0800144 }
145
146 @Deactivate
147 public void deactivate() {
148 leaderBoard.forEach((topic, leadership) -> {
149 if (localNodeId.equals(leadership.leader())) {
150 withdraw(topic);
151 }
152 });
153
154 eventDispatcher.removeSink(LeadershipEvent.class);
155 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
156
157 messageHandlingExecutor.shutdown();
158 retryLeaderLockExecutor.shutdown();
159 deadLockDetectionExecutor.shutdown();
160 leadershipStatusBroadcaster.shutdown();
161
Madan Jampanid46e18f2015-05-04 23:19:33 -0700162 log.info("Stopped");
Madan Jampanid14166a2015-02-24 17:37:51 -0800163 }
164
165 @Override
166 public Map<String, Leadership> getLeaderBoard() {
167 return ImmutableMap.copyOf(leaderBoard);
168 }
169
170 @Override
Madan Jampanifd45d5e2015-04-20 13:33:21 -0700171 public Map<String, List<NodeId>> getCandidates() {
172 return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700173 }
174
175 @Override
176 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700177 Leadership current = candidateBoard.get(path);
178 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700179 }
180
181 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800182 public NodeId getLeader(String path) {
183 Leadership leadership = leaderBoard.get(path);
184 return leadership != null ? leadership.leader() : null;
185 }
186
187 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800188 public Leadership getLeadership(String path) {
189 checkArgument(path != null);
190 return leaderBoard.get(path);
191 }
192
193 @Override
194 public Set<String> ownedTopics(NodeId nodeId) {
195 checkArgument(nodeId != null);
196 return leaderBoard.entrySet()
197 .stream()
198 .filter(entry -> nodeId.equals(entry.getValue().leader()))
199 .map(Entry::getKey)
200 .collect(Collectors.toSet());
201 }
202
203 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700204 public CompletableFuture<Leadership> runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800205 log.debug("Running for leadership for topic: {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700206 CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
207 doRunForLeadership(path, resultFuture);
208 return resultFuture;
209 }
210
211 private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700212 try {
213 Versioned<List<NodeId>> candidates = candidateMap.get(path);
214 if (candidates != null) {
215 List<NodeId> candidateList = Lists.newArrayList(candidates.value());
216 if (!candidateList.contains(localNodeId)) {
217 candidateList.add(localNodeId);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700218 if (candidateMap.replace(path, candidates.version(), candidateList)) {
219 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700220 publish(new LeadershipEvent(
221 LeadershipEvent.Type.CANDIDATES_CHANGED,
222 new Leadership(path,
223 newCandidates.value(),
224 newCandidates.version(),
225 newCandidates.creationTime())));
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700226 } else {
Madan Jampanide003d92015-05-11 17:14:20 -0700227 rerunForLeadership(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700228 return;
229 }
230 }
231 } else {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700232 List<NodeId> candidateList = ImmutableList.of(localNodeId);
233 if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
234 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700235 publish(new LeadershipEvent(
236 LeadershipEvent.Type.CANDIDATES_CHANGED,
237 new Leadership(path,
238 newCandidates.value(),
239 newCandidates.version(),
240 newCandidates.creationTime())));
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700241 } else {
Madan Jampanide003d92015-05-11 17:14:20 -0700242 rerunForLeadership(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700243 return;
244 }
245 }
246 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
247 activeTopics.add(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700248 tryLeaderLock(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700249 } catch (ConsistentMapException e) {
250 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700251 rerunForLeadership(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700252 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800253 }
254
255 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700256 public CompletableFuture<Void> withdraw(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800257 activeTopics.remove(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700258 CompletableFuture<Void> resultFuture = new CompletableFuture<>();
259 doWithdraw(path, resultFuture);
260 return resultFuture;
261 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700262
Madan Jampanide003d92015-05-11 17:14:20 -0700263
264 private void doWithdraw(String path, CompletableFuture<Void> future) {
265 if (activeTopics.contains(path)) {
266 future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
267 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800268 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700269 Versioned<NodeId> leader = leaderMap.get(path);
270 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
271 if (leaderMap.remove(path, leader.version())) {
Jonathan Harte649c752015-03-03 18:04:25 -0800272 log.info("Gave up leadership for {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700273 future.complete(null);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700274 publish(new LeadershipEvent(
275 LeadershipEvent.Type.LEADER_BOOTED,
276 new Leadership(path,
277 localNodeId,
278 leader.version(),
279 leader.creationTime())));
Jonathan Harte649c752015-03-03 18:04:25 -0800280 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800281 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700282 // else we are not the current leader, can still be a candidate.
283 Versioned<List<NodeId>> candidates = candidateMap.get(path);
284 List<NodeId> candidateList = candidates != null
285 ? Lists.newArrayList(candidates.value())
286 : Lists.newArrayList();
287 if (!candidateList.remove(localNodeId)) {
Madan Jampanide003d92015-05-11 17:14:20 -0700288 future.complete(null);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700289 return;
290 }
Ayaka Koshibe93451b42015-04-21 15:12:03 -0700291 if (candidateMap.replace(path, candidates.version(), candidateList)) {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700292 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700293 future.complete(null);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700294 publish(new LeadershipEvent(
295 LeadershipEvent.Type.CANDIDATES_CHANGED,
296 new Leadership(path,
297 newCandidates.value(),
298 newCandidates.version(),
299 newCandidates.creationTime())));
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700300 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700301 log.warn("Failed to withdraw from candidates list. Will retry");
Madan Jampanide003d92015-05-11 17:14:20 -0700302 retryWithdraw(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700303 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800304 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800305 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700306 retryWithdraw(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800307 }
308 }
309
310 @Override
Madan Jampani1af8e132015-04-30 16:41:18 -0700311 public boolean stepdown(String path) {
Madan Jampani9bd1f152015-04-30 23:33:35 -0700312 if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700313 return false;
314 }
315
316 try {
317 Versioned<NodeId> leader = leaderMap.get(path);
318 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
319 if (leaderMap.remove(path, leader.version())) {
Madan Jampanid46e18f2015-05-04 23:19:33 -0700320 log.info("Stepped down from leadership for {}", path);
321 publish(new LeadershipEvent(
322 LeadershipEvent.Type.LEADER_BOOTED,
323 new Leadership(path,
324 localNodeId,
325 leader.version(),
326 leader.creationTime())));
Madan Jampanide003d92015-05-11 17:14:20 -0700327 retryLock(path, new CompletableFuture<>());
Madan Jampani1af8e132015-04-30 16:41:18 -0700328 return true;
329 }
330 }
331 } catch (Exception e) {
332 log.warn("Error executing stepdown for {}", path, e);
333 }
334 return false;
335 }
336
337 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800338 public void addListener(LeadershipEventListener listener) {
339 listenerRegistry.addListener(listener);
340 }
341
342 @Override
343 public void removeListener(LeadershipEventListener listener) {
344 listenerRegistry.removeListener(listener);
345 }
346
Madan Jampani1af8e132015-04-30 16:41:18 -0700347 @Override
348 public boolean makeTopCandidate(String path, NodeId nodeId) {
349 Versioned<List<NodeId>> candidates = candidateMap.get(path);
350 if (candidates == null || !candidates.value().contains(nodeId)) {
351 return false;
352 }
Madan Jampanid46e18f2015-05-04 23:19:33 -0700353 List<NodeId> currentRoster = candidates.value();
354 if (nodeId.equals(currentRoster.get(LEADER_CANDIDATE_POS))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700355 return true;
356 }
Madan Jampani1af8e132015-04-30 16:41:18 -0700357 List<NodeId> newRoster = new ArrayList<>(currentRoster.size());
358 newRoster.add(nodeId);
359 currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
360 boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
361 if (updated) {
362 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700363 publish(new LeadershipEvent(
364 LeadershipEvent.Type.CANDIDATES_CHANGED,
365 new Leadership(path,
366 newCandidates.value(),
367 newCandidates.version(),
368 newCandidates.creationTime())));
Madan Jampani1af8e132015-04-30 16:41:18 -0700369 }
370 return updated;
371 }
372
Madan Jampanide003d92015-05-11 17:14:20 -0700373 private void tryLeaderLock(String path, CompletableFuture<Leadership> future) {
Madan Jampanid46e18f2015-05-04 23:19:33 -0700374 if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800375 return;
376 }
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700377 try {
378 Versioned<List<NodeId>> candidates = candidateMap.get(path);
379 if (candidates != null) {
Madan Jampanid46e18f2015-05-04 23:19:33 -0700380 List<NodeId> activeNodes = candidates.value()
381 .stream()
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700382 .filter(n -> clusterService.getState(n) == ACTIVE)
383 .collect(Collectors.toList());
384 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
Madan Jampanide003d92015-05-11 17:14:20 -0700385 leaderLockAttempt(path, candidates.value(), future);
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700386 } else {
Madan Jampanide003d92015-05-11 17:14:20 -0700387 retryLock(path, future);
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700388 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700389 } else {
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700390 throw new IllegalStateException("should not be here");
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700391 }
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700392 } catch (Exception e) {
393 log.debug("Failed to fetch candidate information for {}", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700394 retryLock(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700395 }
396 }
397
Madan Jampanide003d92015-05-11 17:14:20 -0700398 private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800399 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700400 Versioned<NodeId> currentLeader = leaderMap.get(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800401 if (currentLeader != null) {
402 if (localNodeId.equals(currentLeader.value())) {
403 log.info("Already has leadership for {}", path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700404 // FIXME: candidates can get out of sync.
Madan Jampanide003d92015-05-11 17:14:20 -0700405 Leadership leadership = new Leadership(path,
406 localNodeId,
407 currentLeader.version(),
408 currentLeader.creationTime());
409 future.complete(leadership);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700410 publish(new LeadershipEvent(
411 LeadershipEvent.Type.LEADER_ELECTED,
Madan Jampanide003d92015-05-11 17:14:20 -0700412 leadership));
Madan Jampanid14166a2015-02-24 17:37:51 -0800413 } else {
414 // someone else has leadership. will retry after sometime.
Madan Jampanide003d92015-05-11 17:14:20 -0700415 retryLock(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800416 }
417 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700418 if (leaderMap.putIfAbsent(path, localNodeId) == null) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800419 log.info("Assumed leadership for {}", path);
420 // do a get again to get the version (epoch)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700421 Versioned<NodeId> newLeader = leaderMap.get(path);
422 // FIXME: candidates can get out of sync
Madan Jampanide003d92015-05-11 17:14:20 -0700423 Leadership leadership = new Leadership(path,
424 newLeader.value(),
425 newLeader.version(),
426 newLeader.creationTime());
427 future.complete(leadership);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700428 publish(new LeadershipEvent(
429 LeadershipEvent.Type.LEADER_ELECTED,
Madan Jampanide003d92015-05-11 17:14:20 -0700430 leadership));
Madan Jampanid14166a2015-02-24 17:37:51 -0800431 } else {
432 // someone beat us to it.
Madan Jampanide003d92015-05-11 17:14:20 -0700433 retryLock(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800434 }
435 }
436 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800437 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700438 retryLock(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800439 }
440 }
441
Madan Jampanid46e18f2015-05-04 23:19:33 -0700442 private void publish(LeadershipEvent event) {
443 onLeadershipEvent(event);
444 clusterCommunicator.broadcast(event, LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::encode);
445 }
446
447 private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
448 log.trace("Leadership Event: time = {} type = {} event = {}",
449 leadershipEvent.time(), leadershipEvent.type(),
450 leadershipEvent);
451
452 Leadership leadershipUpdate = leadershipEvent.subject();
453 LeadershipEvent.Type eventType = leadershipEvent.type();
454 String topic = leadershipUpdate.topic();
455
456 AtomicBoolean updateAccepted = new AtomicBoolean(false);
457 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
458 leaderBoard.compute(topic, (k, currentLeadership) -> {
459 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
460 updateAccepted.set(true);
461 return leadershipUpdate;
462 }
463 return currentLeadership;
464 });
465 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
466 leaderBoard.compute(topic, (k, currentLeadership) -> {
467 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
468 updateAccepted.set(true);
469 return null;
470 }
471 return currentLeadership;
472 });
473 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
474 candidateBoard.compute(topic, (k, currentInfo) -> {
475 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
476 updateAccepted.set(true);
477 return leadershipUpdate;
478 }
479 return currentInfo;
480 });
481 } else {
482 throw new IllegalStateException("Unknown event type.");
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700483 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700484
Madan Jampanid46e18f2015-05-04 23:19:33 -0700485 if (updateAccepted.get()) {
486 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800487 }
488 }
489
Madan Jampanide003d92015-05-11 17:14:20 -0700490 private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700491 retryLeaderLockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700492 () -> doRunForLeadership(path, future),
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700493 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
494 TimeUnit.SECONDS);
495 }
496
Madan Jampanide003d92015-05-11 17:14:20 -0700497 private void retryLock(String path, CompletableFuture<Leadership> future) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800498 retryLeaderLockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700499 () -> tryLeaderLock(path, future),
Madan Jampanid14166a2015-02-24 17:37:51 -0800500 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
501 TimeUnit.SECONDS);
502 }
503
Madan Jampanide003d92015-05-11 17:14:20 -0700504 private void retryWithdraw(String path, CompletableFuture<Void> future) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700505 retryLeaderLockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700506 () -> doWithdraw(path, future),
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700507 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
508 TimeUnit.SECONDS);
509 }
510
Madan Jampanid14166a2015-02-24 17:37:51 -0800511 private void purgeStaleLocks() {
512 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700513 leaderMap.entrySet()
514 .stream()
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -0700515 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
516 .filter(e -> activeTopics.contains(e.getKey()))
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700517 .forEach(entry -> {
518 String path = entry.getKey();
519 NodeId nodeId = entry.getValue().value();
520 long epoch = entry.getValue().version();
521 long creationTime = entry.getValue().creationTime();
Madan Jampanid14166a2015-02-24 17:37:51 -0800522 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700523 if (leaderMap.remove(path, epoch)) {
Madan Jampania14047d2015-02-25 12:23:02 -0800524 log.info("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700525 publish(new LeadershipEvent(
526 LeadershipEvent.Type.LEADER_BOOTED,
527 new Leadership(path, nodeId, epoch, creationTime)));
Madan Jampanid14166a2015-02-24 17:37:51 -0800528 }
529 } catch (Exception e) {
530 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
531 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700532 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800533 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800534 log.debug("Failed cleaning up stale locks", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800535 }
536 }
537
538 private void sendLeadershipStatus() {
Madan Jampania14047d2015-02-25 12:23:02 -0800539 try {
540 leaderBoard.forEach((path, leadership) -> {
541 if (leadership.leader().equals(localNodeId)) {
542 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700543 clusterCommunicator.broadcast(event,
544 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
545 SERIALIZER::encode);
Madan Jampania14047d2015-02-25 12:23:02 -0800546 }
547 });
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700548 candidateBoard.forEach((path, leadership) -> {
549 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
550 clusterCommunicator.broadcast(event,
551 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
552 SERIALIZER::encode);
553 });
Madan Jampania14047d2015-02-25 12:23:02 -0800554 } catch (Exception e) {
555 log.debug("Failed to send leadership updates", e);
556 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800557 }
Madan Jampania14047d2015-02-25 12:23:02 -0800558}