blob: 8a5b9f1302ff6bd71da4f6dc893ce885023702cb [file] [log] [blame]
alshabibab984662014-12-04 18:56:18 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.impl;
Madan Jampani1d3494e2014-11-20 11:24:22 -080017
18import static com.google.common.base.Preconditions.checkArgument;
Madan Jampani1d3494e2014-11-20 11:24:22 -080019import static org.onlab.util.Tools.namedThreads;
20import static org.slf4j.LoggerFactory.getLogger;
21
Madan Jampani1ee91782014-11-20 20:24:24 -080022import java.io.IOException;
Madan Jampani1d3494e2014-11-20 11:24:22 -080023import java.util.Map;
24import java.util.Set;
25import java.util.concurrent.Executors;
26import java.util.concurrent.ScheduledExecutorService;
27import java.util.concurrent.TimeUnit;
28
29import org.apache.felix.scr.annotations.Activate;
30import org.apache.felix.scr.annotations.Component;
31import org.apache.felix.scr.annotations.Deactivate;
32import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
34import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080035import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.Leadership;
37import org.onosproject.cluster.LeadershipEvent;
38import org.onosproject.cluster.LeadershipEventListener;
39import org.onosproject.cluster.LeadershipService;
40import org.onosproject.cluster.NodeId;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
42import org.onosproject.store.cluster.messaging.ClusterMessage;
43import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
44import org.onosproject.store.cluster.messaging.MessageSubject;
45import org.onosproject.store.serializers.KryoNamespaces;
46import org.onosproject.store.serializers.KryoSerializer;
47import org.onosproject.store.service.Lock;
48import org.onosproject.store.service.LockService;
49import org.onosproject.store.service.impl.DistributedLockManager;
Madan Jampani1ee91782014-11-20 20:24:24 -080050import org.onlab.util.KryoNamespace;
Madan Jampani1d3494e2014-11-20 11:24:22 -080051import org.slf4j.Logger;
52
Madan Jampani8d21c792014-12-01 16:31:07 -080053import com.google.common.collect.ImmutableMap;
Madan Jampani1d3494e2014-11-20 11:24:22 -080054import com.google.common.collect.Maps;
55import com.google.common.collect.Sets;
56
57/**
58 * Distributed implementation of LeadershipService that is based on the primitives exposed by
59 * LockService.
60 */
Jonathan Hart949c2842014-11-28 23:44:09 -080061@Component(enabled = false)
Madan Jampani1d3494e2014-11-20 11:24:22 -080062@Service
63public class LeadershipManager implements LeadershipService {
64
65 private final Logger log = getLogger(getClass());
66
67 // TODO: Remove this dependency
68 private static final int TERM_DURATION_MS =
69 DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
70
Madan Jampani1ee91782014-11-20 20:24:24 -080071 // Time to wait before retrying leadership after
72 // a unexpected error.
73 private static final int WAIT_BEFORE_RETRY_MS = 2000;
74
Yuta HIGUCHI65934892014-12-04 17:47:44 -080075 // TODO: Make Thread pool size configurable.
Madan Jampaniddaffd02014-11-21 13:12:09 -080076 private final ScheduledExecutorService threadPool =
Madan Jampani1d3494e2014-11-20 11:24:22 -080077 Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
78
Madan Jampani1ee91782014-11-20 20:24:24 -080079 private static final MessageSubject LEADERSHIP_UPDATES =
80 new MessageSubject("leadership-contest-updates");
81
Madan Jampani1d3494e2014-11-20 11:24:22 -080082 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 private ClusterService clusterService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani1ee91782014-11-20 20:24:24 -080086 private ClusterCommunicationService clusterCommunicator;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani1d3494e2014-11-20 11:24:22 -080089 private LockService lockService;
90
Madan Jampani1ee91782014-11-20 20:24:24 -080091 private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
92
Madan Jampanid1a467f2014-11-24 15:11:09 -080093 private final Map<String, Lock> openContests = Maps.newConcurrentMap();
Madan Jampani1ee91782014-11-20 20:24:24 -080094 private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
Madan Jampani8d21c792014-12-01 16:31:07 -080095 private NodeId localNodeId;
Madan Jampani1d3494e2014-11-20 11:24:22 -080096
Madan Jampani1ee91782014-11-20 20:24:24 -080097 private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
98 private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
99
100 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
101 @Override
102 protected void setupKryoPool() {
103 serializerPool = KryoNamespace.newBuilder()
104 .register(KryoNamespaces.API)
105 .build()
106 .populate(1);
107 }
108 };
109
Madan Jampani1d3494e2014-11-20 11:24:22 -0800110 @Activate
111 public void activate() {
Madan Jampani8d21c792014-12-01 16:31:07 -0800112 localNodeId = clusterService.getLocalNode().id();
Madan Jampani1ee91782014-11-20 20:24:24 -0800113
114 addListener(peerAdvertiser);
115 addListener(leaderBoardUpdater);
116
117 clusterCommunicator.addSubscriber(
118 LEADERSHIP_UPDATES,
119 new PeerAdvertisementHandler());
120
Madan Jampani1d3494e2014-11-20 11:24:22 -0800121 log.info("Started.");
122 }
123
124 @Deactivate
125 public void deactivate() {
Madan Jampani1ee91782014-11-20 20:24:24 -0800126 removeListener(peerAdvertiser);
127 removeListener(leaderBoardUpdater);
128
129 clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
130
Madan Jampaniddaffd02014-11-21 13:12:09 -0800131 threadPool.shutdown();
Madan Jampani1ee91782014-11-20 20:24:24 -0800132
Madan Jampani1d3494e2014-11-20 11:24:22 -0800133 log.info("Stopped.");
134 }
135
Madan Jampani1ee91782014-11-20 20:24:24 -0800136
137 @Override
Madan Jampani8d21c792014-12-01 16:31:07 -0800138 public NodeId getLeader(String path) {
Madan Jampani1ee91782014-11-20 20:24:24 -0800139 synchronized (leaderBoard) {
140 Leadership leadership = leaderBoard.get(path);
141 if (leadership != null) {
142 return leadership.leader();
143 }
144 }
145 return null;
146 }
147
Madan Jampani1d3494e2014-11-20 11:24:22 -0800148 @Override
149 public void runForLeadership(String path) {
150 checkArgument(path != null);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800151
Madan Jampani1d3494e2014-11-20 11:24:22 -0800152 if (openContests.containsKey(path)) {
153 log.info("Already in the leadership contest for {}", path);
154 return;
155 } else {
156 Lock lock = lockService.create(path);
157 openContests.put(path, lock);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800158 threadPool.schedule(new TryLeadership(lock), 0, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800159 }
160 }
161
162 @Override
163 public void withdraw(String path) {
164 checkArgument(path != null);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800165
Madan Jampani1d3494e2014-11-20 11:24:22 -0800166 Lock lock = openContests.remove(path);
167
168 if (lock != null && lock.isLocked()) {
169 lock.unlock();
170 notifyListeners(
171 new LeadershipEvent(
172 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800173 new Leadership(lock.path(), localNodeId, lock.epoch())));
Madan Jampani1d3494e2014-11-20 11:24:22 -0800174 }
175 }
176
177 @Override
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800178 public Map<String, Leadership> getLeaderBoard() {
179 return ImmutableMap.copyOf(leaderBoard);
180 }
181
182 @Override
Madan Jampani1d3494e2014-11-20 11:24:22 -0800183 public void addListener(LeadershipEventListener listener) {
184 checkArgument(listener != null);
185 listeners.add(listener);
186 }
187
188 @Override
189 public void removeListener(LeadershipEventListener listener) {
190 checkArgument(listener != null);
191 listeners.remove(listener);
192 }
193
194 private void notifyListeners(LeadershipEvent event) {
195 for (LeadershipEventListener listener : listeners) {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800196 try {
197 listener.event(event);
198 } catch (Exception e) {
199 log.error("Notifying listener failed with exception.", e);
200 }
Madan Jampani1d3494e2014-11-20 11:24:22 -0800201 }
202 }
203
204 private void tryAcquireLeadership(String path) {
205 Lock lock = openContests.get(path);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800206 if (lock == null) {
207 // withdrew from race.
208 return;
209 }
Madan Jampani1d3494e2014-11-20 11:24:22 -0800210 lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
211 if (error == null) {
Madan Jampaniddaffd02014-11-21 13:12:09 -0800212 threadPool.schedule(
Madan Jampani1ee91782014-11-20 20:24:24 -0800213 new ReelectionTask(lock),
Madan Jampani1d3494e2014-11-20 11:24:22 -0800214 TERM_DURATION_MS / 2,
215 TimeUnit.MILLISECONDS);
216 notifyListeners(
217 new LeadershipEvent(
218 LeadershipEvent.Type.LEADER_ELECTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800219 new Leadership(lock.path(), localNodeId, lock.epoch())));
Madan Jampani1ee91782014-11-20 20:24:24 -0800220 return;
Madan Jampani1d3494e2014-11-20 11:24:22 -0800221 } else {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800222 log.warn("Failed to acquire lock for {}. Will retry in {} ms", path, WAIT_BEFORE_RETRY_MS, error);
223 threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800224 }
225 });
226 }
227
Madan Jampani1ee91782014-11-20 20:24:24 -0800228 private class ReelectionTask implements Runnable {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800229
230 private final Lock lock;
231
Madan Jampani1ee91782014-11-20 20:24:24 -0800232 public ReelectionTask(Lock lock) {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800233 this.lock = lock;
234 }
235
236 @Override
237 public void run() {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800238 if (!openContests.containsKey(lock.path())) {
239 log.debug("Node withdrew from leadership race for {}. Cancelling reelection task.", lock.path());
240 return;
241 }
242
243 boolean lockExtended = false;
244 try {
245 lockExtended = lock.extendExpiration(TERM_DURATION_MS);
246 } catch (Exception e) {
247 log.warn("Attempt to extend lock failed with an exception.", e);
248 }
249
250 if (lockExtended) {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800251 notifyListeners(
252 new LeadershipEvent(
253 LeadershipEvent.Type.LEADER_REELECTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800254 new Leadership(lock.path(), localNodeId, lock.epoch())));
Madan Jampaniddaffd02014-11-21 13:12:09 -0800255 threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800256 } else {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800257 // Check if this node already withdrew from the contest, in which case
258 // we don't need to notify here.
Madan Jampani1d3494e2014-11-20 11:24:22 -0800259 if (openContests.containsKey(lock.path())) {
260 notifyListeners(
261 new LeadershipEvent(
262 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani8d21c792014-12-01 16:31:07 -0800263 new Leadership(lock.path(), localNodeId, lock.epoch())));
Madan Jampanid1a467f2014-11-24 15:11:09 -0800264 // Retry leadership after a brief wait.
265 threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800266 }
267 }
268 }
269 }
Madan Jampani1ee91782014-11-20 20:24:24 -0800270
Madan Jampanid1a467f2014-11-24 15:11:09 -0800271 private class TryLeadership implements Runnable {
272 private final Lock lock;
273
274 public TryLeadership(Lock lock) {
275 this.lock = lock;
276 }
277
278 @Override
279 public void run() {
280 tryAcquireLeadership(lock.path());
281 }
282 }
283
Madan Jampani1ee91782014-11-20 20:24:24 -0800284 private class PeerAdvertiser implements LeadershipEventListener {
285 @Override
286 public void event(LeadershipEvent event) {
287 // publish events originating on this host.
Madan Jampani8d21c792014-12-01 16:31:07 -0800288 if (event.subject().leader().equals(localNodeId)) {
Madan Jampani1ee91782014-11-20 20:24:24 -0800289 try {
290 clusterCommunicator.broadcast(
291 new ClusterMessage(
Madan Jampani8d21c792014-12-01 16:31:07 -0800292 localNodeId,
Madan Jampani1ee91782014-11-20 20:24:24 -0800293 LEADERSHIP_UPDATES,
294 SERIALIZER.encode(event)));
295 } catch (IOException e) {
296 log.error("Failed to broadcast leadership update message", e);
297 }
298 }
299 }
300 }
301
302 private class PeerAdvertisementHandler implements ClusterMessageHandler {
303 @Override
304 public void handle(ClusterMessage message) {
305 LeadershipEvent event = SERIALIZER.decode(message.payload());
Yuta HIGUCHI47b2f552014-11-28 20:13:15 -0800306 log.trace("Received {} from {}", event, message.sender());
Madan Jampani1ee91782014-11-20 20:24:24 -0800307 notifyListeners(event);
308 }
309 }
310
311 private class LeaderBoardUpdater implements LeadershipEventListener {
312 @Override
313 public void event(LeadershipEvent event) {
314 Leadership leadershipUpdate = event.subject();
315 synchronized (leaderBoard) {
316 Leadership currentLeadership = leaderBoard.get(leadershipUpdate.topic());
317 switch (event.type()) {
318 case LEADER_ELECTED:
319 case LEADER_REELECTED:
320 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
321 leaderBoard.put(leadershipUpdate.topic(), leadershipUpdate);
322 }
323 break;
324 case LEADER_BOOTED:
325 if (currentLeadership != null && currentLeadership.epoch() <= leadershipUpdate.epoch()) {
326 leaderBoard.remove(leadershipUpdate.topic());
327 }
328 break;
329 default:
330 break;
331 }
332 }
333 }
334 }
Brian O'Connorabafb502014-12-02 22:26:20 -0800335}