blob: 5976325e4a8e9736ca79dbac9af32089bb70c123 [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07003import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -08004import com.google.common.collect.ImmutableMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07005import com.google.common.collect.Lists;
Jonathan Harte649c752015-03-03 18:04:25 -08006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07008
Madan Jampanid14166a2015-02-24 17:37:51 -08009import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
15import org.onlab.util.KryoNamespace;
Madan Jampanicc586752015-05-18 16:19:27 -070016import org.onosproject.cluster.ClusterEvent;
17import org.onosproject.cluster.ClusterEvent.Type;
18import org.onosproject.cluster.ClusterEventListener;
Madan Jampanid14166a2015-02-24 17:37:51 -080019import org.onosproject.cluster.ClusterService;
Madan Jampanid14166a2015-02-24 17:37:51 -080020import org.onosproject.cluster.Leadership;
21import org.onosproject.cluster.LeadershipEvent;
22import org.onosproject.cluster.LeadershipEventListener;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
Simon Huntff663742015-05-14 13:33:05 -070025import org.onosproject.event.ListenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080026import org.onosproject.event.EventDeliveryService;
27import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampanid14166a2015-02-24 17:37:51 -080028import org.onosproject.store.cluster.messaging.MessageSubject;
29import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080030import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070031import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid14166a2015-02-24 17:37:51 -080032import org.onosproject.store.service.Serializer;
33import org.onosproject.store.service.StorageService;
34import org.onosproject.store.service.Versioned;
35import org.slf4j.Logger;
36
Madan Jampani1af8e132015-04-30 16:41:18 -070037import java.util.ArrayList;
Madan Jampanicc586752015-05-18 16:19:27 -070038import java.util.Collections;
Jonathan Harte649c752015-03-03 18:04:25 -080039import java.util.Map;
40import java.util.Map.Entry;
41import java.util.Objects;
42import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070043import java.util.List;
Madan Jampanide003d92015-05-11 17:14:20 -070044import java.util.concurrent.CancellationException;
45import java.util.concurrent.CompletableFuture;
Jonathan Harte649c752015-03-03 18:04:25 -080046import java.util.concurrent.ExecutorService;
47import java.util.concurrent.Executors;
48import java.util.concurrent.ScheduledExecutorService;
49import java.util.concurrent.TimeUnit;
Madan Jampanid46e18f2015-05-04 23:19:33 -070050import java.util.concurrent.atomic.AtomicBoolean;
Jonathan Harte649c752015-03-03 18:04:25 -080051import java.util.stream.Collectors;
52
53import static com.google.common.base.Preconditions.checkArgument;
54import static org.onlab.util.Tools.groupedThreads;
55import static org.slf4j.LoggerFactory.getLogger;
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -070056import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
57import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
58
Madan Jampanid14166a2015-02-24 17:37:51 -080059/**
60 * Distributed Lock Manager implemented on top of ConsistentMap.
61 * <p>
Madan Jampanicc586752015-05-18 16:19:27 -070062 * This implementation makes use of ClusterService's failure
Madan Jampanid14166a2015-02-24 17:37:51 -080063 * detection capabilities to detect and purge stale locks.
64 * TODO: Ensure lock safety and liveness.
65 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080066@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080067@Service
68public class DistributedLeadershipManager implements LeadershipService {
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected StorageService storageService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected ClusterService clusterService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 protected ClusterCommunicationService clusterCommunicator;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected EventDeliveryService eventDispatcher;
81
82 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
83 new MessageSubject("distributed-leadership-manager-events");
84
85 private final Logger log = getLogger(getClass());
86 private ExecutorService messageHandlingExecutor;
87 private ScheduledExecutorService retryLeaderLockExecutor;
Madan Jampanicc586752015-05-18 16:19:27 -070088 private ScheduledExecutorService staleLeadershipPurgeExecutor;
Madan Jampanid14166a2015-02-24 17:37:51 -080089 private ScheduledExecutorService leadershipStatusBroadcaster;
90
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070091 private ConsistentMap<String, NodeId> leaderMap;
92 private ConsistentMap<String, List<NodeId>> candidateMap;
93
Madan Jampanicc586752015-05-18 16:19:27 -070094 private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
Madan Jampanid14166a2015-02-24 17:37:51 -080095 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
Ayaka Koshibefd26a302015-04-13 13:59:54 -070096 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
Madan Jampanicc586752015-05-18 16:19:27 -070097 private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
Madan Jampanid14166a2015-02-24 17:37:51 -080098
Madan Jampanicc586752015-05-18 16:19:27 -070099 private NodeId localNodeId;
Madan Jampanid14166a2015-02-24 17:37:51 -0800100 private Set<String> activeTopics = Sets.newConcurrentHashSet();
101
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700102 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -0800103 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
Madan Jampania14047d2015-02-25 12:23:02 -0800104 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
Madan Jampanicc586752015-05-18 16:19:27 -0700105 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700106 private static final int LEADER_CANDIDATE_POS = 0;
107
Madan Jampanicc586752015-05-18 16:19:27 -0700108 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
109
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700110 private static final Serializer SERIALIZER = Serializer.using(
111 new KryoNamespace.Builder().register(KryoNamespaces.API).build());
Madan Jampanid14166a2015-02-24 17:37:51 -0800112
113 @Activate
114 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700115 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
116 .withName("onos-topic-leaders")
117 .withSerializer(SERIALIZER)
118 .withPartitionsDisabled().build();
119 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
120 .withName("onos-topic-candidates")
121 .withSerializer(SERIALIZER)
122 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800123
124 localNodeId = clusterService.getLocalNode().id();
125
126 messageHandlingExecutor = Executors.newSingleThreadExecutor(
127 groupedThreads("onos/store/leadership", "message-handler"));
128 retryLeaderLockExecutor = Executors.newScheduledThreadPool(
129 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
Madan Jampanicc586752015-05-18 16:19:27 -0700130 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
131 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
Madan Jampanid14166a2015-02-24 17:37:51 -0800132 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
133 groupedThreads("onos/store/leadership", "peer-updater"));
134 clusterCommunicator.addSubscriber(
135 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
Madan Jampanid46e18f2015-05-04 23:19:33 -0700136 SERIALIZER::decode,
137 this::onLeadershipEvent,
Madan Jampanid14166a2015-02-24 17:37:51 -0800138 messageHandlingExecutor);
139
Madan Jampanicc586752015-05-18 16:19:27 -0700140 clusterService.addListener(clusterEventListener);
141
Madan Jampanid14166a2015-02-24 17:37:51 -0800142 leadershipStatusBroadcaster.scheduleWithFixedDelay(
Madan Jampania14047d2015-02-25 12:23:02 -0800143 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800144
Simon Huntff663742015-05-14 13:33:05 -0700145 listenerRegistry = new ListenerRegistry<>();
Madan Jampanid14166a2015-02-24 17:37:51 -0800146 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
147
Madan Jampanid46e18f2015-05-04 23:19:33 -0700148 log.info("Started");
Madan Jampanid14166a2015-02-24 17:37:51 -0800149 }
150
151 @Deactivate
152 public void deactivate() {
153 leaderBoard.forEach((topic, leadership) -> {
154 if (localNodeId.equals(leadership.leader())) {
155 withdraw(topic);
156 }
157 });
158
Madan Jampanicc586752015-05-18 16:19:27 -0700159 clusterService.removeListener(clusterEventListener);
Madan Jampanid14166a2015-02-24 17:37:51 -0800160 eventDispatcher.removeSink(LeadershipEvent.class);
161 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
162
163 messageHandlingExecutor.shutdown();
164 retryLeaderLockExecutor.shutdown();
Madan Jampanicc586752015-05-18 16:19:27 -0700165 staleLeadershipPurgeExecutor.shutdown();
Madan Jampanid14166a2015-02-24 17:37:51 -0800166 leadershipStatusBroadcaster.shutdown();
167
Madan Jampanid46e18f2015-05-04 23:19:33 -0700168 log.info("Stopped");
Madan Jampanid14166a2015-02-24 17:37:51 -0800169 }
170
171 @Override
172 public Map<String, Leadership> getLeaderBoard() {
173 return ImmutableMap.copyOf(leaderBoard);
174 }
175
176 @Override
Madan Jampanifd45d5e2015-04-20 13:33:21 -0700177 public Map<String, List<NodeId>> getCandidates() {
178 return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700179 }
180
181 @Override
182 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700183 Leadership current = candidateBoard.get(path);
184 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700185 }
186
187 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800188 public NodeId getLeader(String path) {
189 Leadership leadership = leaderBoard.get(path);
190 return leadership != null ? leadership.leader() : null;
191 }
192
193 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800194 public Leadership getLeadership(String path) {
195 checkArgument(path != null);
196 return leaderBoard.get(path);
197 }
198
199 @Override
200 public Set<String> ownedTopics(NodeId nodeId) {
201 checkArgument(nodeId != null);
202 return leaderBoard.entrySet()
203 .stream()
204 .filter(entry -> nodeId.equals(entry.getValue().leader()))
205 .map(Entry::getKey)
206 .collect(Collectors.toSet());
207 }
208
209 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700210 public CompletableFuture<Leadership> runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800211 log.debug("Running for leadership for topic: {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700212 CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
213 doRunForLeadership(path, resultFuture);
214 return resultFuture;
215 }
216
217 private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700218 try {
219 Versioned<List<NodeId>> candidates = candidateMap.get(path);
220 if (candidates != null) {
221 List<NodeId> candidateList = Lists.newArrayList(candidates.value());
222 if (!candidateList.contains(localNodeId)) {
223 candidateList.add(localNodeId);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700224 if (candidateMap.replace(path, candidates.version(), candidateList)) {
225 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700226 publish(new LeadershipEvent(
227 LeadershipEvent.Type.CANDIDATES_CHANGED,
228 new Leadership(path,
229 newCandidates.value(),
230 newCandidates.version(),
231 newCandidates.creationTime())));
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700232 } else {
Madan Jampanide003d92015-05-11 17:14:20 -0700233 rerunForLeadership(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700234 return;
235 }
236 }
237 } else {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700238 List<NodeId> candidateList = ImmutableList.of(localNodeId);
239 if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
240 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700241 publish(new LeadershipEvent(
242 LeadershipEvent.Type.CANDIDATES_CHANGED,
243 new Leadership(path,
244 newCandidates.value(),
245 newCandidates.version(),
246 newCandidates.creationTime())));
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700247 } else {
Madan Jampanide003d92015-05-11 17:14:20 -0700248 rerunForLeadership(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700249 return;
250 }
251 }
252 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
253 activeTopics.add(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700254 tryLeaderLock(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700255 } catch (ConsistentMapException e) {
256 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700257 rerunForLeadership(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700258 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800259 }
260
261 @Override
Madan Jampanide003d92015-05-11 17:14:20 -0700262 public CompletableFuture<Void> withdraw(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800263 activeTopics.remove(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700264 CompletableFuture<Void> resultFuture = new CompletableFuture<>();
265 doWithdraw(path, resultFuture);
266 return resultFuture;
267 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700268
Madan Jampanide003d92015-05-11 17:14:20 -0700269
270 private void doWithdraw(String path, CompletableFuture<Void> future) {
271 if (activeTopics.contains(path)) {
272 future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
273 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800274 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700275 Versioned<NodeId> leader = leaderMap.get(path);
276 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
277 if (leaderMap.remove(path, leader.version())) {
Jonathan Harte649c752015-03-03 18:04:25 -0800278 log.info("Gave up leadership for {}", path);
Madan Jampanide003d92015-05-11 17:14:20 -0700279 future.complete(null);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700280 publish(new LeadershipEvent(
281 LeadershipEvent.Type.LEADER_BOOTED,
282 new Leadership(path,
283 localNodeId,
284 leader.version(),
285 leader.creationTime())));
Jonathan Harte649c752015-03-03 18:04:25 -0800286 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800287 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700288 // else we are not the current leader, can still be a candidate.
289 Versioned<List<NodeId>> candidates = candidateMap.get(path);
290 List<NodeId> candidateList = candidates != null
291 ? Lists.newArrayList(candidates.value())
292 : Lists.newArrayList();
293 if (!candidateList.remove(localNodeId)) {
Madan Jampanide003d92015-05-11 17:14:20 -0700294 future.complete(null);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700295 return;
296 }
Ayaka Koshibe93451b42015-04-21 15:12:03 -0700297 if (candidateMap.replace(path, candidates.version(), candidateList)) {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700298 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanide003d92015-05-11 17:14:20 -0700299 future.complete(null);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700300 publish(new LeadershipEvent(
301 LeadershipEvent.Type.CANDIDATES_CHANGED,
302 new Leadership(path,
303 newCandidates.value(),
304 newCandidates.version(),
305 newCandidates.creationTime())));
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700306 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700307 log.warn("Failed to withdraw from candidates list. Will retry");
Madan Jampanide003d92015-05-11 17:14:20 -0700308 retryWithdraw(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700309 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800310 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800311 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700312 retryWithdraw(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800313 }
314 }
315
316 @Override
Madan Jampani1af8e132015-04-30 16:41:18 -0700317 public boolean stepdown(String path) {
Madan Jampani9bd1f152015-04-30 23:33:35 -0700318 if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700319 return false;
320 }
321
322 try {
323 Versioned<NodeId> leader = leaderMap.get(path);
324 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
325 if (leaderMap.remove(path, leader.version())) {
Madan Jampanid46e18f2015-05-04 23:19:33 -0700326 log.info("Stepped down from leadership for {}", path);
327 publish(new LeadershipEvent(
328 LeadershipEvent.Type.LEADER_BOOTED,
329 new Leadership(path,
330 localNodeId,
331 leader.version(),
332 leader.creationTime())));
Madan Jampanide003d92015-05-11 17:14:20 -0700333 retryLock(path, new CompletableFuture<>());
Madan Jampani1af8e132015-04-30 16:41:18 -0700334 return true;
335 }
336 }
337 } catch (Exception e) {
338 log.warn("Error executing stepdown for {}", path, e);
339 }
340 return false;
341 }
342
343 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800344 public void addListener(LeadershipEventListener listener) {
345 listenerRegistry.addListener(listener);
346 }
347
348 @Override
349 public void removeListener(LeadershipEventListener listener) {
350 listenerRegistry.removeListener(listener);
351 }
352
Madan Jampani1af8e132015-04-30 16:41:18 -0700353 @Override
354 public boolean makeTopCandidate(String path, NodeId nodeId) {
355 Versioned<List<NodeId>> candidates = candidateMap.get(path);
356 if (candidates == null || !candidates.value().contains(nodeId)) {
357 return false;
358 }
Madan Jampanid46e18f2015-05-04 23:19:33 -0700359 List<NodeId> currentRoster = candidates.value();
360 if (nodeId.equals(currentRoster.get(LEADER_CANDIDATE_POS))) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700361 return true;
362 }
Madan Jampani1af8e132015-04-30 16:41:18 -0700363 List<NodeId> newRoster = new ArrayList<>(currentRoster.size());
364 newRoster.add(nodeId);
365 currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
366 boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
367 if (updated) {
368 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700369 publish(new LeadershipEvent(
370 LeadershipEvent.Type.CANDIDATES_CHANGED,
371 new Leadership(path,
372 newCandidates.value(),
373 newCandidates.version(),
374 newCandidates.creationTime())));
Madan Jampani1af8e132015-04-30 16:41:18 -0700375 }
376 return updated;
377 }
378
Madan Jampanide003d92015-05-11 17:14:20 -0700379 private void tryLeaderLock(String path, CompletableFuture<Leadership> future) {
Madan Jampanid46e18f2015-05-04 23:19:33 -0700380 if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800381 return;
382 }
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700383 try {
384 Versioned<List<NodeId>> candidates = candidateMap.get(path);
385 if (candidates != null) {
Madan Jampanid46e18f2015-05-04 23:19:33 -0700386 List<NodeId> activeNodes = candidates.value()
387 .stream()
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700388 .filter(n -> clusterService.getState(n) == ACTIVE)
389 .collect(Collectors.toList());
390 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
Madan Jampanide003d92015-05-11 17:14:20 -0700391 leaderLockAttempt(path, candidates.value(), future);
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700392 } else {
Madan Jampanide003d92015-05-11 17:14:20 -0700393 retryLock(path, future);
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700394 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700395 } else {
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700396 throw new IllegalStateException("should not be here");
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700397 }
Ayaka Koshibe0d886fc2015-04-23 11:53:41 -0700398 } catch (Exception e) {
399 log.debug("Failed to fetch candidate information for {}", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700400 retryLock(path, future);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700401 }
402 }
403
Madan Jampanide003d92015-05-11 17:14:20 -0700404 private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800405 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700406 Versioned<NodeId> currentLeader = leaderMap.get(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800407 if (currentLeader != null) {
408 if (localNodeId.equals(currentLeader.value())) {
409 log.info("Already has leadership for {}", path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700410 // FIXME: candidates can get out of sync.
Madan Jampanide003d92015-05-11 17:14:20 -0700411 Leadership leadership = new Leadership(path,
412 localNodeId,
413 currentLeader.version(),
414 currentLeader.creationTime());
415 future.complete(leadership);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700416 publish(new LeadershipEvent(
417 LeadershipEvent.Type.LEADER_ELECTED,
Madan Jampanide003d92015-05-11 17:14:20 -0700418 leadership));
Madan Jampanid14166a2015-02-24 17:37:51 -0800419 } else {
420 // someone else has leadership. will retry after sometime.
Madan Jampanide003d92015-05-11 17:14:20 -0700421 retryLock(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800422 }
423 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700424 if (leaderMap.putIfAbsent(path, localNodeId) == null) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800425 log.info("Assumed leadership for {}", path);
426 // do a get again to get the version (epoch)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700427 Versioned<NodeId> newLeader = leaderMap.get(path);
428 // FIXME: candidates can get out of sync
Madan Jampanide003d92015-05-11 17:14:20 -0700429 Leadership leadership = new Leadership(path,
430 newLeader.value(),
431 newLeader.version(),
432 newLeader.creationTime());
433 future.complete(leadership);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700434 publish(new LeadershipEvent(
435 LeadershipEvent.Type.LEADER_ELECTED,
Madan Jampanide003d92015-05-11 17:14:20 -0700436 leadership));
Madan Jampanid14166a2015-02-24 17:37:51 -0800437 } else {
438 // someone beat us to it.
Madan Jampanide003d92015-05-11 17:14:20 -0700439 retryLock(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800440 }
441 }
442 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800443 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
Madan Jampanide003d92015-05-11 17:14:20 -0700444 retryLock(path, future);
Madan Jampanid14166a2015-02-24 17:37:51 -0800445 }
446 }
447
Madan Jampanid46e18f2015-05-04 23:19:33 -0700448 private void publish(LeadershipEvent event) {
449 onLeadershipEvent(event);
450 clusterCommunicator.broadcast(event, LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::encode);
451 }
452
453 private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
454 log.trace("Leadership Event: time = {} type = {} event = {}",
455 leadershipEvent.time(), leadershipEvent.type(),
456 leadershipEvent);
457
458 Leadership leadershipUpdate = leadershipEvent.subject();
459 LeadershipEvent.Type eventType = leadershipEvent.type();
460 String topic = leadershipUpdate.topic();
461
462 AtomicBoolean updateAccepted = new AtomicBoolean(false);
463 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
464 leaderBoard.compute(topic, (k, currentLeadership) -> {
465 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
466 updateAccepted.set(true);
467 return leadershipUpdate;
468 }
469 return currentLeadership;
470 });
471 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
472 leaderBoard.compute(topic, (k, currentLeadership) -> {
473 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
474 updateAccepted.set(true);
475 return null;
476 }
477 return currentLeadership;
478 });
479 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
480 candidateBoard.compute(topic, (k, currentInfo) -> {
481 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
482 updateAccepted.set(true);
483 return leadershipUpdate;
484 }
485 return currentInfo;
486 });
487 } else {
488 throw new IllegalStateException("Unknown event type.");
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700489 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700490
Madan Jampanid46e18f2015-05-04 23:19:33 -0700491 if (updateAccepted.get()) {
492 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800493 }
494 }
495
Madan Jampanide003d92015-05-11 17:14:20 -0700496 private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700497 retryLeaderLockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700498 () -> doRunForLeadership(path, future),
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700499 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
500 TimeUnit.SECONDS);
501 }
502
Madan Jampanide003d92015-05-11 17:14:20 -0700503 private void retryLock(String path, CompletableFuture<Leadership> future) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800504 retryLeaderLockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700505 () -> tryLeaderLock(path, future),
Madan Jampanid14166a2015-02-24 17:37:51 -0800506 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
507 TimeUnit.SECONDS);
508 }
509
Madan Jampanide003d92015-05-11 17:14:20 -0700510 private void retryWithdraw(String path, CompletableFuture<Void> future) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700511 retryLeaderLockExecutor.schedule(
Madan Jampanide003d92015-05-11 17:14:20 -0700512 () -> doWithdraw(path, future),
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700513 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
514 TimeUnit.SECONDS);
515 }
516
Madan Jampanicc586752015-05-18 16:19:27 -0700517 private void scheduleStaleLeadershipPurge(int afterDelaySec) {
518 if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
519 staleLeadershipPurgeExecutor.schedule(
520 this::purgeStaleLeadership,
521 afterDelaySec,
522 TimeUnit.SECONDS);
523 }
524 }
525
526 /**
527 * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
528 */
529 private void purgeStaleLeadership() {
530 AtomicBoolean rerunPurge = new AtomicBoolean(false);
Madan Jampanid14166a2015-02-24 17:37:51 -0800531 try {
Madan Jampanicc586752015-05-18 16:19:27 -0700532 staleLeadershipPurgeScheduled.set(false);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700533 leaderMap.entrySet()
534 .stream()
Ayaka Koshibe4a3c2392015-04-22 18:03:13 -0700535 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700536 .forEach(entry -> {
537 String path = entry.getKey();
538 NodeId nodeId = entry.getValue().value();
539 long epoch = entry.getValue().version();
540 long creationTime = entry.getValue().creationTime();
Madan Jampanid14166a2015-02-24 17:37:51 -0800541 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700542 if (leaderMap.remove(path, epoch)) {
Madan Jampania14047d2015-02-25 12:23:02 -0800543 log.info("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700544 publish(new LeadershipEvent(
545 LeadershipEvent.Type.LEADER_BOOTED,
546 new Leadership(path, nodeId, epoch, creationTime)));
Madan Jampanid14166a2015-02-24 17:37:51 -0800547 }
548 } catch (Exception e) {
549 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
Madan Jampanicc586752015-05-18 16:19:27 -0700550 rerunPurge.set(true);
551 }
552 });
553
554 candidateMap.entrySet()
555 .forEach(entry -> {
556 String path = entry.getKey();
557 Versioned<List<NodeId>> candidates = entry.getValue();
558 List<NodeId> candidatesList = candidates != null
559 ? candidates.value() : Collections.emptyList();
560 List<NodeId> activeCandidatesList =
561 candidatesList.stream()
562 .filter(n -> clusterService.getState(n) == ACTIVE)
563 .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
564 .collect(Collectors.toList());
565 if (activeCandidatesList.size() < candidatesList.size()) {
566 Set<NodeId> removedCandidates =
567 Sets.difference(Sets.newHashSet(candidatesList),
568 Sets.newHashSet(activeCandidatesList));
569 try {
570 if (candidateMap.replace(path, entry.getValue().version(), activeCandidatesList)) {
571 log.info("Evicted inactive candidates {} from "
572 + "candidate list for {}", removedCandidates, path);
573 Versioned<List<NodeId>> updatedCandidates = candidateMap.get(path);
574 publish(new LeadershipEvent(
575 LeadershipEvent.Type.CANDIDATES_CHANGED,
576 new Leadership(path,
577 updatedCandidates.value(),
578 updatedCandidates.version(),
579 updatedCandidates.creationTime())));
580 }
581 } catch (Exception e) {
582 log.warn("Failed to evict inactive candidates {} from "
583 + "candidate list for {}", removedCandidates, path, e);
584 rerunPurge.set(true);
585 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800586 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700587 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800588 } catch (Exception e) {
Madan Jampanicc586752015-05-18 16:19:27 -0700589 log.warn("Failure purging state leadership.", e);
590 rerunPurge.set(true);
591 }
592
593 if (rerunPurge.get()) {
594 log.info("Rescheduling stale leadership purge due to errors encountered in previous run");
595 scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
Madan Jampanid14166a2015-02-24 17:37:51 -0800596 }
597 }
598
599 private void sendLeadershipStatus() {
Madan Jampania14047d2015-02-25 12:23:02 -0800600 try {
601 leaderBoard.forEach((path, leadership) -> {
602 if (leadership.leader().equals(localNodeId)) {
603 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700604 clusterCommunicator.broadcast(event,
605 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
606 SERIALIZER::encode);
Madan Jampania14047d2015-02-25 12:23:02 -0800607 }
608 });
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700609 candidateBoard.forEach((path, leadership) -> {
610 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
611 clusterCommunicator.broadcast(event,
612 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
613 SERIALIZER::encode);
614 });
Madan Jampania14047d2015-02-25 12:23:02 -0800615 } catch (Exception e) {
616 log.debug("Failed to send leadership updates", e);
617 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800618 }
Madan Jampanicc586752015-05-18 16:19:27 -0700619
620 private class InternalClusterEventListener implements ClusterEventListener {
621
622 @Override
623 public void event(ClusterEvent event) {
624 if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
625 scheduleStaleLeadershipPurge(0);
626 }
627 }
628 }
Madan Jampania14047d2015-02-25 12:23:02 -0800629}