blob: 23355e91e9bdc93afc4ec1f629263495683a2679 [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07003import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -08004import com.google.common.collect.ImmutableMap;
Madan Jampani3650fc42015-05-27 17:01:50 -07005import com.google.common.collect.Iterables;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07006import com.google.common.collect.Lists;
Jonathan Harte649c752015-03-03 18:04:25 -08007import com.google.common.collect.Maps;
8import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07009
Madan Jampani8d37efc2015-06-01 17:25:48 -070010import org.apache.commons.lang.math.RandomUtils;
Madan Jampanid14166a2015-02-24 17:37:51 -080011import org.apache.felix.scr.annotations.Activate;
12import org.apache.felix.scr.annotations.Component;
13import org.apache.felix.scr.annotations.Deactivate;
14import org.apache.felix.scr.annotations.Reference;
15import org.apache.felix.scr.annotations.ReferenceCardinality;
16import org.apache.felix.scr.annotations.Service;
17import org.onlab.util.KryoNamespace;
Madan Jampanicc586752015-05-18 16:19:27 -070018import org.onosproject.cluster.ClusterEvent;
19import org.onosproject.cluster.ClusterEvent.Type;
20import org.onosproject.cluster.ClusterEventListener;
Madan Jampanid14166a2015-02-24 17:37:51 -080021import org.onosproject.cluster.ClusterService;
Madan Jampanid14166a2015-02-24 17:37:51 -080022import org.onosproject.cluster.Leadership;
23import org.onosproject.cluster.LeadershipEvent;
24import org.onosproject.cluster.LeadershipEventListener;
25import org.onosproject.cluster.LeadershipService;
26import org.onosproject.cluster.NodeId;
Simon Huntff663742015-05-14 13:33:05 -070027import org.onosproject.event.ListenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080028import org.onosproject.event.EventDeliveryService;
29import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampanid14166a2015-02-24 17:37:51 -080030import org.onosproject.store.cluster.messaging.MessageSubject;
31import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080032import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070033import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid14166a2015-02-24 17:37:51 -080034import org.onosproject.store.service.Serializer;
35import org.onosproject.store.service.StorageService;
36import org.onosproject.store.service.Versioned;
37import org.slf4j.Logger;
38
Madan Jampani1af8e132015-04-30 16:41:18 -070039import java.util.ArrayList;
Madan Jampanicc586752015-05-18 16:19:27 -070040import java.util.Collections;
Jonathan Harte649c752015-03-03 18:04:25 -080041import java.util.Map;
42import java.util.Map.Entry;
43import java.util.Objects;
44import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070045import java.util.List;
Madan Jampanide003d92015-05-11 17:14:20 -070046import java.util.concurrent.CancellationException;
47import java.util.concurrent.CompletableFuture;
Jonathan Harte649c752015-03-03 18:04:25 -080048import java.util.concurrent.ExecutorService;
49import java.util.concurrent.Executors;
50import java.util.concurrent.ScheduledExecutorService;
51import java.util.concurrent.TimeUnit;
Madan Jampanid46e18f2015-05-04 23:19:33 -070052import java.util.concurrent.atomic.AtomicBoolean;
Jonathan Harte649c752015-03-03 18:04:25 -080053import java.util.stream.Collectors;
54
55import static com.google.common.base.Preconditions.checkArgument;
56import static org.onlab.util.Tools.groupedThreads;
57import static org.slf4j.LoggerFactory.getLogger;
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -070058import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
59import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
60
Madan Jampanid14166a2015-02-24 17:37:51 -080061/**
62 * Distributed Lock Manager implemented on top of ConsistentMap.
63 * <p>
Madan Jampanicc586752015-05-18 16:19:27 -070064 * This implementation makes use of ClusterService's failure
Madan Jampanid14166a2015-02-24 17:37:51 -080065 * detection capabilities to detect and purge stale locks.
66 * TODO: Ensure lock safety and liveness.
67 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080068@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080069@Service
70public class DistributedLeadershipManager implements LeadershipService {
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected StorageService storageService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected ClusterService clusterService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected ClusterCommunicationService clusterCommunicator;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected EventDeliveryService eventDispatcher;
83
84 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
85 new MessageSubject("distributed-leadership-manager-events");
86
87 private final Logger log = getLogger(getClass());
88 private ExecutorService messageHandlingExecutor;
Madan Jampani7d243db2015-05-27 13:16:02 -070089 private ScheduledExecutorService electionRunner;
90 private ScheduledExecutorService lockExecutor;
Madan Jampanicc586752015-05-18 16:19:27 -070091 private ScheduledExecutorService staleLeadershipPurgeExecutor;
Madan Jampani08960a42015-06-04 09:59:29 -070092 private ScheduledExecutorService leadershipRefresher;
Madan Jampanid14166a2015-02-24 17:37:51 -080093
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070094 private ConsistentMap<String, NodeId> leaderMap;
95 private ConsistentMap<String, List<NodeId>> candidateMap;
96
Madan Jampanicc586752015-05-18 16:19:27 -070097 private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080098 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
Ayaka Koshibefd26a302015-04-13 13:59:54 -070099 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
Madan Jampanicc586752015-05-18 16:19:27 -0700100 private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
Madan Jampanid14166a2015-02-24 17:37:51 -0800101
Madan Jampanicc586752015-05-18 16:19:27 -0700102 private NodeId localNodeId;
Madan Jampanid14166a2015-02-24 17:37:51 -0800103 private Set<String> activeTopics = Sets.newConcurrentHashSet();
Madan Jampani7d243db2015-05-27 13:16:02 -0700104 private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
Madan Jampanid14166a2015-02-24 17:37:51 -0800105
Madan Jampani8d37efc2015-06-01 17:25:48 -0700106 // The actual delay is randomly chosen between the interval [0, WAIT_BEFORE_RETRY_MILLIS)
107 private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
Madan Jampanid14166a2015-02-24 17:37:51 -0800108 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
Madan Jampani08960a42015-06-04 09:59:29 -0700109 private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
Madan Jampanicc586752015-05-18 16:19:27 -0700110 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700111
Madan Jampanicc586752015-05-18 16:19:27 -0700112 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
113
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700114 private static final Serializer SERIALIZER = Serializer.using(
115 new KryoNamespace.Builder().register(KryoNamespaces.API).build());
Madan Jampanid14166a2015-02-24 17:37:51 -0800116
117 @Activate
118 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700119 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
120 .withName("onos-topic-leaders")
121 .withSerializer(SERIALIZER)
122 .withPartitionsDisabled().build();
123 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
124 .withName("onos-topic-candidates")
125 .withSerializer(SERIALIZER)
126 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800127
128 localNodeId = clusterService.getLocalNode().id();
129
130 messageHandlingExecutor = Executors.newSingleThreadExecutor(
131 groupedThreads("onos/store/leadership", "message-handler"));
Madan Jampani7d243db2015-05-27 13:16:02 -0700132 electionRunner = Executors.newSingleThreadScheduledExecutor(
133 groupedThreads("onos/store/leadership", "election-runner"));
134 lockExecutor = Executors.newScheduledThreadPool(
Madan Jampanid14166a2015-02-24 17:37:51 -0800135 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
Madan Jampanicc586752015-05-18 16:19:27 -0700136 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
137 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
Madan Jampani08960a42015-06-04 09:59:29 -0700138 leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
139 groupedThreads("onos/store/leadership", "refresh-thread"));
Madan Jampanid14166a2015-02-24 17:37:51 -0800140 clusterCommunicator.addSubscriber(
141 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
Madan Jampanid46e18f2015-05-04 23:19:33 -0700142 SERIALIZER::decode,
143 this::onLeadershipEvent,
Madan Jampanid14166a2015-02-24 17:37:51 -0800144 messageHandlingExecutor);
145
Madan Jampanicc586752015-05-18 16:19:27 -0700146 clusterService.addListener(clusterEventListener);
147
Madan Jampani7d243db2015-05-27 13:16:02 -0700148 electionRunner.scheduleWithFixedDelay(
149 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
150
Madan Jampani08960a42015-06-04 09:59:29 -0700151 leadershipRefresher.scheduleWithFixedDelay(
152 this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800153
Simon Huntff663742015-05-14 13:33:05 -0700154 listenerRegistry = new ListenerRegistry<>();
Madan Jampanid14166a2015-02-24 17:37:51 -0800155 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
156
Madan Jampanid46e18f2015-05-04 23:19:33 -0700157 log.info("Started");
Madan Jampanid14166a2015-02-24 17:37:51 -0800158 }
159
160 @Deactivate
161 public void deactivate() {
162 leaderBoard.forEach((topic, leadership) -> {
163 if (localNodeId.equals(leadership.leader())) {
164 withdraw(topic);
165 }
166 });
167
Madan Jampanicc586752015-05-18 16:19:27 -0700168 clusterService.removeListener(clusterEventListener);
Madan Jampanid14166a2015-02-24 17:37:51 -0800169 eventDispatcher.removeSink(LeadershipEvent.class);
170 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
171
Madan Jampani7d243db2015-05-27 13:16:02 -0700172 electionRunner.shutdown();
Madan Jampanid14166a2015-02-24 17:37:51 -0800173 messageHandlingExecutor.shutdown();
Madan Jampani7d243db2015-05-27 13:16:02 -0700174 lockExecutor.shutdown();
Madan Jampanicc586752015-05-18 16:19:27 -0700175 staleLeadershipPurgeExecutor.shutdown();
Madan Jampani08960a42015-06-04 09:59:29 -0700176 leadershipRefresher.shutdown();
Madan Jampanid14166a2015-02-24 17:37:51 -0800177
Madan Jampanid46e18f2015-05-04 23:19:33 -0700178 log.info("Stopped");
Madan Jampanid14166a2015-02-24 17:37:51 -0800179 }
180
181 @Override
182 public Map<String, Leadership> getLeaderBoard() {
183 return ImmutableMap.copyOf(leaderBoard);
184 }
185
186 @Override
Madan Jampanifd45d5e2015-04-20 13:33:21 -0700187 public Map<String, List<NodeId>> getCandidates() {
188 return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700189 }
190
191 @Override
192 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700193 Leadership current = candidateBoard.get(path);
194 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700195 }
196
197 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800198 public NodeId getLeader(String path) {
199 Leadership leadership = leaderBoard.get(path);
200 return leadership != null ? leadership.leader() : null;
201 }
202
203 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800204 public Leadership getLeadership(String path) {
205 checkArgument(path != null);
206 return leaderBoard.get(path);
207 }
208
209 @Override
210 public Set<String> ownedTopics(NodeId nodeId) {
211 checkArgument(nodeId != null);
212 return leaderBoard.entrySet()
213 .stream()
214 .filter(entry -> nodeId.equals(entry.getValue().leader()))
215 .map(Entry::getKey)
216 .collect(Collectors.toSet());
217 }
218
219 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700220 public CompletableFuture<Leadership> runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800221 log.debug("Running for leadership for topic: {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700222 CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
223 doRunForLeadership(path, resultFuture);
224 return resultFuture;
225 }
226
227 private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700228 try {
Madan Jampani346d4f52015-05-04 11:09:39 -0700229 Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
230 currentList -> currentList == null || !currentList.contains(localNodeId),
231 (topic, currentList) -> {
232 if (currentList == null) {
233 return ImmutableList.of(localNodeId);
234 } else {
235 List<NodeId> newList = Lists.newLinkedList();
236 newList.addAll(currentList);
237 newList.add(localNodeId);
238 return newList;
239 }
240 });
241 publish(new LeadershipEvent(
242 LeadershipEvent.Type.CANDIDATES_CHANGED,
243 new Leadership(path,
244 candidates.value(),
245 candidates.version(),
246 candidates.creationTime())));
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700247 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
248 activeTopics.add(path);
Madan Jampani7d243db2015-05-27 13:16:02 -0700249 Leadership leadership = electLeader(path, candidates.value());
250 if (leadership == null) {
251 pendingFutures.put(path, future);
252 } else {
253 future.complete(leadership);
254 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700255 } catch (ConsistentMapException e) {
256 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700257 rerunForLeadership(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700258 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800259 }
260
261 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700262 public CompletableFuture<Void> withdraw(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800263 activeTopics.remove(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700264 CompletableFuture<Void> resultFuture = new CompletableFuture<>();
265 doWithdraw(path, resultFuture);
266 return resultFuture;
267 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700268
Madan Jampanide003d92015-05-11 17:14:20 -0700269
270 private void doWithdraw(String path, CompletableFuture<Void> future) {
271 if (activeTopics.contains(path)) {
272 future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
273 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800274 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700275 Versioned<NodeId> leader = leaderMap.get(path);
276 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
277 if (leaderMap.remove(path, leader.version())) {
Madan Jampanieedfe542015-05-26 09:54:09 -0700278 log.debug("Gave up leadership for {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700279 future.complete(null);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700280 publish(new LeadershipEvent(
281 LeadershipEvent.Type.LEADER_BOOTED,
282 new Leadership(path,
283 localNodeId,
284 leader.version(),
285 leader.creationTime())));
Jonathan Harte649c752015-03-03 18:04:25 -0800286 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800287 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700288 // else we are not the current leader, can still be a candidate.
289 Versioned<List<NodeId>> candidates = candidateMap.get(path);
290 List<NodeId> candidateList = candidates != null
291 ? Lists.newArrayList(candidates.value())
292 : Lists.newArrayList();
293 if (!candidateList.remove(localNodeId)) {
Madan Jampanide003d92015-05-11 17:14:20 -0700294 future.complete(null);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700295 return;
296 }
Ayaka Koshibe93451b42015-04-21 15:12:03 -0700297 if (candidateMap.replace(path, candidates.version(), candidateList)) {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700298 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700299 future.complete(null);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700300 publish(new LeadershipEvent(
301 LeadershipEvent.Type.CANDIDATES_CHANGED,
302 new Leadership(path,
303 newCandidates.value(),
304 newCandidates.version(),
305 newCandidates.creationTime())));
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700306 } else {
Madan Jampani8d37efc2015-06-01 17:25:48 -0700307 log.debug("Failed to withdraw from candidates list for {}. Will retry", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700308 retryWithdraw(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700309 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800310 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800311 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700312 retryWithdraw(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800313 }
314 }
315
316 @Override
Madan Jampani1af8e132015-04-30 16:41:18 -0700317 public boolean stepdown(String path) {
Madan Jampani9bd1f152015-04-30 23:33:35 -0700318 if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700319 return false;
320 }
321
322 try {
323 Versioned<NodeId> leader = leaderMap.get(path);
324 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
325 if (leaderMap.remove(path, leader.version())) {
Madan Jampanieedfe542015-05-26 09:54:09 -0700326 log.debug("Stepped down from leadership for {}", path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700327 publish(new LeadershipEvent(
328 LeadershipEvent.Type.LEADER_BOOTED,
329 new Leadership(path,
330 localNodeId,
331 leader.version(),
332 leader.creationTime())));
Madan Jampani1af8e132015-04-30 16:41:18 -0700333 return true;
334 }
335 }
336 } catch (Exception e) {
337 log.warn("Error executing stepdown for {}", path, e);
338 }
339 return false;
340 }
341
342 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800343 public void addListener(LeadershipEventListener listener) {
344 listenerRegistry.addListener(listener);
345 }
346
347 @Override
348 public void removeListener(LeadershipEventListener listener) {
349 listenerRegistry.removeListener(listener);
350 }
351
Madan Jampani1af8e132015-04-30 16:41:18 -0700352 @Override
353 public boolean makeTopCandidate(String path, NodeId nodeId) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700354 Versioned<List<NodeId>> newCandidates = candidateMap.computeIf(path,
Madan Jampani3650fc42015-05-27 17:01:50 -0700355 candidates -> candidates != null &&
356 candidates.contains(nodeId) &&
357 !nodeId.equals(Iterables.getFirst(candidates, null)),
Madan Jampani346d4f52015-05-04 11:09:39 -0700358 (topic, candidates) -> {
359 List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
360 updatedCandidates.add(nodeId);
361 candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
362 return updatedCandidates;
363 });
364 publish(new LeadershipEvent(
Madan Jampanid46e18f2015-05-04 23:19:33 -0700365 LeadershipEvent.Type.CANDIDATES_CHANGED,
366 new Leadership(path,
367 newCandidates.value(),
368 newCandidates.version(),
369 newCandidates.creationTime())));
Madan Jampani346d4f52015-05-04 11:09:39 -0700370 return true;
Madan Jampani1af8e132015-04-30 16:41:18 -0700371 }
372
Madan Jampani7d243db2015-05-27 13:16:02 -0700373 private Leadership electLeader(String path, List<NodeId> candidates) {
374 Leadership currentLeadership = getLeadership(path);
375 if (currentLeadership != null) {
376 return currentLeadership;
377 } else {
378 NodeId topCandidate = candidates
379 .stream()
380 .filter(n -> clusterService.getState(n) == ACTIVE)
381 .findFirst()
382 .orElse(null);
383 try {
384 Versioned<NodeId> leader = localNodeId.equals(topCandidate)
385 ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
386 if (leader != null) {
387 Leadership newLeadership = new Leadership(path,
388 leader.value(),
389 leader.version(),
390 leader.creationTime());
391 publish(new LeadershipEvent(
392 LeadershipEvent.Type.LEADER_ELECTED,
393 newLeadership));
394 return newLeadership;
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700395 }
Madan Jampani7d243db2015-05-27 13:16:02 -0700396 } catch (Exception e) {
397 log.debug("Failed to elect leader for {}", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700398 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700399 }
Madan Jampani7d243db2015-05-27 13:16:02 -0700400 return null;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700401 }
402
Madan Jampani7d243db2015-05-27 13:16:02 -0700403 private void electLeaders() {
Madan Jampanid14166a2015-02-24 17:37:51 -0800404 try {
Madan Jampani7d243db2015-05-27 13:16:02 -0700405 candidateMap.entrySet().forEach(entry -> {
406 String path = entry.getKey();
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700407 Versioned<List<NodeId>> candidates = entry.getValue();
408 // for active topics, check if this node can become a leader (if it isn't already)
Madan Jampani7d243db2015-05-27 13:16:02 -0700409 if (activeTopics.contains(path)) {
410 lockExecutor.submit(() -> {
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700411 Leadership leadership = electLeader(path, candidates.value());
Madan Jampani7d243db2015-05-27 13:16:02 -0700412 if (leadership != null) {
413 CompletableFuture<Leadership> future = pendingFutures.remove(path);
414 if (future != null) {
415 future.complete(leadership);
416 }
417 }
418 });
419 }
Madan Jampanicd40e5b2015-05-27 17:19:48 -0700420 // Raise a CANDIDATES_CHANGED event to force refresh local candidate board
421 // and also to update local listeners.
422 // Don't worry about duplicate events as they will be suppressed.
423 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
424 new Leadership(path,
425 candidates.value(),
426 candidates.version(),
427 candidates.creationTime())));
Madan Jampani7d243db2015-05-27 13:16:02 -0700428 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800429 } catch (Exception e) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700430 log.debug("Failure electing leaders", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800431 }
432 }
433
Madan Jampanid46e18f2015-05-04 23:19:33 -0700434 private void publish(LeadershipEvent event) {
435 onLeadershipEvent(event);
436 clusterCommunicator.broadcast(event, LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::encode);
437 }
438
439 private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
440 log.trace("Leadership Event: time = {} type = {} event = {}",
441 leadershipEvent.time(), leadershipEvent.type(),
442 leadershipEvent);
443
444 Leadership leadershipUpdate = leadershipEvent.subject();
445 LeadershipEvent.Type eventType = leadershipEvent.type();
446 String topic = leadershipUpdate.topic();
447
448 AtomicBoolean updateAccepted = new AtomicBoolean(false);
449 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
450 leaderBoard.compute(topic, (k, currentLeadership) -> {
451 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
452 updateAccepted.set(true);
453 return leadershipUpdate;
454 }
455 return currentLeadership;
456 });
457 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
458 leaderBoard.compute(topic, (k, currentLeadership) -> {
459 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
460 updateAccepted.set(true);
Madan Jampani08960a42015-06-04 09:59:29 -0700461 // FIXME: Removing entries from leaderboard is not safe and should be visited.
Madan Jampanid46e18f2015-05-04 23:19:33 -0700462 return null;
463 }
464 return currentLeadership;
465 });
466 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
467 candidateBoard.compute(topic, (k, currentInfo) -> {
468 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
469 updateAccepted.set(true);
470 return leadershipUpdate;
471 }
472 return currentInfo;
473 });
474 } else {
475 throw new IllegalStateException("Unknown event type.");
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700476 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700477
Madan Jampanid46e18f2015-05-04 23:19:33 -0700478 if (updateAccepted.get()) {
479 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800480 }
481 }
482
Madan Jampanide003d92015-05-11 17:14:20 -0700483 private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700484 lockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700485 () -> doRunForLeadership(path, future),
Madan Jampani8d37efc2015-06-01 17:25:48 -0700486 RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
487 TimeUnit.MILLISECONDS);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700488 }
489
Madan Jampanide003d92015-05-11 17:14:20 -0700490 private void retryWithdraw(String path, CompletableFuture<Void> future) {
Madan Jampani7d243db2015-05-27 13:16:02 -0700491 lockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700492 () -> doWithdraw(path, future),
Madan Jampani8d37efc2015-06-01 17:25:48 -0700493 RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
494 TimeUnit.MILLISECONDS);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700495 }
496
Madan Jampanicc586752015-05-18 16:19:27 -0700497 private void scheduleStaleLeadershipPurge(int afterDelaySec) {
498 if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
499 staleLeadershipPurgeExecutor.schedule(
500 this::purgeStaleLeadership,
501 afterDelaySec,
502 TimeUnit.SECONDS);
503 }
504 }
505
506 /**
507 * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
508 */
509 private void purgeStaleLeadership() {
510 AtomicBoolean rerunPurge = new AtomicBoolean(false);
Madan Jampanid14166a2015-02-24 17:37:51 -0800511 try {
Madan Jampanicc586752015-05-18 16:19:27 -0700512 staleLeadershipPurgeScheduled.set(false);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700513 leaderMap.entrySet()
514 .stream()
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -0700515 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700516 .forEach(entry -> {
517 String path = entry.getKey();
518 NodeId nodeId = entry.getValue().value();
519 long epoch = entry.getValue().version();
520 long creationTime = entry.getValue().creationTime();
Madan Jampanid14166a2015-02-24 17:37:51 -0800521 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700522 if (leaderMap.remove(path, epoch)) {
Madan Jampanieedfe542015-05-26 09:54:09 -0700523 log.debug("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700524 publish(new LeadershipEvent(
525 LeadershipEvent.Type.LEADER_BOOTED,
526 new Leadership(path, nodeId, epoch, creationTime)));
Madan Jampanid14166a2015-02-24 17:37:51 -0800527 }
528 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700529 log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
Madan Jampanicc586752015-05-18 16:19:27 -0700530 rerunPurge.set(true);
531 }
532 });
533
534 candidateMap.entrySet()
535 .forEach(entry -> {
536 String path = entry.getKey();
537 Versioned<List<NodeId>> candidates = entry.getValue();
538 List<NodeId> candidatesList = candidates != null
539 ? candidates.value() : Collections.emptyList();
540 List<NodeId> activeCandidatesList =
541 candidatesList.stream()
542 .filter(n -> clusterService.getState(n) == ACTIVE)
543 .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
544 .collect(Collectors.toList());
545 if (activeCandidatesList.size() < candidatesList.size()) {
546 Set<NodeId> removedCandidates =
547 Sets.difference(Sets.newHashSet(candidatesList),
548 Sets.newHashSet(activeCandidatesList));
549 try {
550 if (candidateMap.replace(path, entry.getValue().version(), activeCandidatesList)) {
551 log.info("Evicted inactive candidates {} from "
552 + "candidate list for {}", removedCandidates, path);
553 Versioned<List<NodeId>> updatedCandidates = candidateMap.get(path);
554 publish(new LeadershipEvent(
555 LeadershipEvent.Type.CANDIDATES_CHANGED,
556 new Leadership(path,
557 updatedCandidates.value(),
558 updatedCandidates.version(),
559 updatedCandidates.creationTime())));
Madan Jampani3d1727c2015-05-22 11:46:29 -0700560 } else {
561 // Conflicting update detected. Rerun purge to make sure
562 // inactive candidates are evicted.
563 rerunPurge.set(true);
Madan Jampanicc586752015-05-18 16:19:27 -0700564 }
565 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700566 log.debug("Failed to evict inactive candidates {} from "
Madan Jampanicc586752015-05-18 16:19:27 -0700567 + "candidate list for {}", removedCandidates, path, e);
568 rerunPurge.set(true);
569 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800570 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700571 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800572 } catch (Exception e) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700573 log.debug("Failure purging state leadership.", e);
Madan Jampanicc586752015-05-18 16:19:27 -0700574 rerunPurge.set(true);
575 }
576
577 if (rerunPurge.get()) {
Madan Jampanice8392b2015-05-20 11:13:47 -0700578 log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
Madan Jampanicc586752015-05-18 16:19:27 -0700579 scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
Madan Jampanid14166a2015-02-24 17:37:51 -0800580 }
581 }
582
Madan Jampani08960a42015-06-04 09:59:29 -0700583 private void refreshLeaderBoard() {
Madan Jampania14047d2015-02-25 12:23:02 -0800584 try {
Madan Jampani08960a42015-06-04 09:59:29 -0700585 Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
586 leaderMap.entrySet().forEach(entry -> {
587 String path = entry.getKey();
588 Versioned<NodeId> leader = entry.getValue();
589 Leadership leadership = new Leadership(path,
590 leader.value(),
591 leader.version(),
592 leader.creationTime());
593 newLeaderBoard.put(path, leadership);
594 });
595
596 // first take snapshot of current leader board.
597 Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
598
599 // evict stale leaders
600 Maps.difference(currentLeaderBoard, newLeaderBoard).entriesOnlyOnLeft().forEach((path, leadership) -> {
601 log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
602 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
603 });
604
605 // add missing leaders
606 Maps.difference(currentLeaderBoard, newLeaderBoard).entriesDiffering().forEach((path, difference) -> {
607 Leadership leadership = difference.rightValue();
608 log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
609 onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
Madan Jampania14047d2015-02-25 12:23:02 -0800610 });
611 } catch (Exception e) {
Madan Jampani08960a42015-06-04 09:59:29 -0700612 log.debug("Failed to refresh leader board", e);
Madan Jampania14047d2015-02-25 12:23:02 -0800613 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800614 }
Madan Jampanicc586752015-05-18 16:19:27 -0700615
616 private class InternalClusterEventListener implements ClusterEventListener {
617
618 @Override
619 public void event(ClusterEvent event) {
620 if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
621 scheduleStaleLeadershipPurge(0);
622 }
623 }
624 }
Madan Jampania14047d2015-02-25 12:23:02 -0800625}