blob: 5bbb4cbf10a6d2fe07e11c894e1d6aba39a40d58 [file] [log] [blame]
Madan Jampanid14166a2015-02-24 17:37:51 -08001package org.onosproject.store.consistent.impl;
2
3import static org.onlab.util.Tools.groupedThreads;
4import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani59610512015-02-25 15:25:43 -08005import static com.google.common.base.Preconditions.checkArgument;
Madan Jampanid14166a2015-02-24 17:37:51 -08006
7import java.util.Map;
8import java.util.Map.Entry;
9import java.util.Set;
10import java.util.concurrent.ExecutorService;
11import java.util.concurrent.Executors;
12import java.util.concurrent.ScheduledExecutorService;
13import java.util.concurrent.TimeUnit;
Madan Jampani59610512015-02-25 15:25:43 -080014import java.util.stream.Collectors;
Madan Jampanid14166a2015-02-24 17:37:51 -080015
16import org.apache.felix.scr.annotations.Activate;
17import org.apache.felix.scr.annotations.Component;
18import org.apache.felix.scr.annotations.Deactivate;
19import org.apache.felix.scr.annotations.Reference;
20import org.apache.felix.scr.annotations.ReferenceCardinality;
21import org.apache.felix.scr.annotations.Service;
22import org.onlab.util.KryoNamespace;
23import org.onosproject.cluster.ClusterService;
24import org.onosproject.cluster.ControllerNode;
25import org.onosproject.cluster.Leadership;
26import org.onosproject.cluster.LeadershipEvent;
27import org.onosproject.cluster.LeadershipEventListener;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.event.AbstractListenerRegistry;
31import org.onosproject.event.EventDeliveryService;
32import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33import org.onosproject.store.cluster.messaging.ClusterMessage;
34import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
35import org.onosproject.store.cluster.messaging.MessageSubject;
36import org.onosproject.store.serializers.KryoNamespaces;
37import org.onosproject.store.serializers.KryoSerializer;
38import org.onosproject.store.service.ConsistentMap;
39import org.onosproject.store.service.Serializer;
40import org.onosproject.store.service.StorageService;
41import org.onosproject.store.service.Versioned;
42import org.slf4j.Logger;
43
44import com.google.common.collect.ImmutableMap;
45import com.google.common.collect.Maps;
46import com.google.common.collect.Sets;
47
48/**
49 * Distributed Lock Manager implemented on top of ConsistentMap.
50 * <p>
51 * This implementation makes use of cluster manager's failure
52 * detection capabilities to detect and purge stale locks.
53 * TODO: Ensure lock safety and liveness.
54 */
Madan Jampani7f72c3f2015-03-01 17:34:59 -080055@Component(immediate = true, enabled = true)
Madan Jampanid14166a2015-02-24 17:37:51 -080056@Service
57public class DistributedLeadershipManager implements LeadershipService {
58
59 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
60 protected StorageService storageService;
61
62 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 protected ClusterService clusterService;
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected ClusterCommunicationService clusterCommunicator;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected EventDeliveryService eventDispatcher;
70
71 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
72 new MessageSubject("distributed-leadership-manager-events");
73
74 private final Logger log = getLogger(getClass());
75 private ExecutorService messageHandlingExecutor;
76 private ScheduledExecutorService retryLeaderLockExecutor;
77 private ScheduledExecutorService deadLockDetectionExecutor;
78 private ScheduledExecutorService leadershipStatusBroadcaster;
79
80 private ConsistentMap<String, NodeId> lockMap;
81 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
82 listenerRegistry;
83 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
84 private NodeId localNodeId;
85
86 private Set<String> activeTopics = Sets.newConcurrentHashSet();
87
88 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
89 private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
Madan Jampania14047d2015-02-25 12:23:02 -080090 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
Madan Jampanid14166a2015-02-24 17:37:51 -080091
92 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
93 @Override
94 protected void setupKryoPool() {
95 serializerPool = KryoNamespace.newBuilder()
96 .register(KryoNamespaces.API)
97 .build()
98 .populate(1);
99 }
100 };
101
102 @Activate
103 public void activate() {
104 lockMap = storageService.createConsistentMap("onos-leader-locks", new Serializer() {
105 KryoNamespace kryo = new KryoNamespace.Builder()
106 .register(KryoNamespaces.API).build();
107
108 @Override
109 public <T> byte[] encode(T object) {
110 return kryo.serialize(object);
111 }
112
113 @Override
114 public <T> T decode(byte[] bytes) {
115 return kryo.deserialize(bytes);
116 }
117 });
118
119 localNodeId = clusterService.getLocalNode().id();
120
121 messageHandlingExecutor = Executors.newSingleThreadExecutor(
122 groupedThreads("onos/store/leadership", "message-handler"));
123 retryLeaderLockExecutor = Executors.newScheduledThreadPool(
124 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
125 deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(
126 groupedThreads("onos/store/leadership", "dead-lock-detector"));
127 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
128 groupedThreads("onos/store/leadership", "peer-updater"));
129 clusterCommunicator.addSubscriber(
130 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
131 new InternalLeadershipEventListener(),
132 messageHandlingExecutor);
133
134 deadLockDetectionExecutor.scheduleWithFixedDelay(
135 this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
136 leadershipStatusBroadcaster.scheduleWithFixedDelay(
Madan Jampania14047d2015-02-25 12:23:02 -0800137 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
Madan Jampanid14166a2015-02-24 17:37:51 -0800138
139 listenerRegistry = new AbstractListenerRegistry<>();
140 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
141
142 log.info("Started.");
143 }
144
145 @Deactivate
146 public void deactivate() {
147 leaderBoard.forEach((topic, leadership) -> {
148 if (localNodeId.equals(leadership.leader())) {
149 withdraw(topic);
150 }
151 });
152
153 eventDispatcher.removeSink(LeadershipEvent.class);
154 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
155
156 messageHandlingExecutor.shutdown();
157 retryLeaderLockExecutor.shutdown();
158 deadLockDetectionExecutor.shutdown();
159 leadershipStatusBroadcaster.shutdown();
160
161 log.info("Stopped.");
162 }
163
164 @Override
165 public Map<String, Leadership> getLeaderBoard() {
166 return ImmutableMap.copyOf(leaderBoard);
167 }
168
169 @Override
170 public NodeId getLeader(String path) {
171 Leadership leadership = leaderBoard.get(path);
172 return leadership != null ? leadership.leader() : null;
173 }
174
175 @Override
Madan Jampani59610512015-02-25 15:25:43 -0800176 public Leadership getLeadership(String path) {
177 checkArgument(path != null);
178 return leaderBoard.get(path);
179 }
180
181 @Override
182 public Set<String> ownedTopics(NodeId nodeId) {
183 checkArgument(nodeId != null);
184 return leaderBoard.entrySet()
185 .stream()
186 .filter(entry -> nodeId.equals(entry.getValue().leader()))
187 .map(Entry::getKey)
188 .collect(Collectors.toSet());
189 }
190
191 @Override
Madan Jampanid14166a2015-02-24 17:37:51 -0800192 public void runForLeadership(String path) {
Madan Jampani52860be2015-02-27 12:52:37 -0800193 log.debug("Running for leadership for topic: {}", path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800194 activeTopics.add(path);
195 tryLeaderLock(path);
196 }
197
198 @Override
199 public void withdraw(String path) {
200 activeTopics.remove(path);
201 try {
202 if (lockMap.remove(path, localNodeId)) {
Madan Jampania14047d2015-02-25 12:23:02 -0800203 log.info("Gave up leadership for {}", path);
Madan Jampanid14166a2015-02-24 17:37:51 -0800204 }
205 // else we are not the current owner.
206 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800207 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800208 }
209 }
210
211 @Override
212 public void addListener(LeadershipEventListener listener) {
213 listenerRegistry.addListener(listener);
214 }
215
216 @Override
217 public void removeListener(LeadershipEventListener listener) {
218 listenerRegistry.removeListener(listener);
219 }
220
221 private void tryLeaderLock(String path) {
222 if (!activeTopics.contains(path)) {
223 return;
224 }
225 try {
226 Versioned<NodeId> currentLeader = lockMap.get(path);
227 if (currentLeader != null) {
228 if (localNodeId.equals(currentLeader.value())) {
229 log.info("Already has leadership for {}", path);
Madan Jampani30a57f82015-03-02 12:19:41 -0800230 notifyNewLeader(path, localNodeId, currentLeader.version(), currentLeader.creationTime());
Madan Jampanid14166a2015-02-24 17:37:51 -0800231 } else {
232 // someone else has leadership. will retry after sometime.
233 retry(path);
234 }
235 } else {
236 if (lockMap.putIfAbsent(path, localNodeId) == null) {
237 log.info("Assumed leadership for {}", path);
238 // do a get again to get the version (epoch)
239 Versioned<NodeId> newLeader = lockMap.get(path);
Madan Jampani30a57f82015-03-02 12:19:41 -0800240 notifyNewLeader(path, localNodeId, newLeader.version(), newLeader.creationTime());
Madan Jampanid14166a2015-02-24 17:37:51 -0800241 } else {
242 // someone beat us to it.
243 retry(path);
244 }
245 }
246 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800247 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800248 retry(path);
249 }
250 }
251
Madan Jampani30a57f82015-03-02 12:19:41 -0800252 private void notifyNewLeader(String path, NodeId leader, long epoch, long electedTime) {
253 Leadership newLeadership = new Leadership(path, leader, epoch, electedTime);
Madan Jampanid14166a2015-02-24 17:37:51 -0800254 boolean updatedLeader = false;
255 synchronized (leaderBoard) {
256 Leadership currentLeader = leaderBoard.get(path);
257 if (currentLeader == null || currentLeader.epoch() < epoch) {
258 leaderBoard.put(path, newLeadership);
259 updatedLeader = true;
260 }
261 }
262
263 if (updatedLeader) {
264 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
265 eventDispatcher.post(event);
266 clusterCommunicator.broadcast(
267 new ClusterMessage(
268 clusterService.getLocalNode().id(),
269 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
270 SERIALIZER.encode(event)));
271 }
272 }
273
Madan Jampani30a57f82015-03-02 12:19:41 -0800274 private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
275 Leadership oldLeadership = new Leadership(path, leader, epoch, electedTime);
Madan Jampanid14166a2015-02-24 17:37:51 -0800276 boolean updatedLeader = false;
277 synchronized (leaderBoard) {
278 Leadership currentLeader = leaderBoard.get(path);
279 if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
280 leaderBoard.remove(path);
281 updatedLeader = true;
282 }
283 }
284
285 if (updatedLeader) {
286 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
287 eventDispatcher.post(event);
288 clusterCommunicator.broadcast(
289 new ClusterMessage(
290 clusterService.getLocalNode().id(),
291 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
292 SERIALIZER.encode(event)));
293 }
294 }
295
296 private class InternalLeadershipEventListener implements ClusterMessageHandler {
297
298 @Override
299 public void handle(ClusterMessage message) {
300 LeadershipEvent leadershipEvent =
301 SERIALIZER.decode(message.payload());
302
Madan Jampania14047d2015-02-25 12:23:02 -0800303 log.debug("Leadership Event: time = {} type = {} event = {}",
Madan Jampanid14166a2015-02-24 17:37:51 -0800304 leadershipEvent.time(), leadershipEvent.type(),
305 leadershipEvent);
306
307 Leadership leadershipUpdate = leadershipEvent.subject();
308 LeadershipEvent.Type eventType = leadershipEvent.type();
309 String topic = leadershipUpdate.topic();
310
311 boolean updateAccepted = false;
312
313 synchronized (leaderBoard) {
314 Leadership currentLeadership = leaderBoard.get(topic);
315 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
316 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
317 leaderBoard.put(topic, leadershipUpdate);
318 updateAccepted = true;
319 }
320 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
321 if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
322 leaderBoard.remove(topic);
323 updateAccepted = true;
324 }
325 } else {
326 throw new IllegalStateException("Unknown event type.");
327 }
328 if (updateAccepted) {
329 eventDispatcher.post(leadershipEvent);
330 }
331 }
332 }
333 }
334
335 private void retry(String path) {
336 retryLeaderLockExecutor.schedule(
337 () -> tryLeaderLock(path),
338 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
339 TimeUnit.SECONDS);
340 }
341
342 private void purgeStaleLocks() {
343 try {
344 Set<Entry<String, Versioned<NodeId>>> entries = lockMap.entrySet();
345 entries.forEach(entry -> {
346 String path = entry.getKey();
347 NodeId nodeId = entry.getValue().value();
348 long epoch = entry.getValue().version();
Madan Jampani30a57f82015-03-02 12:19:41 -0800349 long creationTime = entry.getValue().creationTime();
Madan Jampanid14166a2015-02-24 17:37:51 -0800350 if (clusterService.getState(nodeId) == ControllerNode.State.INACTIVE) {
351 log.info("Lock for {} is held by {} which is currently inactive", path, nodeId);
352 try {
353 if (lockMap.remove(path, epoch)) {
Madan Jampania14047d2015-02-25 12:23:02 -0800354 log.info("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampani30a57f82015-03-02 12:19:41 -0800355 notifyRemovedLeader(path, nodeId, epoch, creationTime);
Madan Jampanid14166a2015-02-24 17:37:51 -0800356 }
357 } catch (Exception e) {
358 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
359 }
360 }
361 if (localNodeId.equals(nodeId) && !activeTopics.contains(path)) {
Madan Jampani52860be2015-02-27 12:52:37 -0800362 log.debug("Lock for {} is held by {} when it not running for leadership.", path, nodeId);
Madan Jampanid14166a2015-02-24 17:37:51 -0800363 try {
364 if (lockMap.remove(path, epoch)) {
Madan Jampania14047d2015-02-25 12:23:02 -0800365 log.info("Purged stale lock held by {} for {}", nodeId, path);
Madan Jampani30a57f82015-03-02 12:19:41 -0800366 notifyRemovedLeader(path, nodeId, epoch, creationTime);
Madan Jampanid14166a2015-02-24 17:37:51 -0800367 }
368 } catch (Exception e) {
369 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
370 }
371 }
372 });
373 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800374 log.debug("Failed cleaning up stale locks", e);
Madan Jampanid14166a2015-02-24 17:37:51 -0800375 }
376 }
377
378 private void sendLeadershipStatus() {
Madan Jampania14047d2015-02-25 12:23:02 -0800379 try {
380 leaderBoard.forEach((path, leadership) -> {
381 if (leadership.leader().equals(localNodeId)) {
382 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
383 clusterCommunicator.broadcast(
384 new ClusterMessage(
385 clusterService.getLocalNode().id(),
386 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
387 SERIALIZER.encode(event)));
388 }
389 });
390 } catch (Exception e) {
391 log.debug("Failed to send leadership updates", e);
392 }
Madan Jampanid14166a2015-02-24 17:37:51 -0800393 }
Madan Jampania14047d2015-02-25 12:23:02 -0800394}