blob: 31bc47f816b441bff9dfd51d48d7e68875cb274d [file] [log] [blame]
Madan Jampani1d3494e2014-11-20 11:24:22 -08001package org.onlab.onos.store.cluster.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4import static com.google.common.base.Verify.verifyNotNull;
5import static org.onlab.util.Tools.namedThreads;
6import static org.slf4j.LoggerFactory.getLogger;
7
8import java.util.Map;
9import java.util.Set;
10import java.util.concurrent.Executors;
11import java.util.concurrent.ScheduledExecutorService;
12import java.util.concurrent.TimeUnit;
13
14import org.apache.felix.scr.annotations.Activate;
15import org.apache.felix.scr.annotations.Component;
16import org.apache.felix.scr.annotations.Deactivate;
17import org.apache.felix.scr.annotations.Reference;
18import org.apache.felix.scr.annotations.ReferenceCardinality;
19import org.apache.felix.scr.annotations.Service;
20import org.onlab.onos.cluster.ClusterService;
21import org.onlab.onos.cluster.ControllerNode;
22import org.onlab.onos.cluster.Leadership;
23import org.onlab.onos.cluster.LeadershipEvent;
24import org.onlab.onos.cluster.LeadershipEventListener;
25import org.onlab.onos.cluster.LeadershipService;
26import org.onlab.onos.store.service.Lock;
27import org.onlab.onos.store.service.LockService;
28import org.onlab.onos.store.service.impl.DistributedLockManager;
29import org.slf4j.Logger;
30
31import com.google.common.collect.Maps;
32import com.google.common.collect.Sets;
33
34/**
35 * Distributed implementation of LeadershipService that is based on the primitives exposed by
36 * LockService.
37 */
38@Component(immediate = true)
39@Service
40public class LeadershipManager implements LeadershipService {
41
42 private final Logger log = getLogger(getClass());
43
44 // TODO: Remove this dependency
45 private static final int TERM_DURATION_MS =
46 DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
47
48 // TODO: Appropriate Thread pool sizing.
49 private static final ScheduledExecutorService THREAD_POOL =
50 Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
51
52 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
53 private ClusterService clusterService;
54
55 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
56 private LockService lockService;
57
58 private Map<String, Lock> openContests = Maps.newHashMap();
59 private Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
60 private ControllerNode localNode;
61
62 @Activate
63 public void activate() {
64 localNode = clusterService.getLocalNode();
65 log.info("Started.");
66 }
67
68 @Deactivate
69 public void deactivate() {
70 THREAD_POOL.shutdown();
71 log.info("Stopped.");
72 }
73
74 @Override
75 public void runForLeadership(String path) {
76 checkArgument(path != null);
77 if (openContests.containsKey(path)) {
78 log.info("Already in the leadership contest for {}", path);
79 return;
80 } else {
81 Lock lock = lockService.create(path);
82 openContests.put(path, lock);
83 tryAcquireLeadership(path);
84 }
85 }
86
87 @Override
88 public void withdraw(String path) {
89 checkArgument(path != null);
90 Lock lock = openContests.remove(path);
91
92 if (lock != null && lock.isLocked()) {
93 lock.unlock();
94 notifyListeners(
95 new LeadershipEvent(
96 LeadershipEvent.Type.LEADER_BOOTED,
97 new Leadership(lock.path(), localNode, 0)));
98 // FIXME: Should set the correct term information.
99 }
100 }
101
102 @Override
103 public void addListener(LeadershipEventListener listener) {
104 checkArgument(listener != null);
105 listeners.add(listener);
106 }
107
108 @Override
109 public void removeListener(LeadershipEventListener listener) {
110 checkArgument(listener != null);
111 listeners.remove(listener);
112 }
113
114 private void notifyListeners(LeadershipEvent event) {
115 for (LeadershipEventListener listener : listeners) {
116 listener.event(event);
117 }
118 }
119
120 private void tryAcquireLeadership(String path) {
121 Lock lock = openContests.get(path);
122 verifyNotNull(lock, "Lock should not be null");
123 lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
124 if (error == null) {
125 THREAD_POOL.schedule(
126 new RelectionTask(lock),
127 TERM_DURATION_MS / 2,
128 TimeUnit.MILLISECONDS);
129 notifyListeners(
130 new LeadershipEvent(
131 LeadershipEvent.Type.LEADER_ELECTED,
132 new Leadership(lock.path(), localNode, 0)));
133 } else {
134 log.error("Failed to acquire lock for {}", path, error);
135 // retry
136 tryAcquireLeadership(path);
137 }
138 });
139 }
140
141 private class RelectionTask implements Runnable {
142
143 private final Lock lock;
144
145 public RelectionTask(Lock lock) {
146 this.lock = lock;
147 }
148
149 @Override
150 public void run() {
151 if (lock.extendExpiration(TERM_DURATION_MS)) {
152 notifyListeners(
153 new LeadershipEvent(
154 LeadershipEvent.Type.LEADER_REELECTED,
155 new Leadership(lock.path(), localNode, 0)));
156 THREAD_POOL.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
157 } else {
158 if (openContests.containsKey(lock.path())) {
159 notifyListeners(
160 new LeadershipEvent(
161 LeadershipEvent.Type.LEADER_BOOTED,
162 new Leadership(lock.path(), localNode, 0)));
163 tryAcquireLeadership(lock.path());
164 }
165 }
166 }
167 }
168}