blob: 43959de9f1db28e1e2de205729246754843d4edb [file] [log] [blame]
Madan Jampani620f70d2016-01-30 22:22:47 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani620f70d2016-01-30 22:22:47 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.cluster.impl;
17
Jon Hall7a8bfc62016-05-26 17:59:04 -070018import com.google.common.collect.Maps;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070019import org.onosproject.cfg.ComponentConfigService;
Madan Jampani620f70d2016-01-30 22:22:47 -080020import org.onosproject.cluster.ClusterService;
Madan Jampani620f70d2016-01-30 22:22:47 -080021import org.onosproject.cluster.Leadership;
22import org.onosproject.cluster.LeadershipEvent;
23import org.onosproject.cluster.LeadershipStore;
24import org.onosproject.cluster.LeadershipStoreDelegate;
25import org.onosproject.cluster.NodeId;
Jordan Halterman980a8c12017-09-22 18:01:19 -070026import org.onosproject.core.Version;
27import org.onosproject.core.VersionService;
Madan Jampani78be2492016-06-03 23:27:07 -070028import org.onosproject.event.Change;
Madan Jampani620f70d2016-01-30 22:22:47 -080029import org.onosproject.store.AbstractStore;
Jordan Halterman980a8c12017-09-22 18:01:19 -070030import org.onosproject.store.service.CoordinationService;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070031import org.onosproject.store.service.DistributedPrimitive.Status;
Madan Jampani78be2492016-06-03 23:27:07 -070032import org.onosproject.store.service.LeaderElector;
Jordan Halterman980a8c12017-09-22 18:01:19 -070033import org.onosproject.upgrade.UpgradeEvent;
34import org.onosproject.upgrade.UpgradeEventListener;
35import org.onosproject.upgrade.UpgradeService;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070036import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070037import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Modified;
41import org.osgi.service.component.annotations.Reference;
Madan Jampani620f70d2016-01-30 22:22:47 -080042import org.slf4j.Logger;
43
Ray Milkeyd84f89b2018-08-17 14:54:17 -070044import java.util.Dictionary;
45import java.util.Map;
46import java.util.Objects;
47import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49import java.util.function.Consumer;
50import java.util.stream.Collectors;
51
Jordan Halterman1f2e8042017-10-18 16:38:03 -070052import static com.google.common.base.Strings.isNullOrEmpty;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070053import static org.onlab.util.Tools.get;
Jordan Halterman980a8c12017-09-22 18:01:19 -070054import static org.onlab.util.Tools.groupedThreads;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070055import static org.osgi.service.component.annotations.ReferenceCardinality.MANDATORY;
Jordan Halterman980a8c12017-09-22 18:01:19 -070056import static org.slf4j.LoggerFactory.getLogger;
Ray Milkeyb5646e62018-10-16 11:42:18 -070057import static org.onosproject.store.OsgiPropertyConstants.*;
Jordan Halterman980a8c12017-09-22 18:01:19 -070058
Madan Jampani620f70d2016-01-30 22:22:47 -080059/**
Madan Jampani78be2492016-06-03 23:27:07 -070060 * Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
61 * primitive.
Madan Jampani620f70d2016-01-30 22:22:47 -080062 */
Ray Milkeyb5646e62018-10-16 11:42:18 -070063@Component(
64 immediate = true,
65 service = LeadershipStore.class,
66 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -070067 ELECTION_TIMEOUT_MILLIS + ":Long=" + ELECTION_TIMEOUT_MILLIS_DEFAULT
Ray Milkeyb5646e62018-10-16 11:42:18 -070068 }
69)
Madan Jampani620f70d2016-01-30 22:22:47 -080070public class DistributedLeadershipStore
71 extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
72 implements LeadershipStore {
73
Jordan Halterman980a8c12017-09-22 18:01:19 -070074 private static final char VERSION_SEP = '|';
75
Madan Jampani78be2492016-06-03 23:27:07 -070076 private final Logger log = getLogger(getClass());
Madan Jampani620f70d2016-01-30 22:22:47 -080077
Ray Milkeyd84f89b2018-08-17 14:54:17 -070078 @Reference(cardinality = MANDATORY)
Madan Jampani620f70d2016-01-30 22:22:47 -080079 protected ClusterService clusterService;
80
Ray Milkeyd84f89b2018-08-17 14:54:17 -070081 @Reference(cardinality = MANDATORY)
Jordan Halterman980a8c12017-09-22 18:01:19 -070082 protected CoordinationService storageService;
83
Ray Milkeyd84f89b2018-08-17 14:54:17 -070084 @Reference(cardinality = MANDATORY)
Jordan Halterman1f2e8042017-10-18 16:38:03 -070085 protected ComponentConfigService configService;
86
Ray Milkeyd84f89b2018-08-17 14:54:17 -070087 @Reference(cardinality = MANDATORY)
Jordan Halterman980a8c12017-09-22 18:01:19 -070088 protected VersionService versionService;
89
Ray Milkeyd84f89b2018-08-17 14:54:17 -070090 @Reference(cardinality = MANDATORY)
Jordan Halterman980a8c12017-09-22 18:01:19 -070091 protected UpgradeService upgradeService;
Madan Jampani620f70d2016-01-30 22:22:47 -080092
Thomas Vachuskaf566fa22018-10-30 14:03:36 -070093 /** Leader election timeout in milliseconds. */
Ray Milkeyb5646e62018-10-16 11:42:18 -070094 private long electionTimeoutMillis = ELECTION_TIMEOUT_MILLIS_DEFAULT;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070095
Jon Hall7a8bfc62016-05-26 17:59:04 -070096 private ExecutorService statusChangeHandler;
Madan Jampani78be2492016-06-03 23:27:07 -070097 private NodeId localNodeId;
98 private LeaderElector leaderElector;
Jon Hall7a8bfc62016-05-26 17:59:04 -070099 private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
Jordan Halterman980a8c12017-09-22 18:01:19 -0700100 private final UpgradeEventListener upgradeListener = new InternalUpgradeEventListener();
Madan Jampani72282af2016-02-23 14:23:52 -0800101
Madan Jampani78be2492016-06-03 23:27:07 -0700102 private final Consumer<Change<Leadership>> leadershipChangeListener =
103 change -> {
104 Leadership oldValue = change.oldValue();
105 Leadership newValue = change.newValue();
Jordan Halterman980a8c12017-09-22 18:01:19 -0700106
107 // If the topic is not relevant to this version, skip the event.
108 if (!isLocalTopic(newValue.topic())) {
109 return;
110 }
111
Madan Jampani78be2492016-06-03 23:27:07 -0700112 boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
113 boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
Jordan Halterman980a8c12017-09-22 18:01:19 -0700114
Madan Jampani620f70d2016-01-30 22:22:47 -0800115 LeadershipEvent.Type eventType = null;
116 if (leaderChanged && candidatesChanged) {
117 eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
118 }
119 if (leaderChanged && !candidatesChanged) {
120 eventType = LeadershipEvent.Type.LEADER_CHANGED;
121 }
122 if (!leaderChanged && candidatesChanged) {
123 eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
124 }
Jordan Halterman980a8c12017-09-22 18:01:19 -0700125 notifyDelegate(new LeadershipEvent(eventType, new Leadership(
126 parseTopic(change.newValue().topic()),
127 change.newValue().leader(),
128 change.newValue().candidates())));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700129 // Update local cache of currently held leaderships
130 if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
131 localLeaderCache.put(newValue.topic(), newValue);
132 } else {
133 localLeaderCache.remove(newValue.topic());
134 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800135 };
136
Jon Hall7a8bfc62016-05-26 17:59:04 -0700137 private final Consumer<Status> clientStatusListener = status ->
138 statusChangeHandler.execute(() -> handleStatusChange(status));
139
140 private void handleStatusChange(Status status) {
141 // Notify mastership Service of disconnect and reconnect
142 if (status == Status.ACTIVE) {
143 // Service Restored
144 localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
145 leaderElector.getLeaderships().forEach((topic, leadership) ->
Jordan Halterman980a8c12017-09-22 18:01:19 -0700146 notifyDelegate(new LeadershipEvent(
147 LeadershipEvent.Type.SERVICE_RESTORED,
148 new Leadership(
149 parseTopic(leadership.topic()),
150 leadership.leader(),
151 leadership.candidates()))));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700152 } else if (status == Status.SUSPENDED) {
153 // Service Suspended
154 localLeaderCache.forEach((topic, leadership) ->
Jordan Halterman980a8c12017-09-22 18:01:19 -0700155 notifyDelegate(new LeadershipEvent(
156 LeadershipEvent.Type.SERVICE_DISRUPTED,
157 new Leadership(
158 parseTopic(leadership.topic()),
159 leadership.leader(),
160 leadership.candidates()))));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700161 } else {
162 // Should be only inactive state
163 return;
164 }
165 }
166
Madan Jampani620f70d2016-01-30 22:22:47 -0800167 @Activate
168 public void activate() {
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700169 configService.registerProperties(getClass());
Jon Hall7a8bfc62016-05-26 17:59:04 -0700170 statusChangeHandler = Executors.newSingleThreadExecutor(
171 groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
Madan Jampani620f70d2016-01-30 22:22:47 -0800172 localNodeId = clusterService.getLocalNode().id();
Madan Jampani78be2492016-06-03 23:27:07 -0700173 leaderElector = storageService.leaderElectorBuilder()
Jordan Halterman46c5eaa2018-01-24 16:46:55 -0800174 .withName("onos-leadership-elections")
175 .withElectionTimeout(electionTimeoutMillis)
176 .withRelaxedReadConsistency()
177 .build()
178 .asLeaderElector();
Madan Jampani78be2492016-06-03 23:27:07 -0700179 leaderElector.addChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700180 leaderElector.addStatusChangeListener(clientStatusListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700181 upgradeService.addListener(upgradeListener);
Madan Jampani620f70d2016-01-30 22:22:47 -0800182 log.info("Started");
183 }
184
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700185 @Modified
186 public void modified(ComponentContext context) {
187 if (context == null) {
188 return;
189 }
190
191 Dictionary<?, ?> properties = context.getProperties();
192 long newElectionTimeoutMillis;
193 try {
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700194 String s = get(properties, ELECTION_TIMEOUT_MILLIS);
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700195 newElectionTimeoutMillis = isNullOrEmpty(s) ? electionTimeoutMillis : Long.parseLong(s.trim());
196 } catch (NumberFormatException | ClassCastException e) {
197 log.warn("Malformed configuration detected; using defaults", e);
Ray Milkeyb5646e62018-10-16 11:42:18 -0700198 newElectionTimeoutMillis = ELECTION_TIMEOUT_MILLIS_DEFAULT;
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700199 }
200
201 if (newElectionTimeoutMillis != electionTimeoutMillis) {
202 electionTimeoutMillis = newElectionTimeoutMillis;
203 leaderElector = storageService.leaderElectorBuilder()
204 .withName("onos-leadership-elections")
205 .withElectionTimeout(electionTimeoutMillis)
Jordan Haltermane74e6292018-05-15 00:52:31 -0700206 .withRelaxedReadConsistency()
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700207 .build()
208 .asLeaderElector();
209 }
210 }
211
Madan Jampani620f70d2016-01-30 22:22:47 -0800212 @Deactivate
213 public void deactivate() {
Madan Jampani78be2492016-06-03 23:27:07 -0700214 leaderElector.removeChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700215 leaderElector.removeStatusChangeListener(clientStatusListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700216 upgradeService.removeListener(upgradeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700217 statusChangeHandler.shutdown();
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700218 configService.unregisterProperties(getClass(), false);
Madan Jampani620f70d2016-01-30 22:22:47 -0800219 log.info("Stopped");
220 }
221
222 @Override
223 public Leadership addRegistration(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700224 leaderElector.run(getLocalTopic(topic), localNodeId);
225 return getLeadership(topic);
Madan Jampani620f70d2016-01-30 22:22:47 -0800226 }
227
228 @Override
229 public void removeRegistration(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700230 leaderElector.withdraw(getLocalTopic(topic));
Madan Jampani620f70d2016-01-30 22:22:47 -0800231 }
232
233 @Override
234 public void removeRegistration(NodeId nodeId) {
Madan Jampani78be2492016-06-03 23:27:07 -0700235 leaderElector.evict(nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800236 }
237
238 @Override
239 public boolean moveLeadership(String topic, NodeId toNodeId) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700240 return leaderElector.anoint(getTopicFor(topic, toNodeId), toNodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800241 }
242
243 @Override
244 public boolean makeTopCandidate(String topic, NodeId nodeId) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700245 return leaderElector.promote(getTopicFor(topic, nodeId), nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800246 }
247
248 @Override
249 public Leadership getLeadership(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700250 Leadership leadership = leaderElector.getLeadership(getActiveTopic(topic));
251 return leadership != null ? new Leadership(
252 parseTopic(leadership.topic()),
253 leadership.leader(),
254 leadership.candidates()) : null;
Madan Jampani620f70d2016-01-30 22:22:47 -0800255 }
256
257 @Override
258 public Map<String, Leadership> getLeaderships() {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700259 Map<String, Leadership> leaderships = leaderElector.getLeaderships();
260 return leaderships.entrySet().stream()
261 .filter(e -> isActiveTopic(e.getKey()))
262 .collect(Collectors.toMap(e -> parseTopic(e.getKey()),
263 e -> new Leadership(parseTopic(e.getKey()), e.getValue().leader(), e.getValue().candidates())));
264 }
265
266 /**
267 * Returns a leader elector topic namespaced with the local node's version.
268 *
269 * @param topic the base topic
270 * @return a topic string namespaced with the local node's version
271 */
272 private String getLocalTopic(String topic) {
273 return topic + VERSION_SEP + versionService.version();
274 }
275
276 /**
277 * Returns a leader elector topic namespaced with the current cluster version.
278 *
279 * @param topic the base topic
280 * @return a topic string namespaced with the current cluster version
281 */
282 private String getActiveTopic(String topic) {
283 return topic + VERSION_SEP + upgradeService.getVersion();
284 }
285
286 /**
287 * Returns whether the given topic is a topic for the local version.
288 *
289 * @param topic the topic to check
290 * @return whether the given topic is relevant to the local version
291 */
292 private boolean isLocalTopic(String topic) {
293 return topic.endsWith(versionService.version().toString());
294 }
295
296 /**
297 * Returns whether the given topic is a topic for the current cluster version.
298 *
299 * @param topic the topic to check
300 * @return whether the given topic is relevant to the current cluster version
301 */
302 private boolean isActiveTopic(String topic) {
303 return topic.endsWith(VERSION_SEP + upgradeService.getVersion().toString());
304 }
305
306 /**
307 * Parses a topic string, returning the base topic.
308 *
309 * @param topic the topic string to parse
310 * @return the base topic string
311 */
312 private String parseTopic(String topic) {
313 return topic.substring(0, topic.lastIndexOf(VERSION_SEP));
314 }
315
316 /**
317 * Returns the versioned topic for the given node.
318 *
319 * @param topic the topic for the given node
320 * @param nodeId the node for which to return the namespaced topic
321 * @return the versioned topic for the given node
322 */
323 private String getTopicFor(String topic, NodeId nodeId) {
324 Version nodeVersion = clusterService.getVersion(nodeId);
325 return nodeVersion != null ? topic + VERSION_SEP + nodeVersion : topic + VERSION_SEP + versionService.version();
326 }
327
328 /**
329 * Internal upgrade event listener.
330 */
331 private class InternalUpgradeEventListener implements UpgradeEventListener {
332 @Override
333 public void event(UpgradeEvent event) {
334 if (event.type() == UpgradeEvent.Type.UPGRADED || event.type() == UpgradeEvent.Type.ROLLED_BACK) {
335 // Iterate through all current leaderships for the new version and trigger events.
336 for (Leadership leadership : getLeaderships().values()) {
337 notifyDelegate(new LeadershipEvent(
338 LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED,
339 leadership));
340 }
341 }
342 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800343 }
344}