blob: 91daac2192f51333ade1f4d0aa348fa03848b56d [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07003import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -08004import com.google.common.collect.ImmutableMap;
Madan Jampani3650fc42015-05-27 17:01:50 -07005import com.google.common.collect.Iterables;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07006import com.google.common.collect.Lists;
Madan Jampani6f38daf2015-06-04 15:26:38 -07007import com.google.common.collect.MapDifference;
Jonathan Harte649c752015-03-03 18:04:25 -08008import com.google.common.collect.Maps;
9import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070010
Madan Jampani8d37efc2015-06-01 17:25:48 -070011import org.apache.commons.lang.math.RandomUtils;
Madan Jampanid14166a2015-02-24 17:37:51 -080012import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
15import org.apache.felix.scr.annotations.Reference;
16import org.apache.felix.scr.annotations.ReferenceCardinality;
17import org.apache.felix.scr.annotations.Service;
Madan Jampanicc586752015-05-18 16:19:27 -070018import org.onosproject.cluster.ClusterEvent;
19import org.onosproject.cluster.ClusterEvent.Type;
20import org.onosproject.cluster.ClusterEventListener;
Madan Jampanid14166a2015-02-24 17:37:51 -080021import org.onosproject.cluster.ClusterService;
Madan Jampanid14166a2015-02-24 17:37:51 -080022import org.onosproject.cluster.Leadership;
23import org.onosproject.cluster.LeadershipEvent;
24import org.onosproject.cluster.LeadershipEventListener;
25import org.onosproject.cluster.LeadershipService;
26import org.onosproject.cluster.NodeId;
Simon Huntff663742015-05-14 13:33:05 -070027import org.onosproject.event.ListenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080028import org.onosproject.event.EventDeliveryService;
29import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampanid14166a2015-02-24 17:37:51 -080030import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080031import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070032import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid09441b2015-06-09 14:50:55 -070033import org.onosproject.store.service.MapEvent;
Madan Jampanid14166a2015-02-24 17:37:51 -080034import org.onosproject.store.service.Serializer;
35import org.onosproject.store.service.StorageService;
36import org.onosproject.store.service.Versioned;
37import org.slf4j.Logger;
38
Madan Jampani1af8e132015-04-30 16:41:18 -070039import java.util.ArrayList;
Madan Jampanicc586752015-05-18 16:19:27 -070040import java.util.Collections;
Jonathan Harte649c752015-03-03 18:04:25 -080041import java.util.Map;
42import java.util.Map.Entry;
43import java.util.Objects;
44import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070045import java.util.List;
Madan Jampanide003d92015-05-11 17:14:20 -070046import java.util.concurrent.CancellationException;
47import java.util.concurrent.CompletableFuture;
Jonathan Harte649c752015-03-03 18:04:25 -080048import java.util.concurrent.Executors;
49import java.util.concurrent.ScheduledExecutorService;
50import java.util.concurrent.TimeUnit;
Madan Jampanid46e18f2015-05-04 23:19:33 -070051import java.util.concurrent.atomic.AtomicBoolean;
Jonathan Harte649c752015-03-03 18:04:25 -080052import java.util.stream.Collectors;
53
54import static com.google.common.base.Preconditions.checkArgument;
55import static org.onlab.util.Tools.groupedThreads;
56import static org.slf4j.LoggerFactory.getLogger;
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -070057import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
58import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
59
Madan Jampanid14166a2015-02-24 17:37:51 -080060/**
61 * Distributed Lock Manager implemented on top of ConsistentMap.
62 * <p>
Madan Jampanicc586752015-05-18 16:19:27 -070063 * This implementation makes use of ClusterService's failure
Madan Jampanid14166a2015-02-24 17:37:51 -080064 * detection capabilities to detect and purge stale locks.
65 * TODO: Ensure lock safety and liveness.
66 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080067@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080068@Service
69public class DistributedLeadershipManager implements LeadershipService {
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected StorageService storageService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected ClusterService clusterService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected ClusterCommunicationService clusterCommunicator;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected EventDeliveryService eventDispatcher;
82
Madan Jampanid14166a2015-02-24 17:37:51 -080083 private final Logger log = getLogger(getClass());
Madan Jampani7d243db2015-05-27 13:16:02 -070084 private ScheduledExecutorService electionRunner;
85 private ScheduledExecutorService lockExecutor;
Madan Jampanicc586752015-05-18 16:19:27 -070086 private ScheduledExecutorService staleLeadershipPurgeExecutor;
Madan Jampani6f38daf2015-06-04 15:26:38 -070087 private ScheduledExecutorService leadershipRefresher;
Madan Jampanid14166a2015-02-24 17:37:51 -080088
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070089 private ConsistentMap<String, NodeId> leaderMap;
90 private ConsistentMap<String, List<NodeId>> candidateMap;
91
Madan Jampanicc586752015-05-18 16:19:27 -070092 private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080093 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
Ayaka Koshibefd26a302015-04-13 13:59:54 -070094 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
Madan Jampanicc586752015-05-18 16:19:27 -070095 private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
Madan Jampanid14166a2015-02-24 17:37:51 -080096
Madan Jampanicc586752015-05-18 16:19:27 -070097 private NodeId localNodeId;
Madan Jampanid14166a2015-02-24 17:37:51 -080098 private Set<String> activeTopics = Sets.newConcurrentHashSet();
Madan Jampani7d243db2015-05-27 13:16:02 -070099 private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
Madan Jampanid14166a2015-02-24 17:37:51 -0800100
Madan Jampanid09441b2015-06-09 14:50:55 -0700101 // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS)
Madan Jampani8d37efc2015-06-01 17:25:48 -0700102 private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
Madan Jampanid14166a2015-02-24 17:37:51 -0800103 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
Madan Jampani6f38daf2015-06-04 15:26:38 -0700104 private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
Madan Jampanicc586752015-05-18 16:19:27 -0700105 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700106
Madan Jampanicc586752015-05-18 16:19:27 -0700107 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
108
Madan Jampanid09441b2015-06-09 14:50:55 -0700109 private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
Madan Jampanid14166a2015-02-24 17:37:51 -0800110
111 @Activate
112 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700113 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
114 .withName("onos-topic-leaders")
115 .withSerializer(SERIALIZER)
116 .withPartitionsDisabled().build();
117 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
118 .withName("onos-topic-candidates")
119 .withSerializer(SERIALIZER)
120 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800121
Madan Jampanid09441b2015-06-09 14:50:55 -0700122 leaderMap.addListener(event -> {
123 log.debug("Received {}", event);
124 LeadershipEvent.Type leadershipEventType = null;
125 if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
126 leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED;
127 } else if (event.type() == MapEvent.Type.REMOVE) {
128 leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED;
129 }
130 onLeadershipEvent(new LeadershipEvent(
131 leadershipEventType,
132 new Leadership(event.key(),
133 event.value().value(),
134 event.value().version(),
135 event.value().creationTime())));
136 });
137
138 candidateMap.addListener(event -> {
139 log.debug("Received {}", event);
140 if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) {
141 log.error("Entries must not be removed from candidate map");
142 return;
143 }
144 onLeadershipEvent(new LeadershipEvent(
145 LeadershipEvent.Type.CANDIDATES_CHANGED,
146 new Leadership(event.key(),
147 event.value().value(),
148 event.value().version(),
149 event.value().creationTime())));
150 });
151
Madan Jampanid14166a2015-02-24 17:37:51 -0800152 localNodeId = clusterService.getLocalNode().id();
153
Madan Jampani7d243db2015-05-27 13:16:02 -0700154 electionRunner = Executors.newSingleThreadScheduledExecutor(
155 groupedThreads("onos/store/leadership", "election-runner"));
156 lockExecutor = Executors.newScheduledThreadPool(
Madan Jampanid14166a2015-02-24 17:37:51 -0800157 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
Madan Jampanicc586752015-05-18 16:19:27 -0700158 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
159 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
Madan Jampani6f38daf2015-06-04 15:26:38 -0700160 leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
161 groupedThreads("onos/store/leadership", "refresh-thread"));
Madan Jampanid14166a2015-02-24 17:37:51 -0800162
Madan Jampanicc586752015-05-18 16:19:27 -0700163 clusterService.addListener(clusterEventListener);
164
Madan Jampani7d243db2015-05-27 13:16:02 -0700165 electionRunner.scheduleWithFixedDelay(
166 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
167
Madan Jampani6f38daf2015-06-04 15:26:38 -0700168 leadershipRefresher.scheduleWithFixedDelay(
169 this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800170
Simon Huntff663742015-05-14 13:33:05 -0700171 listenerRegistry = new ListenerRegistry<>();
Madan Jampanid14166a2015-02-24 17:37:51 -0800172 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
173
Madan Jampanid46e18f2015-05-04 23:19:33 -0700174 log.info("Started");
Madan Jampanid14166a2015-02-24 17:37:51 -0800175 }
176
177 @Deactivate
178 public void deactivate() {
179 leaderBoard.forEach((topic, leadership) -> {
180 if (localNodeId.equals(leadership.leader())) {
181 withdraw(topic);
182 }
183 });
184
Madan Jampanicc586752015-05-18 16:19:27 -0700185 clusterService.removeListener(clusterEventListener);
Madan Jampanid14166a2015-02-24 17:37:51 -0800186 eventDispatcher.removeSink(LeadershipEvent.class);
Madan Jampanid14166a2015-02-24 17:37:51 -0800187
Madan Jampani7d243db2015-05-27 13:16:02 -0700188 electionRunner.shutdown();
Madan Jampani7d243db2015-05-27 13:16:02 -0700189 lockExecutor.shutdown();
Madan Jampanicc586752015-05-18 16:19:27 -0700190 staleLeadershipPurgeExecutor.shutdown();
Madan Jampani6f38daf2015-06-04 15:26:38 -0700191 leadershipRefresher.shutdown();
Madan Jampanid14166a2015-02-24 17:37:51 -0800192
Madan Jampanid46e18f2015-05-04 23:19:33 -0700193 log.info("Stopped");
Madan Jampanid14166a2015-02-24 17:37:51 -0800194 }
195
196 @Override
197 public Map<String, Leadership> getLeaderBoard() {
198 return ImmutableMap.copyOf(leaderBoard);
199 }
200
201 @Override
Madan Jampanifd45d5e2015-04-20 13:33:21 -0700202 public Map<String, List<NodeId>> getCandidates() {
203 return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700204 }
205
206 @Override
207 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700208 Leadership current = candidateBoard.get(path);
209 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700210 }
211
212 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800213 public NodeId getLeader(String path) {
214 Leadership leadership = leaderBoard.get(path);
215 return leadership != null ? leadership.leader() : null;
216 }
217
218 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800219 public Leadership getLeadership(String path) {
220 checkArgument(path != null);
221 return leaderBoard.get(path);
222 }
223
224 @Override
225 public Set<String> ownedTopics(NodeId nodeId) {
226 checkArgument(nodeId != null);
227 return leaderBoard.entrySet()
228 .stream()
229 .filter(entry -> nodeId.equals(entry.getValue().leader()))
230 .map(Entry::getKey)
231 .collect(Collectors.toSet());
232 }
233
234 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700235 public CompletableFuture<Leadership> runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800236 log.debug("Running for leadership for topic: {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700237 CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
238 doRunForLeadership(path, resultFuture);
239 return resultFuture;
240 }
241
242 private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700243 try {
Madan Jampani346d4f52015-05-04 11:09:39 -0700244 Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
245 currentList -> currentList == null || !currentList.contains(localNodeId),
246 (topic, currentList) -> {
247 if (currentList == null) {
248 return ImmutableList.of(localNodeId);
249 } else {
250 List<NodeId> newList = Lists.newLinkedList();
251 newList.addAll(currentList);
252 newList.add(localNodeId);
253 return newList;
254 }
255 });
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700256 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
257 activeTopics.add(path);
Madan Jampani7d243db2015-05-27 13:16:02 -0700258 Leadership leadership = electLeader(path, candidates.value());
259 if (leadership == null) {
260 pendingFutures.put(path, future);
261 } else {
262 future.complete(leadership);
263 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700264 } catch (ConsistentMapException e) {
265 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700266 rerunForLeadership(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700267 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800268 }
269
270 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700271 public CompletableFuture<Void> withdraw(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800272 activeTopics.remove(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700273 CompletableFuture<Void> resultFuture = new CompletableFuture<>();
274 doWithdraw(path, resultFuture);
275 return resultFuture;
276 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700277
Madan Jampanide003d92015-05-11 17:14:20 -0700278
279 private void doWithdraw(String path, CompletableFuture<Void> future) {
280 if (activeTopics.contains(path)) {
281 future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
282 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800283 try {
Madan Jampanid09441b2015-06-09 14:50:55 -0700284 leaderMap.computeIf(path,
285 localNodeId::equals,
286 (topic, leader) -> null);
287 candidateMap.computeIf(path,
288 candidates -> candidates != null && candidates.contains(localNodeId),
289 (topic, candidates) -> candidates.stream()
290 .filter(nodeId -> !localNodeId.equals(nodeId))
291 .collect(Collectors.toList()));
Madan Jampani052ad652015-06-10 13:20:32 -0700292 future.complete(null);
Madan Jampanid14166a2015-02-24 17:37:51 -0800293 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800294 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700295 retryWithdraw(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800296 }
297 }
298
299 @Override
Madan Jampani1af8e132015-04-30 16:41:18 -0700300 public boolean stepdown(String path) {
Madan Jampani9bd1f152015-04-30 23:33:35 -0700301 if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700302 return false;
303 }
304
305 try {
Madan Jampanid09441b2015-06-09 14:50:55 -0700306 return leaderMap.computeIf(path,
307 localNodeId::equals,
308 (topic, leader) -> null) == null;
Madan Jampani1af8e132015-04-30 16:41:18 -0700309 } catch (Exception e) {
310 log.warn("Error executing stepdown for {}", path, e);
311 }
312 return false;
313 }
314
315 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800316 public void addListener(LeadershipEventListener listener) {
317 listenerRegistry.addListener(listener);
318 }
319
320 @Override
321 public void removeListener(LeadershipEventListener listener) {
322 listenerRegistry.removeListener(listener);
323 }
324
Madan Jampani1af8e132015-04-30 16:41:18 -0700325 @Override
326 public boolean makeTopCandidate(String path, NodeId nodeId) {
Madan Jampanid09441b2015-06-09 14:50:55 -0700327 Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path,
Madan Jampani3650fc42015-05-27 17:01:50 -0700328 candidates -> candidates != null &&
329 candidates.contains(nodeId) &&
330 !nodeId.equals(Iterables.getFirst(candidates, null)),
Madan Jampani346d4f52015-05-04 11:09:39 -0700331 (topic, candidates) -> {
332 List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
333 updatedCandidates.add(nodeId);
334 candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
335 return updatedCandidates;
336 });
Madan Jampanid09441b2015-06-09 14:50:55 -0700337 List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList();
338 return candidates.size() > 0 && nodeId.equals(candidates.get(0));
Madan Jampani1af8e132015-04-30 16:41:18 -0700339 }
340
Madan Jampani7d243db2015-05-27 13:16:02 -0700341 private Leadership electLeader(String path, List<NodeId> candidates) {
342 Leadership currentLeadership = getLeadership(path);
343 if (currentLeadership != null) {
344 return currentLeadership;
345 } else {
346 NodeId topCandidate = candidates
347 .stream()
348 .filter(n -> clusterService.getState(n) == ACTIVE)
349 .findFirst()
350 .orElse(null);
351 try {
352 Versioned<NodeId> leader = localNodeId.equals(topCandidate)
353 ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
354 if (leader != null) {
355 Leadership newLeadership = new Leadership(path,
356 leader.value(),
357 leader.version(),
358 leader.creationTime());
Madan Jampani7d243db2015-05-27 13:16:02 -0700359 return newLeadership;
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700360 }
Madan Jampani7d243db2015-05-27 13:16:02 -0700361 } catch (Exception e) {
362 log.debug("Failed to elect leader for {}", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700363 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700364 }
Madan Jampani7d243db2015-05-27 13:16:02 -0700365 return null;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700366 }
367
Madan Jampani7d243db2015-05-27 13:16:02 -0700368 private void electLeaders() {
Madan Jampanid14166a2015-02-24 17:37:51 -0800369 try {
Madan Jampani7d243db2015-05-27 13:16:02 -0700370 candidateMap.entrySet().forEach(entry -> {
371 String path = entry.getKey();
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700372 Versioned<List<NodeId>> candidates = entry.getValue();
373 // for active topics, check if this node can become a leader (if it isn't already)
Madan Jampani7d243db2015-05-27 13:16:02 -0700374 if (activeTopics.contains(path)) {
375 lockExecutor.submit(() -> {
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700376 Leadership leadership = electLeader(path, candidates.value());
Madan Jampani7d243db2015-05-27 13:16:02 -0700377 if (leadership != null) {
378 CompletableFuture<Leadership> future = pendingFutures.remove(path);
379 if (future != null) {
380 future.complete(leadership);
381 }
382 }
383 });
384 }
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700385 // Raise a CANDIDATES_CHANGED event to force refresh local candidate board
386 // and also to update local listeners.
387 // Don't worry about duplicate events as they will be suppressed.
388 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
389 new Leadership(path,
390 candidates.value(),
391 candidates.version(),
392 candidates.creationTime())));
Madan Jampani7d243db2015-05-27 13:16:02 -0700393 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800394 } catch (Exception e) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700395 log.debug("Failure electing leaders", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800396 }
397 }
398
Madan Jampanid46e18f2015-05-04 23:19:33 -0700399 private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
400 log.trace("Leadership Event: time = {} type = {} event = {}",
401 leadershipEvent.time(), leadershipEvent.type(),
402 leadershipEvent);
403
404 Leadership leadershipUpdate = leadershipEvent.subject();
405 LeadershipEvent.Type eventType = leadershipEvent.type();
406 String topic = leadershipUpdate.topic();
407
408 AtomicBoolean updateAccepted = new AtomicBoolean(false);
409 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
410 leaderBoard.compute(topic, (k, currentLeadership) -> {
411 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
412 updateAccepted.set(true);
413 return leadershipUpdate;
414 }
415 return currentLeadership;
416 });
417 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
418 leaderBoard.compute(topic, (k, currentLeadership) -> {
419 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
420 updateAccepted.set(true);
Madan Jampani6f38daf2015-06-04 15:26:38 -0700421 // FIXME: Removing entries from leaderboard is not safe and should be visited.
Madan Jampanid46e18f2015-05-04 23:19:33 -0700422 return null;
423 }
424 return currentLeadership;
425 });
426 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
427 candidateBoard.compute(topic, (k, currentInfo) -> {
428 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
429 updateAccepted.set(true);
430 return leadershipUpdate;
431 }
432 return currentInfo;
433 });
434 } else {
435 throw new IllegalStateException("Unknown event type.");
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700436 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700437
Madan Jampanid46e18f2015-05-04 23:19:33 -0700438 if (updateAccepted.get()) {
439 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800440 }
441 }
442
Madan Jampanide003d92015-05-11 17:14:20 -0700443 private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700444 lockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700445 () -> doRunForLeadership(path, future),
Madan Jampani8d37efc2015-06-01 17:25:48 -0700446 RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
447 TimeUnit.MILLISECONDS);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700448 }
449
Madan Jampanide003d92015-05-11 17:14:20 -0700450 private void retryWithdraw(String path, CompletableFuture<Void> future) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700451 lockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700452 () -> doWithdraw(path, future),
Madan Jampani8d37efc2015-06-01 17:25:48 -0700453 RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
454 TimeUnit.MILLISECONDS);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700455 }
456
Madan Jampanicc586752015-05-18 16:19:27 -0700457 private void scheduleStaleLeadershipPurge(int afterDelaySec) {
458 if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
459 staleLeadershipPurgeExecutor.schedule(
460 this::purgeStaleLeadership,
461 afterDelaySec,
462 TimeUnit.SECONDS);
463 }
464 }
465
466 /**
467 * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
468 */
469 private void purgeStaleLeadership() {
470 AtomicBoolean rerunPurge = new AtomicBoolean(false);
Madan Jampanid14166a2015-02-24 17:37:51 -0800471 try {
Madan Jampanicc586752015-05-18 16:19:27 -0700472 staleLeadershipPurgeScheduled.set(false);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700473 leaderMap.entrySet()
474 .stream()
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -0700475 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700476 .forEach(entry -> {
477 String path = entry.getKey();
478 NodeId nodeId = entry.getValue().value();
Madan Jampanid14166a2015-02-24 17:37:51 -0800479 try {
Madan Jampanid09441b2015-06-09 14:50:55 -0700480 leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null);
Madan Jampanid14166a2015-02-24 17:37:51 -0800481 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700482 log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
Madan Jampanicc586752015-05-18 16:19:27 -0700483 rerunPurge.set(true);
484 }
485 });
486
487 candidateMap.entrySet()
488 .forEach(entry -> {
489 String path = entry.getKey();
490 Versioned<List<NodeId>> candidates = entry.getValue();
491 List<NodeId> candidatesList = candidates != null
492 ? candidates.value() : Collections.emptyList();
493 List<NodeId> activeCandidatesList =
494 candidatesList.stream()
495 .filter(n -> clusterService.getState(n) == ACTIVE)
496 .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
497 .collect(Collectors.toList());
498 if (activeCandidatesList.size() < candidatesList.size()) {
499 Set<NodeId> removedCandidates =
500 Sets.difference(Sets.newHashSet(candidatesList),
501 Sets.newHashSet(activeCandidatesList));
502 try {
Madan Jampanid09441b2015-06-09 14:50:55 -0700503 candidateMap.computeIf(path,
504 c -> c.stream()
505 .filter(n -> clusterService.getState(n) == INACTIVE)
506 .count() > 0,
507 (topic, c) -> c.stream()
508 .filter(n -> clusterService.getState(n) == ACTIVE)
509 .filter(n -> !localNodeId.equals(n) ||
510 activeTopics.contains(path))
511 .collect(Collectors.toList()));
Madan Jampanicc586752015-05-18 16:19:27 -0700512 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700513 log.debug("Failed to evict inactive candidates {} from "
Madan Jampanicc586752015-05-18 16:19:27 -0700514 + "candidate list for {}", removedCandidates, path, e);
515 rerunPurge.set(true);
516 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800517 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700518 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800519 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700520 log.debug("Failure purging state leadership.", e);
Madan Jampanicc586752015-05-18 16:19:27 -0700521 rerunPurge.set(true);
522 }
523
524 if (rerunPurge.get()) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700525 log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
Madan Jampanicc586752015-05-18 16:19:27 -0700526 scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
Madan Jampanid14166a2015-02-24 17:37:51 -0800527 }
528 }
529
Madan Jampani6f38daf2015-06-04 15:26:38 -0700530 private void refreshLeaderBoard() {
Madan Jampania14047d2015-02-25 12:23:02 -0800531 try {
Madan Jampani6f38daf2015-06-04 15:26:38 -0700532 Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
533 leaderMap.entrySet().forEach(entry -> {
534 String path = entry.getKey();
535 Versioned<NodeId> leader = entry.getValue();
536 Leadership leadership = new Leadership(path,
537 leader.value(),
538 leader.version(),
539 leader.creationTime());
540 newLeaderBoard.put(path, leadership);
541 });
542
543 // first take snapshot of current leader board.
544 Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
545
546 MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard);
547
548 // evict stale leaders
549 diff.entriesOnlyOnLeft().forEach((path, leadership) -> {
550 log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
551 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
552 });
553
554 // add missing leaders
555 diff.entriesOnlyOnRight().forEach((path, leadership) -> {
556 log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
557 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
558 });
559
560 // add updated leaders
561 diff.entriesDiffering().forEach((path, difference) -> {
562 Leadership current = difference.leftValue();
563 Leadership updated = difference.rightValue();
564 if (current.epoch() < updated.epoch()) {
565 log.debug("Updated {} in leaderboard.", updated);
566 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated));
Madan Jampani096da0e2015-06-04 14:30:19 -0700567 }
Madan Jampania14047d2015-02-25 12:23:02 -0800568 });
569 } catch (Exception e) {
Madan Jampani6f38daf2015-06-04 15:26:38 -0700570 log.debug("Failed to refresh leader board", e);
Madan Jampania14047d2015-02-25 12:23:02 -0800571 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800572 }
Madan Jampanicc586752015-05-18 16:19:27 -0700573
574 private class InternalClusterEventListener implements ClusterEventListener {
575
576 @Override
577 public void event(ClusterEvent event) {
578 if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
579 scheduleStaleLeadershipPurge(0);
580 }
581 }
582 }
Madan Jampania14047d2015-02-25 12:23:02 -0800583}