blob: f18907c093bfec2f1ccdf768a8b512ec312c0307 [file] [log] [blame]
Madan Jampani1d3494e2014-11-20 11:24:22 -08001package org.onlab.onos.store.cluster.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
Madan Jampani1d3494e2014-11-20 11:24:22 -08004import static org.onlab.util.Tools.namedThreads;
5import static org.slf4j.LoggerFactory.getLogger;
6
Madan Jampani1ee91782014-11-20 20:24:24 -08007import java.io.IOException;
Madan Jampani1d3494e2014-11-20 11:24:22 -08008import java.util.Map;
9import java.util.Set;
10import java.util.concurrent.Executors;
11import java.util.concurrent.ScheduledExecutorService;
12import java.util.concurrent.TimeUnit;
13
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -080014import com.google.common.collect.ImmutableMap;
Madan Jampani1d3494e2014-11-20 11:24:22 -080015import org.apache.felix.scr.annotations.Activate;
16import org.apache.felix.scr.annotations.Component;
17import org.apache.felix.scr.annotations.Deactivate;
18import org.apache.felix.scr.annotations.Reference;
19import org.apache.felix.scr.annotations.ReferenceCardinality;
20import org.apache.felix.scr.annotations.Service;
21import org.onlab.onos.cluster.ClusterService;
22import org.onlab.onos.cluster.ControllerNode;
23import org.onlab.onos.cluster.Leadership;
24import org.onlab.onos.cluster.LeadershipEvent;
25import org.onlab.onos.cluster.LeadershipEventListener;
26import org.onlab.onos.cluster.LeadershipService;
Madan Jampani1ee91782014-11-20 20:24:24 -080027import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
28import org.onlab.onos.store.cluster.messaging.ClusterMessage;
29import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
30import org.onlab.onos.store.cluster.messaging.MessageSubject;
31import org.onlab.onos.store.serializers.KryoNamespaces;
32import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampani1d3494e2014-11-20 11:24:22 -080033import org.onlab.onos.store.service.Lock;
34import org.onlab.onos.store.service.LockService;
35import org.onlab.onos.store.service.impl.DistributedLockManager;
Madan Jampani1ee91782014-11-20 20:24:24 -080036import org.onlab.util.KryoNamespace;
Madan Jampani1d3494e2014-11-20 11:24:22 -080037import org.slf4j.Logger;
38
39import com.google.common.collect.Maps;
40import com.google.common.collect.Sets;
41
42/**
43 * Distributed implementation of LeadershipService that is based on the primitives exposed by
44 * LockService.
45 */
46@Component(immediate = true)
47@Service
48public class LeadershipManager implements LeadershipService {
49
50 private final Logger log = getLogger(getClass());
51
52 // TODO: Remove this dependency
53 private static final int TERM_DURATION_MS =
54 DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
55
Madan Jampani1ee91782014-11-20 20:24:24 -080056 // Time to wait before retrying leadership after
57 // a unexpected error.
58 private static final int WAIT_BEFORE_RETRY_MS = 2000;
59
Madan Jampani1d3494e2014-11-20 11:24:22 -080060 // TODO: Appropriate Thread pool sizing.
Madan Jampaniddaffd02014-11-21 13:12:09 -080061 private final ScheduledExecutorService threadPool =
Madan Jampani1d3494e2014-11-20 11:24:22 -080062 Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
63
Madan Jampani1ee91782014-11-20 20:24:24 -080064 private static final MessageSubject LEADERSHIP_UPDATES =
65 new MessageSubject("leadership-contest-updates");
66
Madan Jampani1d3494e2014-11-20 11:24:22 -080067 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 private ClusterService clusterService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani1ee91782014-11-20 20:24:24 -080071 private ClusterCommunicationService clusterCommunicator;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani1d3494e2014-11-20 11:24:22 -080074 private LockService lockService;
75
Madan Jampani1ee91782014-11-20 20:24:24 -080076 private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
77
Madan Jampanid1a467f2014-11-24 15:11:09 -080078 private final Map<String, Lock> openContests = Maps.newConcurrentMap();
Madan Jampani1ee91782014-11-20 20:24:24 -080079 private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
Madan Jampani1d3494e2014-11-20 11:24:22 -080080 private ControllerNode localNode;
81
Madan Jampani1ee91782014-11-20 20:24:24 -080082 private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
83 private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
84
85 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
86 @Override
87 protected void setupKryoPool() {
88 serializerPool = KryoNamespace.newBuilder()
89 .register(KryoNamespaces.API)
90 .build()
91 .populate(1);
92 }
93 };
94
Madan Jampani1d3494e2014-11-20 11:24:22 -080095 @Activate
96 public void activate() {
97 localNode = clusterService.getLocalNode();
Madan Jampani1ee91782014-11-20 20:24:24 -080098
99 addListener(peerAdvertiser);
100 addListener(leaderBoardUpdater);
101
102 clusterCommunicator.addSubscriber(
103 LEADERSHIP_UPDATES,
104 new PeerAdvertisementHandler());
105
Madan Jampani1d3494e2014-11-20 11:24:22 -0800106 log.info("Started.");
107 }
108
109 @Deactivate
110 public void deactivate() {
Madan Jampani1ee91782014-11-20 20:24:24 -0800111 removeListener(peerAdvertiser);
112 removeListener(leaderBoardUpdater);
113
114 clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
115
Madan Jampaniddaffd02014-11-21 13:12:09 -0800116 threadPool.shutdown();
Madan Jampani1ee91782014-11-20 20:24:24 -0800117
Madan Jampani1d3494e2014-11-20 11:24:22 -0800118 log.info("Stopped.");
119 }
120
Madan Jampani1ee91782014-11-20 20:24:24 -0800121
122 @Override
123 public ControllerNode getLeader(String path) {
124 synchronized (leaderBoard) {
125 Leadership leadership = leaderBoard.get(path);
126 if (leadership != null) {
127 return leadership.leader();
128 }
129 }
130 return null;
131 }
132
Madan Jampani1d3494e2014-11-20 11:24:22 -0800133 @Override
134 public void runForLeadership(String path) {
135 checkArgument(path != null);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800136
Madan Jampani1d3494e2014-11-20 11:24:22 -0800137 if (openContests.containsKey(path)) {
138 log.info("Already in the leadership contest for {}", path);
139 return;
140 } else {
141 Lock lock = lockService.create(path);
142 openContests.put(path, lock);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800143 threadPool.schedule(new TryLeadership(lock), 0, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800144 }
145 }
146
147 @Override
148 public void withdraw(String path) {
149 checkArgument(path != null);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800150
Madan Jampani1d3494e2014-11-20 11:24:22 -0800151 Lock lock = openContests.remove(path);
152
153 if (lock != null && lock.isLocked()) {
154 lock.unlock();
155 notifyListeners(
156 new LeadershipEvent(
157 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani1ee91782014-11-20 20:24:24 -0800158 new Leadership(lock.path(), localNode, lock.epoch())));
Madan Jampani1d3494e2014-11-20 11:24:22 -0800159 }
160 }
161
162 @Override
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800163 public Map<String, Leadership> getLeaderBoard() {
164 return ImmutableMap.copyOf(leaderBoard);
165 }
166
167 @Override
Madan Jampani1d3494e2014-11-20 11:24:22 -0800168 public void addListener(LeadershipEventListener listener) {
169 checkArgument(listener != null);
170 listeners.add(listener);
171 }
172
173 @Override
174 public void removeListener(LeadershipEventListener listener) {
175 checkArgument(listener != null);
176 listeners.remove(listener);
177 }
178
179 private void notifyListeners(LeadershipEvent event) {
180 for (LeadershipEventListener listener : listeners) {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800181 try {
182 listener.event(event);
183 } catch (Exception e) {
184 log.error("Notifying listener failed with exception.", e);
185 }
Madan Jampani1d3494e2014-11-20 11:24:22 -0800186 }
187 }
188
189 private void tryAcquireLeadership(String path) {
190 Lock lock = openContests.get(path);
Madan Jampanid1a467f2014-11-24 15:11:09 -0800191 if (lock == null) {
192 // withdrew from race.
193 return;
194 }
Madan Jampani1d3494e2014-11-20 11:24:22 -0800195 lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
196 if (error == null) {
Madan Jampaniddaffd02014-11-21 13:12:09 -0800197 threadPool.schedule(
Madan Jampani1ee91782014-11-20 20:24:24 -0800198 new ReelectionTask(lock),
Madan Jampani1d3494e2014-11-20 11:24:22 -0800199 TERM_DURATION_MS / 2,
200 TimeUnit.MILLISECONDS);
201 notifyListeners(
202 new LeadershipEvent(
203 LeadershipEvent.Type.LEADER_ELECTED,
Madan Jampani1ee91782014-11-20 20:24:24 -0800204 new Leadership(lock.path(), localNode, lock.epoch())));
205 return;
Madan Jampani1d3494e2014-11-20 11:24:22 -0800206 } else {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800207 log.warn("Failed to acquire lock for {}. Will retry in {} ms", path, WAIT_BEFORE_RETRY_MS, error);
208 threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800209 }
210 });
211 }
212
Madan Jampani1ee91782014-11-20 20:24:24 -0800213 private class ReelectionTask implements Runnable {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800214
215 private final Lock lock;
216
Madan Jampani1ee91782014-11-20 20:24:24 -0800217 public ReelectionTask(Lock lock) {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800218 this.lock = lock;
219 }
220
221 @Override
222 public void run() {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800223 if (!openContests.containsKey(lock.path())) {
224 log.debug("Node withdrew from leadership race for {}. Cancelling reelection task.", lock.path());
225 return;
226 }
227
228 boolean lockExtended = false;
229 try {
230 lockExtended = lock.extendExpiration(TERM_DURATION_MS);
231 } catch (Exception e) {
232 log.warn("Attempt to extend lock failed with an exception.", e);
233 }
234
235 if (lockExtended) {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800236 notifyListeners(
237 new LeadershipEvent(
238 LeadershipEvent.Type.LEADER_REELECTED,
Madan Jampani1ee91782014-11-20 20:24:24 -0800239 new Leadership(lock.path(), localNode, lock.epoch())));
Madan Jampaniddaffd02014-11-21 13:12:09 -0800240 threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800241 } else {
Madan Jampanid1a467f2014-11-24 15:11:09 -0800242 // Check if this node already withdrew from the contest, in which case
243 // we don't need to notify here.
Madan Jampani1d3494e2014-11-20 11:24:22 -0800244 if (openContests.containsKey(lock.path())) {
245 notifyListeners(
246 new LeadershipEvent(
247 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani1ee91782014-11-20 20:24:24 -0800248 new Leadership(lock.path(), localNode, lock.epoch())));
Madan Jampanid1a467f2014-11-24 15:11:09 -0800249 // Retry leadership after a brief wait.
250 threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
Madan Jampani1d3494e2014-11-20 11:24:22 -0800251 }
252 }
253 }
254 }
Madan Jampani1ee91782014-11-20 20:24:24 -0800255
Madan Jampanid1a467f2014-11-24 15:11:09 -0800256 private class TryLeadership implements Runnable {
257 private final Lock lock;
258
259 public TryLeadership(Lock lock) {
260 this.lock = lock;
261 }
262
263 @Override
264 public void run() {
265 tryAcquireLeadership(lock.path());
266 }
267 }
268
Madan Jampani1ee91782014-11-20 20:24:24 -0800269 private class PeerAdvertiser implements LeadershipEventListener {
270 @Override
271 public void event(LeadershipEvent event) {
272 // publish events originating on this host.
273 if (event.subject().leader().equals(localNode)) {
274 try {
275 clusterCommunicator.broadcast(
276 new ClusterMessage(
277 localNode.id(),
278 LEADERSHIP_UPDATES,
279 SERIALIZER.encode(event)));
280 } catch (IOException e) {
281 log.error("Failed to broadcast leadership update message", e);
282 }
283 }
284 }
285 }
286
287 private class PeerAdvertisementHandler implements ClusterMessageHandler {
288 @Override
289 public void handle(ClusterMessage message) {
290 LeadershipEvent event = SERIALIZER.decode(message.payload());
Yuta HIGUCHI47b2f552014-11-28 20:13:15 -0800291 log.trace("Received {} from {}", event, message.sender());
Madan Jampani1ee91782014-11-20 20:24:24 -0800292 notifyListeners(event);
293 }
294 }
295
296 private class LeaderBoardUpdater implements LeadershipEventListener {
297 @Override
298 public void event(LeadershipEvent event) {
299 Leadership leadershipUpdate = event.subject();
300 synchronized (leaderBoard) {
301 Leadership currentLeadership = leaderBoard.get(leadershipUpdate.topic());
302 switch (event.type()) {
303 case LEADER_ELECTED:
304 case LEADER_REELECTED:
305 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
306 leaderBoard.put(leadershipUpdate.topic(), leadershipUpdate);
307 }
308 break;
309 case LEADER_BOOTED:
310 if (currentLeadership != null && currentLeadership.epoch() <= leadershipUpdate.epoch()) {
311 leaderBoard.remove(leadershipUpdate.topic());
312 }
313 break;
314 default:
315 break;
316 }
317 }
318 }
319 }
Madan Jampani1d3494e2014-11-20 11:24:22 -0800320}