blob: 360775cebb3e0c67ad68e6601f29ec581299bdcf [file] [log] [blame]
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.sdnip;
17
18import java.util.Map;
19import java.util.concurrent.ExecutorService;
20import java.util.concurrent.Executors;
21import java.util.concurrent.Future;
22import java.util.concurrent.locks.Lock;
23
24import org.onlab.onos.cluster.ClusterService;
25import org.onlab.onos.cluster.ControllerNode;
26import org.onlab.onos.cluster.Leadership;
27import org.onlab.onos.cluster.LeadershipEvent;
28import org.onlab.onos.cluster.LeadershipEventListener;
29import org.onlab.onos.cluster.LeadershipService;
30import org.onlab.onos.cluster.NodeId;
31import org.onlab.onos.event.AbstractListenerRegistry;
32import org.onlab.onos.event.EventDeliveryService;
33import org.onlab.onos.store.hz.StoreService;
34import org.onlab.onos.store.serializers.KryoNamespaces;
35import org.onlab.onos.store.serializers.KryoSerializer;
36import org.onlab.util.KryoNamespace;
37import static org.onlab.util.Tools.namedThreads;
38
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41import com.google.common.collect.Maps;
42import com.hazelcast.config.TopicConfig;
43import com.hazelcast.core.ITopic;
44import com.hazelcast.core.Message;
45import com.hazelcast.core.MessageListener;
46
47import static com.google.common.base.Preconditions.checkArgument;
48
49/**
50 * Distributed implementation of LeadershipService that is based on Hazelcast.
51 * <p>
52 * The election is eventually-consistent: if there is Hazelcast partitioning,
53 * and the partitioning is healed, there could be a short window of time
54 * until the leaders in each partition discover each other. If this happens,
55 * the leaders release the leadership and run again for election.
56 * </p>
57 * <p>
58 * The leader election is based on Hazelcast's Global Lock, which is stongly
59 * consistent. In addition, each leader periodically advertises events
60 * (using a Hazelcast Topic) that it is the elected leader. Those events are
61 * used for two purposes: (1) Discover multi-leader collisions (in case of
62 * healed Hazelcast partitions), and (2) Inform all listeners who is
63 * the current leader (e.g., for informational purpose).
64 * </p>
65 */
66public class SdnIpLeadershipService implements LeadershipService {
67 private static final Logger log =
68 LoggerFactory.getLogger(SdnIpLeadershipService.class);
69
70 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
71 @Override
72 protected void setupKryoPool() {
73 serializerPool = KryoNamespace.newBuilder()
74 .register(KryoNamespaces.API)
75 .build()
76 .populate(1);
77 }
78 };
79
80 private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
81 private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
82
83 private ClusterService clusterService;
84 private StoreService storeService;
85 private EventDeliveryService eventDispatcher;
86
87 private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
88 listenerRegistry;
89 private final Map<String, Topic> topics = Maps.newConcurrentMap();
90 private ControllerNode localNode;
91
92 /**
93 * Constructor.
94 *
95 * @param clusterService the cluster service to use
96 * @param storeService the store service to use
97 * @param eventDispatcher the event dispacher to use
98 */
99 SdnIpLeadershipService(ClusterService clusterService,
100 StoreService storeService,
101 EventDeliveryService eventDispatcher) {
102 this.clusterService = clusterService;
103 this.storeService = storeService;
104 this.eventDispatcher = eventDispatcher;
105 }
106
107 /**
108 * Starts operation.
109 */
110 void start() {
111 localNode = clusterService.getLocalNode();
112 listenerRegistry = new AbstractListenerRegistry<>();
113 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
114
115 log.info("SDN-IP Leadership Service started");
116 }
117
118 /**
119 * Stops operation.
120 */
121 void stop() {
122 eventDispatcher.removeSink(LeadershipEvent.class);
123
124 for (Topic topic : topics.values()) {
125 topic.stop();
126 }
127 topics.clear();
128
129 log.info("SDN-IP Leadership Service stopped");
130 }
131
132 @Override
133 public ControllerNode getLeader(String path) {
134 Topic topic = topics.get(path);
135 if (topic == null) {
136 return null;
137 }
138 return topic.leader();
139 }
140
141 @Override
142 public void runForLeadership(String path) {
143 checkArgument(path != null);
144 Topic topic = new Topic(path);
145 Topic oldTopic = topics.putIfAbsent(path, topic);
146 if (oldTopic == null) {
147 topic.start();
148 }
149 }
150
151 @Override
152 public void withdraw(String path) {
153 checkArgument(path != null);
154 Topic topic = topics.get(path);
155 if (topic != null) {
156 topic.stop();
157 topics.remove(path, topic);
158 }
159 }
160
161 @Override
Yuta HIGUCHIc2bf3d82014-11-28 18:50:41 -0800162 public Map<String, Leadership> getLeaderBoard() {
163 throw new UnsupportedOperationException("I don't know what to do." +
164 " I wish you luck.");
165 }
166
167 @Override
Pavlin Radoslavovc91eebe2014-11-25 18:45:46 -0800168 public void addListener(LeadershipEventListener listener) {
169 listenerRegistry.addListener(listener);
170 }
171
172 @Override
173 public void removeListener(LeadershipEventListener listener) {
174 listenerRegistry.removeListener(listener);
175 }
176
177 /**
178 * Class for keeping per-topic information.
179 */
180 private final class Topic implements MessageListener<byte[]> {
181 private final String topicName;
182 private volatile boolean isShutdown = true;
183 private volatile long lastLeadershipUpdateMs = 0;
184 private ExecutorService leaderElectionExecutor;
185
186 private ControllerNode leader;
187 private Lock leaderLock;
188 private Future<?> getLockFuture;
189 private Future<?> periodicProcessingFuture;
190 private ITopic<byte[]> leaderTopic;
191 private String leaderTopicRegistrationId;
192
193 /**
194 * Constructor.
195 *
196 * @param topicName the topic name
197 */
198 private Topic(String topicName) {
199 this.topicName = topicName;
200 }
201
202 /**
203 * Gets the leader for the topic.
204 *
205 * @return the leader for the topic
206 */
207 private ControllerNode leader() {
208 return leader;
209 }
210
211 /**
212 * Starts leadership election for the topic.
213 */
214 private void start() {
215 isShutdown = false;
216 String lockHzId = "LeadershipService/" + topicName + "/lock";
217 String topicHzId = "LeadershipService/" + topicName + "/topic";
218 leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
219
220 String threadPoolName =
221 "sdnip-leader-election-" + topicName + "-%d";
222 leaderElectionExecutor = Executors.newScheduledThreadPool(2,
223 namedThreads(threadPoolName));
224
225 TopicConfig topicConfig = new TopicConfig();
226 topicConfig.setGlobalOrderingEnabled(true);
227 topicConfig.setName(topicHzId);
228 storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
229
230 leaderTopic =
231 storeService.getHazelcastInstance().getTopic(topicHzId);
232 leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
233
234 getLockFuture = leaderElectionExecutor.submit(new Runnable() {
235 @Override
236 public void run() {
237 doLeaderElectionThread();
238 }
239 });
240 periodicProcessingFuture =
241 leaderElectionExecutor.submit(new Runnable() {
242 @Override
243 public void run() {
244 doPeriodicProcessing();
245 }
246 });
247 }
248
249 /**
250 * Stops leadership election for the topic.
251 */
252 private void stop() {
253 isShutdown = true;
254 leaderTopic.removeMessageListener(leaderTopicRegistrationId);
255 // getLockFuture.cancel(true);
256 // periodicProcessingFuture.cancel(true);
257 leaderElectionExecutor.shutdownNow();
258 }
259
260 @Override
261 public void onMessage(Message<byte[]> message) {
262 LeadershipEvent leadershipEvent =
263 SERIALIZER.decode(message.getMessageObject());
264 NodeId eventLeaderId = leadershipEvent.subject().leader().id();
265
266 log.debug("SDN-IP Leadership Event: time = {} type = {} event = {}",
267 leadershipEvent.time(), leadershipEvent.type(),
268 leadershipEvent);
269 if (!leadershipEvent.subject().topic().equals(topicName)) {
270 return; // Not our topic: ignore
271 }
272 if (eventLeaderId.equals(localNode.id())) {
273 return; // My own message: ignore
274 }
275
276 synchronized (this) {
277 switch (leadershipEvent.type()) {
278 case LEADER_ELECTED:
279 // FALLTHROUGH
280 case LEADER_REELECTED:
281 //
282 // Another leader: if we are also a leader, then give up
283 // leadership and run for re-election.
284 //
285 if ((leader != null) &&
286 leader.id().equals(localNode.id())) {
287 getLockFuture.cancel(true);
288 } else {
289 // Just update the current leader
290 leader = leadershipEvent.subject().leader();
291 lastLeadershipUpdateMs = System.currentTimeMillis();
292 }
293 eventDispatcher.post(leadershipEvent);
294 break;
295 case LEADER_BOOTED:
296 // Remove the state for the current leader
297 if ((leader != null) &&
298 eventLeaderId.equals(leader.id())) {
299 leader = null;
300 }
301 eventDispatcher.post(leadershipEvent);
302 break;
303 default:
304 break;
305 }
306 }
307 }
308
309 private void doPeriodicProcessing() {
310
311 while (!isShutdown) {
312
313 //
314 // Periodic tasks:
315 // (a) Advertise ourselves as the leader
316 // OR
317 // (b) Expire a stale (remote) leader
318 //
319 synchronized (this) {
320 LeadershipEvent leadershipEvent;
321 if (leader != null) {
322 if (leader.id().equals(localNode.id())) {
323 //
324 // Advertise ourselves as the leader
325 //
326 leadershipEvent = new LeadershipEvent(
327 LeadershipEvent.Type.LEADER_REELECTED,
328 new Leadership(topicName, localNode, 0));
329 // Dispatch to all remote instances
330 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
331 } else {
332 //
333 // Test if time to expire a stale leader
334 //
335 long delta = System.currentTimeMillis() -
336 lastLeadershipUpdateMs;
337 if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
338 leadershipEvent = new LeadershipEvent(
339 LeadershipEvent.Type.LEADER_BOOTED,
340 new Leadership(topicName, leader, 0));
341 // Dispatch only to the local listener(s)
342 eventDispatcher.post(leadershipEvent);
343 leader = null;
344 }
345 }
346 }
347 }
348
349 // Sleep before re-advertising
350 try {
351 Thread.sleep(LEADERSHIP_PERIODIC_INTERVAL_MS);
352 } catch (InterruptedException e) {
353 log.debug("SDN-IP Leader Election periodic thread interrupted");
354 }
355 }
356 }
357
358 /**
359 * Performs the leader election by using Hazelcast.
360 */
361 private void doLeaderElectionThread() {
362
363 while (!isShutdown) {
364 LeadershipEvent leadershipEvent;
365 //
366 // Try to acquire the lock and keep it until the instance is
367 // shutdown.
368 //
369 log.debug("SDN-IP Leader Election begin for topic {}",
370 topicName);
371 try {
372 // Block until it becomes the leader
373 leaderLock.lockInterruptibly();
374 } catch (InterruptedException e) {
375 //
376 // Thread interrupted. Either shutdown or run for
377 // re-election.
378 //
379 log.debug("SDN-IP Election interrupted for topic {}",
380 topicName);
381 continue;
382 }
383
384 synchronized (this) {
385 //
386 // This instance is now the leader
387 //
388 log.info("SDN-IP Leader Elected for topic {}", topicName);
389 leader = localNode;
390 leadershipEvent = new LeadershipEvent(
391 LeadershipEvent.Type.LEADER_ELECTED,
392 new Leadership(topicName, localNode, 0));
393 eventDispatcher.post(leadershipEvent);
394 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
395 }
396
397 try {
398 // Sleep forever until interrupted
399 Thread.sleep(Long.MAX_VALUE);
400 } catch (InterruptedException e) {
401 //
402 // Thread interrupted. Either shutdown or run for
403 // re-election.
404 //
405 log.debug("SDN-IP Leader Interrupted for topic {}",
406 topicName);
407 }
408
409 synchronized (this) {
410 // If we reach here, we should release the leadership
411 log.debug("SDN-IP Leader Lock Released for topic {}",
412 topicName);
413 if ((leader != null) &&
414 leader.id().equals(localNode.id())) {
415 leader = null;
416 }
417 leadershipEvent = new LeadershipEvent(
418 LeadershipEvent.Type.LEADER_BOOTED,
419 new Leadership(topicName, localNode, 0));
420 eventDispatcher.post(leadershipEvent);
421 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
422 leaderLock.unlock();
423 }
424 }
425 }
426 }
427}