blob: 1882b1b5247a79b807d80427e9e28a7f1778da7d [file] [log] [blame]
Madan Jampani619453b2015-07-22 23:47:09 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanid14166a2015-02-24 17:37:51 -080016package org.onosproject.store.consistent.impl;
17
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070018import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -080019import com.google.common.collect.ImmutableMap;
Madan Jampani3650fc42015-05-27 17:01:50 -070020import com.google.common.collect.Iterables;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070021import com.google.common.collect.Lists;
Madan Jampani6f38daf2015-06-04 15:26:38 -070022import com.google.common.collect.MapDifference;
Jonathan Harte649c752015-03-03 18:04:25 -080023import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070025
Madan Jampani8d37efc2015-06-01 17:25:48 -070026import org.apache.commons.lang.math.RandomUtils;
Madan Jampanid14166a2015-02-24 17:37:51 -080027import org.apache.felix.scr.annotations.Activate;
28import org.apache.felix.scr.annotations.Component;
29import org.apache.felix.scr.annotations.Deactivate;
30import org.apache.felix.scr.annotations.Reference;
31import org.apache.felix.scr.annotations.ReferenceCardinality;
32import org.apache.felix.scr.annotations.Service;
Madan Jampanicc586752015-05-18 16:19:27 -070033import org.onosproject.cluster.ClusterEvent;
34import org.onosproject.cluster.ClusterEvent.Type;
35import org.onosproject.cluster.ClusterEventListener;
Madan Jampanid14166a2015-02-24 17:37:51 -080036import org.onosproject.cluster.ClusterService;
Madan Jampanid14166a2015-02-24 17:37:51 -080037import org.onosproject.cluster.Leadership;
38import org.onosproject.cluster.LeadershipEvent;
39import org.onosproject.cluster.LeadershipEventListener;
40import org.onosproject.cluster.LeadershipService;
41import org.onosproject.cluster.NodeId;
Simon Huntff663742015-05-14 13:33:05 -070042import org.onosproject.event.ListenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080043import org.onosproject.event.EventDeliveryService;
44import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampanid14166a2015-02-24 17:37:51 -080045import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080046import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070047import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid09441b2015-06-09 14:50:55 -070048import org.onosproject.store.service.MapEvent;
Madan Jampanid14166a2015-02-24 17:37:51 -080049import org.onosproject.store.service.Serializer;
50import org.onosproject.store.service.StorageService;
51import org.onosproject.store.service.Versioned;
52import org.slf4j.Logger;
53
Madan Jampani1af8e132015-04-30 16:41:18 -070054import java.util.ArrayList;
Madan Jampanicc586752015-05-18 16:19:27 -070055import java.util.Collections;
Jonathan Harte649c752015-03-03 18:04:25 -080056import java.util.Map;
57import java.util.Map.Entry;
58import java.util.Objects;
59import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070060import java.util.List;
Madan Jampanide003d92015-05-11 17:14:20 -070061import java.util.concurrent.CancellationException;
62import java.util.concurrent.CompletableFuture;
Jonathan Harte649c752015-03-03 18:04:25 -080063import java.util.concurrent.Executors;
64import java.util.concurrent.ScheduledExecutorService;
65import java.util.concurrent.TimeUnit;
Madan Jampanid46e18f2015-05-04 23:19:33 -070066import java.util.concurrent.atomic.AtomicBoolean;
Jonathan Harte649c752015-03-03 18:04:25 -080067import java.util.stream.Collectors;
68
69import static com.google.common.base.Preconditions.checkArgument;
70import static org.onlab.util.Tools.groupedThreads;
71import static org.slf4j.LoggerFactory.getLogger;
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -070072import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
73import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
74
Madan Jampanid14166a2015-02-24 17:37:51 -080075/**
76 * Distributed Lock Manager implemented on top of ConsistentMap.
77 * <p>
Madan Jampanicc586752015-05-18 16:19:27 -070078 * This implementation makes use of ClusterService's failure
Madan Jampanid14166a2015-02-24 17:37:51 -080079 * detection capabilities to detect and purge stale locks.
80 * TODO: Ensure lock safety and liveness.
81 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080082@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080083@Service
84public class DistributedLeadershipManager implements LeadershipService {
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87 protected StorageService storageService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 protected ClusterService clusterService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected ClusterCommunicationService clusterCommunicator;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected EventDeliveryService eventDispatcher;
97
Madan Jampanid14166a2015-02-24 17:37:51 -080098 private final Logger log = getLogger(getClass());
Madan Jampani7d243db2015-05-27 13:16:02 -070099 private ScheduledExecutorService electionRunner;
100 private ScheduledExecutorService lockExecutor;
Madan Jampanicc586752015-05-18 16:19:27 -0700101 private ScheduledExecutorService staleLeadershipPurgeExecutor;
Madan Jampani6f38daf2015-06-04 15:26:38 -0700102 private ScheduledExecutorService leadershipRefresher;
Madan Jampanid14166a2015-02-24 17:37:51 -0800103
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700104 private ConsistentMap<String, NodeId> leaderMap;
105 private ConsistentMap<String, List<NodeId>> candidateMap;
106
Madan Jampanicc586752015-05-18 16:19:27 -0700107 private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -0800108 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700109 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
Madan Jampanicc586752015-05-18 16:19:27 -0700110 private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
Madan Jampanid14166a2015-02-24 17:37:51 -0800111
Madan Jampanicc586752015-05-18 16:19:27 -0700112 private NodeId localNodeId;
Madan Jampanid14166a2015-02-24 17:37:51 -0800113 private Set<String> activeTopics = Sets.newConcurrentHashSet();
Madan Jampani7d243db2015-05-27 13:16:02 -0700114 private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
Madan Jampanid14166a2015-02-24 17:37:51 -0800115
Madan Jampanid09441b2015-06-09 14:50:55 -0700116 // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS)
Madan Jampani8d37efc2015-06-01 17:25:48 -0700117 private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
Madan Jampanid14166a2015-02-24 17:37:51 -0800118 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
Madan Jampani6f38daf2015-06-04 15:26:38 -0700119 private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
Madan Jampanicc586752015-05-18 16:19:27 -0700120 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700121
Madan Jampanicc586752015-05-18 16:19:27 -0700122 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
123
Madan Jampanid09441b2015-06-09 14:50:55 -0700124 private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
Madan Jampanid14166a2015-02-24 17:37:51 -0800125
126 @Activate
127 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700128 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
129 .withName("onos-topic-leaders")
130 .withSerializer(SERIALIZER)
131 .withPartitionsDisabled().build();
132 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
133 .withName("onos-topic-candidates")
134 .withSerializer(SERIALIZER)
135 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800136
Madan Jampanid09441b2015-06-09 14:50:55 -0700137 leaderMap.addListener(event -> {
138 log.debug("Received {}", event);
139 LeadershipEvent.Type leadershipEventType = null;
140 if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
141 leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED;
142 } else if (event.type() == MapEvent.Type.REMOVE) {
143 leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED;
144 }
145 onLeadershipEvent(new LeadershipEvent(
146 leadershipEventType,
147 new Leadership(event.key(),
148 event.value().value(),
149 event.value().version(),
150 event.value().creationTime())));
151 });
152
153 candidateMap.addListener(event -> {
154 log.debug("Received {}", event);
155 if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) {
156 log.error("Entries must not be removed from candidate map");
157 return;
158 }
159 onLeadershipEvent(new LeadershipEvent(
160 LeadershipEvent.Type.CANDIDATES_CHANGED,
161 new Leadership(event.key(),
162 event.value().value(),
163 event.value().version(),
164 event.value().creationTime())));
165 });
166
Madan Jampanid14166a2015-02-24 17:37:51 -0800167 localNodeId = clusterService.getLocalNode().id();
168
Madan Jampani7d243db2015-05-27 13:16:02 -0700169 electionRunner = Executors.newSingleThreadScheduledExecutor(
170 groupedThreads("onos/store/leadership", "election-runner"));
171 lockExecutor = Executors.newScheduledThreadPool(
Madan Jampanid14166a2015-02-24 17:37:51 -0800172 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
Madan Jampanicc586752015-05-18 16:19:27 -0700173 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
174 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
Madan Jampani6f38daf2015-06-04 15:26:38 -0700175 leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
176 groupedThreads("onos/store/leadership", "refresh-thread"));
Madan Jampanid14166a2015-02-24 17:37:51 -0800177
Madan Jampanicc586752015-05-18 16:19:27 -0700178 clusterService.addListener(clusterEventListener);
179
Madan Jampani7d243db2015-05-27 13:16:02 -0700180 electionRunner.scheduleWithFixedDelay(
181 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
182
Madan Jampani6f38daf2015-06-04 15:26:38 -0700183 leadershipRefresher.scheduleWithFixedDelay(
184 this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800185
Simon Huntff663742015-05-14 13:33:05 -0700186 listenerRegistry = new ListenerRegistry<>();
Madan Jampanid14166a2015-02-24 17:37:51 -0800187 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
188
Madan Jampanid46e18f2015-05-04 23:19:33 -0700189 log.info("Started");
Madan Jampanid14166a2015-02-24 17:37:51 -0800190 }
191
192 @Deactivate
193 public void deactivate() {
Thomas Vachuskac0e20132015-07-31 12:15:42 -0700194 if (clusterService.getNodes().size() > 1) {
195 // FIXME: Determine why this takes ~50 seconds to shutdown on a single node!
196 leaderBoard.forEach((topic, leadership) -> {
197 if (localNodeId.equals(leadership.leader())) {
198 withdraw(topic);
199 }
200 });
201 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800202
Madan Jampanicc586752015-05-18 16:19:27 -0700203 clusterService.removeListener(clusterEventListener);
Madan Jampanid14166a2015-02-24 17:37:51 -0800204 eventDispatcher.removeSink(LeadershipEvent.class);
Madan Jampanid14166a2015-02-24 17:37:51 -0800205
Madan Jampani7d243db2015-05-27 13:16:02 -0700206 electionRunner.shutdown();
Madan Jampani7d243db2015-05-27 13:16:02 -0700207 lockExecutor.shutdown();
Madan Jampanicc586752015-05-18 16:19:27 -0700208 staleLeadershipPurgeExecutor.shutdown();
Madan Jampani6f38daf2015-06-04 15:26:38 -0700209 leadershipRefresher.shutdown();
Madan Jampanid14166a2015-02-24 17:37:51 -0800210
Madan Jampanid46e18f2015-05-04 23:19:33 -0700211 log.info("Stopped");
Madan Jampanid14166a2015-02-24 17:37:51 -0800212 }
213
214 @Override
215 public Map<String, Leadership> getLeaderBoard() {
216 return ImmutableMap.copyOf(leaderBoard);
217 }
218
219 @Override
Madan Jampanifd45d5e2015-04-20 13:33:21 -0700220 public Map<String, List<NodeId>> getCandidates() {
221 return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700222 }
223
224 @Override
225 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700226 Leadership current = candidateBoard.get(path);
227 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700228 }
229
230 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800231 public NodeId getLeader(String path) {
232 Leadership leadership = leaderBoard.get(path);
233 return leadership != null ? leadership.leader() : null;
234 }
235
236 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800237 public Leadership getLeadership(String path) {
238 checkArgument(path != null);
239 return leaderBoard.get(path);
240 }
241
242 @Override
243 public Set<String> ownedTopics(NodeId nodeId) {
244 checkArgument(nodeId != null);
245 return leaderBoard.entrySet()
246 .stream()
247 .filter(entry -> nodeId.equals(entry.getValue().leader()))
248 .map(Entry::getKey)
249 .collect(Collectors.toSet());
250 }
251
252 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700253 public CompletableFuture<Leadership> runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800254 log.debug("Running for leadership for topic: {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700255 CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
256 doRunForLeadership(path, resultFuture);
257 return resultFuture;
258 }
259
260 private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700261 try {
Madan Jampani346d4f52015-05-04 11:09:39 -0700262 Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
263 currentList -> currentList == null || !currentList.contains(localNodeId),
264 (topic, currentList) -> {
265 if (currentList == null) {
266 return ImmutableList.of(localNodeId);
267 } else {
268 List<NodeId> newList = Lists.newLinkedList();
269 newList.addAll(currentList);
270 newList.add(localNodeId);
271 return newList;
272 }
273 });
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700274 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
275 activeTopics.add(path);
Madan Jampani7d243db2015-05-27 13:16:02 -0700276 Leadership leadership = electLeader(path, candidates.value());
277 if (leadership == null) {
278 pendingFutures.put(path, future);
279 } else {
280 future.complete(leadership);
281 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700282 } catch (ConsistentMapException e) {
283 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700284 rerunForLeadership(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700285 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800286 }
287
288 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700289 public CompletableFuture<Void> withdraw(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800290 activeTopics.remove(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700291 CompletableFuture<Void> resultFuture = new CompletableFuture<>();
292 doWithdraw(path, resultFuture);
293 return resultFuture;
294 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700295
Madan Jampanide003d92015-05-11 17:14:20 -0700296
297 private void doWithdraw(String path, CompletableFuture<Void> future) {
298 if (activeTopics.contains(path)) {
299 future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
300 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800301 try {
Madan Jampanid09441b2015-06-09 14:50:55 -0700302 leaderMap.computeIf(path,
303 localNodeId::equals,
304 (topic, leader) -> null);
305 candidateMap.computeIf(path,
306 candidates -> candidates != null && candidates.contains(localNodeId),
307 (topic, candidates) -> candidates.stream()
308 .filter(nodeId -> !localNodeId.equals(nodeId))
309 .collect(Collectors.toList()));
Madan Jampani052ad652015-06-10 13:20:32 -0700310 future.complete(null);
Madan Jampanid14166a2015-02-24 17:37:51 -0800311 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800312 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700313 retryWithdraw(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800314 }
315 }
316
317 @Override
Madan Jampani1af8e132015-04-30 16:41:18 -0700318 public boolean stepdown(String path) {
Madan Jampani9bd1f152015-04-30 23:33:35 -0700319 if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700320 return false;
321 }
322
323 try {
Madan Jampanid09441b2015-06-09 14:50:55 -0700324 return leaderMap.computeIf(path,
325 localNodeId::equals,
326 (topic, leader) -> null) == null;
Madan Jampani1af8e132015-04-30 16:41:18 -0700327 } catch (Exception e) {
328 log.warn("Error executing stepdown for {}", path, e);
329 }
330 return false;
331 }
332
333 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800334 public void addListener(LeadershipEventListener listener) {
335 listenerRegistry.addListener(listener);
336 }
337
338 @Override
339 public void removeListener(LeadershipEventListener listener) {
340 listenerRegistry.removeListener(listener);
341 }
342
Madan Jampani1af8e132015-04-30 16:41:18 -0700343 @Override
344 public boolean makeTopCandidate(String path, NodeId nodeId) {
Madan Jampanid09441b2015-06-09 14:50:55 -0700345 Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path,
Madan Jampani3650fc42015-05-27 17:01:50 -0700346 candidates -> candidates != null &&
347 candidates.contains(nodeId) &&
348 !nodeId.equals(Iterables.getFirst(candidates, null)),
Madan Jampani346d4f52015-05-04 11:09:39 -0700349 (topic, candidates) -> {
350 List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
351 updatedCandidates.add(nodeId);
352 candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
353 return updatedCandidates;
354 });
Madan Jampanid09441b2015-06-09 14:50:55 -0700355 List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList();
356 return candidates.size() > 0 && nodeId.equals(candidates.get(0));
Madan Jampani1af8e132015-04-30 16:41:18 -0700357 }
358
Madan Jampani7d243db2015-05-27 13:16:02 -0700359 private Leadership electLeader(String path, List<NodeId> candidates) {
360 Leadership currentLeadership = getLeadership(path);
361 if (currentLeadership != null) {
362 return currentLeadership;
363 } else {
364 NodeId topCandidate = candidates
365 .stream()
366 .filter(n -> clusterService.getState(n) == ACTIVE)
367 .findFirst()
368 .orElse(null);
369 try {
370 Versioned<NodeId> leader = localNodeId.equals(topCandidate)
371 ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
372 if (leader != null) {
373 Leadership newLeadership = new Leadership(path,
374 leader.value(),
375 leader.version(),
376 leader.creationTime());
Madan Jampani565a66a2015-07-25 17:01:13 -0700377 // Since reads only go through the local copy of leader board, we ought to update it
378 // first before returning from this method.
379 // This is to ensure a subsequent read will not read a stale value.
380 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership));
Madan Jampani7d243db2015-05-27 13:16:02 -0700381 return newLeadership;
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700382 }
Madan Jampani7d243db2015-05-27 13:16:02 -0700383 } catch (Exception e) {
384 log.debug("Failed to elect leader for {}", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700385 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700386 }
Madan Jampani7d243db2015-05-27 13:16:02 -0700387 return null;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700388 }
389
Madan Jampani7d243db2015-05-27 13:16:02 -0700390 private void electLeaders() {
Madan Jampanid14166a2015-02-24 17:37:51 -0800391 try {
Madan Jampani7d243db2015-05-27 13:16:02 -0700392 candidateMap.entrySet().forEach(entry -> {
393 String path = entry.getKey();
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700394 Versioned<List<NodeId>> candidates = entry.getValue();
395 // for active topics, check if this node can become a leader (if it isn't already)
Madan Jampani7d243db2015-05-27 13:16:02 -0700396 if (activeTopics.contains(path)) {
397 lockExecutor.submit(() -> {
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700398 Leadership leadership = electLeader(path, candidates.value());
Madan Jampani7d243db2015-05-27 13:16:02 -0700399 if (leadership != null) {
400 CompletableFuture<Leadership> future = pendingFutures.remove(path);
401 if (future != null) {
402 future.complete(leadership);
403 }
404 }
405 });
406 }
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700407 // Raise a CANDIDATES_CHANGED event to force refresh local candidate board
408 // and also to update local listeners.
409 // Don't worry about duplicate events as they will be suppressed.
410 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
411 new Leadership(path,
412 candidates.value(),
413 candidates.version(),
414 candidates.creationTime())));
Madan Jampani7d243db2015-05-27 13:16:02 -0700415 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800416 } catch (Exception e) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700417 log.debug("Failure electing leaders", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800418 }
419 }
420
Madan Jampanid46e18f2015-05-04 23:19:33 -0700421 private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
422 log.trace("Leadership Event: time = {} type = {} event = {}",
423 leadershipEvent.time(), leadershipEvent.type(),
424 leadershipEvent);
425
426 Leadership leadershipUpdate = leadershipEvent.subject();
427 LeadershipEvent.Type eventType = leadershipEvent.type();
428 String topic = leadershipUpdate.topic();
429
430 AtomicBoolean updateAccepted = new AtomicBoolean(false);
431 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
432 leaderBoard.compute(topic, (k, currentLeadership) -> {
433 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
434 updateAccepted.set(true);
435 return leadershipUpdate;
436 }
437 return currentLeadership;
438 });
439 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
440 leaderBoard.compute(topic, (k, currentLeadership) -> {
441 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
442 updateAccepted.set(true);
Madan Jampani6f38daf2015-06-04 15:26:38 -0700443 // FIXME: Removing entries from leaderboard is not safe and should be visited.
Madan Jampanid46e18f2015-05-04 23:19:33 -0700444 return null;
445 }
446 return currentLeadership;
447 });
448 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
449 candidateBoard.compute(topic, (k, currentInfo) -> {
450 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
451 updateAccepted.set(true);
452 return leadershipUpdate;
453 }
454 return currentInfo;
455 });
456 } else {
457 throw new IllegalStateException("Unknown event type.");
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700458 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700459
Madan Jampanid46e18f2015-05-04 23:19:33 -0700460 if (updateAccepted.get()) {
461 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800462 }
463 }
464
Madan Jampanide003d92015-05-11 17:14:20 -0700465 private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700466 lockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700467 () -> doRunForLeadership(path, future),
Madan Jampani8d37efc2015-06-01 17:25:48 -0700468 RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
469 TimeUnit.MILLISECONDS);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700470 }
471
Madan Jampanide003d92015-05-11 17:14:20 -0700472 private void retryWithdraw(String path, CompletableFuture<Void> future) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700473 lockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700474 () -> doWithdraw(path, future),
Madan Jampani8d37efc2015-06-01 17:25:48 -0700475 RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
476 TimeUnit.MILLISECONDS);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700477 }
478
Madan Jampanicc586752015-05-18 16:19:27 -0700479 private void scheduleStaleLeadershipPurge(int afterDelaySec) {
480 if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
481 staleLeadershipPurgeExecutor.schedule(
482 this::purgeStaleLeadership,
483 afterDelaySec,
484 TimeUnit.SECONDS);
485 }
486 }
487
488 /**
489 * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
490 */
491 private void purgeStaleLeadership() {
492 AtomicBoolean rerunPurge = new AtomicBoolean(false);
Madan Jampanid14166a2015-02-24 17:37:51 -0800493 try {
Madan Jampanicc586752015-05-18 16:19:27 -0700494 staleLeadershipPurgeScheduled.set(false);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700495 leaderMap.entrySet()
496 .stream()
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -0700497 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700498 .forEach(entry -> {
499 String path = entry.getKey();
500 NodeId nodeId = entry.getValue().value();
Madan Jampanid14166a2015-02-24 17:37:51 -0800501 try {
Madan Jampanid09441b2015-06-09 14:50:55 -0700502 leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null);
Madan Jampanid14166a2015-02-24 17:37:51 -0800503 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700504 log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
Madan Jampanicc586752015-05-18 16:19:27 -0700505 rerunPurge.set(true);
506 }
507 });
508
509 candidateMap.entrySet()
510 .forEach(entry -> {
511 String path = entry.getKey();
512 Versioned<List<NodeId>> candidates = entry.getValue();
513 List<NodeId> candidatesList = candidates != null
514 ? candidates.value() : Collections.emptyList();
515 List<NodeId> activeCandidatesList =
516 candidatesList.stream()
517 .filter(n -> clusterService.getState(n) == ACTIVE)
518 .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
519 .collect(Collectors.toList());
520 if (activeCandidatesList.size() < candidatesList.size()) {
521 Set<NodeId> removedCandidates =
522 Sets.difference(Sets.newHashSet(candidatesList),
523 Sets.newHashSet(activeCandidatesList));
524 try {
Madan Jampanid09441b2015-06-09 14:50:55 -0700525 candidateMap.computeIf(path,
526 c -> c.stream()
527 .filter(n -> clusterService.getState(n) == INACTIVE)
528 .count() > 0,
529 (topic, c) -> c.stream()
530 .filter(n -> clusterService.getState(n) == ACTIVE)
531 .filter(n -> !localNodeId.equals(n) ||
532 activeTopics.contains(path))
533 .collect(Collectors.toList()));
Madan Jampanicc586752015-05-18 16:19:27 -0700534 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700535 log.debug("Failed to evict inactive candidates {} from "
Madan Jampanicc586752015-05-18 16:19:27 -0700536 + "candidate list for {}", removedCandidates, path, e);
537 rerunPurge.set(true);
538 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800539 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700540 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800541 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700542 log.debug("Failure purging state leadership.", e);
Madan Jampanicc586752015-05-18 16:19:27 -0700543 rerunPurge.set(true);
544 }
545
546 if (rerunPurge.get()) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700547 log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
Madan Jampanicc586752015-05-18 16:19:27 -0700548 scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
Madan Jampanid14166a2015-02-24 17:37:51 -0800549 }
550 }
551
Madan Jampani6f38daf2015-06-04 15:26:38 -0700552 private void refreshLeaderBoard() {
Madan Jampania14047d2015-02-25 12:23:02 -0800553 try {
Madan Jampani6f38daf2015-06-04 15:26:38 -0700554 Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
555 leaderMap.entrySet().forEach(entry -> {
556 String path = entry.getKey();
557 Versioned<NodeId> leader = entry.getValue();
558 Leadership leadership = new Leadership(path,
559 leader.value(),
560 leader.version(),
561 leader.creationTime());
562 newLeaderBoard.put(path, leadership);
563 });
564
565 // first take snapshot of current leader board.
566 Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
567
568 MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard);
569
570 // evict stale leaders
571 diff.entriesOnlyOnLeft().forEach((path, leadership) -> {
572 log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
573 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
574 });
575
576 // add missing leaders
577 diff.entriesOnlyOnRight().forEach((path, leadership) -> {
578 log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
579 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
580 });
581
582 // add updated leaders
583 diff.entriesDiffering().forEach((path, difference) -> {
584 Leadership current = difference.leftValue();
585 Leadership updated = difference.rightValue();
586 if (current.epoch() < updated.epoch()) {
587 log.debug("Updated {} in leaderboard.", updated);
588 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated));
Madan Jampani096da0e2015-06-04 14:30:19 -0700589 }
Madan Jampania14047d2015-02-25 12:23:02 -0800590 });
591 } catch (Exception e) {
Madan Jampani6f38daf2015-06-04 15:26:38 -0700592 log.debug("Failed to refresh leader board", e);
Madan Jampania14047d2015-02-25 12:23:02 -0800593 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800594 }
Madan Jampanicc586752015-05-18 16:19:27 -0700595
596 private class InternalClusterEventListener implements ClusterEventListener {
597
598 @Override
599 public void event(ClusterEvent event) {
600 if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
601 scheduleStaleLeadershipPurge(0);
602 }
603 }
604 }
Madan Jampania14047d2015-02-25 12:23:02 -0800605}