blob: e9373c2cbf75df5a46890624c5bb91fe17d5952c [file] [log] [blame]
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jonathan Hart949c2842014-11-28 23:44:09 -080016package org.onlab.onos.store.cluster.impl;
17
18import static com.google.common.base.Preconditions.checkArgument;
19import static org.onlab.util.Tools.namedThreads;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080020
21import java.util.Map;
22import java.util.concurrent.ExecutorService;
23import java.util.concurrent.Executors;
24import java.util.concurrent.Future;
25import java.util.concurrent.locks.Lock;
26
Jonathan Hart949c2842014-11-28 23:44:09 -080027import org.apache.felix.scr.annotations.Activate;
28import org.apache.felix.scr.annotations.Component;
29import org.apache.felix.scr.annotations.Deactivate;
30import org.apache.felix.scr.annotations.Reference;
31import org.apache.felix.scr.annotations.ReferenceCardinality;
32import org.apache.felix.scr.annotations.Service;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080033import org.onlab.onos.cluster.ClusterService;
34import org.onlab.onos.cluster.ControllerNode;
35import org.onlab.onos.cluster.Leadership;
36import org.onlab.onos.cluster.LeadershipEvent;
37import org.onlab.onos.cluster.LeadershipEventListener;
38import org.onlab.onos.cluster.LeadershipService;
39import org.onlab.onos.cluster.NodeId;
40import org.onlab.onos.event.AbstractListenerRegistry;
41import org.onlab.onos.event.EventDeliveryService;
42import org.onlab.onos.store.hz.StoreService;
43import org.onlab.onos.store.serializers.KryoNamespaces;
44import org.onlab.onos.store.serializers.KryoSerializer;
45import org.onlab.util.KryoNamespace;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080046import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
Jonathan Hart949c2842014-11-28 23:44:09 -080048
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080049import com.google.common.collect.Maps;
50import com.hazelcast.config.TopicConfig;
51import com.hazelcast.core.ITopic;
52import com.hazelcast.core.Message;
53import com.hazelcast.core.MessageListener;
54
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080055/**
56 * Distributed implementation of LeadershipService that is based on Hazelcast.
57 * <p>
58 * The election is eventually-consistent: if there is Hazelcast partitioning,
59 * and the partitioning is healed, there could be a short window of time
60 * until the leaders in each partition discover each other. If this happens,
61 * the leaders release the leadership and run again for election.
62 * </p>
63 * <p>
64 * The leader election is based on Hazelcast's Global Lock, which is stongly
65 * consistent. In addition, each leader periodically advertises events
66 * (using a Hazelcast Topic) that it is the elected leader. Those events are
67 * used for two purposes: (1) Discover multi-leader collisions (in case of
68 * healed Hazelcast partitions), and (2) Inform all listeners who is
69 * the current leader (e.g., for informational purpose).
70 * </p>
71 */
Jonathan Hart949c2842014-11-28 23:44:09 -080072@Component(immediate = true)
73@Service
74public class HazelcastLeadershipService implements LeadershipService {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080075 private static final Logger log =
Jonathan Hart949c2842014-11-28 23:44:09 -080076 LoggerFactory.getLogger(HazelcastLeadershipService.class);
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080077
78 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
79 @Override
80 protected void setupKryoPool() {
81 serializerPool = KryoNamespace.newBuilder()
82 .register(KryoNamespaces.API)
83 .build()
84 .populate(1);
85 }
86 };
87
88 private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
89 private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
90
Jonathan Hart949c2842014-11-28 23:44:09 -080091 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
92 protected ClusterService clusterService;
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95 protected StoreService storeService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected EventDeliveryService eventDispatcher;
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -080099
100 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
101 listenerRegistry;
102 private final Map<String, Topic> topics = Maps.newConcurrentMap();
103 private ControllerNode localNode;
104
Jonathan Hart949c2842014-11-28 23:44:09 -0800105 @Activate
106 protected void activate() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800107 localNode = clusterService.getLocalNode();
108 listenerRegistry = new AbstractListenerRegistry<>();
109 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
110
Jonathan Hart949c2842014-11-28 23:44:09 -0800111 log.info("Hazelcast Leadership Service started");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800112 }
113
Jonathan Hart949c2842014-11-28 23:44:09 -0800114 @Deactivate
115 protected void deactivate() {
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800116 eventDispatcher.removeSink(LeadershipEvent.class);
117
118 for (Topic topic : topics.values()) {
119 topic.stop();
120 }
121 topics.clear();
122
Jonathan Hart949c2842014-11-28 23:44:09 -0800123 log.info("Hazelcast Leadership Service stopped");
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800124 }
125
126 @Override
127 public ControllerNode getLeader(String path) {
128 Topic topic = topics.get(path);
129 if (topic == null) {
130 return null;
131 }
132 return topic.leader();
133 }
134
135 @Override
136 public void runForLeadership(String path) {
137 checkArgument(path != null);
138 Topic topic = new Topic(path);
139 Topic oldTopic = topics.putIfAbsent(path, topic);
140 if (oldTopic == null) {
141 topic.start();
142 }
143 }
144
145 @Override
146 public void withdraw(String path) {
147 checkArgument(path != null);
148 Topic topic = topics.get(path);
149 if (topic != null) {
150 topic.stop();
151 topics.remove(path, topic);
152 }
153 }
154
155 @Override
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800156 public Map<String, Leadership> getLeaderBoard() {
157 throw new UnsupportedOperationException("I don't know what to do." +
158 " I wish you luck.");
159 }
160
161 @Override
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800162 public void addListener(LeadershipEventListener listener) {
163 listenerRegistry.addListener(listener);
164 }
165
166 @Override
167 public void removeListener(LeadershipEventListener listener) {
168 listenerRegistry.removeListener(listener);
169 }
170
171 /**
172 * Class for keeping per-topic information.
173 */
174 private final class Topic implements MessageListener<byte[]> {
175 private final String topicName;
176 private volatile boolean isShutdown = true;
177 private volatile long lastLeadershipUpdateMs = 0;
178 private ExecutorService leaderElectionExecutor;
179
180 private ControllerNode leader;
181 private Lock leaderLock;
182 private Future<?> getLockFuture;
183 private Future<?> periodicProcessingFuture;
184 private ITopic<byte[]> leaderTopic;
185 private String leaderTopicRegistrationId;
186
187 /**
188 * Constructor.
189 *
190 * @param topicName the topic name
191 */
192 private Topic(String topicName) {
193 this.topicName = topicName;
194 }
195
196 /**
197 * Gets the leader for the topic.
198 *
199 * @return the leader for the topic
200 */
201 private ControllerNode leader() {
202 return leader;
203 }
204
205 /**
206 * Starts leadership election for the topic.
207 */
208 private void start() {
209 isShutdown = false;
210 String lockHzId = "LeadershipService/" + topicName + "/lock";
211 String topicHzId = "LeadershipService/" + topicName + "/topic";
212 leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
213
214 String threadPoolName =
215 "sdnip-leader-election-" + topicName + "-%d";
216 leaderElectionExecutor = Executors.newScheduledThreadPool(2,
217 namedThreads(threadPoolName));
218
219 TopicConfig topicConfig = new TopicConfig();
220 topicConfig.setGlobalOrderingEnabled(true);
221 topicConfig.setName(topicHzId);
222 storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
223
224 leaderTopic =
225 storeService.getHazelcastInstance().getTopic(topicHzId);
226 leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
227
228 getLockFuture = leaderElectionExecutor.submit(new Runnable() {
229 @Override
230 public void run() {
231 doLeaderElectionThread();
232 }
233 });
234 periodicProcessingFuture =
235 leaderElectionExecutor.submit(new Runnable() {
236 @Override
237 public void run() {
238 doPeriodicProcessing();
239 }
240 });
241 }
242
243 /**
244 * Stops leadership election for the topic.
245 */
246 private void stop() {
247 isShutdown = true;
248 leaderTopic.removeMessageListener(leaderTopicRegistrationId);
249 // getLockFuture.cancel(true);
250 // periodicProcessingFuture.cancel(true);
251 leaderElectionExecutor.shutdownNow();
252 }
253
254 @Override
255 public void onMessage(Message<byte[]> message) {
256 LeadershipEvent leadershipEvent =
257 SERIALIZER.decode(message.getMessageObject());
258 NodeId eventLeaderId = leadershipEvent.subject().leader().id();
259
260 log.debug("SDN-IP Leadership Event: time = {} type = {} event = {}",
261 leadershipEvent.time(), leadershipEvent.type(),
262 leadershipEvent);
263 if (!leadershipEvent.subject().topic().equals(topicName)) {
264 return; // Not our topic: ignore
265 }
266 if (eventLeaderId.equals(localNode.id())) {
267 return; // My own message: ignore
268 }
269
270 synchronized (this) {
271 switch (leadershipEvent.type()) {
272 case LEADER_ELECTED:
273 // FALLTHROUGH
274 case LEADER_REELECTED:
275 //
276 // Another leader: if we are also a leader, then give up
277 // leadership and run for re-election.
278 //
279 if ((leader != null) &&
280 leader.id().equals(localNode.id())) {
281 getLockFuture.cancel(true);
282 } else {
283 // Just update the current leader
284 leader = leadershipEvent.subject().leader();
285 lastLeadershipUpdateMs = System.currentTimeMillis();
286 }
287 eventDispatcher.post(leadershipEvent);
288 break;
289 case LEADER_BOOTED:
290 // Remove the state for the current leader
291 if ((leader != null) &&
292 eventLeaderId.equals(leader.id())) {
293 leader = null;
294 }
295 eventDispatcher.post(leadershipEvent);
296 break;
297 default:
298 break;
299 }
300 }
301 }
302
303 private void doPeriodicProcessing() {
304
305 while (!isShutdown) {
306
307 //
308 // Periodic tasks:
309 // (a) Advertise ourselves as the leader
310 // OR
311 // (b) Expire a stale (remote) leader
312 //
313 synchronized (this) {
314 LeadershipEvent leadershipEvent;
315 if (leader != null) {
316 if (leader.id().equals(localNode.id())) {
317 //
318 // Advertise ourselves as the leader
319 //
320 leadershipEvent = new LeadershipEvent(
321 LeadershipEvent.Type.LEADER_REELECTED,
322 new Leadership(topicName, localNode, 0));
323 // Dispatch to all remote instances
324 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
325 } else {
326 //
327 // Test if time to expire a stale leader
328 //
329 long delta = System.currentTimeMillis() -
330 lastLeadershipUpdateMs;
331 if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
332 leadershipEvent = new LeadershipEvent(
333 LeadershipEvent.Type.LEADER_BOOTED,
334 new Leadership(topicName, leader, 0));
335 // Dispatch only to the local listener(s)
336 eventDispatcher.post(leadershipEvent);
337 leader = null;
338 }
339 }
340 }
341 }
342
343 // Sleep before re-advertising
344 try {
345 Thread.sleep(LEADERSHIP_PERIODIC_INTERVAL_MS);
346 } catch (InterruptedException e) {
347 log.debug("SDN-IP Leader Election periodic thread interrupted");
348 }
349 }
350 }
351
352 /**
353 * Performs the leader election by using Hazelcast.
354 */
355 private void doLeaderElectionThread() {
356
357 while (!isShutdown) {
358 LeadershipEvent leadershipEvent;
359 //
360 // Try to acquire the lock and keep it until the instance is
361 // shutdown.
362 //
363 log.debug("SDN-IP Leader Election begin for topic {}",
364 topicName);
365 try {
366 // Block until it becomes the leader
367 leaderLock.lockInterruptibly();
368 } catch (InterruptedException e) {
369 //
370 // Thread interrupted. Either shutdown or run for
371 // re-election.
372 //
373 log.debug("SDN-IP Election interrupted for topic {}",
374 topicName);
375 continue;
376 }
377
378 synchronized (this) {
379 //
380 // This instance is now the leader
381 //
382 log.info("SDN-IP Leader Elected for topic {}", topicName);
383 leader = localNode;
384 leadershipEvent = new LeadershipEvent(
385 LeadershipEvent.Type.LEADER_ELECTED,
386 new Leadership(topicName, localNode, 0));
387 eventDispatcher.post(leadershipEvent);
388 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
389 }
390
391 try {
392 // Sleep forever until interrupted
393 Thread.sleep(Long.MAX_VALUE);
394 } catch (InterruptedException e) {
395 //
396 // Thread interrupted. Either shutdown or run for
397 // re-election.
398 //
399 log.debug("SDN-IP Leader Interrupted for topic {}",
400 topicName);
401 }
402
403 synchronized (this) {
404 // If we reach here, we should release the leadership
405 log.debug("SDN-IP Leader Lock Released for topic {}",
406 topicName);
407 if ((leader != null) &&
408 leader.id().equals(localNode.id())) {
409 leader = null;
410 }
411 leadershipEvent = new LeadershipEvent(
412 LeadershipEvent.Type.LEADER_BOOTED,
413 new Leadership(topicName, localNode, 0));
414 eventDispatcher.post(leadershipEvent);
415 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
416 leaderLock.unlock();
417 }
418 }
419 }
420 }
421}