blob: ed610add81f669b884f225d1b9117be86e949cea [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07003import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -08004import com.google.common.collect.ImmutableMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07005import com.google.common.collect.Lists;
Jonathan Harte649c752015-03-03 18:04:25 -08006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07008
Madan Jampanid14166a2015-02-24 17:37:51 -08009import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
Ayaka Koshibefd26a302015-04-13 13:59:54 -070015import org.apache.commons.lang3.mutable.MutableBoolean;
Madan Jampanid14166a2015-02-24 17:37:51 -080016import org.onlab.util.KryoNamespace;
17import org.onosproject.cluster.ClusterService;
18import org.onosproject.cluster.ControllerNode;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070019import org.onosproject.cluster.ControllerNode.State;
Madan Jampanid14166a2015-02-24 17:37:51 -080020import org.onosproject.cluster.Leadership;
21import org.onosproject.cluster.LeadershipEvent;
22import org.onosproject.cluster.LeadershipEventListener;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.event.AbstractListenerRegistry;
26import org.onosproject.event.EventDeliveryService;
27import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
28import org.onosproject.store.cluster.messaging.ClusterMessage;
29import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
30import org.onosproject.store.cluster.messaging.MessageSubject;
31import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080032import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070033import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid14166a2015-02-24 17:37:51 -080034import org.onosproject.store.service.Serializer;
35import org.onosproject.store.service.StorageService;
36import org.onosproject.store.service.Versioned;
37import org.slf4j.Logger;
38
Jonathan Harte649c752015-03-03 18:04:25 -080039import java.util.Map;
40import java.util.Map.Entry;
41import java.util.Objects;
42import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070043import java.util.List;
Jonathan Harte649c752015-03-03 18:04:25 -080044import java.util.concurrent.ExecutorService;
45import java.util.concurrent.Executors;
46import java.util.concurrent.ScheduledExecutorService;
47import java.util.concurrent.TimeUnit;
48import java.util.stream.Collectors;
49
50import static com.google.common.base.Preconditions.checkArgument;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanid14166a2015-02-24 17:37:51 -080053
54/**
55 * Distributed Lock Manager implemented on top of ConsistentMap.
56 * <p>
57 * This implementation makes use of cluster manager's failure
58 * detection capabilities to detect and purge stale locks.
59 * TODO: Ensure lock safety and liveness.
60 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080061@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080062@Service
63public class DistributedLeadershipManager implements LeadershipService {
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected StorageService storageService;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected ClusterService clusterService;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected ClusterCommunicationService clusterCommunicator;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected EventDeliveryService eventDispatcher;
76
77 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
78 new MessageSubject("distributed-leadership-manager-events");
79
80 private final Logger log = getLogger(getClass());
81 private ExecutorService messageHandlingExecutor;
82 private ScheduledExecutorService retryLeaderLockExecutor;
83 private ScheduledExecutorService deadLockDetectionExecutor;
84 private ScheduledExecutorService leadershipStatusBroadcaster;
85
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070086 private ConsistentMap<String, NodeId> leaderMap;
87 private ConsistentMap<String, List<NodeId>> candidateMap;
88
Madan Jampanid14166a2015-02-24 17:37:51 -080089 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
90 listenerRegistry;
91 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
Ayaka Koshibefd26a302015-04-13 13:59:54 -070092 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
Madan Jampanid14166a2015-02-24 17:37:51 -080093 private NodeId localNodeId;
94
95 private Set<String> activeTopics = Sets.newConcurrentHashSet();
96
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070097 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -080098 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
99 private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
Madan Jampania14047d2015-02-25 12:23:02 -0800100 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -0800101
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700102 private static final int LEADER_CANDIDATE_POS = 0;
103
104 private static final Serializer SERIALIZER = Serializer.using(
105 new KryoNamespace.Builder().register(KryoNamespaces.API).build());
Madan Jampanid14166a2015-02-24 17:37:51 -0800106
107 @Activate
108 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700109 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
110 .withName("onos-topic-leaders")
111 .withSerializer(SERIALIZER)
112 .withPartitionsDisabled().build();
113 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
114 .withName("onos-topic-candidates")
115 .withSerializer(SERIALIZER)
116 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800117
118 localNodeId = clusterService.getLocalNode().id();
119
120 messageHandlingExecutor = Executors.newSingleThreadExecutor(
121 groupedThreads("onos/store/leadership", "message-handler"));
122 retryLeaderLockExecutor = Executors.newScheduledThreadPool(
123 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
124 deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(
125 groupedThreads("onos/store/leadership", "dead-lock-detector"));
126 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
127 groupedThreads("onos/store/leadership", "peer-updater"));
128 clusterCommunicator.addSubscriber(
129 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
130 new InternalLeadershipEventListener(),
131 messageHandlingExecutor);
132
133 deadLockDetectionExecutor.scheduleWithFixedDelay(
134 this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
135 leadershipStatusBroadcaster.scheduleWithFixedDelay(
Madan Jampania14047d2015-02-25 12:23:02 -0800136 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800137
138 listenerRegistry = new AbstractListenerRegistry<>();
139 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
140
141 log.info("Started.");
142 }
143
144 @Deactivate
145 public void deactivate() {
146 leaderBoard.forEach((topic, leadership) -> {
147 if (localNodeId.equals(leadership.leader())) {
148 withdraw(topic);
149 }
150 });
151
152 eventDispatcher.removeSink(LeadershipEvent.class);
153 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
154
155 messageHandlingExecutor.shutdown();
156 retryLeaderLockExecutor.shutdown();
157 deadLockDetectionExecutor.shutdown();
158 leadershipStatusBroadcaster.shutdown();
159
160 log.info("Stopped.");
161 }
162
163 @Override
164 public Map<String, Leadership> getLeaderBoard() {
165 return ImmutableMap.copyOf(leaderBoard);
166 }
167
168 @Override
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700169 public Map<String, Leadership> getCandidates() {
170 return ImmutableMap.copyOf(candidateBoard);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700171 }
172
173 @Override
174 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700175 Leadership current = candidateBoard.get(path);
176 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700177 }
178
179 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800180 public NodeId getLeader(String path) {
181 Leadership leadership = leaderBoard.get(path);
182 return leadership != null ? leadership.leader() : null;
183 }
184
185 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800186 public Leadership getLeadership(String path) {
187 checkArgument(path != null);
188 return leaderBoard.get(path);
189 }
190
191 @Override
192 public Set<String> ownedTopics(NodeId nodeId) {
193 checkArgument(nodeId != null);
194 return leaderBoard.entrySet()
195 .stream()
196 .filter(entry -> nodeId.equals(entry.getValue().leader()))
197 .map(Entry::getKey)
198 .collect(Collectors.toSet());
199 }
200
201 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800202 public void runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800203 log.debug("Running for leadership for topic: {}", path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700204 try {
205 Versioned<List<NodeId>> candidates = candidateMap.get(path);
206 if (candidates != null) {
207 List<NodeId> candidateList = Lists.newArrayList(candidates.value());
208 if (!candidateList.contains(localNodeId)) {
209 candidateList.add(localNodeId);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700210 if (candidateMap.replace(path, candidates.version(), candidateList)) {
211 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
212 notifyCandidateAdded(
213 path, candidateList, newCandidates.version(), newCandidates.creationTime());
214 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700215 rerunForLeadership(path);
216 return;
217 }
218 }
219 } else {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700220 List<NodeId> candidateList = ImmutableList.of(localNodeId);
221 if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
222 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
223 notifyCandidateAdded(path, candidateList, newCandidates.version(), newCandidates.creationTime());
224 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700225 rerunForLeadership(path);
226 return;
227 }
228 }
229 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
230 activeTopics.add(path);
231 tryLeaderLock(path);
232 } catch (ConsistentMapException e) {
233 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
234 rerunForLeadership(path);
235 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800236 }
237
238 @Override
239 public void withdraw(String path) {
240 activeTopics.remove(path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700241
Madan Jampanid14166a2015-02-24 17:37:51 -0800242 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700243 Versioned<NodeId> leader = leaderMap.get(path);
244 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
245 if (leaderMap.remove(path, leader.version())) {
Jonathan Harte649c752015-03-03 18:04:25 -0800246 log.info("Gave up leadership for {}", path);
247 notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
248 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800249 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700250 // else we are not the current leader, can still be a candidate.
251 Versioned<List<NodeId>> candidates = candidateMap.get(path);
252 List<NodeId> candidateList = candidates != null
253 ? Lists.newArrayList(candidates.value())
254 : Lists.newArrayList();
255 if (!candidateList.remove(localNodeId)) {
256 return;
257 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700258 boolean success = false;
259 if (candidateList.isEmpty()) {
260 if (candidateMap.remove(path, candidates.version())) {
261 success = true;
262 }
263 } else {
264 if (candidateMap.replace(path, candidates.version(), candidateList)) {
265 success = true;
266 }
267 }
268 if (success) {
269 notifyCandidateRemoved(path, candidateList, candidates.version(), candidates.creationTime());
270 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700271 log.warn("Failed to withdraw from candidates list. Will retry");
272 retryWithdraw(path);
273 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800274 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800275 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700276 retryWithdraw(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800277 }
278 }
279
280 @Override
281 public void addListener(LeadershipEventListener listener) {
282 listenerRegistry.addListener(listener);
283 }
284
285 @Override
286 public void removeListener(LeadershipEventListener listener) {
287 listenerRegistry.removeListener(listener);
288 }
289
290 private void tryLeaderLock(String path) {
291 if (!activeTopics.contains(path)) {
292 return;
293 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700294
295 Versioned<List<NodeId>> candidates = candidateMap.get(path);
296 if (candidates != null) {
297 List<NodeId> activeNodes = candidates.value().stream()
298 .filter(n -> clusterService.getState(n) == State.ACTIVE)
299 .collect(Collectors.toList());
300 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
301 leaderLockAttempt(path, candidates.value());
302 } else {
303 retryLock(path);
304 }
305 } else {
306 throw new IllegalStateException("should not be here");
307 }
308 }
309
310 private void leaderLockAttempt(String path, List<NodeId> candidates) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800311 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700312 Versioned<NodeId> currentLeader = leaderMap.get(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800313 if (currentLeader != null) {
314 if (localNodeId.equals(currentLeader.value())) {
315 log.info("Already has leadership for {}", path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700316 // FIXME: candidates can get out of sync.
317 notifyNewLeader(
318 path, localNodeId, candidates, currentLeader.version(), currentLeader.creationTime());
Madan Jampanid14166a2015-02-24 17:37:51 -0800319 } else {
320 // someone else has leadership. will retry after sometime.
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700321 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800322 }
323 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700324 if (leaderMap.putIfAbsent(path, localNodeId) == null) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800325 log.info("Assumed leadership for {}", path);
326 // do a get again to get the version (epoch)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700327 Versioned<NodeId> newLeader = leaderMap.get(path);
328 // FIXME: candidates can get out of sync
329 notifyNewLeader(path, localNodeId, candidates, newLeader.version(), newLeader.creationTime());
Madan Jampanid14166a2015-02-24 17:37:51 -0800330 } else {
331 // someone beat us to it.
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700332 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800333 }
334 }
335 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800336 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700337 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800338 }
339 }
340
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700341 private void notifyCandidateAdded(
342 String path, List<NodeId> candidates, long epoch, long electedTime) {
343 Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
344 final MutableBoolean updated = new MutableBoolean(false);
345 candidateBoard.compute(path, (k, current) -> {
346 if (current == null || current.epoch() < newInfo.epoch()) {
347 log.info("updating candidateboard with {}", newInfo);
348 updated.setTrue();
349 return newInfo;
350 }
351 return current;
352 });
353 // maybe rethink types of candidates events
354 if (updated.booleanValue()) {
355 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
356 notifyPeers(event);
357 }
358 }
359
360 private void notifyCandidateRemoved(
361 String path, List<NodeId> candidates, long epoch, long electedTime) {
362 Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
363 final MutableBoolean updated = new MutableBoolean(false);
364 candidateBoard.compute(path, (k, current) -> {
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700365 if (current != null && current.epoch() <= newInfo.epoch()) {
366 log.info("updating candidateboard with removal of {}", newInfo);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700367 updated.setTrue();
368 if (candidates.isEmpty()) {
369 return null;
370 } else {
371 return newInfo;
372 }
373 }
374 return current;
375 });
376 // maybe rethink types of candidates events
377 if (updated.booleanValue()) {
378 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
379 notifyPeers(event);
380 }
381 }
382
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700383 private void notifyNewLeader(String path, NodeId leader,
384 List<NodeId> candidates, long epoch, long electedTime) {
385 Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700386 final MutableBoolean updatedLeader = new MutableBoolean(false);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700387 log.debug("candidates for new Leadership {}", candidates);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700388 leaderBoard.compute(path, (k, currentLeader) -> {
Madan Jampanid14166a2015-02-24 17:37:51 -0800389 if (currentLeader == null || currentLeader.epoch() < epoch) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700390 log.debug("updating leaderboard with new {}", newLeadership);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700391 updatedLeader.setTrue();
392 return newLeadership;
Madan Jampanid14166a2015-02-24 17:37:51 -0800393 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700394 return currentLeader;
395 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800396
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700397 if (updatedLeader.booleanValue()) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800398 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700399 notifyPeers(event);
Madan Jampanid14166a2015-02-24 17:37:51 -0800400 }
401 }
402
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700403 private void notifyPeers(LeadershipEvent event) {
404 eventDispatcher.post(event);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700405 clusterCommunicator.broadcast(event,
406 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
407 SERIALIZER::encode);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700408 }
409
Madan Jampani30a57f82015-03-02 12:19:41 -0800410 private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700411 Versioned<List<NodeId>> candidates = candidateMap.get(path);
412 Leadership oldLeadership = new Leadership(
413 path, leader, candidates.value(), epoch, electedTime);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700414 final MutableBoolean updatedLeader = new MutableBoolean(false);
415 leaderBoard.compute(path, (k, currentLeader) -> {
Madan Jampanid14166a2015-02-24 17:37:51 -0800416 if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700417 updatedLeader.setTrue();
418 return null;
Madan Jampanid14166a2015-02-24 17:37:51 -0800419 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700420 return currentLeader;
421 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800422
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700423 if (updatedLeader.booleanValue()) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800424 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700425 notifyPeers(event);
Madan Jampanid14166a2015-02-24 17:37:51 -0800426 }
427 }
428
429 private class InternalLeadershipEventListener implements ClusterMessageHandler {
430
431 @Override
432 public void handle(ClusterMessage message) {
433 LeadershipEvent leadershipEvent =
434 SERIALIZER.decode(message.payload());
435
Jonathan Hartb59ac4e2015-03-24 18:00:34 -0700436 log.trace("Leadership Event: time = {} type = {} event = {}",
Madan Jampanid14166a2015-02-24 17:37:51 -0800437 leadershipEvent.time(), leadershipEvent.type(),
438 leadershipEvent);
439
440 Leadership leadershipUpdate = leadershipEvent.subject();
441 LeadershipEvent.Type eventType = leadershipEvent.type();
442 String topic = leadershipUpdate.topic();
443
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700444 MutableBoolean updateAccepted = new MutableBoolean(false);
445 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
446 leaderBoard.compute(topic, (k, currentLeadership) -> {
Madan Jampanid14166a2015-02-24 17:37:51 -0800447 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700448 updateAccepted.setTrue();
449 return leadershipUpdate;
Madan Jampanid14166a2015-02-24 17:37:51 -0800450 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700451 return currentLeadership;
452 });
453 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
454 leaderBoard.compute(topic, (k, currentLeadership) -> {
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700455 if (currentLeadership == null || currentLeadership.epoch() == leadershipUpdate.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700456 updateAccepted.setTrue();
457 return null;
Madan Jampanid14166a2015-02-24 17:37:51 -0800458 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700459 return currentLeadership;
460 });
461 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
462 candidateBoard.compute(topic, (k, currentInfo) -> {
463 if (currentInfo == null || currentInfo.epoch() <= leadershipUpdate.epoch()) {
464 updateAccepted.setTrue();
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700465 if (leadershipUpdate.candidates().isEmpty()) {
466 return null;
467 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700468 return leadershipUpdate;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700469 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700470 return currentInfo;
471 });
472 } else {
473 throw new IllegalStateException("Unknown event type.");
474 }
475
476 if (updateAccepted.booleanValue()) {
477 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800478 }
479 }
480 }
481
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700482 private void rerunForLeadership(String path) {
483 retryLeaderLockExecutor.schedule(
484 () -> runForLeadership(path),
485 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
486 TimeUnit.SECONDS);
487 }
488
489 private void retryLock(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800490 retryLeaderLockExecutor.schedule(
491 () -> tryLeaderLock(path),
492 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
493 TimeUnit.SECONDS);
494 }
495
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700496 private void retryWithdraw(String path) {
497 retryLeaderLockExecutor.schedule(
498 () -> withdraw(path),
499 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
500 TimeUnit.SECONDS);
501 }
502
Madan Jampanid14166a2015-02-24 17:37:51 -0800503 private void purgeStaleLocks() {
504 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700505 leaderMap.entrySet()
506 .stream()
507 .filter(e -> clusterService.getState(e.getValue().value()) == ControllerNode.State.INACTIVE)
508 .filter(e -> localNodeId.equals(e.getValue().value()) && !activeTopics.contains(e.getKey()))
509 .forEach(entry -> {
510 String path = entry.getKey();
511 NodeId nodeId = entry.getValue().value();
512 long epoch = entry.getValue().version();
513 long creationTime = entry.getValue().creationTime();
Madan Jampanid14166a2015-02-24 17:37:51 -0800514 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700515 if (leaderMap.remove(path, epoch)) {
Madan Jampania14047d2015-02-25 12:23:02 -0800516 log.info("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampani30a57f82015-03-02 12:19:41 -0800517 notifyRemovedLeader(path, nodeId, epoch, creationTime);
Madan Jampanid14166a2015-02-24 17:37:51 -0800518 }
519 } catch (Exception e) {
520 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
521 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700522 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800523 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800524 log.debug("Failed cleaning up stale locks", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800525 }
526 }
527
528 private void sendLeadershipStatus() {
Madan Jampania14047d2015-02-25 12:23:02 -0800529 try {
530 leaderBoard.forEach((path, leadership) -> {
531 if (leadership.leader().equals(localNodeId)) {
532 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700533 clusterCommunicator.broadcast(event,
534 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
535 SERIALIZER::encode);
Madan Jampania14047d2015-02-25 12:23:02 -0800536 }
537 });
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700538 candidateBoard.forEach((path, leadership) -> {
539 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
540 clusterCommunicator.broadcast(event,
541 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
542 SERIALIZER::encode);
543 });
Madan Jampania14047d2015-02-25 12:23:02 -0800544 } catch (Exception e) {
545 log.debug("Failed to send leadership updates", e);
546 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800547 }
Madan Jampania14047d2015-02-25 12:23:02 -0800548}