blob: fa9d7cb0d678b4959f6ca749c457192ebbab9c07 [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07003import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -08004import com.google.common.collect.ImmutableMap;
Madan Jampani3650fc42015-05-27 17:01:50 -07005import com.google.common.collect.Iterables;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07006import com.google.common.collect.Lists;
Jonathan Harte649c752015-03-03 18:04:25 -08007import com.google.common.collect.Maps;
8import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07009
Madan Jampanid14166a2015-02-24 17:37:51 -080010import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
16import org.onlab.util.KryoNamespace;
Madan Jampanicc586752015-05-18 16:19:27 -070017import org.onosproject.cluster.ClusterEvent;
18import org.onosproject.cluster.ClusterEvent.Type;
19import org.onosproject.cluster.ClusterEventListener;
Madan Jampanid14166a2015-02-24 17:37:51 -080020import org.onosproject.cluster.ClusterService;
Madan Jampanid14166a2015-02-24 17:37:51 -080021import org.onosproject.cluster.Leadership;
22import org.onosproject.cluster.LeadershipEvent;
23import org.onosproject.cluster.LeadershipEventListener;
24import org.onosproject.cluster.LeadershipService;
25import org.onosproject.cluster.NodeId;
Simon Huntff663742015-05-14 13:33:05 -070026import org.onosproject.event.ListenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080027import org.onosproject.event.EventDeliveryService;
28import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampanid14166a2015-02-24 17:37:51 -080029import org.onosproject.store.cluster.messaging.MessageSubject;
30import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080031import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070032import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid14166a2015-02-24 17:37:51 -080033import org.onosproject.store.service.Serializer;
34import org.onosproject.store.service.StorageService;
35import org.onosproject.store.service.Versioned;
36import org.slf4j.Logger;
37
Madan Jampani1af8e132015-04-30 16:41:18 -070038import java.util.ArrayList;
Madan Jampanicc586752015-05-18 16:19:27 -070039import java.util.Collections;
Jonathan Harte649c752015-03-03 18:04:25 -080040import java.util.Map;
41import java.util.Map.Entry;
42import java.util.Objects;
43import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070044import java.util.List;
Madan Jampanide003d92015-05-11 17:14:20 -070045import java.util.concurrent.CancellationException;
46import java.util.concurrent.CompletableFuture;
Jonathan Harte649c752015-03-03 18:04:25 -080047import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49import java.util.concurrent.ScheduledExecutorService;
50import java.util.concurrent.TimeUnit;
Madan Jampanid46e18f2015-05-04 23:19:33 -070051import java.util.concurrent.atomic.AtomicBoolean;
Jonathan Harte649c752015-03-03 18:04:25 -080052import java.util.stream.Collectors;
53
54import static com.google.common.base.Preconditions.checkArgument;
55import static org.onlab.util.Tools.groupedThreads;
56import static org.slf4j.LoggerFactory.getLogger;
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -070057import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
58import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
59
Madan Jampanid14166a2015-02-24 17:37:51 -080060/**
61 * Distributed Lock Manager implemented on top of ConsistentMap.
62 * <p>
Madan Jampanicc586752015-05-18 16:19:27 -070063 * This implementation makes use of ClusterService's failure
Madan Jampanid14166a2015-02-24 17:37:51 -080064 * detection capabilities to detect and purge stale locks.
65 * TODO: Ensure lock safety and liveness.
66 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080067@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080068@Service
69public class DistributedLeadershipManager implements LeadershipService {
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected StorageService storageService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected ClusterService clusterService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected ClusterCommunicationService clusterCommunicator;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected EventDeliveryService eventDispatcher;
82
83 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
84 new MessageSubject("distributed-leadership-manager-events");
85
86 private final Logger log = getLogger(getClass());
87 private ExecutorService messageHandlingExecutor;
Madan Jampani7d243db2015-05-27 13:16:02 -070088 private ScheduledExecutorService electionRunner;
89 private ScheduledExecutorService lockExecutor;
Madan Jampanicc586752015-05-18 16:19:27 -070090 private ScheduledExecutorService staleLeadershipPurgeExecutor;
Madan Jampanid14166a2015-02-24 17:37:51 -080091 private ScheduledExecutorService leadershipStatusBroadcaster;
92
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070093 private ConsistentMap<String, NodeId> leaderMap;
94 private ConsistentMap<String, List<NodeId>> candidateMap;
95
Madan Jampanicc586752015-05-18 16:19:27 -070096 private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080097 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
Ayaka Koshibefd26a302015-04-13 13:59:54 -070098 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
Madan Jampanicc586752015-05-18 16:19:27 -070099 private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
Madan Jampanid14166a2015-02-24 17:37:51 -0800100
Madan Jampanicc586752015-05-18 16:19:27 -0700101 private NodeId localNodeId;
Madan Jampanid14166a2015-02-24 17:37:51 -0800102 private Set<String> activeTopics = Sets.newConcurrentHashSet();
Madan Jampani7d243db2015-05-27 13:16:02 -0700103 private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
Madan Jampanid14166a2015-02-24 17:37:51 -0800104
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700105 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -0800106 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
Madan Jampania14047d2015-02-25 12:23:02 -0800107 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
Madan Jampanicc586752015-05-18 16:19:27 -0700108 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700109 private static final int LEADER_CANDIDATE_POS = 0;
110
Madan Jampanicc586752015-05-18 16:19:27 -0700111 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
112
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700113 private static final Serializer SERIALIZER = Serializer.using(
114 new KryoNamespace.Builder().register(KryoNamespaces.API).build());
Madan Jampanid14166a2015-02-24 17:37:51 -0800115
116 @Activate
117 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700118 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
119 .withName("onos-topic-leaders")
120 .withSerializer(SERIALIZER)
121 .withPartitionsDisabled().build();
122 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
123 .withName("onos-topic-candidates")
124 .withSerializer(SERIALIZER)
125 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800126
127 localNodeId = clusterService.getLocalNode().id();
128
129 messageHandlingExecutor = Executors.newSingleThreadExecutor(
130 groupedThreads("onos/store/leadership", "message-handler"));
Madan Jampani7d243db2015-05-27 13:16:02 -0700131 electionRunner = Executors.newSingleThreadScheduledExecutor(
132 groupedThreads("onos/store/leadership", "election-runner"));
133 lockExecutor = Executors.newScheduledThreadPool(
Madan Jampanid14166a2015-02-24 17:37:51 -0800134 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
Madan Jampanicc586752015-05-18 16:19:27 -0700135 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
136 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
Madan Jampanid14166a2015-02-24 17:37:51 -0800137 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
138 groupedThreads("onos/store/leadership", "peer-updater"));
139 clusterCommunicator.addSubscriber(
140 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
Madan Jampanid46e18f2015-05-04 23:19:33 -0700141 SERIALIZER::decode,
142 this::onLeadershipEvent,
Madan Jampanid14166a2015-02-24 17:37:51 -0800143 messageHandlingExecutor);
144
Madan Jampanicc586752015-05-18 16:19:27 -0700145 clusterService.addListener(clusterEventListener);
146
Madan Jampani7d243db2015-05-27 13:16:02 -0700147 electionRunner.scheduleWithFixedDelay(
148 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
149
Madan Jampanid14166a2015-02-24 17:37:51 -0800150 leadershipStatusBroadcaster.scheduleWithFixedDelay(
Madan Jampania14047d2015-02-25 12:23:02 -0800151 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800152
Simon Huntff663742015-05-14 13:33:05 -0700153 listenerRegistry = new ListenerRegistry<>();
Madan Jampanid14166a2015-02-24 17:37:51 -0800154 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
155
Madan Jampanid46e18f2015-05-04 23:19:33 -0700156 log.info("Started");
Madan Jampanid14166a2015-02-24 17:37:51 -0800157 }
158
159 @Deactivate
160 public void deactivate() {
161 leaderBoard.forEach((topic, leadership) -> {
162 if (localNodeId.equals(leadership.leader())) {
163 withdraw(topic);
164 }
165 });
166
Madan Jampanicc586752015-05-18 16:19:27 -0700167 clusterService.removeListener(clusterEventListener);
Madan Jampanid14166a2015-02-24 17:37:51 -0800168 eventDispatcher.removeSink(LeadershipEvent.class);
169 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
170
Madan Jampani7d243db2015-05-27 13:16:02 -0700171 electionRunner.shutdown();
Madan Jampanid14166a2015-02-24 17:37:51 -0800172 messageHandlingExecutor.shutdown();
Madan Jampani7d243db2015-05-27 13:16:02 -0700173 lockExecutor.shutdown();
Madan Jampanicc586752015-05-18 16:19:27 -0700174 staleLeadershipPurgeExecutor.shutdown();
Madan Jampanid14166a2015-02-24 17:37:51 -0800175 leadershipStatusBroadcaster.shutdown();
176
Madan Jampanid46e18f2015-05-04 23:19:33 -0700177 log.info("Stopped");
Madan Jampanid14166a2015-02-24 17:37:51 -0800178 }
179
180 @Override
181 public Map<String, Leadership> getLeaderBoard() {
182 return ImmutableMap.copyOf(leaderBoard);
183 }
184
185 @Override
Madan Jampanifd45d5e2015-04-20 13:33:21 -0700186 public Map<String, List<NodeId>> getCandidates() {
187 return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700188 }
189
190 @Override
191 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700192 Leadership current = candidateBoard.get(path);
193 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700194 }
195
196 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800197 public NodeId getLeader(String path) {
198 Leadership leadership = leaderBoard.get(path);
199 return leadership != null ? leadership.leader() : null;
200 }
201
202 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800203 public Leadership getLeadership(String path) {
204 checkArgument(path != null);
205 return leaderBoard.get(path);
206 }
207
208 @Override
209 public Set<String> ownedTopics(NodeId nodeId) {
210 checkArgument(nodeId != null);
211 return leaderBoard.entrySet()
212 .stream()
213 .filter(entry -> nodeId.equals(entry.getValue().leader()))
214 .map(Entry::getKey)
215 .collect(Collectors.toSet());
216 }
217
218 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700219 public CompletableFuture<Leadership> runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800220 log.debug("Running for leadership for topic: {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700221 CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
222 doRunForLeadership(path, resultFuture);
223 return resultFuture;
224 }
225
226 private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700227 try {
Madan Jampani346d4f52015-05-04 11:09:39 -0700228 Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
229 currentList -> currentList == null || !currentList.contains(localNodeId),
230 (topic, currentList) -> {
231 if (currentList == null) {
232 return ImmutableList.of(localNodeId);
233 } else {
234 List<NodeId> newList = Lists.newLinkedList();
235 newList.addAll(currentList);
236 newList.add(localNodeId);
237 return newList;
238 }
239 });
240 publish(new LeadershipEvent(
241 LeadershipEvent.Type.CANDIDATES_CHANGED,
242 new Leadership(path,
243 candidates.value(),
244 candidates.version(),
245 candidates.creationTime())));
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700246 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
247 activeTopics.add(path);
Madan Jampani7d243db2015-05-27 13:16:02 -0700248 Leadership leadership = electLeader(path, candidates.value());
249 if (leadership == null) {
250 pendingFutures.put(path, future);
251 } else {
252 future.complete(leadership);
253 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700254 } catch (ConsistentMapException e) {
255 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700256 rerunForLeadership(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700257 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800258 }
259
260 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700261 public CompletableFuture<Void> withdraw(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800262 activeTopics.remove(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700263 CompletableFuture<Void> resultFuture = new CompletableFuture<>();
264 doWithdraw(path, resultFuture);
265 return resultFuture;
266 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700267
Madan Jampanide003d92015-05-11 17:14:20 -0700268
269 private void doWithdraw(String path, CompletableFuture<Void> future) {
270 if (activeTopics.contains(path)) {
271 future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
272 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800273 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700274 Versioned<NodeId> leader = leaderMap.get(path);
275 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
276 if (leaderMap.remove(path, leader.version())) {
Madan Jampanieedfe542015-05-26 09:54:09 -0700277 log.debug("Gave up leadership for {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700278 future.complete(null);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700279 publish(new LeadershipEvent(
280 LeadershipEvent.Type.LEADER_BOOTED,
281 new Leadership(path,
282 localNodeId,
283 leader.version(),
284 leader.creationTime())));
Jonathan Harte649c752015-03-03 18:04:25 -0800285 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800286 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700287 // else we are not the current leader, can still be a candidate.
288 Versioned<List<NodeId>> candidates = candidateMap.get(path);
289 List<NodeId> candidateList = candidates != null
290 ? Lists.newArrayList(candidates.value())
291 : Lists.newArrayList();
292 if (!candidateList.remove(localNodeId)) {
Madan Jampanide003d92015-05-11 17:14:20 -0700293 future.complete(null);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700294 return;
295 }
Ayaka Koshibe93451b42015-04-21 15:12:03 -0700296 if (candidateMap.replace(path, candidates.version(), candidateList)) {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700297 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700298 future.complete(null);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700299 publish(new LeadershipEvent(
300 LeadershipEvent.Type.CANDIDATES_CHANGED,
301 new Leadership(path,
302 newCandidates.value(),
303 newCandidates.version(),
304 newCandidates.creationTime())));
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700305 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700306 log.warn("Failed to withdraw from candidates list. Will retry");
Madan Jampanide003d92015-05-11 17:14:20 -0700307 retryWithdraw(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700308 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800309 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800310 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700311 retryWithdraw(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800312 }
313 }
314
315 @Override
Madan Jampani1af8e132015-04-30 16:41:18 -0700316 public boolean stepdown(String path) {
Madan Jampani9bd1f152015-04-30 23:33:35 -0700317 if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700318 return false;
319 }
320
321 try {
322 Versioned<NodeId> leader = leaderMap.get(path);
323 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
324 if (leaderMap.remove(path, leader.version())) {
Madan Jampanieedfe542015-05-26 09:54:09 -0700325 log.debug("Stepped down from leadership for {}", path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700326 publish(new LeadershipEvent(
327 LeadershipEvent.Type.LEADER_BOOTED,
328 new Leadership(path,
329 localNodeId,
330 leader.version(),
331 leader.creationTime())));
Madan Jampani1af8e132015-04-30 16:41:18 -0700332 return true;
333 }
334 }
335 } catch (Exception e) {
336 log.warn("Error executing stepdown for {}", path, e);
337 }
338 return false;
339 }
340
341 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800342 public void addListener(LeadershipEventListener listener) {
343 listenerRegistry.addListener(listener);
344 }
345
346 @Override
347 public void removeListener(LeadershipEventListener listener) {
348 listenerRegistry.removeListener(listener);
349 }
350
Madan Jampani1af8e132015-04-30 16:41:18 -0700351 @Override
352 public boolean makeTopCandidate(String path, NodeId nodeId) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700353 Versioned<List<NodeId>> newCandidates = candidateMap.computeIf(path,
Madan Jampani3650fc42015-05-27 17:01:50 -0700354 candidates -> candidates != null &&
355 candidates.contains(nodeId) &&
356 !nodeId.equals(Iterables.getFirst(candidates, null)),
Madan Jampani346d4f52015-05-04 11:09:39 -0700357 (topic, candidates) -> {
358 List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
359 updatedCandidates.add(nodeId);
360 candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
361 return updatedCandidates;
362 });
363 publish(new LeadershipEvent(
Madan Jampanid46e18f2015-05-04 23:19:33 -0700364 LeadershipEvent.Type.CANDIDATES_CHANGED,
365 new Leadership(path,
366 newCandidates.value(),
367 newCandidates.version(),
368 newCandidates.creationTime())));
Madan Jampani346d4f52015-05-04 11:09:39 -0700369 return true;
Madan Jampani1af8e132015-04-30 16:41:18 -0700370 }
371
Madan Jampani7d243db2015-05-27 13:16:02 -0700372 private Leadership electLeader(String path, List<NodeId> candidates) {
373 Leadership currentLeadership = getLeadership(path);
374 if (currentLeadership != null) {
375 return currentLeadership;
376 } else {
377 NodeId topCandidate = candidates
378 .stream()
379 .filter(n -> clusterService.getState(n) == ACTIVE)
380 .findFirst()
381 .orElse(null);
382 try {
383 Versioned<NodeId> leader = localNodeId.equals(topCandidate)
384 ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
385 if (leader != null) {
386 Leadership newLeadership = new Leadership(path,
387 leader.value(),
388 leader.version(),
389 leader.creationTime());
390 publish(new LeadershipEvent(
391 LeadershipEvent.Type.LEADER_ELECTED,
392 newLeadership));
393 return newLeadership;
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700394 }
Madan Jampani7d243db2015-05-27 13:16:02 -0700395 } catch (Exception e) {
396 log.debug("Failed to elect leader for {}", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700397 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700398 }
Madan Jampani7d243db2015-05-27 13:16:02 -0700399 return null;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700400 }
401
Madan Jampani7d243db2015-05-27 13:16:02 -0700402 private void electLeaders() {
Madan Jampanid14166a2015-02-24 17:37:51 -0800403 try {
Madan Jampani7d243db2015-05-27 13:16:02 -0700404 candidateMap.entrySet().forEach(entry -> {
405 String path = entry.getKey();
406 List<NodeId> candidates = entry.getValue().value();
407 if (activeTopics.contains(path)) {
408 lockExecutor.submit(() -> {
409 Leadership leadership = electLeader(path, candidates);
410 if (leadership != null) {
411 CompletableFuture<Leadership> future = pendingFutures.remove(path);
412 if (future != null) {
413 future.complete(leadership);
414 }
415 }
416 });
417 }
418 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800419 } catch (Exception e) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700420 log.debug("Failure electing leaders", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800421 }
422 }
423
Madan Jampanid46e18f2015-05-04 23:19:33 -0700424 private void publish(LeadershipEvent event) {
425 onLeadershipEvent(event);
426 clusterCommunicator.broadcast(event, LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::encode);
427 }
428
429 private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
430 log.trace("Leadership Event: time = {} type = {} event = {}",
431 leadershipEvent.time(), leadershipEvent.type(),
432 leadershipEvent);
433
434 Leadership leadershipUpdate = leadershipEvent.subject();
435 LeadershipEvent.Type eventType = leadershipEvent.type();
436 String topic = leadershipUpdate.topic();
437
438 AtomicBoolean updateAccepted = new AtomicBoolean(false);
439 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
440 leaderBoard.compute(topic, (k, currentLeadership) -> {
441 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
442 updateAccepted.set(true);
443 return leadershipUpdate;
444 }
445 return currentLeadership;
446 });
447 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
448 leaderBoard.compute(topic, (k, currentLeadership) -> {
449 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
450 updateAccepted.set(true);
451 return null;
452 }
453 return currentLeadership;
454 });
455 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
456 candidateBoard.compute(topic, (k, currentInfo) -> {
457 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
458 updateAccepted.set(true);
459 return leadershipUpdate;
460 }
461 return currentInfo;
462 });
463 } else {
464 throw new IllegalStateException("Unknown event type.");
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700465 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700466
Madan Jampanid46e18f2015-05-04 23:19:33 -0700467 if (updateAccepted.get()) {
468 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800469 }
470 }
471
Madan Jampanide003d92015-05-11 17:14:20 -0700472 private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700473 lockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700474 () -> doRunForLeadership(path, future),
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700475 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
476 TimeUnit.SECONDS);
477 }
478
Madan Jampanide003d92015-05-11 17:14:20 -0700479 private void retryWithdraw(String path, CompletableFuture<Void> future) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700480 lockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700481 () -> doWithdraw(path, future),
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700482 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
483 TimeUnit.SECONDS);
484 }
485
Madan Jampanicc586752015-05-18 16:19:27 -0700486 private void scheduleStaleLeadershipPurge(int afterDelaySec) {
487 if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
488 staleLeadershipPurgeExecutor.schedule(
489 this::purgeStaleLeadership,
490 afterDelaySec,
491 TimeUnit.SECONDS);
492 }
493 }
494
495 /**
496 * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
497 */
498 private void purgeStaleLeadership() {
499 AtomicBoolean rerunPurge = new AtomicBoolean(false);
Madan Jampanid14166a2015-02-24 17:37:51 -0800500 try {
Madan Jampanicc586752015-05-18 16:19:27 -0700501 staleLeadershipPurgeScheduled.set(false);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700502 leaderMap.entrySet()
503 .stream()
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -0700504 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700505 .forEach(entry -> {
506 String path = entry.getKey();
507 NodeId nodeId = entry.getValue().value();
508 long epoch = entry.getValue().version();
509 long creationTime = entry.getValue().creationTime();
Madan Jampanid14166a2015-02-24 17:37:51 -0800510 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700511 if (leaderMap.remove(path, epoch)) {
Madan Jampanieedfe542015-05-26 09:54:09 -0700512 log.debug("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700513 publish(new LeadershipEvent(
514 LeadershipEvent.Type.LEADER_BOOTED,
515 new Leadership(path, nodeId, epoch, creationTime)));
Madan Jampanid14166a2015-02-24 17:37:51 -0800516 }
517 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700518 log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
Madan Jampanicc586752015-05-18 16:19:27 -0700519 rerunPurge.set(true);
520 }
521 });
522
523 candidateMap.entrySet()
524 .forEach(entry -> {
525 String path = entry.getKey();
526 Versioned<List<NodeId>> candidates = entry.getValue();
527 List<NodeId> candidatesList = candidates != null
528 ? candidates.value() : Collections.emptyList();
529 List<NodeId> activeCandidatesList =
530 candidatesList.stream()
531 .filter(n -> clusterService.getState(n) == ACTIVE)
532 .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
533 .collect(Collectors.toList());
534 if (activeCandidatesList.size() < candidatesList.size()) {
535 Set<NodeId> removedCandidates =
536 Sets.difference(Sets.newHashSet(candidatesList),
537 Sets.newHashSet(activeCandidatesList));
538 try {
539 if (candidateMap.replace(path, entry.getValue().version(), activeCandidatesList)) {
540 log.info("Evicted inactive candidates {} from "
541 + "candidate list for {}", removedCandidates, path);
542 Versioned<List<NodeId>> updatedCandidates = candidateMap.get(path);
543 publish(new LeadershipEvent(
544 LeadershipEvent.Type.CANDIDATES_CHANGED,
545 new Leadership(path,
546 updatedCandidates.value(),
547 updatedCandidates.version(),
548 updatedCandidates.creationTime())));
Madan Jampani3d1727c2015-05-22 11:46:29 -0700549 } else {
550 // Conflicting update detected. Rerun purge to make sure
551 // inactive candidates are evicted.
552 rerunPurge.set(true);
Madan Jampanicc586752015-05-18 16:19:27 -0700553 }
554 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700555 log.debug("Failed to evict inactive candidates {} from "
Madan Jampanicc586752015-05-18 16:19:27 -0700556 + "candidate list for {}", removedCandidates, path, e);
557 rerunPurge.set(true);
558 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800559 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700560 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800561 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700562 log.debug("Failure purging state leadership.", e);
Madan Jampanicc586752015-05-18 16:19:27 -0700563 rerunPurge.set(true);
564 }
565
566 if (rerunPurge.get()) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700567 log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
Madan Jampanicc586752015-05-18 16:19:27 -0700568 scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
Madan Jampanid14166a2015-02-24 17:37:51 -0800569 }
570 }
571
572 private void sendLeadershipStatus() {
Madan Jampania14047d2015-02-25 12:23:02 -0800573 try {
574 leaderBoard.forEach((path, leadership) -> {
575 if (leadership.leader().equals(localNodeId)) {
576 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700577 clusterCommunicator.broadcast(event,
578 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
579 SERIALIZER::encode);
Madan Jampania14047d2015-02-25 12:23:02 -0800580 }
581 });
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700582 candidateBoard.forEach((path, leadership) -> {
583 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
584 clusterCommunicator.broadcast(event,
585 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
586 SERIALIZER::encode);
587 });
Madan Jampania14047d2015-02-25 12:23:02 -0800588 } catch (Exception e) {
589 log.debug("Failed to send leadership updates", e);
590 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800591 }
Madan Jampanicc586752015-05-18 16:19:27 -0700592
593 private class InternalClusterEventListener implements ClusterEventListener {
594
595 @Override
596 public void event(ClusterEvent event) {
597 if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
598 scheduleStaleLeadershipPurge(0);
599 }
600 }
601 }
Madan Jampania14047d2015-02-25 12:23:02 -0800602}