blob: b1bae21db4380260d9497b43d12bfb6d94137c74 [file] [log] [blame]
Madan Jampani620f70d2016-01-30 22:22:47 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani620f70d2016-01-30 22:22:47 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.cluster.impl;
17
Jordan Halterman1f2e8042017-10-18 16:38:03 -070018import java.util.Dictionary;
Madan Jampani620f70d2016-01-30 22:22:47 -080019import java.util.Map;
Madan Jampani78be2492016-06-03 23:27:07 -070020import java.util.Objects;
Jon Hall7a8bfc62016-05-26 17:59:04 -070021import java.util.concurrent.ExecutorService;
22import java.util.concurrent.Executors;
Madan Jampani78be2492016-06-03 23:27:07 -070023import java.util.function.Consumer;
Jordan Halterman980a8c12017-09-22 18:01:19 -070024import java.util.stream.Collectors;
Madan Jampani620f70d2016-01-30 22:22:47 -080025
Jon Hall7a8bfc62016-05-26 17:59:04 -070026import com.google.common.collect.Maps;
Madan Jampani620f70d2016-01-30 22:22:47 -080027import org.apache.felix.scr.annotations.Activate;
28import org.apache.felix.scr.annotations.Component;
29import org.apache.felix.scr.annotations.Deactivate;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070030import org.apache.felix.scr.annotations.Modified;
31import org.apache.felix.scr.annotations.Property;
Madan Jampani620f70d2016-01-30 22:22:47 -080032import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
34import org.apache.felix.scr.annotations.Service;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070035import org.onosproject.cfg.ComponentConfigService;
Madan Jampani620f70d2016-01-30 22:22:47 -080036import org.onosproject.cluster.ClusterService;
Madan Jampani620f70d2016-01-30 22:22:47 -080037import org.onosproject.cluster.Leadership;
38import org.onosproject.cluster.LeadershipEvent;
39import org.onosproject.cluster.LeadershipStore;
40import org.onosproject.cluster.LeadershipStoreDelegate;
41import org.onosproject.cluster.NodeId;
Jordan Halterman980a8c12017-09-22 18:01:19 -070042import org.onosproject.core.Version;
43import org.onosproject.core.VersionService;
Madan Jampani78be2492016-06-03 23:27:07 -070044import org.onosproject.event.Change;
Madan Jampani620f70d2016-01-30 22:22:47 -080045import org.onosproject.store.AbstractStore;
Jon Hall7a8bfc62016-05-26 17:59:04 -070046import org.onosproject.store.service.DistributedPrimitive.Status;
Jordan Halterman980a8c12017-09-22 18:01:19 -070047import org.onosproject.store.service.CoordinationService;
Madan Jampani78be2492016-06-03 23:27:07 -070048import org.onosproject.store.service.LeaderElector;
Jordan Halterman980a8c12017-09-22 18:01:19 -070049import org.onosproject.upgrade.UpgradeEvent;
50import org.onosproject.upgrade.UpgradeEventListener;
51import org.onosproject.upgrade.UpgradeService;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070052import org.osgi.service.component.ComponentContext;
Madan Jampani620f70d2016-01-30 22:22:47 -080053import org.slf4j.Logger;
54
Jordan Halterman1f2e8042017-10-18 16:38:03 -070055import static com.google.common.base.Strings.isNullOrEmpty;
56import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
57import static org.onlab.util.Tools.get;
Jordan Halterman980a8c12017-09-22 18:01:19 -070058import static org.onlab.util.Tools.groupedThreads;
59import static org.slf4j.LoggerFactory.getLogger;
60
Madan Jampani620f70d2016-01-30 22:22:47 -080061/**
Madan Jampani78be2492016-06-03 23:27:07 -070062 * Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
63 * primitive.
Madan Jampani620f70d2016-01-30 22:22:47 -080064 */
65@Service
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070066@Component(immediate = true)
Madan Jampani620f70d2016-01-30 22:22:47 -080067public class DistributedLeadershipStore
68 extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
69 implements LeadershipStore {
70
Jordan Halterman980a8c12017-09-22 18:01:19 -070071 private static final char VERSION_SEP = '|';
72
Madan Jampani78be2492016-06-03 23:27:07 -070073 private final Logger log = getLogger(getClass());
Madan Jampani620f70d2016-01-30 22:22:47 -080074
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected ClusterService clusterService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Halterman980a8c12017-09-22 18:01:19 -070079 protected CoordinationService storageService;
80
Jordan Halterman1f2e8042017-10-18 16:38:03 -070081 @Reference(cardinality = MANDATORY_UNARY)
82 protected ComponentConfigService configService;
83
Jordan Halterman980a8c12017-09-22 18:01:19 -070084 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected VersionService versionService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected UpgradeService upgradeService;
Madan Jampani620f70d2016-01-30 22:22:47 -080089
Jordan Halterman19486e32017-11-02 15:00:06 -070090 private static final long DEFAULT_ELECTION_TIMEOUT_MILLIS = 250;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070091 @Property(name = "electionTimeoutMillis", longValue = DEFAULT_ELECTION_TIMEOUT_MILLIS,
92 label = "the leader election timeout in milliseconds")
93 private long electionTimeoutMillis = DEFAULT_ELECTION_TIMEOUT_MILLIS;
94
Jon Hall7a8bfc62016-05-26 17:59:04 -070095 private ExecutorService statusChangeHandler;
Madan Jampani78be2492016-06-03 23:27:07 -070096 private NodeId localNodeId;
97 private LeaderElector leaderElector;
Jon Hall7a8bfc62016-05-26 17:59:04 -070098 private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
Jordan Halterman980a8c12017-09-22 18:01:19 -070099 private final UpgradeEventListener upgradeListener = new InternalUpgradeEventListener();
Madan Jampani72282af2016-02-23 14:23:52 -0800100
Madan Jampani78be2492016-06-03 23:27:07 -0700101 private final Consumer<Change<Leadership>> leadershipChangeListener =
102 change -> {
103 Leadership oldValue = change.oldValue();
104 Leadership newValue = change.newValue();
Jordan Halterman980a8c12017-09-22 18:01:19 -0700105
106 // If the topic is not relevant to this version, skip the event.
107 if (!isLocalTopic(newValue.topic())) {
108 return;
109 }
110
Madan Jampani78be2492016-06-03 23:27:07 -0700111 boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
112 boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
Jordan Halterman980a8c12017-09-22 18:01:19 -0700113
Madan Jampani620f70d2016-01-30 22:22:47 -0800114 LeadershipEvent.Type eventType = null;
115 if (leaderChanged && candidatesChanged) {
116 eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
117 }
118 if (leaderChanged && !candidatesChanged) {
119 eventType = LeadershipEvent.Type.LEADER_CHANGED;
120 }
121 if (!leaderChanged && candidatesChanged) {
122 eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
123 }
Jordan Halterman980a8c12017-09-22 18:01:19 -0700124 notifyDelegate(new LeadershipEvent(eventType, new Leadership(
125 parseTopic(change.newValue().topic()),
126 change.newValue().leader(),
127 change.newValue().candidates())));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700128 // Update local cache of currently held leaderships
129 if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
130 localLeaderCache.put(newValue.topic(), newValue);
131 } else {
132 localLeaderCache.remove(newValue.topic());
133 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800134 };
135
Jon Hall7a8bfc62016-05-26 17:59:04 -0700136 private final Consumer<Status> clientStatusListener = status ->
137 statusChangeHandler.execute(() -> handleStatusChange(status));
138
139 private void handleStatusChange(Status status) {
140 // Notify mastership Service of disconnect and reconnect
141 if (status == Status.ACTIVE) {
142 // Service Restored
143 localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
144 leaderElector.getLeaderships().forEach((topic, leadership) ->
Jordan Halterman980a8c12017-09-22 18:01:19 -0700145 notifyDelegate(new LeadershipEvent(
146 LeadershipEvent.Type.SERVICE_RESTORED,
147 new Leadership(
148 parseTopic(leadership.topic()),
149 leadership.leader(),
150 leadership.candidates()))));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700151 } else if (status == Status.SUSPENDED) {
152 // Service Suspended
153 localLeaderCache.forEach((topic, leadership) ->
Jordan Halterman980a8c12017-09-22 18:01:19 -0700154 notifyDelegate(new LeadershipEvent(
155 LeadershipEvent.Type.SERVICE_DISRUPTED,
156 new Leadership(
157 parseTopic(leadership.topic()),
158 leadership.leader(),
159 leadership.candidates()))));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700160 } else {
161 // Should be only inactive state
162 return;
163 }
164 }
165
Madan Jampani620f70d2016-01-30 22:22:47 -0800166 @Activate
167 public void activate() {
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700168 configService.registerProperties(getClass());
Jon Hall7a8bfc62016-05-26 17:59:04 -0700169 statusChangeHandler = Executors.newSingleThreadExecutor(
170 groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
Madan Jampani620f70d2016-01-30 22:22:47 -0800171 localNodeId = clusterService.getLocalNode().id();
Madan Jampani78be2492016-06-03 23:27:07 -0700172 leaderElector = storageService.leaderElectorBuilder()
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700173 .withName("onos-leadership-elections")
174 .withElectionTimeout(electionTimeoutMillis)
175 .build()
176 .asLeaderElector();
Madan Jampani78be2492016-06-03 23:27:07 -0700177 leaderElector.addChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700178 leaderElector.addStatusChangeListener(clientStatusListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700179 upgradeService.addListener(upgradeListener);
Madan Jampani620f70d2016-01-30 22:22:47 -0800180 log.info("Started");
181 }
182
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700183 @Modified
184 public void modified(ComponentContext context) {
185 if (context == null) {
186 return;
187 }
188
189 Dictionary<?, ?> properties = context.getProperties();
190 long newElectionTimeoutMillis;
191 try {
192 String s = get(properties, "electionTimeoutMillis");
193 newElectionTimeoutMillis = isNullOrEmpty(s) ? electionTimeoutMillis : Long.parseLong(s.trim());
194 } catch (NumberFormatException | ClassCastException e) {
195 log.warn("Malformed configuration detected; using defaults", e);
196 newElectionTimeoutMillis = DEFAULT_ELECTION_TIMEOUT_MILLIS;
197 }
198
199 if (newElectionTimeoutMillis != electionTimeoutMillis) {
200 electionTimeoutMillis = newElectionTimeoutMillis;
201 leaderElector = storageService.leaderElectorBuilder()
202 .withName("onos-leadership-elections")
203 .withElectionTimeout(electionTimeoutMillis)
204 .build()
205 .asLeaderElector();
206 }
207 }
208
Madan Jampani620f70d2016-01-30 22:22:47 -0800209 @Deactivate
210 public void deactivate() {
Madan Jampani78be2492016-06-03 23:27:07 -0700211 leaderElector.removeChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700212 leaderElector.removeStatusChangeListener(clientStatusListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700213 upgradeService.removeListener(upgradeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700214 statusChangeHandler.shutdown();
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700215 configService.unregisterProperties(getClass(), false);
Madan Jampani620f70d2016-01-30 22:22:47 -0800216 log.info("Stopped");
217 }
218
219 @Override
220 public Leadership addRegistration(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700221 leaderElector.run(getLocalTopic(topic), localNodeId);
222 return getLeadership(topic);
Madan Jampani620f70d2016-01-30 22:22:47 -0800223 }
224
225 @Override
226 public void removeRegistration(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700227 leaderElector.withdraw(getLocalTopic(topic));
Madan Jampani620f70d2016-01-30 22:22:47 -0800228 }
229
230 @Override
231 public void removeRegistration(NodeId nodeId) {
Madan Jampani78be2492016-06-03 23:27:07 -0700232 leaderElector.evict(nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800233 }
234
235 @Override
236 public boolean moveLeadership(String topic, NodeId toNodeId) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700237 return leaderElector.anoint(getTopicFor(topic, toNodeId), toNodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800238 }
239
240 @Override
241 public boolean makeTopCandidate(String topic, NodeId nodeId) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700242 return leaderElector.promote(getTopicFor(topic, nodeId), nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800243 }
244
245 @Override
246 public Leadership getLeadership(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700247 Leadership leadership = leaderElector.getLeadership(getActiveTopic(topic));
248 return leadership != null ? new Leadership(
249 parseTopic(leadership.topic()),
250 leadership.leader(),
251 leadership.candidates()) : null;
Madan Jampani620f70d2016-01-30 22:22:47 -0800252 }
253
254 @Override
255 public Map<String, Leadership> getLeaderships() {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700256 Map<String, Leadership> leaderships = leaderElector.getLeaderships();
257 return leaderships.entrySet().stream()
258 .filter(e -> isActiveTopic(e.getKey()))
259 .collect(Collectors.toMap(e -> parseTopic(e.getKey()),
260 e -> new Leadership(parseTopic(e.getKey()), e.getValue().leader(), e.getValue().candidates())));
261 }
262
263 /**
264 * Returns a leader elector topic namespaced with the local node's version.
265 *
266 * @param topic the base topic
267 * @return a topic string namespaced with the local node's version
268 */
269 private String getLocalTopic(String topic) {
270 return topic + VERSION_SEP + versionService.version();
271 }
272
273 /**
274 * Returns a leader elector topic namespaced with the current cluster version.
275 *
276 * @param topic the base topic
277 * @return a topic string namespaced with the current cluster version
278 */
279 private String getActiveTopic(String topic) {
280 return topic + VERSION_SEP + upgradeService.getVersion();
281 }
282
283 /**
284 * Returns whether the given topic is a topic for the local version.
285 *
286 * @param topic the topic to check
287 * @return whether the given topic is relevant to the local version
288 */
289 private boolean isLocalTopic(String topic) {
290 return topic.endsWith(versionService.version().toString());
291 }
292
293 /**
294 * Returns whether the given topic is a topic for the current cluster version.
295 *
296 * @param topic the topic to check
297 * @return whether the given topic is relevant to the current cluster version
298 */
299 private boolean isActiveTopic(String topic) {
300 return topic.endsWith(VERSION_SEP + upgradeService.getVersion().toString());
301 }
302
303 /**
304 * Parses a topic string, returning the base topic.
305 *
306 * @param topic the topic string to parse
307 * @return the base topic string
308 */
309 private String parseTopic(String topic) {
310 return topic.substring(0, topic.lastIndexOf(VERSION_SEP));
311 }
312
313 /**
314 * Returns the versioned topic for the given node.
315 *
316 * @param topic the topic for the given node
317 * @param nodeId the node for which to return the namespaced topic
318 * @return the versioned topic for the given node
319 */
320 private String getTopicFor(String topic, NodeId nodeId) {
321 Version nodeVersion = clusterService.getVersion(nodeId);
322 return nodeVersion != null ? topic + VERSION_SEP + nodeVersion : topic + VERSION_SEP + versionService.version();
323 }
324
325 /**
326 * Internal upgrade event listener.
327 */
328 private class InternalUpgradeEventListener implements UpgradeEventListener {
329 @Override
330 public void event(UpgradeEvent event) {
331 if (event.type() == UpgradeEvent.Type.UPGRADED || event.type() == UpgradeEvent.Type.ROLLED_BACK) {
332 // Iterate through all current leaderships for the new version and trigger events.
333 for (Leadership leadership : getLeaderships().values()) {
334 notifyDelegate(new LeadershipEvent(
335 LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED,
336 leadership));
337 }
338 }
339 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800340 }
341}