blob: c3a39771a5fe087cbf05846e71e62dad9272caa7 [file] [log] [blame]
Madan Jampani620f70d2016-01-30 22:22:47 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani620f70d2016-01-30 22:22:47 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.cluster.impl;
17
Jordan Halterman1f2e8042017-10-18 16:38:03 -070018import java.util.Dictionary;
Madan Jampani620f70d2016-01-30 22:22:47 -080019import java.util.Map;
Madan Jampani78be2492016-06-03 23:27:07 -070020import java.util.Objects;
Jon Hall7a8bfc62016-05-26 17:59:04 -070021import java.util.concurrent.ExecutorService;
22import java.util.concurrent.Executors;
Madan Jampani78be2492016-06-03 23:27:07 -070023import java.util.function.Consumer;
Jordan Halterman980a8c12017-09-22 18:01:19 -070024import java.util.stream.Collectors;
Madan Jampani620f70d2016-01-30 22:22:47 -080025
Jon Hall7a8bfc62016-05-26 17:59:04 -070026import com.google.common.collect.Maps;
Madan Jampani620f70d2016-01-30 22:22:47 -080027import org.apache.felix.scr.annotations.Activate;
28import org.apache.felix.scr.annotations.Component;
29import org.apache.felix.scr.annotations.Deactivate;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070030import org.apache.felix.scr.annotations.Modified;
31import org.apache.felix.scr.annotations.Property;
Madan Jampani620f70d2016-01-30 22:22:47 -080032import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
34import org.apache.felix.scr.annotations.Service;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070035import org.onosproject.cfg.ComponentConfigService;
Madan Jampani620f70d2016-01-30 22:22:47 -080036import org.onosproject.cluster.ClusterService;
Madan Jampani620f70d2016-01-30 22:22:47 -080037import org.onosproject.cluster.Leadership;
38import org.onosproject.cluster.LeadershipEvent;
39import org.onosproject.cluster.LeadershipStore;
40import org.onosproject.cluster.LeadershipStoreDelegate;
41import org.onosproject.cluster.NodeId;
Jordan Halterman980a8c12017-09-22 18:01:19 -070042import org.onosproject.core.Version;
43import org.onosproject.core.VersionService;
Madan Jampani78be2492016-06-03 23:27:07 -070044import org.onosproject.event.Change;
Madan Jampani620f70d2016-01-30 22:22:47 -080045import org.onosproject.store.AbstractStore;
Jon Hall7a8bfc62016-05-26 17:59:04 -070046import org.onosproject.store.service.DistributedPrimitive.Status;
Jordan Halterman980a8c12017-09-22 18:01:19 -070047import org.onosproject.store.service.CoordinationService;
Madan Jampani78be2492016-06-03 23:27:07 -070048import org.onosproject.store.service.LeaderElector;
Jordan Halterman980a8c12017-09-22 18:01:19 -070049import org.onosproject.upgrade.UpgradeEvent;
50import org.onosproject.upgrade.UpgradeEventListener;
51import org.onosproject.upgrade.UpgradeService;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070052import org.osgi.service.component.ComponentContext;
Madan Jampani620f70d2016-01-30 22:22:47 -080053import org.slf4j.Logger;
54
Jordan Halterman1f2e8042017-10-18 16:38:03 -070055import static com.google.common.base.Strings.isNullOrEmpty;
56import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
57import static org.onlab.util.Tools.get;
Jordan Halterman980a8c12017-09-22 18:01:19 -070058import static org.onlab.util.Tools.groupedThreads;
59import static org.slf4j.LoggerFactory.getLogger;
60
Madan Jampani620f70d2016-01-30 22:22:47 -080061/**
Madan Jampani78be2492016-06-03 23:27:07 -070062 * Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
63 * primitive.
Madan Jampani620f70d2016-01-30 22:22:47 -080064 */
65@Service
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070066@Component(immediate = true)
Madan Jampani620f70d2016-01-30 22:22:47 -080067public class DistributedLeadershipStore
68 extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
69 implements LeadershipStore {
70
Jordan Halterman980a8c12017-09-22 18:01:19 -070071 private static final char VERSION_SEP = '|';
72
Madan Jampani78be2492016-06-03 23:27:07 -070073 private final Logger log = getLogger(getClass());
Madan Jampani620f70d2016-01-30 22:22:47 -080074
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected ClusterService clusterService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Halterman980a8c12017-09-22 18:01:19 -070079 protected CoordinationService storageService;
80
Jordan Halterman1f2e8042017-10-18 16:38:03 -070081 @Reference(cardinality = MANDATORY_UNARY)
82 protected ComponentConfigService configService;
83
Jordan Halterman980a8c12017-09-22 18:01:19 -070084 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected VersionService versionService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected UpgradeService upgradeService;
Madan Jampani620f70d2016-01-30 22:22:47 -080089
Jordan Halterman19486e32017-11-02 15:00:06 -070090 private static final long DEFAULT_ELECTION_TIMEOUT_MILLIS = 250;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070091 @Property(name = "electionTimeoutMillis", longValue = DEFAULT_ELECTION_TIMEOUT_MILLIS,
92 label = "the leader election timeout in milliseconds")
93 private long electionTimeoutMillis = DEFAULT_ELECTION_TIMEOUT_MILLIS;
94
Jon Hall7a8bfc62016-05-26 17:59:04 -070095 private ExecutorService statusChangeHandler;
Madan Jampani78be2492016-06-03 23:27:07 -070096 private NodeId localNodeId;
97 private LeaderElector leaderElector;
Jon Hall7a8bfc62016-05-26 17:59:04 -070098 private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
Jordan Halterman980a8c12017-09-22 18:01:19 -070099 private final UpgradeEventListener upgradeListener = new InternalUpgradeEventListener();
Madan Jampani72282af2016-02-23 14:23:52 -0800100
Madan Jampani78be2492016-06-03 23:27:07 -0700101 private final Consumer<Change<Leadership>> leadershipChangeListener =
102 change -> {
103 Leadership oldValue = change.oldValue();
104 Leadership newValue = change.newValue();
Jordan Halterman980a8c12017-09-22 18:01:19 -0700105
106 // If the topic is not relevant to this version, skip the event.
107 if (!isLocalTopic(newValue.topic())) {
108 return;
109 }
110
Madan Jampani78be2492016-06-03 23:27:07 -0700111 boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
112 boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
Jordan Halterman980a8c12017-09-22 18:01:19 -0700113
Madan Jampani620f70d2016-01-30 22:22:47 -0800114 LeadershipEvent.Type eventType = null;
115 if (leaderChanged && candidatesChanged) {
116 eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
117 }
118 if (leaderChanged && !candidatesChanged) {
119 eventType = LeadershipEvent.Type.LEADER_CHANGED;
120 }
121 if (!leaderChanged && candidatesChanged) {
122 eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
123 }
Jordan Halterman980a8c12017-09-22 18:01:19 -0700124 notifyDelegate(new LeadershipEvent(eventType, new Leadership(
125 parseTopic(change.newValue().topic()),
126 change.newValue().leader(),
127 change.newValue().candidates())));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700128 // Update local cache of currently held leaderships
129 if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
130 localLeaderCache.put(newValue.topic(), newValue);
131 } else {
132 localLeaderCache.remove(newValue.topic());
133 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800134 };
135
Jon Hall7a8bfc62016-05-26 17:59:04 -0700136 private final Consumer<Status> clientStatusListener = status ->
137 statusChangeHandler.execute(() -> handleStatusChange(status));
138
139 private void handleStatusChange(Status status) {
140 // Notify mastership Service of disconnect and reconnect
141 if (status == Status.ACTIVE) {
142 // Service Restored
143 localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
144 leaderElector.getLeaderships().forEach((topic, leadership) ->
Jordan Halterman980a8c12017-09-22 18:01:19 -0700145 notifyDelegate(new LeadershipEvent(
146 LeadershipEvent.Type.SERVICE_RESTORED,
147 new Leadership(
148 parseTopic(leadership.topic()),
149 leadership.leader(),
150 leadership.candidates()))));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700151 } else if (status == Status.SUSPENDED) {
152 // Service Suspended
153 localLeaderCache.forEach((topic, leadership) ->
Jordan Halterman980a8c12017-09-22 18:01:19 -0700154 notifyDelegate(new LeadershipEvent(
155 LeadershipEvent.Type.SERVICE_DISRUPTED,
156 new Leadership(
157 parseTopic(leadership.topic()),
158 leadership.leader(),
159 leadership.candidates()))));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700160 } else {
161 // Should be only inactive state
162 return;
163 }
164 }
165
Madan Jampani620f70d2016-01-30 22:22:47 -0800166 @Activate
167 public void activate() {
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700168 configService.registerProperties(getClass());
Jon Hall7a8bfc62016-05-26 17:59:04 -0700169 statusChangeHandler = Executors.newSingleThreadExecutor(
170 groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
Madan Jampani620f70d2016-01-30 22:22:47 -0800171 localNodeId = clusterService.getLocalNode().id();
Madan Jampani78be2492016-06-03 23:27:07 -0700172 leaderElector = storageService.leaderElectorBuilder()
Jordan Halterman46c5eaa2018-01-24 16:46:55 -0800173 .withName("onos-leadership-elections")
174 .withElectionTimeout(electionTimeoutMillis)
175 .withRelaxedReadConsistency()
176 .build()
177 .asLeaderElector();
Madan Jampani78be2492016-06-03 23:27:07 -0700178 leaderElector.addChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700179 leaderElector.addStatusChangeListener(clientStatusListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700180 upgradeService.addListener(upgradeListener);
Madan Jampani620f70d2016-01-30 22:22:47 -0800181 log.info("Started");
182 }
183
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700184 @Modified
185 public void modified(ComponentContext context) {
186 if (context == null) {
187 return;
188 }
189
190 Dictionary<?, ?> properties = context.getProperties();
191 long newElectionTimeoutMillis;
192 try {
193 String s = get(properties, "electionTimeoutMillis");
194 newElectionTimeoutMillis = isNullOrEmpty(s) ? electionTimeoutMillis : Long.parseLong(s.trim());
195 } catch (NumberFormatException | ClassCastException e) {
196 log.warn("Malformed configuration detected; using defaults", e);
197 newElectionTimeoutMillis = DEFAULT_ELECTION_TIMEOUT_MILLIS;
198 }
199
200 if (newElectionTimeoutMillis != electionTimeoutMillis) {
201 electionTimeoutMillis = newElectionTimeoutMillis;
202 leaderElector = storageService.leaderElectorBuilder()
203 .withName("onos-leadership-elections")
204 .withElectionTimeout(electionTimeoutMillis)
Jordan Haltermane74e6292018-05-15 00:52:31 -0700205 .withRelaxedReadConsistency()
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700206 .build()
207 .asLeaderElector();
208 }
209 }
210
Madan Jampani620f70d2016-01-30 22:22:47 -0800211 @Deactivate
212 public void deactivate() {
Madan Jampani78be2492016-06-03 23:27:07 -0700213 leaderElector.removeChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700214 leaderElector.removeStatusChangeListener(clientStatusListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700215 upgradeService.removeListener(upgradeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700216 statusChangeHandler.shutdown();
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700217 configService.unregisterProperties(getClass(), false);
Madan Jampani620f70d2016-01-30 22:22:47 -0800218 log.info("Stopped");
219 }
220
221 @Override
222 public Leadership addRegistration(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700223 leaderElector.run(getLocalTopic(topic), localNodeId);
224 return getLeadership(topic);
Madan Jampani620f70d2016-01-30 22:22:47 -0800225 }
226
227 @Override
228 public void removeRegistration(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700229 leaderElector.withdraw(getLocalTopic(topic));
Madan Jampani620f70d2016-01-30 22:22:47 -0800230 }
231
232 @Override
233 public void removeRegistration(NodeId nodeId) {
Madan Jampani78be2492016-06-03 23:27:07 -0700234 leaderElector.evict(nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800235 }
236
237 @Override
238 public boolean moveLeadership(String topic, NodeId toNodeId) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700239 return leaderElector.anoint(getTopicFor(topic, toNodeId), toNodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800240 }
241
242 @Override
243 public boolean makeTopCandidate(String topic, NodeId nodeId) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700244 return leaderElector.promote(getTopicFor(topic, nodeId), nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800245 }
246
247 @Override
248 public Leadership getLeadership(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700249 Leadership leadership = leaderElector.getLeadership(getActiveTopic(topic));
250 return leadership != null ? new Leadership(
251 parseTopic(leadership.topic()),
252 leadership.leader(),
253 leadership.candidates()) : null;
Madan Jampani620f70d2016-01-30 22:22:47 -0800254 }
255
256 @Override
257 public Map<String, Leadership> getLeaderships() {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700258 Map<String, Leadership> leaderships = leaderElector.getLeaderships();
259 return leaderships.entrySet().stream()
260 .filter(e -> isActiveTopic(e.getKey()))
261 .collect(Collectors.toMap(e -> parseTopic(e.getKey()),
262 e -> new Leadership(parseTopic(e.getKey()), e.getValue().leader(), e.getValue().candidates())));
263 }
264
265 /**
266 * Returns a leader elector topic namespaced with the local node's version.
267 *
268 * @param topic the base topic
269 * @return a topic string namespaced with the local node's version
270 */
271 private String getLocalTopic(String topic) {
272 return topic + VERSION_SEP + versionService.version();
273 }
274
275 /**
276 * Returns a leader elector topic namespaced with the current cluster version.
277 *
278 * @param topic the base topic
279 * @return a topic string namespaced with the current cluster version
280 */
281 private String getActiveTopic(String topic) {
282 return topic + VERSION_SEP + upgradeService.getVersion();
283 }
284
285 /**
286 * Returns whether the given topic is a topic for the local version.
287 *
288 * @param topic the topic to check
289 * @return whether the given topic is relevant to the local version
290 */
291 private boolean isLocalTopic(String topic) {
292 return topic.endsWith(versionService.version().toString());
293 }
294
295 /**
296 * Returns whether the given topic is a topic for the current cluster version.
297 *
298 * @param topic the topic to check
299 * @return whether the given topic is relevant to the current cluster version
300 */
301 private boolean isActiveTopic(String topic) {
302 return topic.endsWith(VERSION_SEP + upgradeService.getVersion().toString());
303 }
304
305 /**
306 * Parses a topic string, returning the base topic.
307 *
308 * @param topic the topic string to parse
309 * @return the base topic string
310 */
311 private String parseTopic(String topic) {
312 return topic.substring(0, topic.lastIndexOf(VERSION_SEP));
313 }
314
315 /**
316 * Returns the versioned topic for the given node.
317 *
318 * @param topic the topic for the given node
319 * @param nodeId the node for which to return the namespaced topic
320 * @return the versioned topic for the given node
321 */
322 private String getTopicFor(String topic, NodeId nodeId) {
323 Version nodeVersion = clusterService.getVersion(nodeId);
324 return nodeVersion != null ? topic + VERSION_SEP + nodeVersion : topic + VERSION_SEP + versionService.version();
325 }
326
327 /**
328 * Internal upgrade event listener.
329 */
330 private class InternalUpgradeEventListener implements UpgradeEventListener {
331 @Override
332 public void event(UpgradeEvent event) {
333 if (event.type() == UpgradeEvent.Type.UPGRADED || event.type() == UpgradeEvent.Type.ROLLED_BACK) {
334 // Iterate through all current leaderships for the new version and trigger events.
335 for (Leadership leadership : getLeaderships().values()) {
336 notifyDelegate(new LeadershipEvent(
337 LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED,
338 leadership));
339 }
340 }
341 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800342 }
343}