blob: 38bb1a3ee873acf8162e5f61b469d5672d77c66d [file] [log] [blame]
Madan Jampani1d3494e2014-11-20 11:24:22 -08001package org.onlab.onos.store.cluster.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4import static com.google.common.base.Verify.verifyNotNull;
5import static org.onlab.util.Tools.namedThreads;
6import static org.slf4j.LoggerFactory.getLogger;
7
Madan Jampani1ee91782014-11-20 20:24:24 -08008import java.io.IOException;
Madan Jampani1d3494e2014-11-20 11:24:22 -08009import java.util.Map;
10import java.util.Set;
11import java.util.concurrent.Executors;
12import java.util.concurrent.ScheduledExecutorService;
13import java.util.concurrent.TimeUnit;
14
15import org.apache.felix.scr.annotations.Activate;
16import org.apache.felix.scr.annotations.Component;
17import org.apache.felix.scr.annotations.Deactivate;
18import org.apache.felix.scr.annotations.Reference;
19import org.apache.felix.scr.annotations.ReferenceCardinality;
20import org.apache.felix.scr.annotations.Service;
21import org.onlab.onos.cluster.ClusterService;
22import org.onlab.onos.cluster.ControllerNode;
23import org.onlab.onos.cluster.Leadership;
24import org.onlab.onos.cluster.LeadershipEvent;
25import org.onlab.onos.cluster.LeadershipEventListener;
26import org.onlab.onos.cluster.LeadershipService;
Madan Jampani1ee91782014-11-20 20:24:24 -080027import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
28import org.onlab.onos.store.cluster.messaging.ClusterMessage;
29import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
30import org.onlab.onos.store.cluster.messaging.MessageSubject;
31import org.onlab.onos.store.serializers.KryoNamespaces;
32import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampani1d3494e2014-11-20 11:24:22 -080033import org.onlab.onos.store.service.Lock;
34import org.onlab.onos.store.service.LockService;
35import org.onlab.onos.store.service.impl.DistributedLockManager;
Madan Jampani1ee91782014-11-20 20:24:24 -080036import org.onlab.util.KryoNamespace;
Madan Jampani1d3494e2014-11-20 11:24:22 -080037import org.slf4j.Logger;
38
39import com.google.common.collect.Maps;
40import com.google.common.collect.Sets;
41
42/**
43 * Distributed implementation of LeadershipService that is based on the primitives exposed by
44 * LockService.
45 */
46@Component(immediate = true)
47@Service
48public class LeadershipManager implements LeadershipService {
49
50 private final Logger log = getLogger(getClass());
51
52 // TODO: Remove this dependency
53 private static final int TERM_DURATION_MS =
54 DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
55
Madan Jampani1ee91782014-11-20 20:24:24 -080056 // Time to wait before retrying leadership after
57 // a unexpected error.
58 private static final int WAIT_BEFORE_RETRY_MS = 2000;
59
Madan Jampani1d3494e2014-11-20 11:24:22 -080060 // TODO: Appropriate Thread pool sizing.
61 private static final ScheduledExecutorService THREAD_POOL =
62 Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
63
Madan Jampani1ee91782014-11-20 20:24:24 -080064 private static final MessageSubject LEADERSHIP_UPDATES =
65 new MessageSubject("leadership-contest-updates");
66
Madan Jampani1d3494e2014-11-20 11:24:22 -080067 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 private ClusterService clusterService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani1ee91782014-11-20 20:24:24 -080071 private ClusterCommunicationService clusterCommunicator;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani1d3494e2014-11-20 11:24:22 -080074 private LockService lockService;
75
Madan Jampani1ee91782014-11-20 20:24:24 -080076 private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
77
78 private final Map<String, Lock> openContests = Maps.newHashMap();
79 private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
Madan Jampani1d3494e2014-11-20 11:24:22 -080080 private ControllerNode localNode;
81
Madan Jampani1ee91782014-11-20 20:24:24 -080082 private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
83 private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
84
85 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
86 @Override
87 protected void setupKryoPool() {
88 serializerPool = KryoNamespace.newBuilder()
89 .register(KryoNamespaces.API)
90 .build()
91 .populate(1);
92 }
93 };
94
Madan Jampani1d3494e2014-11-20 11:24:22 -080095 @Activate
96 public void activate() {
97 localNode = clusterService.getLocalNode();
Madan Jampani1ee91782014-11-20 20:24:24 -080098
99 addListener(peerAdvertiser);
100 addListener(leaderBoardUpdater);
101
102 clusterCommunicator.addSubscriber(
103 LEADERSHIP_UPDATES,
104 new PeerAdvertisementHandler());
105
Madan Jampani1d3494e2014-11-20 11:24:22 -0800106 log.info("Started.");
107 }
108
109 @Deactivate
110 public void deactivate() {
Madan Jampani1ee91782014-11-20 20:24:24 -0800111 removeListener(peerAdvertiser);
112 removeListener(leaderBoardUpdater);
113
114 clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
115
Madan Jampani1d3494e2014-11-20 11:24:22 -0800116 THREAD_POOL.shutdown();
Madan Jampani1ee91782014-11-20 20:24:24 -0800117
Madan Jampani1d3494e2014-11-20 11:24:22 -0800118 log.info("Stopped.");
119 }
120
Madan Jampani1ee91782014-11-20 20:24:24 -0800121
122 @Override
123 public ControllerNode getLeader(String path) {
124 synchronized (leaderBoard) {
125 Leadership leadership = leaderBoard.get(path);
126 if (leadership != null) {
127 return leadership.leader();
128 }
129 }
130 return null;
131 }
132
Madan Jampani1d3494e2014-11-20 11:24:22 -0800133 @Override
134 public void runForLeadership(String path) {
135 checkArgument(path != null);
136 if (openContests.containsKey(path)) {
137 log.info("Already in the leadership contest for {}", path);
138 return;
139 } else {
140 Lock lock = lockService.create(path);
141 openContests.put(path, lock);
142 tryAcquireLeadership(path);
143 }
144 }
145
146 @Override
147 public void withdraw(String path) {
148 checkArgument(path != null);
149 Lock lock = openContests.remove(path);
150
151 if (lock != null && lock.isLocked()) {
152 lock.unlock();
153 notifyListeners(
154 new LeadershipEvent(
155 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani1ee91782014-11-20 20:24:24 -0800156 new Leadership(lock.path(), localNode, lock.epoch())));
Madan Jampani1d3494e2014-11-20 11:24:22 -0800157 }
158 }
159
160 @Override
161 public void addListener(LeadershipEventListener listener) {
162 checkArgument(listener != null);
163 listeners.add(listener);
164 }
165
166 @Override
167 public void removeListener(LeadershipEventListener listener) {
168 checkArgument(listener != null);
169 listeners.remove(listener);
170 }
171
172 private void notifyListeners(LeadershipEvent event) {
173 for (LeadershipEventListener listener : listeners) {
174 listener.event(event);
175 }
176 }
177
178 private void tryAcquireLeadership(String path) {
179 Lock lock = openContests.get(path);
180 verifyNotNull(lock, "Lock should not be null");
181 lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
182 if (error == null) {
183 THREAD_POOL.schedule(
Madan Jampani1ee91782014-11-20 20:24:24 -0800184 new ReelectionTask(lock),
Madan Jampani1d3494e2014-11-20 11:24:22 -0800185 TERM_DURATION_MS / 2,
186 TimeUnit.MILLISECONDS);
187 notifyListeners(
188 new LeadershipEvent(
189 LeadershipEvent.Type.LEADER_ELECTED,
Madan Jampani1ee91782014-11-20 20:24:24 -0800190 new Leadership(lock.path(), localNode, lock.epoch())));
191 return;
Madan Jampani1d3494e2014-11-20 11:24:22 -0800192 } else {
Madan Jampani1ee91782014-11-20 20:24:24 -0800193 log.warn("Failed to acquire lock for {}. Will retry in {} sec", path, WAIT_BEFORE_RETRY_MS, error);
194 try {
195 Thread.sleep(WAIT_BEFORE_RETRY_MS);
196 tryAcquireLeadership(path);
197 } catch (InterruptedException e) {
198 Thread.currentThread().interrupt();
199 }
Madan Jampani1d3494e2014-11-20 11:24:22 -0800200 }
201 });
202 }
203
Madan Jampani1ee91782014-11-20 20:24:24 -0800204 private class ReelectionTask implements Runnable {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800205
206 private final Lock lock;
207
Madan Jampani1ee91782014-11-20 20:24:24 -0800208 public ReelectionTask(Lock lock) {
Madan Jampani1d3494e2014-11-20 11:24:22 -0800209 this.lock = lock;
210 }
211
212 @Override
213 public void run() {
214 if (lock.extendExpiration(TERM_DURATION_MS)) {
215 notifyListeners(
216 new LeadershipEvent(
217 LeadershipEvent.Type.LEADER_REELECTED,
Madan Jampani1ee91782014-11-20 20:24:24 -0800218 new Leadership(lock.path(), localNode, lock.epoch())));
Madan Jampani1d3494e2014-11-20 11:24:22 -0800219 THREAD_POOL.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
220 } else {
221 if (openContests.containsKey(lock.path())) {
222 notifyListeners(
223 new LeadershipEvent(
224 LeadershipEvent.Type.LEADER_BOOTED,
Madan Jampani1ee91782014-11-20 20:24:24 -0800225 new Leadership(lock.path(), localNode, lock.epoch())));
Madan Jampani1d3494e2014-11-20 11:24:22 -0800226 tryAcquireLeadership(lock.path());
227 }
228 }
229 }
230 }
Madan Jampani1ee91782014-11-20 20:24:24 -0800231
232 private class PeerAdvertiser implements LeadershipEventListener {
233 @Override
234 public void event(LeadershipEvent event) {
235 // publish events originating on this host.
236 if (event.subject().leader().equals(localNode)) {
237 try {
238 clusterCommunicator.broadcast(
239 new ClusterMessage(
240 localNode.id(),
241 LEADERSHIP_UPDATES,
242 SERIALIZER.encode(event)));
243 } catch (IOException e) {
244 log.error("Failed to broadcast leadership update message", e);
245 }
246 }
247 }
248 }
249
250 private class PeerAdvertisementHandler implements ClusterMessageHandler {
251 @Override
252 public void handle(ClusterMessage message) {
253 LeadershipEvent event = SERIALIZER.decode(message.payload());
254 log.debug("Received {} from {}", event, message.sender());
255 notifyListeners(event);
256 }
257 }
258
259 private class LeaderBoardUpdater implements LeadershipEventListener {
260 @Override
261 public void event(LeadershipEvent event) {
262 Leadership leadershipUpdate = event.subject();
263 synchronized (leaderBoard) {
264 Leadership currentLeadership = leaderBoard.get(leadershipUpdate.topic());
265 switch (event.type()) {
266 case LEADER_ELECTED:
267 case LEADER_REELECTED:
268 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
269 leaderBoard.put(leadershipUpdate.topic(), leadershipUpdate);
270 }
271 break;
272 case LEADER_BOOTED:
273 if (currentLeadership != null && currentLeadership.epoch() <= leadershipUpdate.epoch()) {
274 leaderBoard.remove(leadershipUpdate.topic());
275 }
276 break;
277 default:
278 break;
279 }
280 }
281 }
282 }
Madan Jampani1d3494e2014-11-20 11:24:22 -0800283}