blob: 1a659ef828b9a9f5e6d0d3f017444fb7e0ee27ac [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07003import com.google.common.collect.ImmutableList;
Jonathan Harte649c752015-03-03 18:04:25 -08004import com.google.common.collect.ImmutableMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07005import com.google.common.collect.Lists;
Jonathan Harte649c752015-03-03 18:04:25 -08006import com.google.common.collect.Maps;
7import com.google.common.collect.Sets;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -07008
Madan Jampanid14166a2015-02-24 17:37:51 -08009import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
Ayaka Koshibefd26a302015-04-13 13:59:54 -070015import org.apache.commons.lang3.mutable.MutableBoolean;
Madan Jampanid14166a2015-02-24 17:37:51 -080016import org.onlab.util.KryoNamespace;
17import org.onosproject.cluster.ClusterService;
18import org.onosproject.cluster.ControllerNode;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070019import org.onosproject.cluster.ControllerNode.State;
Madan Jampanid14166a2015-02-24 17:37:51 -080020import org.onosproject.cluster.Leadership;
21import org.onosproject.cluster.LeadershipEvent;
22import org.onosproject.cluster.LeadershipEventListener;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.event.AbstractListenerRegistry;
26import org.onosproject.event.EventDeliveryService;
27import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
28import org.onosproject.store.cluster.messaging.ClusterMessage;
29import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
30import org.onosproject.store.cluster.messaging.MessageSubject;
31import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanid14166a2015-02-24 17:37:51 -080032import org.onosproject.store.service.ConsistentMap;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070033import org.onosproject.store.service.ConsistentMapException;
Madan Jampanid14166a2015-02-24 17:37:51 -080034import org.onosproject.store.service.Serializer;
35import org.onosproject.store.service.StorageService;
36import org.onosproject.store.service.Versioned;
37import org.slf4j.Logger;
38
Jonathan Harte649c752015-03-03 18:04:25 -080039import java.util.Map;
40import java.util.Map.Entry;
41import java.util.Objects;
42import java.util.Set;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070043import java.util.List;
Jonathan Harte649c752015-03-03 18:04:25 -080044import java.util.concurrent.ExecutorService;
45import java.util.concurrent.Executors;
46import java.util.concurrent.ScheduledExecutorService;
47import java.util.concurrent.TimeUnit;
48import java.util.stream.Collectors;
49
50import static com.google.common.base.Preconditions.checkArgument;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanid14166a2015-02-24 17:37:51 -080053
54/**
55 * Distributed Lock Manager implemented on top of ConsistentMap.
56 * <p>
57 * This implementation makes use of cluster manager's failure
58 * detection capabilities to detect and purge stale locks.
59 * TODO: Ensure lock safety and liveness.
60 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080061@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080062@Service
63public class DistributedLeadershipManager implements LeadershipService {
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected StorageService storageService;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected ClusterService clusterService;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected ClusterCommunicationService clusterCommunicator;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected EventDeliveryService eventDispatcher;
76
77 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
78 new MessageSubject("distributed-leadership-manager-events");
79
80 private final Logger log = getLogger(getClass());
81 private ExecutorService messageHandlingExecutor;
82 private ScheduledExecutorService retryLeaderLockExecutor;
83 private ScheduledExecutorService deadLockDetectionExecutor;
84 private ScheduledExecutorService leadershipStatusBroadcaster;
85
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070086 private ConsistentMap<String, NodeId> leaderMap;
87 private ConsistentMap<String, List<NodeId>> candidateMap;
88
Madan Jampanid14166a2015-02-24 17:37:51 -080089 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
90 listenerRegistry;
91 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
Ayaka Koshibefd26a302015-04-13 13:59:54 -070092 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
Madan Jampanid14166a2015-02-24 17:37:51 -080093 private NodeId localNodeId;
94
95 private Set<String> activeTopics = Sets.newConcurrentHashSet();
96
Ayaka Koshibec19b8b82015-04-08 15:18:24 -070097 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -080098 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
99 private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
Madan Jampania14047d2015-02-25 12:23:02 -0800100 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -0800101
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700102 private static final int LEADER_CANDIDATE_POS = 0;
103
104 private static final Serializer SERIALIZER = Serializer.using(
105 new KryoNamespace.Builder().register(KryoNamespaces.API).build());
Madan Jampanid14166a2015-02-24 17:37:51 -0800106
107 @Activate
108 public void activate() {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700109 leaderMap = storageService.<String, NodeId>consistentMapBuilder()
110 .withName("onos-topic-leaders")
111 .withSerializer(SERIALIZER)
112 .withPartitionsDisabled().build();
113 candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
114 .withName("onos-topic-candidates")
115 .withSerializer(SERIALIZER)
116 .withPartitionsDisabled().build();
Madan Jampanid14166a2015-02-24 17:37:51 -0800117
118 localNodeId = clusterService.getLocalNode().id();
119
120 messageHandlingExecutor = Executors.newSingleThreadExecutor(
121 groupedThreads("onos/store/leadership", "message-handler"));
122 retryLeaderLockExecutor = Executors.newScheduledThreadPool(
123 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
124 deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(
125 groupedThreads("onos/store/leadership", "dead-lock-detector"));
126 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
127 groupedThreads("onos/store/leadership", "peer-updater"));
128 clusterCommunicator.addSubscriber(
129 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
130 new InternalLeadershipEventListener(),
131 messageHandlingExecutor);
132
133 deadLockDetectionExecutor.scheduleWithFixedDelay(
134 this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
135 leadershipStatusBroadcaster.scheduleWithFixedDelay(
Madan Jampania14047d2015-02-25 12:23:02 -0800136 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800137
138 listenerRegistry = new AbstractListenerRegistry<>();
139 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
140
141 log.info("Started.");
142 }
143
144 @Deactivate
145 public void deactivate() {
146 leaderBoard.forEach((topic, leadership) -> {
147 if (localNodeId.equals(leadership.leader())) {
148 withdraw(topic);
149 }
150 });
151
152 eventDispatcher.removeSink(LeadershipEvent.class);
153 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
154
155 messageHandlingExecutor.shutdown();
156 retryLeaderLockExecutor.shutdown();
157 deadLockDetectionExecutor.shutdown();
158 leadershipStatusBroadcaster.shutdown();
159
160 log.info("Stopped.");
161 }
162
163 @Override
164 public Map<String, Leadership> getLeaderBoard() {
165 return ImmutableMap.copyOf(leaderBoard);
166 }
167
168 @Override
Madan Jampanifd45d5e2015-04-20 13:33:21 -0700169 public Map<String, List<NodeId>> getCandidates() {
170 return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700171 }
172
173 @Override
174 public List<NodeId> getCandidates(String path) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700175 Leadership current = candidateBoard.get(path);
176 return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700177 }
178
179 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800180 public NodeId getLeader(String path) {
181 Leadership leadership = leaderBoard.get(path);
182 return leadership != null ? leadership.leader() : null;
183 }
184
185 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800186 public Leadership getLeadership(String path) {
187 checkArgument(path != null);
188 return leaderBoard.get(path);
189 }
190
191 @Override
192 public Set<String> ownedTopics(NodeId nodeId) {
193 checkArgument(nodeId != null);
194 return leaderBoard.entrySet()
195 .stream()
196 .filter(entry -> nodeId.equals(entry.getValue().leader()))
197 .map(Entry::getKey)
198 .collect(Collectors.toSet());
199 }
200
201 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800202 public void runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800203 log.debug("Running for leadership for topic: {}", path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700204 try {
205 Versioned<List<NodeId>> candidates = candidateMap.get(path);
206 if (candidates != null) {
207 List<NodeId> candidateList = Lists.newArrayList(candidates.value());
208 if (!candidateList.contains(localNodeId)) {
209 candidateList.add(localNodeId);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700210 if (candidateMap.replace(path, candidates.version(), candidateList)) {
211 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
212 notifyCandidateAdded(
213 path, candidateList, newCandidates.version(), newCandidates.creationTime());
214 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700215 rerunForLeadership(path);
216 return;
217 }
218 }
219 } else {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700220 List<NodeId> candidateList = ImmutableList.of(localNodeId);
221 if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
222 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
223 notifyCandidateAdded(path, candidateList, newCandidates.version(), newCandidates.creationTime());
224 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700225 rerunForLeadership(path);
226 return;
227 }
228 }
229 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
230 activeTopics.add(path);
231 tryLeaderLock(path);
232 } catch (ConsistentMapException e) {
233 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
234 rerunForLeadership(path);
235 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800236 }
237
238 @Override
239 public void withdraw(String path) {
240 activeTopics.remove(path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700241
Madan Jampanid14166a2015-02-24 17:37:51 -0800242 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700243 Versioned<NodeId> leader = leaderMap.get(path);
244 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
245 if (leaderMap.remove(path, leader.version())) {
Jonathan Harte649c752015-03-03 18:04:25 -0800246 log.info("Gave up leadership for {}", path);
247 notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
248 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800249 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700250 // else we are not the current leader, can still be a candidate.
251 Versioned<List<NodeId>> candidates = candidateMap.get(path);
252 List<NodeId> candidateList = candidates != null
253 ? Lists.newArrayList(candidates.value())
254 : Lists.newArrayList();
255 if (!candidateList.remove(localNodeId)) {
256 return;
257 }
Ayaka Koshibe93451b42015-04-21 15:12:03 -0700258 if (candidateMap.replace(path, candidates.version(), candidateList)) {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700259 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
260 notifyCandidateRemoved(path, candidates.version(), candidates.creationTime(), newCandidates);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700261 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700262 log.warn("Failed to withdraw from candidates list. Will retry");
263 retryWithdraw(path);
264 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800265 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800266 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700267 retryWithdraw(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800268 }
269 }
270
271 @Override
272 public void addListener(LeadershipEventListener listener) {
273 listenerRegistry.addListener(listener);
274 }
275
276 @Override
277 public void removeListener(LeadershipEventListener listener) {
278 listenerRegistry.removeListener(listener);
279 }
280
281 private void tryLeaderLock(String path) {
282 if (!activeTopics.contains(path)) {
283 return;
284 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700285
286 Versioned<List<NodeId>> candidates = candidateMap.get(path);
287 if (candidates != null) {
288 List<NodeId> activeNodes = candidates.value().stream()
289 .filter(n -> clusterService.getState(n) == State.ACTIVE)
290 .collect(Collectors.toList());
291 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
292 leaderLockAttempt(path, candidates.value());
293 } else {
294 retryLock(path);
295 }
296 } else {
297 throw new IllegalStateException("should not be here");
298 }
299 }
300
301 private void leaderLockAttempt(String path, List<NodeId> candidates) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800302 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700303 Versioned<NodeId> currentLeader = leaderMap.get(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800304 if (currentLeader != null) {
305 if (localNodeId.equals(currentLeader.value())) {
306 log.info("Already has leadership for {}", path);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700307 // FIXME: candidates can get out of sync.
308 notifyNewLeader(
309 path, localNodeId, candidates, currentLeader.version(), currentLeader.creationTime());
Madan Jampanid14166a2015-02-24 17:37:51 -0800310 } else {
311 // someone else has leadership. will retry after sometime.
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700312 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800313 }
314 } else {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700315 if (leaderMap.putIfAbsent(path, localNodeId) == null) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800316 log.info("Assumed leadership for {}", path);
317 // do a get again to get the version (epoch)
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700318 Versioned<NodeId> newLeader = leaderMap.get(path);
319 // FIXME: candidates can get out of sync
320 notifyNewLeader(path, localNodeId, candidates, newLeader.version(), newLeader.creationTime());
Madan Jampanid14166a2015-02-24 17:37:51 -0800321 } else {
322 // someone beat us to it.
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700323 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800324 }
325 }
326 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800327 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700328 retryLock(path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800329 }
330 }
331
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700332 private void notifyCandidateAdded(
333 String path, List<NodeId> candidates, long epoch, long electedTime) {
334 Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
335 final MutableBoolean updated = new MutableBoolean(false);
336 candidateBoard.compute(path, (k, current) -> {
337 if (current == null || current.epoch() < newInfo.epoch()) {
338 log.info("updating candidateboard with {}", newInfo);
339 updated.setTrue();
340 return newInfo;
341 }
342 return current;
343 });
344 // maybe rethink types of candidates events
345 if (updated.booleanValue()) {
346 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
347 notifyPeers(event);
348 }
349 }
350
351 private void notifyCandidateRemoved(
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700352 String path, long oldEpoch, long oldTime, Versioned<List<NodeId>> candidates) {
353 Leadership newInfo = (candidates == null)
354 ? new Leadership(path, ImmutableList.of(), oldEpoch, oldTime)
355 : new Leadership(path, candidates.value(), candidates.version(), candidates.creationTime());
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700356 final MutableBoolean updated = new MutableBoolean(false);
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700357
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700358 candidateBoard.compute(path, (k, current) -> {
Ayaka Koshibe93451b42015-04-21 15:12:03 -0700359 if (current != null && current.epoch() < newInfo.epoch()) {
360 updated.setTrue();
361 return newInfo;
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700362 }
363 return current;
364 });
365 // maybe rethink types of candidates events
366 if (updated.booleanValue()) {
Ayaka Koshibe67cf7de2015-04-17 11:47:27 -0700367 log.debug("updated candidateboard with removal: {}", newInfo);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700368 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
369 notifyPeers(event);
370 }
371 }
372
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700373 private void notifyNewLeader(String path, NodeId leader,
374 List<NodeId> candidates, long epoch, long electedTime) {
375 Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700376 final MutableBoolean updatedLeader = new MutableBoolean(false);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700377 log.debug("candidates for new Leadership {}", candidates);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700378 leaderBoard.compute(path, (k, currentLeader) -> {
Madan Jampanid14166a2015-02-24 17:37:51 -0800379 if (currentLeader == null || currentLeader.epoch() < epoch) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700380 log.debug("updating leaderboard with new {}", newLeadership);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700381 updatedLeader.setTrue();
382 return newLeadership;
Madan Jampanid14166a2015-02-24 17:37:51 -0800383 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700384 return currentLeader;
385 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800386
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700387 if (updatedLeader.booleanValue()) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800388 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700389 notifyPeers(event);
Madan Jampanid14166a2015-02-24 17:37:51 -0800390 }
391 }
392
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700393 private void notifyPeers(LeadershipEvent event) {
394 eventDispatcher.post(event);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700395 clusterCommunicator.broadcast(event,
396 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
397 SERIALIZER::encode);
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700398 }
399
Madan Jampani30a57f82015-03-02 12:19:41 -0800400 private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700401 Versioned<List<NodeId>> candidates = candidateMap.get(path);
402 Leadership oldLeadership = new Leadership(
403 path, leader, candidates.value(), epoch, electedTime);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700404 final MutableBoolean updatedLeader = new MutableBoolean(false);
405 leaderBoard.compute(path, (k, currentLeader) -> {
Madan Jampanid14166a2015-02-24 17:37:51 -0800406 if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700407 updatedLeader.setTrue();
408 return null;
Madan Jampanid14166a2015-02-24 17:37:51 -0800409 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700410 return currentLeader;
411 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800412
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700413 if (updatedLeader.booleanValue()) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800414 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700415 notifyPeers(event);
Madan Jampanid14166a2015-02-24 17:37:51 -0800416 }
417 }
418
419 private class InternalLeadershipEventListener implements ClusterMessageHandler {
420
421 @Override
422 public void handle(ClusterMessage message) {
423 LeadershipEvent leadershipEvent =
424 SERIALIZER.decode(message.payload());
425
Jonathan Hartb59ac4e2015-03-24 18:00:34 -0700426 log.trace("Leadership Event: time = {} type = {} event = {}",
Madan Jampanid14166a2015-02-24 17:37:51 -0800427 leadershipEvent.time(), leadershipEvent.type(),
428 leadershipEvent);
429
430 Leadership leadershipUpdate = leadershipEvent.subject();
431 LeadershipEvent.Type eventType = leadershipEvent.type();
432 String topic = leadershipUpdate.topic();
433
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700434 MutableBoolean updateAccepted = new MutableBoolean(false);
435 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
436 leaderBoard.compute(topic, (k, currentLeadership) -> {
Madan Jampanid14166a2015-02-24 17:37:51 -0800437 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700438 updateAccepted.setTrue();
439 return leadershipUpdate;
Madan Jampanid14166a2015-02-24 17:37:51 -0800440 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700441 return currentLeadership;
442 });
443 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
444 leaderBoard.compute(topic, (k, currentLeadership) -> {
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700445 if (currentLeadership == null || currentLeadership.epoch() == leadershipUpdate.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700446 updateAccepted.setTrue();
447 return null;
Madan Jampanid14166a2015-02-24 17:37:51 -0800448 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700449 return currentLeadership;
450 });
451 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
452 candidateBoard.compute(topic, (k, currentInfo) -> {
Ayaka Koshibe93451b42015-04-21 15:12:03 -0700453 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700454 updateAccepted.setTrue();
455 return leadershipUpdate;
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700456 }
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700457 return currentInfo;
458 });
459 } else {
460 throw new IllegalStateException("Unknown event type.");
461 }
462
463 if (updateAccepted.booleanValue()) {
464 eventDispatcher.post(leadershipEvent);
Madan Jampanid14166a2015-02-24 17:37:51 -0800465 }
466 }
467 }
468
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700469 private void rerunForLeadership(String path) {
470 retryLeaderLockExecutor.schedule(
471 () -> runForLeadership(path),
472 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
473 TimeUnit.SECONDS);
474 }
475
476 private void retryLock(String path) {
Madan Jampanid14166a2015-02-24 17:37:51 -0800477 retryLeaderLockExecutor.schedule(
478 () -> tryLeaderLock(path),
479 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
480 TimeUnit.SECONDS);
481 }
482
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700483 private void retryWithdraw(String path) {
484 retryLeaderLockExecutor.schedule(
485 () -> withdraw(path),
486 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
487 TimeUnit.SECONDS);
488 }
489
Madan Jampanid14166a2015-02-24 17:37:51 -0800490 private void purgeStaleLocks() {
491 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700492 leaderMap.entrySet()
493 .stream()
494 .filter(e -> clusterService.getState(e.getValue().value()) == ControllerNode.State.INACTIVE)
495 .filter(e -> localNodeId.equals(e.getValue().value()) && !activeTopics.contains(e.getKey()))
496 .forEach(entry -> {
497 String path = entry.getKey();
498 NodeId nodeId = entry.getValue().value();
499 long epoch = entry.getValue().version();
500 long creationTime = entry.getValue().creationTime();
Madan Jampanid14166a2015-02-24 17:37:51 -0800501 try {
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700502 if (leaderMap.remove(path, epoch)) {
Madan Jampania14047d2015-02-25 12:23:02 -0800503 log.info("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampani30a57f82015-03-02 12:19:41 -0800504 notifyRemovedLeader(path, nodeId, epoch, creationTime);
Madan Jampanid14166a2015-02-24 17:37:51 -0800505 }
506 } catch (Exception e) {
507 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
508 }
Ayaka Koshibec19b8b82015-04-08 15:18:24 -0700509 });
Madan Jampanid14166a2015-02-24 17:37:51 -0800510 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800511 log.debug("Failed cleaning up stale locks", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800512 }
513 }
514
515 private void sendLeadershipStatus() {
Madan Jampania14047d2015-02-25 12:23:02 -0800516 try {
517 leaderBoard.forEach((path, leadership) -> {
518 if (leadership.leader().equals(localNodeId)) {
519 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700520 clusterCommunicator.broadcast(event,
521 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
522 SERIALIZER::encode);
Madan Jampania14047d2015-02-25 12:23:02 -0800523 }
524 });
Ayaka Koshibefd26a302015-04-13 13:59:54 -0700525 candidateBoard.forEach((path, leadership) -> {
526 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
527 clusterCommunicator.broadcast(event,
528 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
529 SERIALIZER::encode);
530 });
Madan Jampania14047d2015-02-25 12:23:02 -0800531 } catch (Exception e) {
532 log.debug("Failed to send leadership updates", e);
533 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800534 }
Madan Jampania14047d2015-02-25 12:23:02 -0800535}