blob: 8f3068c51a278fee4ee5ea27addec80e0018a887 [file] [log] [blame]
alshabibab984662014-12-04 18:56:18 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.impl;
Madan Jampani1d3494e2014-11-20 11:24:22 -080017
Jonathan Hart7d656f42015-01-27 14:07:23 -080018import com.google.common.collect.ImmutableMap;
19import com.google.common.collect.Maps;
20import com.google.common.collect.Sets;
Madan Jampani1d3494e2014-11-20 11:24:22 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
Jonathan Hart7d656f42015-01-27 14:07:23 -080027import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080028import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.Leadership;
30import org.onosproject.cluster.LeadershipEvent;
31import org.onosproject.cluster.LeadershipEventListener;
32import org.onosproject.cluster.LeadershipService;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
35import org.onosproject.store.cluster.messaging.ClusterMessage;
36import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
37import org.onosproject.store.cluster.messaging.MessageSubject;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.serializers.KryoSerializer;
40import org.onosproject.store.service.Lock;
41import org.onosproject.store.service.LockService;
Madan Jampani1d3494e2014-11-20 11:24:22 -080042import org.slf4j.Logger;
43
Jonathan Hart7d656f42015-01-27 14:07:23 -080044import java.util.Map;
45import java.util.Set;
46import java.util.concurrent.Executors;
47import java.util.concurrent.ScheduledExecutorService;
48import java.util.concurrent.TimeUnit;
49
50import static com.google.common.base.Preconditions.checkArgument;
51import static org.onlab.util.Tools.namedThreads;
52import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani1d3494e2014-11-20 11:24:22 -080053
54/**
55 * Distributed implementation of LeadershipService that is based on the primitives exposed by
56 * LockService.
57 */
Jonathan Hart949c2842014-11-28 23:44:09 -080058@Component(enabled = false)
Madan Jampani1d3494e2014-11-20 11:24:22 -080059@Service
60public class LeadershipManager implements LeadershipService {
61
62 private final Logger log = getLogger(getClass());
63
Madan Jampani94c23532015-02-05 17:40:01 -080064 private static final int TERM_DURATION_MS = 2000;
Madan Jampani1d3494e2014-11-20 11:24:22 -080065
Madan Jampani1ee91782014-11-20 20:24:24 -080066 // Time to wait before retrying leadership after
67 // a unexpected error.
68 private static final int WAIT_BEFORE_RETRY_MS = 2000;
69
Yuta HIGUCHI65934892014-12-04 17:47:44 -080070 // TODO: Make Thread pool size configurable.
Madan Jampaniddaffd02014-11-21 13:12:09 -080071 private final ScheduledExecutorService threadPool =
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -080072 Executors.newScheduledThreadPool(25, namedThreads("onos-leadership-manager-%d"));
Madan Jampani1d3494e2014-11-20 11:24:22 -080073
Madan Jampani1ee91782014-11-20 20:24:24 -080074 private static final MessageSubject LEADERSHIP_UPDATES =
75 new MessageSubject("leadership-contest-updates");
76
Madan Jampani1d3494e2014-11-20 11:24:22 -080077 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 private ClusterService clusterService;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani1ee91782014-11-20 20:24:24 -080081 private ClusterCommunicationService clusterCommunicator;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani1d3494e2014-11-20 11:24:22 -080084 private LockService lockService;
85
Madan Jampani1ee91782014-11-20 20:24:24 -080086 private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
87
Madan Jampanid1a467f2014-11-24 15:11:09 -080088 private final Map<String, Lock> openContests = Maps.newConcurrentMap();
Madan Jampani1ee91782014-11-20 20:24:24 -080089 private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
Madan Jampani8d21c792014-12-01 16:31:07 -080090 private NodeId localNodeId;
Madan Jampani1d3494e2014-11-20 11:24:22 -080091
Madan Jampani1ee91782014-11-20 20:24:24 -080092 private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
93 private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
94
95 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
96 @Override
97 protected void setupKryoPool() {
98 serializerPool = KryoNamespace.newBuilder()
99 .register(KryoNamespaces.API)
100 .build()
101 .populate(1);
102 }
103 };
104
Madan Jampani1d3494e2014-11-20 11:24:22 -0800105 @Activate
106 public void activate() {
Madan Jampani8d21c792014-12-01 16:31:07 -0800107 localNodeId = clusterService.getLocalNode().id();
Madan Jampani1ee91782014-11-20 20:24:24 -0800108
109 addListener(peerAdvertiser);
110 addListener(leaderBoardUpdater);
111
112 clusterCommunicator.addSubscriber(
113 LEADERSHIP_UPDATES,
114 new PeerAdvertisementHandler());
115
Madan Jampani1d3494e2014-11-20 11:24:22 -0800116 log.info("Started.");
117 }
118
119 @Deactivate
120 public void deactivate() {
Madan Jampani1ee91782014-11-20 20:24:24 -0800121 removeListener(peerAdvertiser);
122 removeListener(leaderBoardUpdater);
123
124 clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
125
Madan Jampaniddaffd02014-11-21 13:12:09 -0800126 threadPool.shutdown();
Madan Jampani1ee91782014-11-20 20:24:24 -0800127
Madan Jampani1d3494e2014-11-20 11:24:22 -0800128 log.info("Stopped.");
129 }
130
Madan Jampani1ee91782014-11-20 20:24:24 -0800131
132 @Override
Madan Jampani8d21c792014-12-01 16:31:07 -0800133 public NodeId getLeader(String path) {
Madan Jampani1ee91782014-11-20 20:24:24 -0800134 synchronized (leaderBoard) {
135 Leadership leadership = leaderBoard.get(path);
136 if (leadership != null) {
137 return leadership.leader();
138 }
139 }
140 return null;
141 }
142
Madan Jampani1d3494e2014-11-20 11:24:22 -0800143 @Override
144 public void runForLeadership(String path) {
145 checkArgument(path != null);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800146
Madan Jampani1d3494e2014-11-20 11:24:22 -0800147 if (openContests.containsKey(path)) {
148 log.info("Already in the leadership contest for {}", path);
149 return;
150 } else {
151 Lock lock = lockService.create(path);
152 openContests.put(path, lock);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800153 threadPool.schedule(new TryLeadership(lock), 0, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800154 }
155 }
156
157 @Override
158 public void withdraw(String path) {
159 checkArgument(path != null);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800160
Madan Jampani1d3494e2014-11-20 11:24:22 -0800161 Lock lock = openContests.remove(path);
162
163 if (lock != null && lock.isLocked()) {
164 lock.unlock();
165 notifyListeners(
166 new LeadershipEvent(
167 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800168 new Leadership(lock.path(), localNodeId, lock.epoch())));
Madan Jampani1d3494e2014-11-20 11:24:22 -0800169 }
170 }
171
172 @Override
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800173 public Map<String, Leadership> getLeaderBoard() {
174 return ImmutableMap.copyOf(leaderBoard);
175 }
176
177 @Override
Madan Jampani1d3494e2014-11-20 11:24:22 -0800178 public void addListener(LeadershipEventListener listener) {
179 checkArgument(listener != null);
180 listeners.add(listener);
181 }
182
183 @Override
184 public void removeListener(LeadershipEventListener listener) {
185 checkArgument(listener != null);
186 listeners.remove(listener);
187 }
188
189 private void notifyListeners(LeadershipEvent event) {
190 for (LeadershipEventListener listener : listeners) {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800191 try {
192 listener.event(event);
193 } catch (Exception e) {
194 log.error("Notifying listener failed with exception.", e);
195 }
Madan Jampani1d3494e2014-11-20 11:24:22 -0800196 }
197 }
198
199 private void tryAcquireLeadership(String path) {
200 Lock lock = openContests.get(path);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800201 if (lock == null) {
202 // withdrew from race.
203 return;
204 }
Madan Jampani1d3494e2014-11-20 11:24:22 -0800205 lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
206 if (error == null) {
Madan Jampaniddaffd02014-11-21 13:12:09 -0800207 threadPool.schedule(
Madan Jampani1ee91782014-11-20 20:24:24 -0800208 new ReelectionTask(lock),
Madan Jampani1d3494e2014-11-20 11:24:22 -0800209 TERM_DURATION_MS / 2,
210 TimeUnit.MILLISECONDS);
211 notifyListeners(
212 new LeadershipEvent(
213 LeadershipEvent.Type.LEADER_ELECTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800214 new Leadership(lock.path(), localNodeId, lock.epoch())));
Madan Jampani1ee91782014-11-20 20:24:24 -0800215 return;
Madan Jampani1d3494e2014-11-20 11:24:22 -0800216 } else {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800217 log.warn("Failed to acquire lock for {}. Will retry in {} ms", path, WAIT_BEFORE_RETRY_MS, error);
218 threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800219 }
220 });
221 }
222
Madan Jampani1ee91782014-11-20 20:24:24 -0800223 private class ReelectionTask implements Runnable {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800224
225 private final Lock lock;
226
Madan Jampani1ee91782014-11-20 20:24:24 -0800227 public ReelectionTask(Lock lock) {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800228 this.lock = lock;
229 }
230
231 @Override
232 public void run() {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800233 if (!openContests.containsKey(lock.path())) {
234 log.debug("Node withdrew from leadership race for {}. Cancelling reelection task.", lock.path());
235 return;
236 }
237
238 boolean lockExtended = false;
239 try {
240 lockExtended = lock.extendExpiration(TERM_DURATION_MS);
241 } catch (Exception e) {
242 log.warn("Attempt to extend lock failed with an exception.", e);
243 }
244
245 if (lockExtended) {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800246 notifyListeners(
247 new LeadershipEvent(
248 LeadershipEvent.Type.LEADER_REELECTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800249 new Leadership(lock.path(), localNodeId, lock.epoch())));
Madan Jampaniddaffd02014-11-21 13:12:09 -0800250 threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800251 } else {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800252 // Check if this node already withdrew from the contest, in which case
253 // we don't need to notify here.
Madan Jampani1d3494e2014-11-20 11:24:22 -0800254 if (openContests.containsKey(lock.path())) {
255 notifyListeners(
256 new LeadershipEvent(
257 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800258 new Leadership(lock.path(), localNodeId, lock.epoch())));
Madan Jampanid1a467f2014-11-24 15:11:09 -0800259 // Retry leadership after a brief wait.
260 threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800261 }
262 }
263 }
264 }
Madan Jampani1ee91782014-11-20 20:24:24 -0800265
Madan Jampanid1a467f2014-11-24 15:11:09 -0800266 private class TryLeadership implements Runnable {
267 private final Lock lock;
268
269 public TryLeadership(Lock lock) {
270 this.lock = lock;
271 }
272
273 @Override
274 public void run() {
275 tryAcquireLeadership(lock.path());
276 }
277 }
278
Madan Jampani1ee91782014-11-20 20:24:24 -0800279 private class PeerAdvertiser implements LeadershipEventListener {
280 @Override
281 public void event(LeadershipEvent event) {
282 // publish events originating on this host.
Madan Jampani8d21c792014-12-01 16:31:07 -0800283 if (event.subject().leader().equals(localNodeId)) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800284 clusterCommunicator.broadcast(
285 new ClusterMessage(
286 localNodeId,
287 LEADERSHIP_UPDATES,
288 SERIALIZER.encode(event)));
Madan Jampani1ee91782014-11-20 20:24:24 -0800289 }
290 }
291 }
292
293 private class PeerAdvertisementHandler implements ClusterMessageHandler {
294 @Override
295 public void handle(ClusterMessage message) {
296 LeadershipEvent event = SERIALIZER.decode(message.payload());
Yuta HIGUCHI47b2f552014-11-28 20:13:15 -0800297 log.trace("Received {} from {}", event, message.sender());
Madan Jampani1ee91782014-11-20 20:24:24 -0800298 notifyListeners(event);
299 }
300 }
301
302 private class LeaderBoardUpdater implements LeadershipEventListener {
303 @Override
304 public void event(LeadershipEvent event) {
305 Leadership leadershipUpdate = event.subject();
306 synchronized (leaderBoard) {
307 Leadership currentLeadership = leaderBoard.get(leadershipUpdate.topic());
308 switch (event.type()) {
309 case LEADER_ELECTED:
310 case LEADER_REELECTED:
311 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
312 leaderBoard.put(leadershipUpdate.topic(), leadershipUpdate);
313 }
314 break;
315 case LEADER_BOOTED:
316 if (currentLeadership != null && currentLeadership.epoch() <= leadershipUpdate.epoch()) {
317 leaderBoard.remove(leadershipUpdate.topic());
318 }
319 break;
320 default:
321 break;
322 }
323 }
324 }
325 }
Brian O'Connorabafb502014-12-02 22:26:20 -0800326}