blob: bc12374823a99290eca64fa36d31f012946460f2 [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
Jonathan Harte649c752015-03-03 18:04:25 -08003import com.google.common.collect.ImmutableMap;
4import com.google.common.collect.Maps;
5import com.google.common.collect.Sets;
Madan Jampanid14166a2015-02-24 17:37:51 -08006import org.apache.felix.scr.annotations.Activate;
7import org.apache.felix.scr.annotations.Component;
8import org.apache.felix.scr.annotations.Deactivate;
9import org.apache.felix.scr.annotations.Reference;
10import org.apache.felix.scr.annotations.ReferenceCardinality;
11import org.apache.felix.scr.annotations.Service;
12import org.onlab.util.KryoNamespace;
13import org.onosproject.cluster.ClusterService;
14import org.onosproject.cluster.ControllerNode;
15import org.onosproject.cluster.Leadership;
16import org.onosproject.cluster.LeadershipEvent;
17import org.onosproject.cluster.LeadershipEventListener;
18import org.onosproject.cluster.LeadershipService;
19import org.onosproject.cluster.NodeId;
20import org.onosproject.event.AbstractListenerRegistry;
21import org.onosproject.event.EventDeliveryService;
22import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
23import org.onosproject.store.cluster.messaging.ClusterMessage;
24import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
25import org.onosproject.store.cluster.messaging.MessageSubject;
26import org.onosproject.store.serializers.KryoNamespaces;
27import org.onosproject.store.serializers.KryoSerializer;
28import org.onosproject.store.service.ConsistentMap;
29import org.onosproject.store.service.Serializer;
30import org.onosproject.store.service.StorageService;
31import org.onosproject.store.service.Versioned;
32import org.slf4j.Logger;
33
Jonathan Harte649c752015-03-03 18:04:25 -080034import java.util.Map;
35import java.util.Map.Entry;
36import java.util.Objects;
37import java.util.Set;
38import java.util.concurrent.ExecutorService;
39import java.util.concurrent.Executors;
40import java.util.concurrent.ScheduledExecutorService;
41import java.util.concurrent.TimeUnit;
42import java.util.stream.Collectors;
43
44import static com.google.common.base.Preconditions.checkArgument;
45import static org.onlab.util.Tools.groupedThreads;
46import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanid14166a2015-02-24 17:37:51 -080047
48/**
49 * Distributed Lock Manager implemented on top of ConsistentMap.
50 * <p>
51 * This implementation makes use of cluster manager's failure
52 * detection capabilities to detect and purge stale locks.
53 * TODO: Ensure lock safety and liveness.
54 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080055@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080056@Service
57public class DistributedLeadershipManager implements LeadershipService {
58
59 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
60 protected StorageService storageService;
61
62 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 protected ClusterService clusterService;
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected ClusterCommunicationService clusterCommunicator;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected EventDeliveryService eventDispatcher;
70
71 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
72 new MessageSubject("distributed-leadership-manager-events");
73
74 private final Logger log = getLogger(getClass());
75 private ExecutorService messageHandlingExecutor;
76 private ScheduledExecutorService retryLeaderLockExecutor;
77 private ScheduledExecutorService deadLockDetectionExecutor;
78 private ScheduledExecutorService leadershipStatusBroadcaster;
79
80 private ConsistentMap<String, NodeId> lockMap;
81 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
82 listenerRegistry;
83 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
84 private NodeId localNodeId;
85
86 private Set<String> activeTopics = Sets.newConcurrentHashSet();
87
88 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
89 private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
Madan Jampania14047d2015-02-25 12:23:02 -080090 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -080091
92 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
93 @Override
94 protected void setupKryoPool() {
95 serializerPool = KryoNamespace.newBuilder()
96 .register(KryoNamespaces.API)
97 .build()
98 .populate(1);
99 }
100 };
101
102 @Activate
103 public void activate() {
104 lockMap = storageService.createConsistentMap("onos-leader-locks", new Serializer() {
105 KryoNamespace kryo = new KryoNamespace.Builder()
106 .register(KryoNamespaces.API).build();
107
108 @Override
109 public <T> byte[] encode(T object) {
110 return kryo.serialize(object);
111 }
112
113 @Override
114 public <T> T decode(byte[] bytes) {
115 return kryo.deserialize(bytes);
116 }
117 });
118
119 localNodeId = clusterService.getLocalNode().id();
120
121 messageHandlingExecutor = Executors.newSingleThreadExecutor(
122 groupedThreads("onos/store/leadership", "message-handler"));
123 retryLeaderLockExecutor = Executors.newScheduledThreadPool(
124 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
125 deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(
126 groupedThreads("onos/store/leadership", "dead-lock-detector"));
127 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
128 groupedThreads("onos/store/leadership", "peer-updater"));
129 clusterCommunicator.addSubscriber(
130 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
131 new InternalLeadershipEventListener(),
132 messageHandlingExecutor);
133
134 deadLockDetectionExecutor.scheduleWithFixedDelay(
135 this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
136 leadershipStatusBroadcaster.scheduleWithFixedDelay(
Madan Jampania14047d2015-02-25 12:23:02 -0800137 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800138
139 listenerRegistry = new AbstractListenerRegistry<>();
140 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
141
142 log.info("Started.");
143 }
144
145 @Deactivate
146 public void deactivate() {
147 leaderBoard.forEach((topic, leadership) -> {
148 if (localNodeId.equals(leadership.leader())) {
149 withdraw(topic);
150 }
151 });
152
153 eventDispatcher.removeSink(LeadershipEvent.class);
154 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
155
156 messageHandlingExecutor.shutdown();
157 retryLeaderLockExecutor.shutdown();
158 deadLockDetectionExecutor.shutdown();
159 leadershipStatusBroadcaster.shutdown();
160
161 log.info("Stopped.");
162 }
163
164 @Override
165 public Map<String, Leadership> getLeaderBoard() {
166 return ImmutableMap.copyOf(leaderBoard);
167 }
168
169 @Override
170 public NodeId getLeader(String path) {
171 Leadership leadership = leaderBoard.get(path);
172 return leadership != null ? leadership.leader() : null;
173 }
174
175 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800176 public Leadership getLeadership(String path) {
177 checkArgument(path != null);
178 return leaderBoard.get(path);
179 }
180
181 @Override
182 public Set<String> ownedTopics(NodeId nodeId) {
183 checkArgument(nodeId != null);
184 return leaderBoard.entrySet()
185 .stream()
186 .filter(entry -> nodeId.equals(entry.getValue().leader()))
187 .map(Entry::getKey)
188 .collect(Collectors.toSet());
189 }
190
191 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800192 public void runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800193 log.debug("Running for leadership for topic: {}", path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800194 activeTopics.add(path);
195 tryLeaderLock(path);
196 }
197
198 @Override
199 public void withdraw(String path) {
200 activeTopics.remove(path);
201 try {
Jonathan Harte649c752015-03-03 18:04:25 -0800202 Versioned<NodeId> leader = lockMap.get(path);
203 if (Objects.equals(leader.value(), localNodeId)) {
204 if (lockMap.remove(path, leader.version())) {
205 log.info("Gave up leadership for {}", path);
206 notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
207 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800208 }
209 // else we are not the current owner.
210 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800211 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800212 }
213 }
214
215 @Override
216 public void addListener(LeadershipEventListener listener) {
217 listenerRegistry.addListener(listener);
218 }
219
220 @Override
221 public void removeListener(LeadershipEventListener listener) {
222 listenerRegistry.removeListener(listener);
223 }
224
225 private void tryLeaderLock(String path) {
226 if (!activeTopics.contains(path)) {
227 return;
228 }
229 try {
230 Versioned<NodeId> currentLeader = lockMap.get(path);
231 if (currentLeader != null) {
232 if (localNodeId.equals(currentLeader.value())) {
233 log.info("Already has leadership for {}", path);
Madan Jampani30a57f82015-03-02 12:19:41 -0800234 notifyNewLeader(path, localNodeId, currentLeader.version(), currentLeader.creationTime());
Madan Jampanid14166a2015-02-24 17:37:51 -0800235 } else {
236 // someone else has leadership. will retry after sometime.
237 retry(path);
238 }
239 } else {
240 if (lockMap.putIfAbsent(path, localNodeId) == null) {
241 log.info("Assumed leadership for {}", path);
242 // do a get again to get the version (epoch)
243 Versioned<NodeId> newLeader = lockMap.get(path);
Madan Jampani30a57f82015-03-02 12:19:41 -0800244 notifyNewLeader(path, localNodeId, newLeader.version(), newLeader.creationTime());
Madan Jampanid14166a2015-02-24 17:37:51 -0800245 } else {
246 // someone beat us to it.
247 retry(path);
248 }
249 }
250 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800251 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800252 retry(path);
253 }
254 }
255
Madan Jampani30a57f82015-03-02 12:19:41 -0800256 private void notifyNewLeader(String path, NodeId leader, long epoch, long electedTime) {
257 Leadership newLeadership = new Leadership(path, leader, epoch, electedTime);
Madan Jampanid14166a2015-02-24 17:37:51 -0800258 boolean updatedLeader = false;
259 synchronized (leaderBoard) {
260 Leadership currentLeader = leaderBoard.get(path);
261 if (currentLeader == null || currentLeader.epoch() < epoch) {
262 leaderBoard.put(path, newLeadership);
263 updatedLeader = true;
264 }
265 }
266
267 if (updatedLeader) {
268 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
269 eventDispatcher.post(event);
270 clusterCommunicator.broadcast(
271 new ClusterMessage(
272 clusterService.getLocalNode().id(),
273 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
274 SERIALIZER.encode(event)));
275 }
276 }
277
Madan Jampani30a57f82015-03-02 12:19:41 -0800278 private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
279 Leadership oldLeadership = new Leadership(path, leader, epoch, electedTime);
Madan Jampanid14166a2015-02-24 17:37:51 -0800280 boolean updatedLeader = false;
281 synchronized (leaderBoard) {
282 Leadership currentLeader = leaderBoard.get(path);
283 if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
284 leaderBoard.remove(path);
285 updatedLeader = true;
286 }
287 }
288
289 if (updatedLeader) {
290 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
291 eventDispatcher.post(event);
292 clusterCommunicator.broadcast(
293 new ClusterMessage(
294 clusterService.getLocalNode().id(),
295 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
296 SERIALIZER.encode(event)));
297 }
298 }
299
300 private class InternalLeadershipEventListener implements ClusterMessageHandler {
301
302 @Override
303 public void handle(ClusterMessage message) {
304 LeadershipEvent leadershipEvent =
305 SERIALIZER.decode(message.payload());
306
Madan Jampania14047d2015-02-25 12:23:02 -0800307 log.debug("Leadership Event: time = {} type = {} event = {}",
Madan Jampanid14166a2015-02-24 17:37:51 -0800308 leadershipEvent.time(), leadershipEvent.type(),
309 leadershipEvent);
310
311 Leadership leadershipUpdate = leadershipEvent.subject();
312 LeadershipEvent.Type eventType = leadershipEvent.type();
313 String topic = leadershipUpdate.topic();
314
315 boolean updateAccepted = false;
316
317 synchronized (leaderBoard) {
318 Leadership currentLeadership = leaderBoard.get(topic);
319 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
320 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
321 leaderBoard.put(topic, leadershipUpdate);
322 updateAccepted = true;
323 }
324 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
325 if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
326 leaderBoard.remove(topic);
327 updateAccepted = true;
328 }
329 } else {
330 throw new IllegalStateException("Unknown event type.");
331 }
332 if (updateAccepted) {
333 eventDispatcher.post(leadershipEvent);
334 }
335 }
336 }
337 }
338
339 private void retry(String path) {
340 retryLeaderLockExecutor.schedule(
341 () -> tryLeaderLock(path),
342 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
343 TimeUnit.SECONDS);
344 }
345
346 private void purgeStaleLocks() {
347 try {
348 Set<Entry<String, Versioned<NodeId>>> entries = lockMap.entrySet();
349 entries.forEach(entry -> {
350 String path = entry.getKey();
351 NodeId nodeId = entry.getValue().value();
352 long epoch = entry.getValue().version();
Madan Jampani30a57f82015-03-02 12:19:41 -0800353 long creationTime = entry.getValue().creationTime();
Madan Jampanid14166a2015-02-24 17:37:51 -0800354 if (clusterService.getState(nodeId) == ControllerNode.State.INACTIVE) {
355 log.info("Lock for {} is held by {} which is currently inactive", path, nodeId);
356 try {
357 if (lockMap.remove(path, epoch)) {
Madan Jampania14047d2015-02-25 12:23:02 -0800358 log.info("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampani30a57f82015-03-02 12:19:41 -0800359 notifyRemovedLeader(path, nodeId, epoch, creationTime);
Madan Jampanid14166a2015-02-24 17:37:51 -0800360 }
361 } catch (Exception e) {
362 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
363 }
364 }
365 if (localNodeId.equals(nodeId) && !activeTopics.contains(path)) {
Madan Jampani52860be2015-02-27 12:52:37 -0800366 log.debug("Lock for {} is held by {} when it not running for leadership.", path, nodeId);
Madan Jampanid14166a2015-02-24 17:37:51 -0800367 try {
368 if (lockMap.remove(path, epoch)) {
Madan Jampania14047d2015-02-25 12:23:02 -0800369 log.info("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampani30a57f82015-03-02 12:19:41 -0800370 notifyRemovedLeader(path, nodeId, epoch, creationTime);
Madan Jampanid14166a2015-02-24 17:37:51 -0800371 }
372 } catch (Exception e) {
373 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
374 }
375 }
376 });
377 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800378 log.debug("Failed cleaning up stale locks", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800379 }
380 }
381
382 private void sendLeadershipStatus() {
Madan Jampania14047d2015-02-25 12:23:02 -0800383 try {
384 leaderBoard.forEach((path, leadership) -> {
385 if (leadership.leader().equals(localNodeId)) {
386 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
387 clusterCommunicator.broadcast(
388 new ClusterMessage(
389 clusterService.getLocalNode().id(),
390 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
391 SERIALIZER.encode(event)));
392 }
393 });
394 } catch (Exception e) {
395 log.debug("Failed to send leadership updates", e);
396 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800397 }
Madan Jampania14047d2015-02-25 12:23:02 -0800398}