blob: db55a85f5d1c43af694091c21db67efe495f87e0 [file] [log] [blame]
Madan Jampani619453b2015-07-22 23:47:09 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Madan Jampanid14166a2015-02-24 17:37:51 -080017
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070018import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -080019import com.google.common.collect.ImmutableMap;
Madan Jampani3650fc42015-05-27 17:01:50 -070020import com.google.common.collect.Iterables;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070021import com.google.common.collect.Lists;
Madan Jampani6f38daf2015-06-04 15:26:38 -070022import com.google.common.collect.MapDifference;
Jonathan Harte649c752015-03-03 18:04:25 -080023import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070025
Madan Jampani8d37efc2015-06-01 17:25:48 -070026import org.apache.commons.lang.math.RandomUtils;
Madan Jampanid14166a2015-02-24 17:37:51 -080027import org.apache.felix.scr.annotations.Activate;
28import org.apache.felix.scr.annotations.Component;
29import org.apache.felix.scr.annotations.Deactivate;
30import org.apache.felix.scr.annotations.Reference;
31import org.apache.felix.scr.annotations.ReferenceCardinality;
32import org.apache.felix.scr.annotations.Service;
Madan Jampanicc586752015-05-18 16:19:27 -070033import org.onosproject.cluster.ClusterEvent;
34import org.onosproject.cluster.ClusterEvent.Type;
35import org.onosproject.cluster.ClusterEventListener;
Madan Jampanid14166a2015-02-24 17:37:51 -080036import org.onosproject.cluster.ClusterService;
Madan Jampanid14166a2015-02-24 17:37:51 -080037import org.onosproject.cluster.Leadership;
38import org.onosproject.cluster.LeadershipEvent;
39import org.onosproject.cluster.LeadershipEventListener;
40import org.onosproject.cluster.LeadershipService;
41import org.onosproject.cluster.NodeId;
Simon Huntff663742015-05-14 13:33:05 -070042import org.onosproject.event.ListenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080043import org.onosproject.event.EventDeliveryService;
44import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampanid14166a2015-02-24 17:37:51 -080045import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080046import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070047import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid09441b2015-06-09 14:50:55 -070048import org.onosproject.store.service.MapEvent;
Madan Jampanid14166a2015-02-24 17:37:51 -080049import org.onosproject.store.service.Serializer;
50import org.onosproject.store.service.StorageService;
51import org.onosproject.store.service.Versioned;
52import org.slf4j.Logger;
53
Madan Jampani1af8e132015-04-30 16:41:18 -070054import java.util.ArrayList;
Madan Jampanicc586752015-05-18 16:19:27 -070055import java.util.Collections;
Jonathan Harte649c752015-03-03 18:04:25 -080056import java.util.Map;
57import java.util.Map.Entry;
58import java.util.Objects;
59import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070060import java.util.List;
Madan Jampanide003d92015-05-11 17:14:20 -070061import java.util.concurrent.CancellationException;
62import java.util.concurrent.CompletableFuture;
Jonathan Harte649c752015-03-03 18:04:25 -080063import java.util.concurrent.Executors;
64import java.util.concurrent.ScheduledExecutorService;
65import java.util.concurrent.TimeUnit;
Madan Jampanid46e18f2015-05-04 23:19:33 -070066import java.util.concurrent.atomic.AtomicBoolean;
Jonathan Harte649c752015-03-03 18:04:25 -080067import java.util.stream.Collectors;
68
69import static com.google.common.base.Preconditions.checkArgument;
70import static org.onlab.util.Tools.groupedThreads;
71import static org.slf4j.LoggerFactory.getLogger;
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -070072import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
73import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
74
Madan Jampanid14166a2015-02-24 17:37:51 -080075/**
76 * Distributed Lock Manager implemented on top of ConsistentMap.
77 * <p>
Madan Jampanicc586752015-05-18 16:19:27 -070078 * This implementation makes use of ClusterService's failure
Madan Jampanid14166a2015-02-24 17:37:51 -080079 * detection capabilities to detect and purge stale locks.
80 * TODO: Ensure lock safety and liveness.
81 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080082@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080083@Service
84public class DistributedLeadershipManager implements LeadershipService {
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87 protected StorageService storageService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 protected ClusterService clusterService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected ClusterCommunicationService clusterCommunicator;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected EventDeliveryService eventDispatcher;
97
Madan Jampanid14166a2015-02-24 17:37:51 -080098 private final Logger log = getLogger(getClass());
HIGUCHI Yuta1979f552015-12-28 21:24:26 -080099
Madan Jampani7d243db2015-05-27 13:16:02 -0700100 private ScheduledExecutorService electionRunner;
101 private ScheduledExecutorService lockExecutor;
Madan Jampanicc586752015-05-18 16:19:27 -0700102 private ScheduledExecutorService staleLeadershipPurgeExecutor;
Madan Jampani6f38daf2015-06-04 15:26:38 -0700103 private ScheduledExecutorService leadershipRefresher;
Madan Jampanid14166a2015-02-24 17:37:51 -0800104
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800105 // leader for each topic
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700106 private ConsistentMap<String, NodeId> leaderMap;
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800107 // list of candidates (includes chosen leader) for each topic
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700108 private ConsistentMap<String, List<NodeId>> candidateMap;
109
Madan Jampanicc586752015-05-18 16:19:27 -0700110 private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800111
112 // cached copy of leaderMap
113 // Note: Map value, Leadership, does not contain proper candidates info
Madan Jampanid14166a2015-02-24 17:37:51 -0800114 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800115 // cached copy of candidateMap
116 // Note: Map value, Leadership, does not contain proper leader info
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700117 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800118
Madan Jampanicc586752015-05-18 16:19:27 -0700119 private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
Madan Jampanid14166a2015-02-24 17:37:51 -0800120
Madan Jampanicc586752015-05-18 16:19:27 -0700121 private NodeId localNodeId;
Madan Jampanid14166a2015-02-24 17:37:51 -0800122 private Set<String> activeTopics = Sets.newConcurrentHashSet();
Madan Jampani7d243db2015-05-27 13:16:02 -0700123 private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
Madan Jampanid14166a2015-02-24 17:37:51 -0800124
Madan Jampanid09441b2015-06-09 14:50:55 -0700125 // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS)
Madan Jampani8d37efc2015-06-01 17:25:48 -0700126 private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
Madan Jampanid14166a2015-02-24 17:37:51 -0800127 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
Madan Jampani6f38daf2015-06-04 15:26:38 -0700128 private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
Madan Jampanicc586752015-05-18 16:19:27 -0700129 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700130
Madan Jampanicc586752015-05-18 16:19:27 -0700131 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
132
Madan Jampanid09441b2015-06-09 14:50:55 -0700133 private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
Madan Jampanid14166a2015-02-24 17:37:51 -0800134
135 @Activate
136 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700137 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
138 .withName("onos-topic-leaders")
139 .withSerializer(SERIALIZER)
140 .withPartitionsDisabled().build();
141 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
142 .withName("onos-topic-candidates")
143 .withSerializer(SERIALIZER)
144 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800145
Madan Jampanid09441b2015-06-09 14:50:55 -0700146 leaderMap.addListener(event -> {
147 log.debug("Received {}", event);
148 LeadershipEvent.Type leadershipEventType = null;
149 if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
150 leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED;
151 } else if (event.type() == MapEvent.Type.REMOVE) {
152 leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED;
153 }
154 onLeadershipEvent(new LeadershipEvent(
155 leadershipEventType,
156 new Leadership(event.key(),
157 event.value().value(),
158 event.value().version(),
159 event.value().creationTime())));
160 });
161
162 candidateMap.addListener(event -> {
163 log.debug("Received {}", event);
164 if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) {
165 log.error("Entries must not be removed from candidate map");
166 return;
167 }
168 onLeadershipEvent(new LeadershipEvent(
169 LeadershipEvent.Type.CANDIDATES_CHANGED,
170 new Leadership(event.key(),
171 event.value().value(),
172 event.value().version(),
173 event.value().creationTime())));
174 });
175
Madan Jampanid14166a2015-02-24 17:37:51 -0800176 localNodeId = clusterService.getLocalNode().id();
177
Madan Jampani7d243db2015-05-27 13:16:02 -0700178 electionRunner = Executors.newSingleThreadScheduledExecutor(
179 groupedThreads("onos/store/leadership", "election-runner"));
180 lockExecutor = Executors.newScheduledThreadPool(
Madan Jampanid14166a2015-02-24 17:37:51 -0800181 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
Madan Jampanicc586752015-05-18 16:19:27 -0700182 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
183 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
Madan Jampani6f38daf2015-06-04 15:26:38 -0700184 leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
185 groupedThreads("onos/store/leadership", "refresh-thread"));
Madan Jampanid14166a2015-02-24 17:37:51 -0800186
Madan Jampanicc586752015-05-18 16:19:27 -0700187 clusterService.addListener(clusterEventListener);
188
Madan Jampani7d243db2015-05-27 13:16:02 -0700189 electionRunner.scheduleWithFixedDelay(
190 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
191
Madan Jampani6f38daf2015-06-04 15:26:38 -0700192 leadershipRefresher.scheduleWithFixedDelay(
193 this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800194
Simon Huntff663742015-05-14 13:33:05 -0700195 listenerRegistry = new ListenerRegistry<>();
Madan Jampanid14166a2015-02-24 17:37:51 -0800196 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
197
Madan Jampanid46e18f2015-05-04 23:19:33 -0700198 log.info("Started");
Madan Jampanid14166a2015-02-24 17:37:51 -0800199 }
200
201 @Deactivate
202 public void deactivate() {
Thomas Vachuskac0e20132015-07-31 12:15:42 -0700203 if (clusterService.getNodes().size() > 1) {
204 // FIXME: Determine why this takes ~50 seconds to shutdown on a single node!
205 leaderBoard.forEach((topic, leadership) -> {
206 if (localNodeId.equals(leadership.leader())) {
207 withdraw(topic);
208 }
209 });
210 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800211
Madan Jampanicc586752015-05-18 16:19:27 -0700212 clusterService.removeListener(clusterEventListener);
Madan Jampanid14166a2015-02-24 17:37:51 -0800213 eventDispatcher.removeSink(LeadershipEvent.class);
Madan Jampanid14166a2015-02-24 17:37:51 -0800214
Madan Jampani7d243db2015-05-27 13:16:02 -0700215 electionRunner.shutdown();
Madan Jampani7d243db2015-05-27 13:16:02 -0700216 lockExecutor.shutdown();
Madan Jampanicc586752015-05-18 16:19:27 -0700217 staleLeadershipPurgeExecutor.shutdown();
Madan Jampani6f38daf2015-06-04 15:26:38 -0700218 leadershipRefresher.shutdown();
Madan Jampanid14166a2015-02-24 17:37:51 -0800219
Madan Jampanid46e18f2015-05-04 23:19:33 -0700220 log.info("Stopped");
Madan Jampanid14166a2015-02-24 17:37:51 -0800221 }
222
223 @Override
224 public Map<String, Leadership> getLeaderBoard() {
225 return ImmutableMap.copyOf(leaderBoard);
226 }
227
228 @Override
Madan Jampanifd45d5e2015-04-20 13:33:21 -0700229 public Map<String, List<NodeId>> getCandidates() {
230 return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700231 }
232
233 @Override
234 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700235 Leadership current = candidateBoard.get(path);
236 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700237 }
238
239 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800240 public NodeId getLeader(String path) {
241 Leadership leadership = leaderBoard.get(path);
242 return leadership != null ? leadership.leader() : null;
243 }
244
245 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800246 public Leadership getLeadership(String path) {
247 checkArgument(path != null);
248 return leaderBoard.get(path);
249 }
250
251 @Override
252 public Set<String> ownedTopics(NodeId nodeId) {
253 checkArgument(nodeId != null);
254 return leaderBoard.entrySet()
255 .stream()
256 .filter(entry -> nodeId.equals(entry.getValue().leader()))
257 .map(Entry::getKey)
258 .collect(Collectors.toSet());
259 }
260
261 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700262 public CompletableFuture<Leadership> runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800263 log.debug("Running for leadership for topic: {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700264 CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
265 doRunForLeadership(path, resultFuture);
266 return resultFuture;
267 }
268
269 private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700270 try {
Madan Jampani346d4f52015-05-04 11:09:39 -0700271 Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
272 currentList -> currentList == null || !currentList.contains(localNodeId),
273 (topic, currentList) -> {
274 if (currentList == null) {
275 return ImmutableList.of(localNodeId);
276 } else {
277 List<NodeId> newList = Lists.newLinkedList();
278 newList.addAll(currentList);
279 newList.add(localNodeId);
280 return newList;
281 }
282 });
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700283 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
284 activeTopics.add(path);
Madan Jampani7d243db2015-05-27 13:16:02 -0700285 Leadership leadership = electLeader(path, candidates.value());
286 if (leadership == null) {
287 pendingFutures.put(path, future);
288 } else {
289 future.complete(leadership);
290 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700291 } catch (ConsistentMapException e) {
292 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700293 rerunForLeadership(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700294 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800295 }
296
297 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700298 public CompletableFuture<Void> withdraw(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800299 activeTopics.remove(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700300 CompletableFuture<Void> resultFuture = new CompletableFuture<>();
301 doWithdraw(path, resultFuture);
302 return resultFuture;
303 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700304
Madan Jampanide003d92015-05-11 17:14:20 -0700305
306 private void doWithdraw(String path, CompletableFuture<Void> future) {
307 if (activeTopics.contains(path)) {
308 future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
309 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800310 try {
Madan Jampanid09441b2015-06-09 14:50:55 -0700311 leaderMap.computeIf(path,
312 localNodeId::equals,
313 (topic, leader) -> null);
314 candidateMap.computeIf(path,
315 candidates -> candidates != null && candidates.contains(localNodeId),
316 (topic, candidates) -> candidates.stream()
317 .filter(nodeId -> !localNodeId.equals(nodeId))
318 .collect(Collectors.toList()));
Madan Jampani052ad652015-06-10 13:20:32 -0700319 future.complete(null);
Madan Jampanid14166a2015-02-24 17:37:51 -0800320 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800321 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700322 retryWithdraw(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800323 }
324 }
325
326 @Override
Madan Jampani1af8e132015-04-30 16:41:18 -0700327 public boolean stepdown(String path) {
Madan Jampani9bd1f152015-04-30 23:33:35 -0700328 if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700329 return false;
330 }
331
332 try {
Madan Jampanid09441b2015-06-09 14:50:55 -0700333 return leaderMap.computeIf(path,
334 localNodeId::equals,
335 (topic, leader) -> null) == null;
Madan Jampani1af8e132015-04-30 16:41:18 -0700336 } catch (Exception e) {
337 log.warn("Error executing stepdown for {}", path, e);
338 }
339 return false;
340 }
341
342 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800343 public void addListener(LeadershipEventListener listener) {
344 listenerRegistry.addListener(listener);
345 }
346
347 @Override
348 public void removeListener(LeadershipEventListener listener) {
349 listenerRegistry.removeListener(listener);
350 }
351
Madan Jampani1af8e132015-04-30 16:41:18 -0700352 @Override
353 public boolean makeTopCandidate(String path, NodeId nodeId) {
Madan Jampanid09441b2015-06-09 14:50:55 -0700354 Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path,
Madan Jampani3650fc42015-05-27 17:01:50 -0700355 candidates -> candidates != null &&
356 candidates.contains(nodeId) &&
357 !nodeId.equals(Iterables.getFirst(candidates, null)),
Madan Jampani346d4f52015-05-04 11:09:39 -0700358 (topic, candidates) -> {
359 List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
360 updatedCandidates.add(nodeId);
361 candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
362 return updatedCandidates;
363 });
Madan Jampanid09441b2015-06-09 14:50:55 -0700364 List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList();
365 return candidates.size() > 0 && nodeId.equals(candidates.get(0));
Madan Jampani1af8e132015-04-30 16:41:18 -0700366 }
367
Madan Jampani7d243db2015-05-27 13:16:02 -0700368 private Leadership electLeader(String path, List<NodeId> candidates) {
369 Leadership currentLeadership = getLeadership(path);
370 if (currentLeadership != null) {
371 return currentLeadership;
372 } else {
373 NodeId topCandidate = candidates
374 .stream()
375 .filter(n -> clusterService.getState(n) == ACTIVE)
376 .findFirst()
377 .orElse(null);
378 try {
379 Versioned<NodeId> leader = localNodeId.equals(topCandidate)
380 ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
381 if (leader != null) {
382 Leadership newLeadership = new Leadership(path,
383 leader.value(),
384 leader.version(),
385 leader.creationTime());
Madan Jampani565a66a2015-07-25 17:01:13 -0700386 // Since reads only go through the local copy of leader board, we ought to update it
387 // first before returning from this method.
388 // This is to ensure a subsequent read will not read a stale value.
389 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership));
Madan Jampani7d243db2015-05-27 13:16:02 -0700390 return newLeadership;
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700391 }
Madan Jampani7d243db2015-05-27 13:16:02 -0700392 } catch (Exception e) {
393 log.debug("Failed to elect leader for {}", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700394 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700395 }
Madan Jampani7d243db2015-05-27 13:16:02 -0700396 return null;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700397 }
398
Madan Jampani7d243db2015-05-27 13:16:02 -0700399 private void electLeaders() {
Madan Jampanid14166a2015-02-24 17:37:51 -0800400 try {
Madan Jampani7d243db2015-05-27 13:16:02 -0700401 candidateMap.entrySet().forEach(entry -> {
402 String path = entry.getKey();
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700403 Versioned<List<NodeId>> candidates = entry.getValue();
404 // for active topics, check if this node can become a leader (if it isn't already)
Madan Jampani7d243db2015-05-27 13:16:02 -0700405 if (activeTopics.contains(path)) {
406 lockExecutor.submit(() -> {
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700407 Leadership leadership = electLeader(path, candidates.value());
Madan Jampani7d243db2015-05-27 13:16:02 -0700408 if (leadership != null) {
409 CompletableFuture<Leadership> future = pendingFutures.remove(path);
410 if (future != null) {
411 future.complete(leadership);
412 }
413 }
414 });
415 }
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700416 // Raise a CANDIDATES_CHANGED event to force refresh local candidate board
417 // and also to update local listeners.
418 // Don't worry about duplicate events as they will be suppressed.
419 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
420 new Leadership(path,
421 candidates.value(),
422 candidates.version(),
423 candidates.creationTime())));
Madan Jampani7d243db2015-05-27 13:16:02 -0700424 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800425 } catch (Exception e) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700426 log.debug("Failure electing leaders", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800427 }
428 }
429
Madan Jampanid46e18f2015-05-04 23:19:33 -0700430 private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
431 log.trace("Leadership Event: time = {} type = {} event = {}",
432 leadershipEvent.time(), leadershipEvent.type(),
433 leadershipEvent);
434
435 Leadership leadershipUpdate = leadershipEvent.subject();
436 LeadershipEvent.Type eventType = leadershipEvent.type();
437 String topic = leadershipUpdate.topic();
438
439 AtomicBoolean updateAccepted = new AtomicBoolean(false);
440 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
441 leaderBoard.compute(topic, (k, currentLeadership) -> {
442 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
443 updateAccepted.set(true);
444 return leadershipUpdate;
445 }
446 return currentLeadership;
447 });
448 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
449 leaderBoard.compute(topic, (k, currentLeadership) -> {
450 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
451 updateAccepted.set(true);
Madan Jampani6f38daf2015-06-04 15:26:38 -0700452 // FIXME: Removing entries from leaderboard is not safe and should be visited.
Madan Jampanid46e18f2015-05-04 23:19:33 -0700453 return null;
454 }
455 return currentLeadership;
456 });
457 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
458 candidateBoard.compute(topic, (k, currentInfo) -> {
459 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
460 updateAccepted.set(true);
461 return leadershipUpdate;
462 }
463 return currentInfo;
464 });
465 } else {
466 throw new IllegalStateException("Unknown event type.");
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700467 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700468
Madan Jampanid46e18f2015-05-04 23:19:33 -0700469 if (updateAccepted.get()) {
470 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800471 }
472 }
473
Madan Jampanide003d92015-05-11 17:14:20 -0700474 private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700475 lockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700476 () -> doRunForLeadership(path, future),
Madan Jampani8d37efc2015-06-01 17:25:48 -0700477 RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
478 TimeUnit.MILLISECONDS);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700479 }
480
Madan Jampanide003d92015-05-11 17:14:20 -0700481 private void retryWithdraw(String path, CompletableFuture<Void> future) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700482 lockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700483 () -> doWithdraw(path, future),
Madan Jampani8d37efc2015-06-01 17:25:48 -0700484 RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
485 TimeUnit.MILLISECONDS);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700486 }
487
Madan Jampanicc586752015-05-18 16:19:27 -0700488 private void scheduleStaleLeadershipPurge(int afterDelaySec) {
489 if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
490 staleLeadershipPurgeExecutor.schedule(
491 this::purgeStaleLeadership,
492 afterDelaySec,
493 TimeUnit.SECONDS);
494 }
495 }
496
497 /**
498 * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
499 */
500 private void purgeStaleLeadership() {
501 AtomicBoolean rerunPurge = new AtomicBoolean(false);
Madan Jampanid14166a2015-02-24 17:37:51 -0800502 try {
Madan Jampanicc586752015-05-18 16:19:27 -0700503 staleLeadershipPurgeScheduled.set(false);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700504 leaderMap.entrySet()
505 .stream()
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -0700506 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700507 .forEach(entry -> {
508 String path = entry.getKey();
509 NodeId nodeId = entry.getValue().value();
Madan Jampanid14166a2015-02-24 17:37:51 -0800510 try {
Madan Jampanid09441b2015-06-09 14:50:55 -0700511 leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null);
Madan Jampanid14166a2015-02-24 17:37:51 -0800512 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700513 log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
Madan Jampanicc586752015-05-18 16:19:27 -0700514 rerunPurge.set(true);
515 }
516 });
517
518 candidateMap.entrySet()
519 .forEach(entry -> {
520 String path = entry.getKey();
521 Versioned<List<NodeId>> candidates = entry.getValue();
522 List<NodeId> candidatesList = candidates != null
523 ? candidates.value() : Collections.emptyList();
524 List<NodeId> activeCandidatesList =
525 candidatesList.stream()
526 .filter(n -> clusterService.getState(n) == ACTIVE)
527 .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
528 .collect(Collectors.toList());
529 if (activeCandidatesList.size() < candidatesList.size()) {
530 Set<NodeId> removedCandidates =
531 Sets.difference(Sets.newHashSet(candidatesList),
532 Sets.newHashSet(activeCandidatesList));
533 try {
Madan Jampanid09441b2015-06-09 14:50:55 -0700534 candidateMap.computeIf(path,
535 c -> c.stream()
536 .filter(n -> clusterService.getState(n) == INACTIVE)
537 .count() > 0,
538 (topic, c) -> c.stream()
539 .filter(n -> clusterService.getState(n) == ACTIVE)
540 .filter(n -> !localNodeId.equals(n) ||
541 activeTopics.contains(path))
542 .collect(Collectors.toList()));
Madan Jampanicc586752015-05-18 16:19:27 -0700543 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700544 log.debug("Failed to evict inactive candidates {} from "
Madan Jampanicc586752015-05-18 16:19:27 -0700545 + "candidate list for {}", removedCandidates, path, e);
546 rerunPurge.set(true);
547 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800548 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700549 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800550 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700551 log.debug("Failure purging state leadership.", e);
Madan Jampanicc586752015-05-18 16:19:27 -0700552 rerunPurge.set(true);
553 }
554
555 if (rerunPurge.get()) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700556 log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
Madan Jampanicc586752015-05-18 16:19:27 -0700557 scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
Madan Jampanid14166a2015-02-24 17:37:51 -0800558 }
559 }
560
Madan Jampani6f38daf2015-06-04 15:26:38 -0700561 private void refreshLeaderBoard() {
Madan Jampania14047d2015-02-25 12:23:02 -0800562 try {
Madan Jampani6f38daf2015-06-04 15:26:38 -0700563 Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
564 leaderMap.entrySet().forEach(entry -> {
565 String path = entry.getKey();
566 Versioned<NodeId> leader = entry.getValue();
567 Leadership leadership = new Leadership(path,
568 leader.value(),
569 leader.version(),
570 leader.creationTime());
571 newLeaderBoard.put(path, leadership);
572 });
573
574 // first take snapshot of current leader board.
575 Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
576
577 MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard);
578
579 // evict stale leaders
580 diff.entriesOnlyOnLeft().forEach((path, leadership) -> {
581 log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
582 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
583 });
584
585 // add missing leaders
586 diff.entriesOnlyOnRight().forEach((path, leadership) -> {
587 log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
588 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
589 });
590
591 // add updated leaders
592 diff.entriesDiffering().forEach((path, difference) -> {
593 Leadership current = difference.leftValue();
594 Leadership updated = difference.rightValue();
595 if (current.epoch() < updated.epoch()) {
596 log.debug("Updated {} in leaderboard.", updated);
597 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated));
Madan Jampani096da0e2015-06-04 14:30:19 -0700598 }
Madan Jampania14047d2015-02-25 12:23:02 -0800599 });
600 } catch (Exception e) {
Madan Jampani6f38daf2015-06-04 15:26:38 -0700601 log.debug("Failed to refresh leader board", e);
Madan Jampania14047d2015-02-25 12:23:02 -0800602 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800603 }
Madan Jampanicc586752015-05-18 16:19:27 -0700604
605 private class InternalClusterEventListener implements ClusterEventListener {
606
607 @Override
608 public void event(ClusterEvent event) {
609 if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
610 scheduleStaleLeadershipPurge(0);
611 }
612 }
613 }
Madan Jampania14047d2015-02-25 12:23:02 -0800614}