blob: 92ef75ab577a6973921719986e48010c25adeded [file] [log] [blame]
Madan Jampani620f70d2016-01-30 22:22:47 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani620f70d2016-01-30 22:22:47 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.cluster.impl;
17
Jon Hall7a8bfc62016-05-26 17:59:04 -070018import com.google.common.collect.Maps;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070019import org.onosproject.cfg.ComponentConfigService;
Madan Jampani620f70d2016-01-30 22:22:47 -080020import org.onosproject.cluster.ClusterService;
Madan Jampani620f70d2016-01-30 22:22:47 -080021import org.onosproject.cluster.Leadership;
22import org.onosproject.cluster.LeadershipEvent;
23import org.onosproject.cluster.LeadershipStore;
24import org.onosproject.cluster.LeadershipStoreDelegate;
25import org.onosproject.cluster.NodeId;
Jordan Halterman980a8c12017-09-22 18:01:19 -070026import org.onosproject.core.Version;
27import org.onosproject.core.VersionService;
Madan Jampani78be2492016-06-03 23:27:07 -070028import org.onosproject.event.Change;
Madan Jampani620f70d2016-01-30 22:22:47 -080029import org.onosproject.store.AbstractStore;
Jordan Halterman980a8c12017-09-22 18:01:19 -070030import org.onosproject.store.service.CoordinationService;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070031import org.onosproject.store.service.DistributedPrimitive.Status;
Madan Jampani78be2492016-06-03 23:27:07 -070032import org.onosproject.store.service.LeaderElector;
Jordan Halterman980a8c12017-09-22 18:01:19 -070033import org.onosproject.upgrade.UpgradeEvent;
34import org.onosproject.upgrade.UpgradeEventListener;
35import org.onosproject.upgrade.UpgradeService;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070036import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070037import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Modified;
41import org.osgi.service.component.annotations.Reference;
Madan Jampani620f70d2016-01-30 22:22:47 -080042import org.slf4j.Logger;
43
Ray Milkeyd84f89b2018-08-17 14:54:17 -070044import java.util.Dictionary;
45import java.util.Map;
46import java.util.Objects;
47import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49import java.util.function.Consumer;
50import java.util.stream.Collectors;
51
Jordan Halterman1f2e8042017-10-18 16:38:03 -070052import static com.google.common.base.Strings.isNullOrEmpty;
Jordan Halterman1f2e8042017-10-18 16:38:03 -070053import static org.onlab.util.Tools.get;
Jordan Halterman980a8c12017-09-22 18:01:19 -070054import static org.onlab.util.Tools.groupedThreads;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070055import static org.osgi.service.component.annotations.ReferenceCardinality.MANDATORY;
Jordan Halterman980a8c12017-09-22 18:01:19 -070056import static org.slf4j.LoggerFactory.getLogger;
57
Madan Jampani620f70d2016-01-30 22:22:47 -080058/**
Madan Jampani78be2492016-06-03 23:27:07 -070059 * Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
60 * primitive.
Madan Jampani620f70d2016-01-30 22:22:47 -080061 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070062@Component(immediate = true, service = LeadershipStore.class)
Madan Jampani620f70d2016-01-30 22:22:47 -080063public class DistributedLeadershipStore
64 extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
65 implements LeadershipStore {
66
Jordan Halterman980a8c12017-09-22 18:01:19 -070067 private static final char VERSION_SEP = '|';
68
Madan Jampani78be2492016-06-03 23:27:07 -070069 private final Logger log = getLogger(getClass());
Madan Jampani620f70d2016-01-30 22:22:47 -080070
Ray Milkeyd84f89b2018-08-17 14:54:17 -070071 @Reference(cardinality = MANDATORY)
Madan Jampani620f70d2016-01-30 22:22:47 -080072 protected ClusterService clusterService;
73
Ray Milkeyd84f89b2018-08-17 14:54:17 -070074 @Reference(cardinality = MANDATORY)
Jordan Halterman980a8c12017-09-22 18:01:19 -070075 protected CoordinationService storageService;
76
Ray Milkeyd84f89b2018-08-17 14:54:17 -070077 @Reference(cardinality = MANDATORY)
Jordan Halterman1f2e8042017-10-18 16:38:03 -070078 protected ComponentConfigService configService;
79
Ray Milkeyd84f89b2018-08-17 14:54:17 -070080 @Reference(cardinality = MANDATORY)
Jordan Halterman980a8c12017-09-22 18:01:19 -070081 protected VersionService versionService;
82
Ray Milkeyd84f89b2018-08-17 14:54:17 -070083 @Reference(cardinality = MANDATORY)
Jordan Halterman980a8c12017-09-22 18:01:19 -070084 protected UpgradeService upgradeService;
Madan Jampani620f70d2016-01-30 22:22:47 -080085
You Wang45ce0082018-08-23 14:23:03 -070086 private static final long DEFAULT_ELECTION_TIMEOUT_MILLIS = 2500;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070087 //@Property(name = "electionTimeoutMillis", longValue = DEFAULT_ELECTION_TIMEOUT_MILLIS,
88 // label = "the leader election timeout in milliseconds")
Jordan Halterman1f2e8042017-10-18 16:38:03 -070089 private long electionTimeoutMillis = DEFAULT_ELECTION_TIMEOUT_MILLIS;
90
Jon Hall7a8bfc62016-05-26 17:59:04 -070091 private ExecutorService statusChangeHandler;
Madan Jampani78be2492016-06-03 23:27:07 -070092 private NodeId localNodeId;
93 private LeaderElector leaderElector;
Jon Hall7a8bfc62016-05-26 17:59:04 -070094 private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
Jordan Halterman980a8c12017-09-22 18:01:19 -070095 private final UpgradeEventListener upgradeListener = new InternalUpgradeEventListener();
Madan Jampani72282af2016-02-23 14:23:52 -080096
Madan Jampani78be2492016-06-03 23:27:07 -070097 private final Consumer<Change<Leadership>> leadershipChangeListener =
98 change -> {
99 Leadership oldValue = change.oldValue();
100 Leadership newValue = change.newValue();
Jordan Halterman980a8c12017-09-22 18:01:19 -0700101
102 // If the topic is not relevant to this version, skip the event.
103 if (!isLocalTopic(newValue.topic())) {
104 return;
105 }
106
Madan Jampani78be2492016-06-03 23:27:07 -0700107 boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
108 boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
Jordan Halterman980a8c12017-09-22 18:01:19 -0700109
Madan Jampani620f70d2016-01-30 22:22:47 -0800110 LeadershipEvent.Type eventType = null;
111 if (leaderChanged && candidatesChanged) {
112 eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
113 }
114 if (leaderChanged && !candidatesChanged) {
115 eventType = LeadershipEvent.Type.LEADER_CHANGED;
116 }
117 if (!leaderChanged && candidatesChanged) {
118 eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
119 }
Jordan Halterman980a8c12017-09-22 18:01:19 -0700120 notifyDelegate(new LeadershipEvent(eventType, new Leadership(
121 parseTopic(change.newValue().topic()),
122 change.newValue().leader(),
123 change.newValue().candidates())));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700124 // Update local cache of currently held leaderships
125 if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
126 localLeaderCache.put(newValue.topic(), newValue);
127 } else {
128 localLeaderCache.remove(newValue.topic());
129 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800130 };
131
Jon Hall7a8bfc62016-05-26 17:59:04 -0700132 private final Consumer<Status> clientStatusListener = status ->
133 statusChangeHandler.execute(() -> handleStatusChange(status));
134
135 private void handleStatusChange(Status status) {
136 // Notify mastership Service of disconnect and reconnect
137 if (status == Status.ACTIVE) {
138 // Service Restored
139 localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
140 leaderElector.getLeaderships().forEach((topic, leadership) ->
Jordan Halterman980a8c12017-09-22 18:01:19 -0700141 notifyDelegate(new LeadershipEvent(
142 LeadershipEvent.Type.SERVICE_RESTORED,
143 new Leadership(
144 parseTopic(leadership.topic()),
145 leadership.leader(),
146 leadership.candidates()))));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700147 } else if (status == Status.SUSPENDED) {
148 // Service Suspended
149 localLeaderCache.forEach((topic, leadership) ->
Jordan Halterman980a8c12017-09-22 18:01:19 -0700150 notifyDelegate(new LeadershipEvent(
151 LeadershipEvent.Type.SERVICE_DISRUPTED,
152 new Leadership(
153 parseTopic(leadership.topic()),
154 leadership.leader(),
155 leadership.candidates()))));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700156 } else {
157 // Should be only inactive state
158 return;
159 }
160 }
161
Madan Jampani620f70d2016-01-30 22:22:47 -0800162 @Activate
163 public void activate() {
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700164 configService.registerProperties(getClass());
Jon Hall7a8bfc62016-05-26 17:59:04 -0700165 statusChangeHandler = Executors.newSingleThreadExecutor(
166 groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
Madan Jampani620f70d2016-01-30 22:22:47 -0800167 localNodeId = clusterService.getLocalNode().id();
Madan Jampani78be2492016-06-03 23:27:07 -0700168 leaderElector = storageService.leaderElectorBuilder()
Jordan Halterman46c5eaa2018-01-24 16:46:55 -0800169 .withName("onos-leadership-elections")
170 .withElectionTimeout(electionTimeoutMillis)
171 .withRelaxedReadConsistency()
172 .build()
173 .asLeaderElector();
Madan Jampani78be2492016-06-03 23:27:07 -0700174 leaderElector.addChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700175 leaderElector.addStatusChangeListener(clientStatusListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700176 upgradeService.addListener(upgradeListener);
Madan Jampani620f70d2016-01-30 22:22:47 -0800177 log.info("Started");
178 }
179
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700180 @Modified
181 public void modified(ComponentContext context) {
182 if (context == null) {
183 return;
184 }
185
186 Dictionary<?, ?> properties = context.getProperties();
187 long newElectionTimeoutMillis;
188 try {
189 String s = get(properties, "electionTimeoutMillis");
190 newElectionTimeoutMillis = isNullOrEmpty(s) ? electionTimeoutMillis : Long.parseLong(s.trim());
191 } catch (NumberFormatException | ClassCastException e) {
192 log.warn("Malformed configuration detected; using defaults", e);
193 newElectionTimeoutMillis = DEFAULT_ELECTION_TIMEOUT_MILLIS;
194 }
195
196 if (newElectionTimeoutMillis != electionTimeoutMillis) {
197 electionTimeoutMillis = newElectionTimeoutMillis;
198 leaderElector = storageService.leaderElectorBuilder()
199 .withName("onos-leadership-elections")
200 .withElectionTimeout(electionTimeoutMillis)
Jordan Haltermane74e6292018-05-15 00:52:31 -0700201 .withRelaxedReadConsistency()
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700202 .build()
203 .asLeaderElector();
204 }
205 }
206
Madan Jampani620f70d2016-01-30 22:22:47 -0800207 @Deactivate
208 public void deactivate() {
Madan Jampani78be2492016-06-03 23:27:07 -0700209 leaderElector.removeChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700210 leaderElector.removeStatusChangeListener(clientStatusListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700211 upgradeService.removeListener(upgradeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700212 statusChangeHandler.shutdown();
Jordan Halterman1f2e8042017-10-18 16:38:03 -0700213 configService.unregisterProperties(getClass(), false);
Madan Jampani620f70d2016-01-30 22:22:47 -0800214 log.info("Stopped");
215 }
216
217 @Override
218 public Leadership addRegistration(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700219 leaderElector.run(getLocalTopic(topic), localNodeId);
220 return getLeadership(topic);
Madan Jampani620f70d2016-01-30 22:22:47 -0800221 }
222
223 @Override
224 public void removeRegistration(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700225 leaderElector.withdraw(getLocalTopic(topic));
Madan Jampani620f70d2016-01-30 22:22:47 -0800226 }
227
228 @Override
229 public void removeRegistration(NodeId nodeId) {
Madan Jampani78be2492016-06-03 23:27:07 -0700230 leaderElector.evict(nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800231 }
232
233 @Override
234 public boolean moveLeadership(String topic, NodeId toNodeId) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700235 return leaderElector.anoint(getTopicFor(topic, toNodeId), toNodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800236 }
237
238 @Override
239 public boolean makeTopCandidate(String topic, NodeId nodeId) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700240 return leaderElector.promote(getTopicFor(topic, nodeId), nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800241 }
242
243 @Override
244 public Leadership getLeadership(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700245 Leadership leadership = leaderElector.getLeadership(getActiveTopic(topic));
246 return leadership != null ? new Leadership(
247 parseTopic(leadership.topic()),
248 leadership.leader(),
249 leadership.candidates()) : null;
Madan Jampani620f70d2016-01-30 22:22:47 -0800250 }
251
252 @Override
253 public Map<String, Leadership> getLeaderships() {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700254 Map<String, Leadership> leaderships = leaderElector.getLeaderships();
255 return leaderships.entrySet().stream()
256 .filter(e -> isActiveTopic(e.getKey()))
257 .collect(Collectors.toMap(e -> parseTopic(e.getKey()),
258 e -> new Leadership(parseTopic(e.getKey()), e.getValue().leader(), e.getValue().candidates())));
259 }
260
261 /**
262 * Returns a leader elector topic namespaced with the local node's version.
263 *
264 * @param topic the base topic
265 * @return a topic string namespaced with the local node's version
266 */
267 private String getLocalTopic(String topic) {
268 return topic + VERSION_SEP + versionService.version();
269 }
270
271 /**
272 * Returns a leader elector topic namespaced with the current cluster version.
273 *
274 * @param topic the base topic
275 * @return a topic string namespaced with the current cluster version
276 */
277 private String getActiveTopic(String topic) {
278 return topic + VERSION_SEP + upgradeService.getVersion();
279 }
280
281 /**
282 * Returns whether the given topic is a topic for the local version.
283 *
284 * @param topic the topic to check
285 * @return whether the given topic is relevant to the local version
286 */
287 private boolean isLocalTopic(String topic) {
288 return topic.endsWith(versionService.version().toString());
289 }
290
291 /**
292 * Returns whether the given topic is a topic for the current cluster version.
293 *
294 * @param topic the topic to check
295 * @return whether the given topic is relevant to the current cluster version
296 */
297 private boolean isActiveTopic(String topic) {
298 return topic.endsWith(VERSION_SEP + upgradeService.getVersion().toString());
299 }
300
301 /**
302 * Parses a topic string, returning the base topic.
303 *
304 * @param topic the topic string to parse
305 * @return the base topic string
306 */
307 private String parseTopic(String topic) {
308 return topic.substring(0, topic.lastIndexOf(VERSION_SEP));
309 }
310
311 /**
312 * Returns the versioned topic for the given node.
313 *
314 * @param topic the topic for the given node
315 * @param nodeId the node for which to return the namespaced topic
316 * @return the versioned topic for the given node
317 */
318 private String getTopicFor(String topic, NodeId nodeId) {
319 Version nodeVersion = clusterService.getVersion(nodeId);
320 return nodeVersion != null ? topic + VERSION_SEP + nodeVersion : topic + VERSION_SEP + versionService.version();
321 }
322
323 /**
324 * Internal upgrade event listener.
325 */
326 private class InternalUpgradeEventListener implements UpgradeEventListener {
327 @Override
328 public void event(UpgradeEvent event) {
329 if (event.type() == UpgradeEvent.Type.UPGRADED || event.type() == UpgradeEvent.Type.ROLLED_BACK) {
330 // Iterate through all current leaderships for the new version and trigger events.
331 for (Leadership leadership : getLeaderships().values()) {
332 notifyDelegate(new LeadershipEvent(
333 LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED,
334 leadership));
335 }
336 }
337 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800338 }
339}