blob: 171e146a4c96e03e8a592e6508d10d4756913e3d [file] [log] [blame]
Madan Jampani620f70d2016-01-30 22:22:47 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani620f70d2016-01-30 22:22:47 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.cluster.impl;
17
Jon Hall7a8bfc62016-05-26 17:59:04 -070018import com.google.common.collect.Maps;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070019import org.onosproject.cfg.ComponentConfigService;
Madan Jampani620f70d2016-01-30 22:22:47 -080020import org.onosproject.cluster.ClusterService;
Madan Jampani620f70d2016-01-30 22:22:47 -080021import org.onosproject.cluster.Leadership;
22import org.onosproject.cluster.LeadershipEvent;
23import org.onosproject.cluster.LeadershipStore;
24import org.onosproject.cluster.LeadershipStoreDelegate;
25import org.onosproject.cluster.NodeId;
Jordan Halterman980a8c12017-09-22 18:01:19 -070026import org.onosproject.core.Version;
27import org.onosproject.core.VersionService;
Madan Jampani78be2492016-06-03 23:27:07 -070028import org.onosproject.event.Change;
Madan Jampani620f70d2016-01-30 22:22:47 -080029import org.onosproject.store.AbstractStore;
Jordan Halterman980a8c12017-09-22 18:01:19 -070030import org.onosproject.store.service.CoordinationService;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070031import org.onosproject.store.service.DistributedPrimitive.Status;
Madan Jampani78be2492016-06-03 23:27:07 -070032import org.onosproject.store.service.LeaderElector;
Jordan Halterman980a8c12017-09-22 18:01:19 -070033import org.onosproject.upgrade.UpgradeEvent;
34import org.onosproject.upgrade.UpgradeEventListener;
35import org.onosproject.upgrade.UpgradeService;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070036import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070037import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Modified;
41import org.osgi.service.component.annotations.Reference;
Madan Jampani620f70d2016-01-30 22:22:47 -080042import org.slf4j.Logger;
43
Ray Milkeyd84f89b2018-08-17 14:54:17 -070044import java.util.Dictionary;
45import java.util.Map;
46import java.util.Objects;
47import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49import java.util.function.Consumer;
50import java.util.stream.Collectors;
51
Jordan Halterman1f2e8042017-10-18 16:38:03 -070052import static com.google.common.base.Strings.isNullOrEmpty;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070053import static org.onlab.util.Tools.get;
Jordan Halterman980a8c12017-09-22 18:01:19 -070054import static org.onlab.util.Tools.groupedThreads;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070055import static org.osgi.service.component.annotations.ReferenceCardinality.MANDATORY;
Jordan Halterman980a8c12017-09-22 18:01:19 -070056import static org.slf4j.LoggerFactory.getLogger;
Ray Milkeyb5646e62018-10-16 11:42:18 -070057import static org.onosproject.store.OsgiPropertyConstants.*;
Jordan Halterman980a8c12017-09-22 18:01:19 -070058
Madan Jampani620f70d2016-01-30 22:22:47 -080059/**
Madan Jampani78be2492016-06-03 23:27:07 -070060 * Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
61 * primitive.
Madan Jampani620f70d2016-01-30 22:22:47 -080062 */
Ray Milkeyb5646e62018-10-16 11:42:18 -070063@Component(
64 immediate = true,
65 service = LeadershipStore.class,
66 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -070067 ELECTION_TIMEOUT_MILLIS + ":Long=" + ELECTION_TIMEOUT_MILLIS_DEFAULT
Ray Milkeyb5646e62018-10-16 11:42:18 -070068 }
69)
Madan Jampani620f70d2016-01-30 22:22:47 -080070public class DistributedLeadershipStore
71 extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
72 implements LeadershipStore {
73
Jordan Halterman980a8c12017-09-22 18:01:19 -070074 private static final char VERSION_SEP = '|';
75
Madan Jampani78be2492016-06-03 23:27:07 -070076 private final Logger log = getLogger(getClass());
Madan Jampani620f70d2016-01-30 22:22:47 -080077
Ray Milkeyd84f89b2018-08-17 14:54:17 -070078 @Reference(cardinality = MANDATORY)
Madan Jampani620f70d2016-01-30 22:22:47 -080079 protected ClusterService clusterService;
80
Ray Milkeyd84f89b2018-08-17 14:54:17 -070081 @Reference(cardinality = MANDATORY)
Jordan Halterman980a8c12017-09-22 18:01:19 -070082 protected CoordinationService storageService;
83
Ray Milkeyd84f89b2018-08-17 14:54:17 -070084 @Reference(cardinality = MANDATORY)
Jordan Halterman1f2e8042017-10-18 16:38:03 -070085 protected ComponentConfigService configService;
86
Ray Milkeyd84f89b2018-08-17 14:54:17 -070087 @Reference(cardinality = MANDATORY)
Jordan Halterman980a8c12017-09-22 18:01:19 -070088 protected VersionService versionService;
89
Ray Milkeyd84f89b2018-08-17 14:54:17 -070090 @Reference(cardinality = MANDATORY)
Jordan Halterman980a8c12017-09-22 18:01:19 -070091 protected UpgradeService upgradeService;
Madan Jampani620f70d2016-01-30 22:22:47 -080092
Ray Milkeyd84f89b2018-08-17 14:54:17 -070093 //@Property(name = "electionTimeoutMillis", longValue = DEFAULT_ELECTION_TIMEOUT_MILLIS,
94 // label = "the leader election timeout in milliseconds")
Ray Milkeyb5646e62018-10-16 11:42:18 -070095 private long electionTimeoutMillis = ELECTION_TIMEOUT_MILLIS_DEFAULT;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070096
Jon Hall7a8bfc62016-05-26 17:59:04 -070097 private ExecutorService statusChangeHandler;
Madan Jampani78be2492016-06-03 23:27:07 -070098 private NodeId localNodeId;
99 private LeaderElector leaderElector;
Jon Hall7a8bfc62016-05-26 17:59:04 -0700100 private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
Jordan Halterman980a8c12017-09-22 18:01:19 -0700101 private final UpgradeEventListener upgradeListener = new InternalUpgradeEventListener();
Madan Jampani72282af2016-02-23 14:23:52 -0800102
Madan Jampani78be2492016-06-03 23:27:07 -0700103 private final Consumer<Change<Leadership>> leadershipChangeListener =
104 change -> {
105 Leadership oldValue = change.oldValue();
106 Leadership newValue = change.newValue();
Jordan Halterman980a8c12017-09-22 18:01:19 -0700107
108 // If the topic is not relevant to this version, skip the event.
109 if (!isLocalTopic(newValue.topic())) {
110 return;
111 }
112
Madan Jampani78be2492016-06-03 23:27:07 -0700113 boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
114 boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
Jordan Halterman980a8c12017-09-22 18:01:19 -0700115
Madan Jampani620f70d2016-01-30 22:22:47 -0800116 LeadershipEvent.Type eventType = null;
117 if (leaderChanged && candidatesChanged) {
118 eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
119 }
120 if (leaderChanged && !candidatesChanged) {
121 eventType = LeadershipEvent.Type.LEADER_CHANGED;
122 }
123 if (!leaderChanged && candidatesChanged) {
124 eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
125 }
Jordan Halterman980a8c12017-09-22 18:01:19 -0700126 notifyDelegate(new LeadershipEvent(eventType, new Leadership(
127 parseTopic(change.newValue().topic()),
128 change.newValue().leader(),
129 change.newValue().candidates())));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700130 // Update local cache of currently held leaderships
131 if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
132 localLeaderCache.put(newValue.topic(), newValue);
133 } else {
134 localLeaderCache.remove(newValue.topic());
135 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800136 };
137
Jon Hall7a8bfc62016-05-26 17:59:04 -0700138 private final Consumer<Status> clientStatusListener = status ->
139 statusChangeHandler.execute(() -> handleStatusChange(status));
140
141 private void handleStatusChange(Status status) {
142 // Notify mastership Service of disconnect and reconnect
143 if (status == Status.ACTIVE) {
144 // Service Restored
145 localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
146 leaderElector.getLeaderships().forEach((topic, leadership) ->
Jordan Halterman980a8c12017-09-22 18:01:19 -0700147 notifyDelegate(new LeadershipEvent(
148 LeadershipEvent.Type.SERVICE_RESTORED,
149 new Leadership(
150 parseTopic(leadership.topic()),
151 leadership.leader(),
152 leadership.candidates()))));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700153 } else if (status == Status.SUSPENDED) {
154 // Service Suspended
155 localLeaderCache.forEach((topic, leadership) ->
Jordan Halterman980a8c12017-09-22 18:01:19 -0700156 notifyDelegate(new LeadershipEvent(
157 LeadershipEvent.Type.SERVICE_DISRUPTED,
158 new Leadership(
159 parseTopic(leadership.topic()),
160 leadership.leader(),
161 leadership.candidates()))));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700162 } else {
163 // Should be only inactive state
164 return;
165 }
166 }
167
Madan Jampani620f70d2016-01-30 22:22:47 -0800168 @Activate
169 public void activate() {
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700170 configService.registerProperties(getClass());
Jon Hall7a8bfc62016-05-26 17:59:04 -0700171 statusChangeHandler = Executors.newSingleThreadExecutor(
172 groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
Madan Jampani620f70d2016-01-30 22:22:47 -0800173 localNodeId = clusterService.getLocalNode().id();
Madan Jampani78be2492016-06-03 23:27:07 -0700174 leaderElector = storageService.leaderElectorBuilder()
Jordan Halterman46c5eaa2018-01-24 16:46:55 -0800175 .withName("onos-leadership-elections")
176 .withElectionTimeout(electionTimeoutMillis)
177 .withRelaxedReadConsistency()
178 .build()
179 .asLeaderElector();
Madan Jampani78be2492016-06-03 23:27:07 -0700180 leaderElector.addChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700181 leaderElector.addStatusChangeListener(clientStatusListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700182 upgradeService.addListener(upgradeListener);
Madan Jampani620f70d2016-01-30 22:22:47 -0800183 log.info("Started");
184 }
185
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700186 @Modified
187 public void modified(ComponentContext context) {
188 if (context == null) {
189 return;
190 }
191
192 Dictionary<?, ?> properties = context.getProperties();
193 long newElectionTimeoutMillis;
194 try {
195 String s = get(properties, "electionTimeoutMillis");
196 newElectionTimeoutMillis = isNullOrEmpty(s) ? electionTimeoutMillis : Long.parseLong(s.trim());
197 } catch (NumberFormatException | ClassCastException e) {
198 log.warn("Malformed configuration detected; using defaults", e);
Ray Milkeyb5646e62018-10-16 11:42:18 -0700199 newElectionTimeoutMillis = ELECTION_TIMEOUT_MILLIS_DEFAULT;
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700200 }
201
202 if (newElectionTimeoutMillis != electionTimeoutMillis) {
203 electionTimeoutMillis = newElectionTimeoutMillis;
204 leaderElector = storageService.leaderElectorBuilder()
205 .withName("onos-leadership-elections")
206 .withElectionTimeout(electionTimeoutMillis)
Jordan Haltermane74e6292018-05-15 00:52:31 -0700207 .withRelaxedReadConsistency()
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700208 .build()
209 .asLeaderElector();
210 }
211 }
212
Madan Jampani620f70d2016-01-30 22:22:47 -0800213 @Deactivate
214 public void deactivate() {
Madan Jampani78be2492016-06-03 23:27:07 -0700215 leaderElector.removeChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700216 leaderElector.removeStatusChangeListener(clientStatusListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700217 upgradeService.removeListener(upgradeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700218 statusChangeHandler.shutdown();
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700219 configService.unregisterProperties(getClass(), false);
Madan Jampani620f70d2016-01-30 22:22:47 -0800220 log.info("Stopped");
221 }
222
223 @Override
224 public Leadership addRegistration(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700225 leaderElector.run(getLocalTopic(topic), localNodeId);
226 return getLeadership(topic);
Madan Jampani620f70d2016-01-30 22:22:47 -0800227 }
228
229 @Override
230 public void removeRegistration(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700231 leaderElector.withdraw(getLocalTopic(topic));
Madan Jampani620f70d2016-01-30 22:22:47 -0800232 }
233
234 @Override
235 public void removeRegistration(NodeId nodeId) {
Madan Jampani78be2492016-06-03 23:27:07 -0700236 leaderElector.evict(nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800237 }
238
239 @Override
240 public boolean moveLeadership(String topic, NodeId toNodeId) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700241 return leaderElector.anoint(getTopicFor(topic, toNodeId), toNodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800242 }
243
244 @Override
245 public boolean makeTopCandidate(String topic, NodeId nodeId) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700246 return leaderElector.promote(getTopicFor(topic, nodeId), nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800247 }
248
249 @Override
250 public Leadership getLeadership(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700251 Leadership leadership = leaderElector.getLeadership(getActiveTopic(topic));
252 return leadership != null ? new Leadership(
253 parseTopic(leadership.topic()),
254 leadership.leader(),
255 leadership.candidates()) : null;
Madan Jampani620f70d2016-01-30 22:22:47 -0800256 }
257
258 @Override
259 public Map<String, Leadership> getLeaderships() {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700260 Map<String, Leadership> leaderships = leaderElector.getLeaderships();
261 return leaderships.entrySet().stream()
262 .filter(e -> isActiveTopic(e.getKey()))
263 .collect(Collectors.toMap(e -> parseTopic(e.getKey()),
264 e -> new Leadership(parseTopic(e.getKey()), e.getValue().leader(), e.getValue().candidates())));
265 }
266
267 /**
268 * Returns a leader elector topic namespaced with the local node's version.
269 *
270 * @param topic the base topic
271 * @return a topic string namespaced with the local node's version
272 */
273 private String getLocalTopic(String topic) {
274 return topic + VERSION_SEP + versionService.version();
275 }
276
277 /**
278 * Returns a leader elector topic namespaced with the current cluster version.
279 *
280 * @param topic the base topic
281 * @return a topic string namespaced with the current cluster version
282 */
283 private String getActiveTopic(String topic) {
284 return topic + VERSION_SEP + upgradeService.getVersion();
285 }
286
287 /**
288 * Returns whether the given topic is a topic for the local version.
289 *
290 * @param topic the topic to check
291 * @return whether the given topic is relevant to the local version
292 */
293 private boolean isLocalTopic(String topic) {
294 return topic.endsWith(versionService.version().toString());
295 }
296
297 /**
298 * Returns whether the given topic is a topic for the current cluster version.
299 *
300 * @param topic the topic to check
301 * @return whether the given topic is relevant to the current cluster version
302 */
303 private boolean isActiveTopic(String topic) {
304 return topic.endsWith(VERSION_SEP + upgradeService.getVersion().toString());
305 }
306
307 /**
308 * Parses a topic string, returning the base topic.
309 *
310 * @param topic the topic string to parse
311 * @return the base topic string
312 */
313 private String parseTopic(String topic) {
314 return topic.substring(0, topic.lastIndexOf(VERSION_SEP));
315 }
316
317 /**
318 * Returns the versioned topic for the given node.
319 *
320 * @param topic the topic for the given node
321 * @param nodeId the node for which to return the namespaced topic
322 * @return the versioned topic for the given node
323 */
324 private String getTopicFor(String topic, NodeId nodeId) {
325 Version nodeVersion = clusterService.getVersion(nodeId);
326 return nodeVersion != null ? topic + VERSION_SEP + nodeVersion : topic + VERSION_SEP + versionService.version();
327 }
328
329 /**
330 * Internal upgrade event listener.
331 */
332 private class InternalUpgradeEventListener implements UpgradeEventListener {
333 @Override
334 public void event(UpgradeEvent event) {
335 if (event.type() == UpgradeEvent.Type.UPGRADED || event.type() == UpgradeEvent.Type.ROLLED_BACK) {
336 // Iterate through all current leaderships for the new version and trigger events.
337 for (Leadership leadership : getLeaderships().values()) {
338 notifyDelegate(new LeadershipEvent(
339 LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED,
340 leadership));
341 }
342 }
343 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800344 }
345}