blob: fd8afb73c6b45d17f6bfdeb00d0d9fcadd3c4654 [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07003import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -08004import com.google.common.collect.ImmutableMap;
Madan Jampani3650fc42015-05-27 17:01:50 -07005import com.google.common.collect.Iterables;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07006import com.google.common.collect.Lists;
Jonathan Harte649c752015-03-03 18:04:25 -08007import com.google.common.collect.Maps;
8import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07009
Madan Jampanid14166a2015-02-24 17:37:51 -080010import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
16import org.onlab.util.KryoNamespace;
Madan Jampanicc586752015-05-18 16:19:27 -070017import org.onosproject.cluster.ClusterEvent;
18import org.onosproject.cluster.ClusterEvent.Type;
19import org.onosproject.cluster.ClusterEventListener;
Madan Jampanid14166a2015-02-24 17:37:51 -080020import org.onosproject.cluster.ClusterService;
Madan Jampanid14166a2015-02-24 17:37:51 -080021import org.onosproject.cluster.Leadership;
22import org.onosproject.cluster.LeadershipEvent;
23import org.onosproject.cluster.LeadershipEventListener;
24import org.onosproject.cluster.LeadershipService;
25import org.onosproject.cluster.NodeId;
Simon Huntff663742015-05-14 13:33:05 -070026import org.onosproject.event.ListenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080027import org.onosproject.event.EventDeliveryService;
28import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampanid14166a2015-02-24 17:37:51 -080029import org.onosproject.store.cluster.messaging.MessageSubject;
30import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080031import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070032import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid14166a2015-02-24 17:37:51 -080033import org.onosproject.store.service.Serializer;
34import org.onosproject.store.service.StorageService;
35import org.onosproject.store.service.Versioned;
36import org.slf4j.Logger;
37
Madan Jampani1af8e132015-04-30 16:41:18 -070038import java.util.ArrayList;
Madan Jampanicc586752015-05-18 16:19:27 -070039import java.util.Collections;
Jonathan Harte649c752015-03-03 18:04:25 -080040import java.util.Map;
41import java.util.Map.Entry;
42import java.util.Objects;
43import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070044import java.util.List;
Madan Jampanide003d92015-05-11 17:14:20 -070045import java.util.concurrent.CancellationException;
46import java.util.concurrent.CompletableFuture;
Jonathan Harte649c752015-03-03 18:04:25 -080047import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49import java.util.concurrent.ScheduledExecutorService;
50import java.util.concurrent.TimeUnit;
Madan Jampanid46e18f2015-05-04 23:19:33 -070051import java.util.concurrent.atomic.AtomicBoolean;
Jonathan Harte649c752015-03-03 18:04:25 -080052import java.util.stream.Collectors;
53
54import static com.google.common.base.Preconditions.checkArgument;
55import static org.onlab.util.Tools.groupedThreads;
56import static org.slf4j.LoggerFactory.getLogger;
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -070057import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
58import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
59
Madan Jampanid14166a2015-02-24 17:37:51 -080060/**
61 * Distributed Lock Manager implemented on top of ConsistentMap.
62 * <p>
Madan Jampanicc586752015-05-18 16:19:27 -070063 * This implementation makes use of ClusterService's failure
Madan Jampanid14166a2015-02-24 17:37:51 -080064 * detection capabilities to detect and purge stale locks.
65 * TODO: Ensure lock safety and liveness.
66 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080067@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080068@Service
69public class DistributedLeadershipManager implements LeadershipService {
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected StorageService storageService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected ClusterService clusterService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected ClusterCommunicationService clusterCommunicator;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected EventDeliveryService eventDispatcher;
82
83 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
84 new MessageSubject("distributed-leadership-manager-events");
85
86 private final Logger log = getLogger(getClass());
87 private ExecutorService messageHandlingExecutor;
Madan Jampani7d243db2015-05-27 13:16:02 -070088 private ScheduledExecutorService electionRunner;
89 private ScheduledExecutorService lockExecutor;
Madan Jampanicc586752015-05-18 16:19:27 -070090 private ScheduledExecutorService staleLeadershipPurgeExecutor;
Madan Jampanid14166a2015-02-24 17:37:51 -080091 private ScheduledExecutorService leadershipStatusBroadcaster;
92
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070093 private ConsistentMap<String, NodeId> leaderMap;
94 private ConsistentMap<String, List<NodeId>> candidateMap;
95
Madan Jampanicc586752015-05-18 16:19:27 -070096 private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080097 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
Ayaka Koshibefd26a302015-04-13 13:59:54 -070098 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
Madan Jampanicc586752015-05-18 16:19:27 -070099 private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
Madan Jampanid14166a2015-02-24 17:37:51 -0800100
Madan Jampanicc586752015-05-18 16:19:27 -0700101 private NodeId localNodeId;
Madan Jampanid14166a2015-02-24 17:37:51 -0800102 private Set<String> activeTopics = Sets.newConcurrentHashSet();
Madan Jampani7d243db2015-05-27 13:16:02 -0700103 private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
Madan Jampanid14166a2015-02-24 17:37:51 -0800104
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700105 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -0800106 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
Madan Jampania14047d2015-02-25 12:23:02 -0800107 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
Madan Jampanicc586752015-05-18 16:19:27 -0700108 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700109
Madan Jampanicc586752015-05-18 16:19:27 -0700110 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
111
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700112 private static final Serializer SERIALIZER = Serializer.using(
113 new KryoNamespace.Builder().register(KryoNamespaces.API).build());
Madan Jampanid14166a2015-02-24 17:37:51 -0800114
115 @Activate
116 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700117 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
118 .withName("onos-topic-leaders")
119 .withSerializer(SERIALIZER)
120 .withPartitionsDisabled().build();
121 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
122 .withName("onos-topic-candidates")
123 .withSerializer(SERIALIZER)
124 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800125
126 localNodeId = clusterService.getLocalNode().id();
127
128 messageHandlingExecutor = Executors.newSingleThreadExecutor(
129 groupedThreads("onos/store/leadership", "message-handler"));
Madan Jampani7d243db2015-05-27 13:16:02 -0700130 electionRunner = Executors.newSingleThreadScheduledExecutor(
131 groupedThreads("onos/store/leadership", "election-runner"));
132 lockExecutor = Executors.newScheduledThreadPool(
Madan Jampanid14166a2015-02-24 17:37:51 -0800133 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
Madan Jampanicc586752015-05-18 16:19:27 -0700134 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
135 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
Madan Jampanid14166a2015-02-24 17:37:51 -0800136 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
137 groupedThreads("onos/store/leadership", "peer-updater"));
138 clusterCommunicator.addSubscriber(
139 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
Madan Jampanid46e18f2015-05-04 23:19:33 -0700140 SERIALIZER::decode,
141 this::onLeadershipEvent,
Madan Jampanid14166a2015-02-24 17:37:51 -0800142 messageHandlingExecutor);
143
Madan Jampanicc586752015-05-18 16:19:27 -0700144 clusterService.addListener(clusterEventListener);
145
Madan Jampani7d243db2015-05-27 13:16:02 -0700146 electionRunner.scheduleWithFixedDelay(
147 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
148
Madan Jampanid14166a2015-02-24 17:37:51 -0800149 leadershipStatusBroadcaster.scheduleWithFixedDelay(
Madan Jampania14047d2015-02-25 12:23:02 -0800150 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800151
Simon Huntff663742015-05-14 13:33:05 -0700152 listenerRegistry = new ListenerRegistry<>();
Madan Jampanid14166a2015-02-24 17:37:51 -0800153 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
154
Madan Jampanid46e18f2015-05-04 23:19:33 -0700155 log.info("Started");
Madan Jampanid14166a2015-02-24 17:37:51 -0800156 }
157
158 @Deactivate
159 public void deactivate() {
160 leaderBoard.forEach((topic, leadership) -> {
161 if (localNodeId.equals(leadership.leader())) {
162 withdraw(topic);
163 }
164 });
165
Madan Jampanicc586752015-05-18 16:19:27 -0700166 clusterService.removeListener(clusterEventListener);
Madan Jampanid14166a2015-02-24 17:37:51 -0800167 eventDispatcher.removeSink(LeadershipEvent.class);
168 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
169
Madan Jampani7d243db2015-05-27 13:16:02 -0700170 electionRunner.shutdown();
Madan Jampanid14166a2015-02-24 17:37:51 -0800171 messageHandlingExecutor.shutdown();
Madan Jampani7d243db2015-05-27 13:16:02 -0700172 lockExecutor.shutdown();
Madan Jampanicc586752015-05-18 16:19:27 -0700173 staleLeadershipPurgeExecutor.shutdown();
Madan Jampanid14166a2015-02-24 17:37:51 -0800174 leadershipStatusBroadcaster.shutdown();
175
Madan Jampanid46e18f2015-05-04 23:19:33 -0700176 log.info("Stopped");
Madan Jampanid14166a2015-02-24 17:37:51 -0800177 }
178
179 @Override
180 public Map<String, Leadership> getLeaderBoard() {
181 return ImmutableMap.copyOf(leaderBoard);
182 }
183
184 @Override
Madan Jampanifd45d5e2015-04-20 13:33:21 -0700185 public Map<String, List<NodeId>> getCandidates() {
186 return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700187 }
188
189 @Override
190 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700191 Leadership current = candidateBoard.get(path);
192 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700193 }
194
195 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800196 public NodeId getLeader(String path) {
197 Leadership leadership = leaderBoard.get(path);
198 return leadership != null ? leadership.leader() : null;
199 }
200
201 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800202 public Leadership getLeadership(String path) {
203 checkArgument(path != null);
204 return leaderBoard.get(path);
205 }
206
207 @Override
208 public Set<String> ownedTopics(NodeId nodeId) {
209 checkArgument(nodeId != null);
210 return leaderBoard.entrySet()
211 .stream()
212 .filter(entry -> nodeId.equals(entry.getValue().leader()))
213 .map(Entry::getKey)
214 .collect(Collectors.toSet());
215 }
216
217 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700218 public CompletableFuture<Leadership> runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800219 log.debug("Running for leadership for topic: {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700220 CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
221 doRunForLeadership(path, resultFuture);
222 return resultFuture;
223 }
224
225 private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700226 try {
Madan Jampani346d4f52015-05-04 11:09:39 -0700227 Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
228 currentList -> currentList == null || !currentList.contains(localNodeId),
229 (topic, currentList) -> {
230 if (currentList == null) {
231 return ImmutableList.of(localNodeId);
232 } else {
233 List<NodeId> newList = Lists.newLinkedList();
234 newList.addAll(currentList);
235 newList.add(localNodeId);
236 return newList;
237 }
238 });
239 publish(new LeadershipEvent(
240 LeadershipEvent.Type.CANDIDATES_CHANGED,
241 new Leadership(path,
242 candidates.value(),
243 candidates.version(),
244 candidates.creationTime())));
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700245 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
246 activeTopics.add(path);
Madan Jampani7d243db2015-05-27 13:16:02 -0700247 Leadership leadership = electLeader(path, candidates.value());
248 if (leadership == null) {
249 pendingFutures.put(path, future);
250 } else {
251 future.complete(leadership);
252 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700253 } catch (ConsistentMapException e) {
254 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700255 rerunForLeadership(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700256 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800257 }
258
259 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700260 public CompletableFuture<Void> withdraw(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800261 activeTopics.remove(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700262 CompletableFuture<Void> resultFuture = new CompletableFuture<>();
263 doWithdraw(path, resultFuture);
264 return resultFuture;
265 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700266
Madan Jampanide003d92015-05-11 17:14:20 -0700267
268 private void doWithdraw(String path, CompletableFuture<Void> future) {
269 if (activeTopics.contains(path)) {
270 future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
271 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800272 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700273 Versioned<NodeId> leader = leaderMap.get(path);
274 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
275 if (leaderMap.remove(path, leader.version())) {
Madan Jampanieedfe542015-05-26 09:54:09 -0700276 log.debug("Gave up leadership for {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700277 future.complete(null);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700278 publish(new LeadershipEvent(
279 LeadershipEvent.Type.LEADER_BOOTED,
280 new Leadership(path,
281 localNodeId,
282 leader.version(),
283 leader.creationTime())));
Jonathan Harte649c752015-03-03 18:04:25 -0800284 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800285 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700286 // else we are not the current leader, can still be a candidate.
287 Versioned<List<NodeId>> candidates = candidateMap.get(path);
288 List<NodeId> candidateList = candidates != null
289 ? Lists.newArrayList(candidates.value())
290 : Lists.newArrayList();
291 if (!candidateList.remove(localNodeId)) {
Madan Jampanide003d92015-05-11 17:14:20 -0700292 future.complete(null);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700293 return;
294 }
Ayaka Koshibe93451b42015-04-21 15:12:03 -0700295 if (candidateMap.replace(path, candidates.version(), candidateList)) {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700296 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700297 future.complete(null);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700298 publish(new LeadershipEvent(
299 LeadershipEvent.Type.CANDIDATES_CHANGED,
300 new Leadership(path,
301 newCandidates.value(),
302 newCandidates.version(),
303 newCandidates.creationTime())));
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700304 } else {
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700305 log.warn("Failed to withdraw from candidates list for {}. Will retry", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700306 retryWithdraw(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700307 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800308 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800309 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700310 retryWithdraw(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800311 }
312 }
313
314 @Override
Madan Jampani1af8e132015-04-30 16:41:18 -0700315 public boolean stepdown(String path) {
Madan Jampani9bd1f152015-04-30 23:33:35 -0700316 if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700317 return false;
318 }
319
320 try {
321 Versioned<NodeId> leader = leaderMap.get(path);
322 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
323 if (leaderMap.remove(path, leader.version())) {
Madan Jampanieedfe542015-05-26 09:54:09 -0700324 log.debug("Stepped down from leadership for {}", path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700325 publish(new LeadershipEvent(
326 LeadershipEvent.Type.LEADER_BOOTED,
327 new Leadership(path,
328 localNodeId,
329 leader.version(),
330 leader.creationTime())));
Madan Jampani1af8e132015-04-30 16:41:18 -0700331 return true;
332 }
333 }
334 } catch (Exception e) {
335 log.warn("Error executing stepdown for {}", path, e);
336 }
337 return false;
338 }
339
340 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800341 public void addListener(LeadershipEventListener listener) {
342 listenerRegistry.addListener(listener);
343 }
344
345 @Override
346 public void removeListener(LeadershipEventListener listener) {
347 listenerRegistry.removeListener(listener);
348 }
349
Madan Jampani1af8e132015-04-30 16:41:18 -0700350 @Override
351 public boolean makeTopCandidate(String path, NodeId nodeId) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700352 Versioned<List<NodeId>> newCandidates = candidateMap.computeIf(path,
Madan Jampani3650fc42015-05-27 17:01:50 -0700353 candidates -> candidates != null &&
354 candidates.contains(nodeId) &&
355 !nodeId.equals(Iterables.getFirst(candidates, null)),
Madan Jampani346d4f52015-05-04 11:09:39 -0700356 (topic, candidates) -> {
357 List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
358 updatedCandidates.add(nodeId);
359 candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
360 return updatedCandidates;
361 });
362 publish(new LeadershipEvent(
Madan Jampanid46e18f2015-05-04 23:19:33 -0700363 LeadershipEvent.Type.CANDIDATES_CHANGED,
364 new Leadership(path,
365 newCandidates.value(),
366 newCandidates.version(),
367 newCandidates.creationTime())));
Madan Jampani346d4f52015-05-04 11:09:39 -0700368 return true;
Madan Jampani1af8e132015-04-30 16:41:18 -0700369 }
370
Madan Jampani7d243db2015-05-27 13:16:02 -0700371 private Leadership electLeader(String path, List<NodeId> candidates) {
372 Leadership currentLeadership = getLeadership(path);
373 if (currentLeadership != null) {
374 return currentLeadership;
375 } else {
376 NodeId topCandidate = candidates
377 .stream()
378 .filter(n -> clusterService.getState(n) == ACTIVE)
379 .findFirst()
380 .orElse(null);
381 try {
382 Versioned<NodeId> leader = localNodeId.equals(topCandidate)
383 ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
384 if (leader != null) {
385 Leadership newLeadership = new Leadership(path,
386 leader.value(),
387 leader.version(),
388 leader.creationTime());
389 publish(new LeadershipEvent(
390 LeadershipEvent.Type.LEADER_ELECTED,
391 newLeadership));
392 return newLeadership;
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700393 }
Madan Jampani7d243db2015-05-27 13:16:02 -0700394 } catch (Exception e) {
395 log.debug("Failed to elect leader for {}", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700396 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700397 }
Madan Jampani7d243db2015-05-27 13:16:02 -0700398 return null;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700399 }
400
Madan Jampani7d243db2015-05-27 13:16:02 -0700401 private void electLeaders() {
Madan Jampanid14166a2015-02-24 17:37:51 -0800402 try {
Madan Jampani7d243db2015-05-27 13:16:02 -0700403 candidateMap.entrySet().forEach(entry -> {
404 String path = entry.getKey();
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700405 Versioned<List<NodeId>> candidates = entry.getValue();
406 // for active topics, check if this node can become a leader (if it isn't already)
Madan Jampani7d243db2015-05-27 13:16:02 -0700407 if (activeTopics.contains(path)) {
408 lockExecutor.submit(() -> {
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700409 Leadership leadership = electLeader(path, candidates.value());
Madan Jampani7d243db2015-05-27 13:16:02 -0700410 if (leadership != null) {
411 CompletableFuture<Leadership> future = pendingFutures.remove(path);
412 if (future != null) {
413 future.complete(leadership);
414 }
415 }
416 });
417 }
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700418 // Raise a CANDIDATES_CHANGED event to force refresh local candidate board
419 // and also to update local listeners.
420 // Don't worry about duplicate events as they will be suppressed.
421 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
422 new Leadership(path,
423 candidates.value(),
424 candidates.version(),
425 candidates.creationTime())));
Madan Jampani7d243db2015-05-27 13:16:02 -0700426 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800427 } catch (Exception e) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700428 log.debug("Failure electing leaders", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800429 }
430 }
431
Madan Jampanid46e18f2015-05-04 23:19:33 -0700432 private void publish(LeadershipEvent event) {
433 onLeadershipEvent(event);
434 clusterCommunicator.broadcast(event, LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::encode);
435 }
436
437 private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
438 log.trace("Leadership Event: time = {} type = {} event = {}",
439 leadershipEvent.time(), leadershipEvent.type(),
440 leadershipEvent);
441
442 Leadership leadershipUpdate = leadershipEvent.subject();
443 LeadershipEvent.Type eventType = leadershipEvent.type();
444 String topic = leadershipUpdate.topic();
445
446 AtomicBoolean updateAccepted = new AtomicBoolean(false);
447 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
448 leaderBoard.compute(topic, (k, currentLeadership) -> {
449 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
450 updateAccepted.set(true);
451 return leadershipUpdate;
452 }
453 return currentLeadership;
454 });
455 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
456 leaderBoard.compute(topic, (k, currentLeadership) -> {
457 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
458 updateAccepted.set(true);
459 return null;
460 }
461 return currentLeadership;
462 });
463 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
464 candidateBoard.compute(topic, (k, currentInfo) -> {
465 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
466 updateAccepted.set(true);
467 return leadershipUpdate;
468 }
469 return currentInfo;
470 });
471 } else {
472 throw new IllegalStateException("Unknown event type.");
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700473 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700474
Madan Jampanid46e18f2015-05-04 23:19:33 -0700475 if (updateAccepted.get()) {
476 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800477 }
478 }
479
Madan Jampanide003d92015-05-11 17:14:20 -0700480 private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700481 lockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700482 () -> doRunForLeadership(path, future),
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700483 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
484 TimeUnit.SECONDS);
485 }
486
Madan Jampanide003d92015-05-11 17:14:20 -0700487 private void retryWithdraw(String path, CompletableFuture<Void> future) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700488 lockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700489 () -> doWithdraw(path, future),
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700490 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
491 TimeUnit.SECONDS);
492 }
493
Madan Jampanicc586752015-05-18 16:19:27 -0700494 private void scheduleStaleLeadershipPurge(int afterDelaySec) {
495 if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
496 staleLeadershipPurgeExecutor.schedule(
497 this::purgeStaleLeadership,
498 afterDelaySec,
499 TimeUnit.SECONDS);
500 }
501 }
502
503 /**
504 * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
505 */
506 private void purgeStaleLeadership() {
507 AtomicBoolean rerunPurge = new AtomicBoolean(false);
Madan Jampanid14166a2015-02-24 17:37:51 -0800508 try {
Madan Jampanicc586752015-05-18 16:19:27 -0700509 staleLeadershipPurgeScheduled.set(false);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700510 leaderMap.entrySet()
511 .stream()
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -0700512 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700513 .forEach(entry -> {
514 String path = entry.getKey();
515 NodeId nodeId = entry.getValue().value();
516 long epoch = entry.getValue().version();
517 long creationTime = entry.getValue().creationTime();
Madan Jampanid14166a2015-02-24 17:37:51 -0800518 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700519 if (leaderMap.remove(path, epoch)) {
Madan Jampanieedfe542015-05-26 09:54:09 -0700520 log.debug("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700521 publish(new LeadershipEvent(
522 LeadershipEvent.Type.LEADER_BOOTED,
523 new Leadership(path, nodeId, epoch, creationTime)));
Madan Jampanid14166a2015-02-24 17:37:51 -0800524 }
525 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700526 log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
Madan Jampanicc586752015-05-18 16:19:27 -0700527 rerunPurge.set(true);
528 }
529 });
530
531 candidateMap.entrySet()
532 .forEach(entry -> {
533 String path = entry.getKey();
534 Versioned<List<NodeId>> candidates = entry.getValue();
535 List<NodeId> candidatesList = candidates != null
536 ? candidates.value() : Collections.emptyList();
537 List<NodeId> activeCandidatesList =
538 candidatesList.stream()
539 .filter(n -> clusterService.getState(n) == ACTIVE)
540 .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
541 .collect(Collectors.toList());
542 if (activeCandidatesList.size() < candidatesList.size()) {
543 Set<NodeId> removedCandidates =
544 Sets.difference(Sets.newHashSet(candidatesList),
545 Sets.newHashSet(activeCandidatesList));
546 try {
547 if (candidateMap.replace(path, entry.getValue().version(), activeCandidatesList)) {
548 log.info("Evicted inactive candidates {} from "
549 + "candidate list for {}", removedCandidates, path);
550 Versioned<List<NodeId>> updatedCandidates = candidateMap.get(path);
551 publish(new LeadershipEvent(
552 LeadershipEvent.Type.CANDIDATES_CHANGED,
553 new Leadership(path,
554 updatedCandidates.value(),
555 updatedCandidates.version(),
556 updatedCandidates.creationTime())));
Madan Jampani3d1727c2015-05-22 11:46:29 -0700557 } else {
558 // Conflicting update detected. Rerun purge to make sure
559 // inactive candidates are evicted.
560 rerunPurge.set(true);
Madan Jampanicc586752015-05-18 16:19:27 -0700561 }
562 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700563 log.debug("Failed to evict inactive candidates {} from "
Madan Jampanicc586752015-05-18 16:19:27 -0700564 + "candidate list for {}", removedCandidates, path, e);
565 rerunPurge.set(true);
566 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800567 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700568 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800569 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700570 log.debug("Failure purging state leadership.", e);
Madan Jampanicc586752015-05-18 16:19:27 -0700571 rerunPurge.set(true);
572 }
573
574 if (rerunPurge.get()) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700575 log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
Madan Jampanicc586752015-05-18 16:19:27 -0700576 scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
Madan Jampanid14166a2015-02-24 17:37:51 -0800577 }
578 }
579
580 private void sendLeadershipStatus() {
Madan Jampania14047d2015-02-25 12:23:02 -0800581 try {
582 leaderBoard.forEach((path, leadership) -> {
583 if (leadership.leader().equals(localNodeId)) {
584 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700585 clusterCommunicator.broadcast(event,
586 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
587 SERIALIZER::encode);
Madan Jampania14047d2015-02-25 12:23:02 -0800588 }
589 });
590 } catch (Exception e) {
591 log.debug("Failed to send leadership updates", e);
592 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800593 }
Madan Jampanicc586752015-05-18 16:19:27 -0700594
595 private class InternalClusterEventListener implements ClusterEventListener {
596
597 @Override
598 public void event(ClusterEvent event) {
599 if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
600 scheduleStaleLeadershipPurge(0);
601 }
602 }
603 }
Madan Jampania14047d2015-02-25 12:23:02 -0800604}