blob: fb19d4d29eb1889896c54dc9cab1a0412ddbc9ea [file] [log] [blame]
alshabibab984662014-12-04 18:56:18 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.impl;
Madan Jampani1d3494e2014-11-20 11:24:22 -080017
Jonathan Hart7d656f42015-01-27 14:07:23 -080018import com.google.common.collect.ImmutableMap;
19import com.google.common.collect.Maps;
20import com.google.common.collect.Sets;
Madan Jampani1d3494e2014-11-20 11:24:22 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
Jonathan Hart7d656f42015-01-27 14:07:23 -080027import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080028import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.Leadership;
30import org.onosproject.cluster.LeadershipEvent;
31import org.onosproject.cluster.LeadershipEventListener;
32import org.onosproject.cluster.LeadershipService;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
35import org.onosproject.store.cluster.messaging.ClusterMessage;
36import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
37import org.onosproject.store.cluster.messaging.MessageSubject;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.serializers.KryoSerializer;
40import org.onosproject.store.service.Lock;
41import org.onosproject.store.service.LockService;
42import org.onosproject.store.service.impl.DistributedLockManager;
Madan Jampani1d3494e2014-11-20 11:24:22 -080043import org.slf4j.Logger;
44
Jonathan Hart7d656f42015-01-27 14:07:23 -080045import java.util.Map;
46import java.util.Set;
47import java.util.concurrent.Executors;
48import java.util.concurrent.ScheduledExecutorService;
49import java.util.concurrent.TimeUnit;
50
51import static com.google.common.base.Preconditions.checkArgument;
52import static org.onlab.util.Tools.namedThreads;
53import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani1d3494e2014-11-20 11:24:22 -080054
55/**
56 * Distributed implementation of LeadershipService that is based on the primitives exposed by
57 * LockService.
58 */
Jonathan Hart949c2842014-11-28 23:44:09 -080059@Component(enabled = false)
Madan Jampani1d3494e2014-11-20 11:24:22 -080060@Service
61public class LeadershipManager implements LeadershipService {
62
63 private final Logger log = getLogger(getClass());
64
65 // TODO: Remove this dependency
66 private static final int TERM_DURATION_MS =
67 DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
68
Madan Jampani1ee91782014-11-20 20:24:24 -080069 // Time to wait before retrying leadership after
70 // a unexpected error.
71 private static final int WAIT_BEFORE_RETRY_MS = 2000;
72
Yuta HIGUCHI65934892014-12-04 17:47:44 -080073 // TODO: Make Thread pool size configurable.
Madan Jampaniddaffd02014-11-21 13:12:09 -080074 private final ScheduledExecutorService threadPool =
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -080075 Executors.newScheduledThreadPool(25, namedThreads("onos-leadership-manager-%d"));
Madan Jampani1d3494e2014-11-20 11:24:22 -080076
Madan Jampani1ee91782014-11-20 20:24:24 -080077 private static final MessageSubject LEADERSHIP_UPDATES =
78 new MessageSubject("leadership-contest-updates");
79
Madan Jampani1d3494e2014-11-20 11:24:22 -080080 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 private ClusterService clusterService;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani1ee91782014-11-20 20:24:24 -080084 private ClusterCommunicationService clusterCommunicator;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani1d3494e2014-11-20 11:24:22 -080087 private LockService lockService;
88
Madan Jampani1ee91782014-11-20 20:24:24 -080089 private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
90
Madan Jampanid1a467f2014-11-24 15:11:09 -080091 private final Map<String, Lock> openContests = Maps.newConcurrentMap();
Madan Jampani1ee91782014-11-20 20:24:24 -080092 private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
Madan Jampani8d21c792014-12-01 16:31:07 -080093 private NodeId localNodeId;
Madan Jampani1d3494e2014-11-20 11:24:22 -080094
Madan Jampani1ee91782014-11-20 20:24:24 -080095 private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
96 private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
97
98 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
99 @Override
100 protected void setupKryoPool() {
101 serializerPool = KryoNamespace.newBuilder()
102 .register(KryoNamespaces.API)
103 .build()
104 .populate(1);
105 }
106 };
107
Madan Jampani1d3494e2014-11-20 11:24:22 -0800108 @Activate
109 public void activate() {
Madan Jampani8d21c792014-12-01 16:31:07 -0800110 localNodeId = clusterService.getLocalNode().id();
Madan Jampani1ee91782014-11-20 20:24:24 -0800111
112 addListener(peerAdvertiser);
113 addListener(leaderBoardUpdater);
114
115 clusterCommunicator.addSubscriber(
116 LEADERSHIP_UPDATES,
117 new PeerAdvertisementHandler());
118
Madan Jampani1d3494e2014-11-20 11:24:22 -0800119 log.info("Started.");
120 }
121
122 @Deactivate
123 public void deactivate() {
Madan Jampani1ee91782014-11-20 20:24:24 -0800124 removeListener(peerAdvertiser);
125 removeListener(leaderBoardUpdater);
126
127 clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
128
Madan Jampaniddaffd02014-11-21 13:12:09 -0800129 threadPool.shutdown();
Madan Jampani1ee91782014-11-20 20:24:24 -0800130
Madan Jampani1d3494e2014-11-20 11:24:22 -0800131 log.info("Stopped.");
132 }
133
Madan Jampani1ee91782014-11-20 20:24:24 -0800134
135 @Override
Madan Jampani8d21c792014-12-01 16:31:07 -0800136 public NodeId getLeader(String path) {
Madan Jampani1ee91782014-11-20 20:24:24 -0800137 synchronized (leaderBoard) {
138 Leadership leadership = leaderBoard.get(path);
139 if (leadership != null) {
140 return leadership.leader();
141 }
142 }
143 return null;
144 }
145
Madan Jampani1d3494e2014-11-20 11:24:22 -0800146 @Override
147 public void runForLeadership(String path) {
148 checkArgument(path != null);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800149
Madan Jampani1d3494e2014-11-20 11:24:22 -0800150 if (openContests.containsKey(path)) {
151 log.info("Already in the leadership contest for {}", path);
152 return;
153 } else {
154 Lock lock = lockService.create(path);
155 openContests.put(path, lock);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800156 threadPool.schedule(new TryLeadership(lock), 0, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800157 }
158 }
159
160 @Override
161 public void withdraw(String path) {
162 checkArgument(path != null);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800163
Madan Jampani1d3494e2014-11-20 11:24:22 -0800164 Lock lock = openContests.remove(path);
165
166 if (lock != null && lock.isLocked()) {
167 lock.unlock();
168 notifyListeners(
169 new LeadershipEvent(
170 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800171 new Leadership(lock.path(), localNodeId, lock.epoch())));
Madan Jampani1d3494e2014-11-20 11:24:22 -0800172 }
173 }
174
175 @Override
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800176 public Map<String, Leadership> getLeaderBoard() {
177 return ImmutableMap.copyOf(leaderBoard);
178 }
179
180 @Override
Madan Jampani1d3494e2014-11-20 11:24:22 -0800181 public void addListener(LeadershipEventListener listener) {
182 checkArgument(listener != null);
183 listeners.add(listener);
184 }
185
186 @Override
187 public void removeListener(LeadershipEventListener listener) {
188 checkArgument(listener != null);
189 listeners.remove(listener);
190 }
191
192 private void notifyListeners(LeadershipEvent event) {
193 for (LeadershipEventListener listener : listeners) {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800194 try {
195 listener.event(event);
196 } catch (Exception e) {
197 log.error("Notifying listener failed with exception.", e);
198 }
Madan Jampani1d3494e2014-11-20 11:24:22 -0800199 }
200 }
201
202 private void tryAcquireLeadership(String path) {
203 Lock lock = openContests.get(path);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800204 if (lock == null) {
205 // withdrew from race.
206 return;
207 }
Madan Jampani1d3494e2014-11-20 11:24:22 -0800208 lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
209 if (error == null) {
Madan Jampaniddaffd02014-11-21 13:12:09 -0800210 threadPool.schedule(
Madan Jampani1ee91782014-11-20 20:24:24 -0800211 new ReelectionTask(lock),
Madan Jampani1d3494e2014-11-20 11:24:22 -0800212 TERM_DURATION_MS / 2,
213 TimeUnit.MILLISECONDS);
214 notifyListeners(
215 new LeadershipEvent(
216 LeadershipEvent.Type.LEADER_ELECTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800217 new Leadership(lock.path(), localNodeId, lock.epoch())));
Madan Jampani1ee91782014-11-20 20:24:24 -0800218 return;
Madan Jampani1d3494e2014-11-20 11:24:22 -0800219 } else {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800220 log.warn("Failed to acquire lock for {}. Will retry in {} ms", path, WAIT_BEFORE_RETRY_MS, error);
221 threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800222 }
223 });
224 }
225
Madan Jampani1ee91782014-11-20 20:24:24 -0800226 private class ReelectionTask implements Runnable {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800227
228 private final Lock lock;
229
Madan Jampani1ee91782014-11-20 20:24:24 -0800230 public ReelectionTask(Lock lock) {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800231 this.lock = lock;
232 }
233
234 @Override
235 public void run() {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800236 if (!openContests.containsKey(lock.path())) {
237 log.debug("Node withdrew from leadership race for {}. Cancelling reelection task.", lock.path());
238 return;
239 }
240
241 boolean lockExtended = false;
242 try {
243 lockExtended = lock.extendExpiration(TERM_DURATION_MS);
244 } catch (Exception e) {
245 log.warn("Attempt to extend lock failed with an exception.", e);
246 }
247
248 if (lockExtended) {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800249 notifyListeners(
250 new LeadershipEvent(
251 LeadershipEvent.Type.LEADER_REELECTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800252 new Leadership(lock.path(), localNodeId, lock.epoch())));
Madan Jampaniddaffd02014-11-21 13:12:09 -0800253 threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800254 } else {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800255 // Check if this node already withdrew from the contest, in which case
256 // we don't need to notify here.
Madan Jampani1d3494e2014-11-20 11:24:22 -0800257 if (openContests.containsKey(lock.path())) {
258 notifyListeners(
259 new LeadershipEvent(
260 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800261 new Leadership(lock.path(), localNodeId, lock.epoch())));
Madan Jampanid1a467f2014-11-24 15:11:09 -0800262 // Retry leadership after a brief wait.
263 threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800264 }
265 }
266 }
267 }
Madan Jampani1ee91782014-11-20 20:24:24 -0800268
Madan Jampanid1a467f2014-11-24 15:11:09 -0800269 private class TryLeadership implements Runnable {
270 private final Lock lock;
271
272 public TryLeadership(Lock lock) {
273 this.lock = lock;
274 }
275
276 @Override
277 public void run() {
278 tryAcquireLeadership(lock.path());
279 }
280 }
281
Madan Jampani1ee91782014-11-20 20:24:24 -0800282 private class PeerAdvertiser implements LeadershipEventListener {
283 @Override
284 public void event(LeadershipEvent event) {
285 // publish events originating on this host.
Madan Jampani8d21c792014-12-01 16:31:07 -0800286 if (event.subject().leader().equals(localNodeId)) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800287 clusterCommunicator.broadcast(
288 new ClusterMessage(
289 localNodeId,
290 LEADERSHIP_UPDATES,
291 SERIALIZER.encode(event)));
Madan Jampani1ee91782014-11-20 20:24:24 -0800292 }
293 }
294 }
295
296 private class PeerAdvertisementHandler implements ClusterMessageHandler {
297 @Override
298 public void handle(ClusterMessage message) {
299 LeadershipEvent event = SERIALIZER.decode(message.payload());
Yuta HIGUCHI47b2f552014-11-28 20:13:15 -0800300 log.trace("Received {} from {}", event, message.sender());
Madan Jampani1ee91782014-11-20 20:24:24 -0800301 notifyListeners(event);
302 }
303 }
304
305 private class LeaderBoardUpdater implements LeadershipEventListener {
306 @Override
307 public void event(LeadershipEvent event) {
308 Leadership leadershipUpdate = event.subject();
309 synchronized (leaderBoard) {
310 Leadership currentLeadership = leaderBoard.get(leadershipUpdate.topic());
311 switch (event.type()) {
312 case LEADER_ELECTED:
313 case LEADER_REELECTED:
314 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
315 leaderBoard.put(leadershipUpdate.topic(), leadershipUpdate);
316 }
317 break;
318 case LEADER_BOOTED:
319 if (currentLeadership != null && currentLeadership.epoch() <= leadershipUpdate.epoch()) {
320 leaderBoard.remove(leadershipUpdate.topic());
321 }
322 break;
323 default:
324 break;
325 }
326 }
327 }
328 }
Brian O'Connorabafb502014-12-02 22:26:20 -0800329}