blob: 2bb1ea330b5aab78510f973d2db9079f06cb2d61 [file] [log] [blame]
Madan Jampani620f70d2016-01-30 22:22:47 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani620f70d2016-01-30 22:22:47 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.cluster.impl;
17
Madan Jampani620f70d2016-01-30 22:22:47 -080018import java.util.Map;
Madan Jampani78be2492016-06-03 23:27:07 -070019import java.util.Objects;
Jon Hall7a8bfc62016-05-26 17:59:04 -070020import java.util.concurrent.ExecutorService;
21import java.util.concurrent.Executors;
Madan Jampani78be2492016-06-03 23:27:07 -070022import java.util.function.Consumer;
Jordan Halterman980a8c12017-09-22 18:01:19 -070023import java.util.stream.Collectors;
Madan Jampani620f70d2016-01-30 22:22:47 -080024
Jon Hall7a8bfc62016-05-26 17:59:04 -070025import com.google.common.collect.Maps;
Madan Jampani620f70d2016-01-30 22:22:47 -080026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onosproject.cluster.ClusterService;
Madan Jampani620f70d2016-01-30 22:22:47 -080033import org.onosproject.cluster.Leadership;
34import org.onosproject.cluster.LeadershipEvent;
35import org.onosproject.cluster.LeadershipStore;
36import org.onosproject.cluster.LeadershipStoreDelegate;
37import org.onosproject.cluster.NodeId;
Jordan Halterman980a8c12017-09-22 18:01:19 -070038import org.onosproject.core.Version;
39import org.onosproject.core.VersionService;
Madan Jampani78be2492016-06-03 23:27:07 -070040import org.onosproject.event.Change;
Madan Jampani620f70d2016-01-30 22:22:47 -080041import org.onosproject.store.AbstractStore;
Jon Hall7a8bfc62016-05-26 17:59:04 -070042import org.onosproject.store.service.DistributedPrimitive.Status;
Jordan Halterman980a8c12017-09-22 18:01:19 -070043import org.onosproject.store.service.CoordinationService;
Madan Jampani78be2492016-06-03 23:27:07 -070044import org.onosproject.store.service.LeaderElector;
Jordan Halterman980a8c12017-09-22 18:01:19 -070045import org.onosproject.upgrade.UpgradeEvent;
46import org.onosproject.upgrade.UpgradeEventListener;
47import org.onosproject.upgrade.UpgradeService;
Madan Jampani620f70d2016-01-30 22:22:47 -080048import org.slf4j.Logger;
49
Jordan Halterman980a8c12017-09-22 18:01:19 -070050import static org.onlab.util.Tools.groupedThreads;
51import static org.slf4j.LoggerFactory.getLogger;
52
Madan Jampani620f70d2016-01-30 22:22:47 -080053/**
Madan Jampani78be2492016-06-03 23:27:07 -070054 * Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
55 * primitive.
Madan Jampani620f70d2016-01-30 22:22:47 -080056 */
57@Service
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070058@Component(immediate = true)
Madan Jampani620f70d2016-01-30 22:22:47 -080059public class DistributedLeadershipStore
60 extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
61 implements LeadershipStore {
62
Jordan Halterman980a8c12017-09-22 18:01:19 -070063 private static final char VERSION_SEP = '|';
64
Madan Jampani78be2492016-06-03 23:27:07 -070065 private final Logger log = getLogger(getClass());
Madan Jampani620f70d2016-01-30 22:22:47 -080066
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 protected ClusterService clusterService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Halterman980a8c12017-09-22 18:01:19 -070071 protected CoordinationService storageService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected VersionService versionService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 protected UpgradeService upgradeService;
Madan Jampani620f70d2016-01-30 22:22:47 -080078
Jon Hall7a8bfc62016-05-26 17:59:04 -070079 private ExecutorService statusChangeHandler;
Madan Jampani78be2492016-06-03 23:27:07 -070080 private NodeId localNodeId;
81 private LeaderElector leaderElector;
Jon Hall7a8bfc62016-05-26 17:59:04 -070082 private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
Jordan Halterman980a8c12017-09-22 18:01:19 -070083 private final UpgradeEventListener upgradeListener = new InternalUpgradeEventListener();
Madan Jampani72282af2016-02-23 14:23:52 -080084
Madan Jampani78be2492016-06-03 23:27:07 -070085 private final Consumer<Change<Leadership>> leadershipChangeListener =
86 change -> {
87 Leadership oldValue = change.oldValue();
88 Leadership newValue = change.newValue();
Jordan Halterman980a8c12017-09-22 18:01:19 -070089
90 // If the topic is not relevant to this version, skip the event.
91 if (!isLocalTopic(newValue.topic())) {
92 return;
93 }
94
Madan Jampani78be2492016-06-03 23:27:07 -070095 boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
96 boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
Jordan Halterman980a8c12017-09-22 18:01:19 -070097
Madan Jampani620f70d2016-01-30 22:22:47 -080098 LeadershipEvent.Type eventType = null;
99 if (leaderChanged && candidatesChanged) {
100 eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
101 }
102 if (leaderChanged && !candidatesChanged) {
103 eventType = LeadershipEvent.Type.LEADER_CHANGED;
104 }
105 if (!leaderChanged && candidatesChanged) {
106 eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
107 }
Jordan Halterman980a8c12017-09-22 18:01:19 -0700108 notifyDelegate(new LeadershipEvent(eventType, new Leadership(
109 parseTopic(change.newValue().topic()),
110 change.newValue().leader(),
111 change.newValue().candidates())));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700112 // Update local cache of currently held leaderships
113 if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
114 localLeaderCache.put(newValue.topic(), newValue);
115 } else {
116 localLeaderCache.remove(newValue.topic());
117 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800118 };
119
Jon Hall7a8bfc62016-05-26 17:59:04 -0700120 private final Consumer<Status> clientStatusListener = status ->
121 statusChangeHandler.execute(() -> handleStatusChange(status));
122
123 private void handleStatusChange(Status status) {
124 // Notify mastership Service of disconnect and reconnect
125 if (status == Status.ACTIVE) {
126 // Service Restored
127 localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
128 leaderElector.getLeaderships().forEach((topic, leadership) ->
Jordan Halterman980a8c12017-09-22 18:01:19 -0700129 notifyDelegate(new LeadershipEvent(
130 LeadershipEvent.Type.SERVICE_RESTORED,
131 new Leadership(
132 parseTopic(leadership.topic()),
133 leadership.leader(),
134 leadership.candidates()))));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700135 } else if (status == Status.SUSPENDED) {
136 // Service Suspended
137 localLeaderCache.forEach((topic, leadership) ->
Jordan Halterman980a8c12017-09-22 18:01:19 -0700138 notifyDelegate(new LeadershipEvent(
139 LeadershipEvent.Type.SERVICE_DISRUPTED,
140 new Leadership(
141 parseTopic(leadership.topic()),
142 leadership.leader(),
143 leadership.candidates()))));
Jon Hall7a8bfc62016-05-26 17:59:04 -0700144 } else {
145 // Should be only inactive state
146 return;
147 }
148 }
149
Madan Jampani620f70d2016-01-30 22:22:47 -0800150 @Activate
151 public void activate() {
Jon Hall7a8bfc62016-05-26 17:59:04 -0700152 statusChangeHandler = Executors.newSingleThreadExecutor(
153 groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
Madan Jampani620f70d2016-01-30 22:22:47 -0800154 localNodeId = clusterService.getLocalNode().id();
Madan Jampani78be2492016-06-03 23:27:07 -0700155 leaderElector = storageService.leaderElectorBuilder()
156 .withName("onos-leadership-elections")
157 .build()
158 .asLeaderElector();
159 leaderElector.addChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700160 leaderElector.addStatusChangeListener(clientStatusListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700161 upgradeService.addListener(upgradeListener);
Madan Jampani620f70d2016-01-30 22:22:47 -0800162 log.info("Started");
163 }
164
165 @Deactivate
166 public void deactivate() {
Madan Jampani78be2492016-06-03 23:27:07 -0700167 leaderElector.removeChangeListener(leadershipChangeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700168 leaderElector.removeStatusChangeListener(clientStatusListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700169 upgradeService.removeListener(upgradeListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -0700170 statusChangeHandler.shutdown();
Madan Jampani620f70d2016-01-30 22:22:47 -0800171 log.info("Stopped");
172 }
173
174 @Override
175 public Leadership addRegistration(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700176 leaderElector.run(getLocalTopic(topic), localNodeId);
177 return getLeadership(topic);
Madan Jampani620f70d2016-01-30 22:22:47 -0800178 }
179
180 @Override
181 public void removeRegistration(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700182 leaderElector.withdraw(getLocalTopic(topic));
Madan Jampani620f70d2016-01-30 22:22:47 -0800183 }
184
185 @Override
186 public void removeRegistration(NodeId nodeId) {
Madan Jampani78be2492016-06-03 23:27:07 -0700187 leaderElector.evict(nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800188 }
189
190 @Override
191 public boolean moveLeadership(String topic, NodeId toNodeId) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700192 return leaderElector.anoint(getTopicFor(topic, toNodeId), toNodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800193 }
194
195 @Override
196 public boolean makeTopCandidate(String topic, NodeId nodeId) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700197 return leaderElector.promote(getTopicFor(topic, nodeId), nodeId);
Madan Jampani620f70d2016-01-30 22:22:47 -0800198 }
199
200 @Override
201 public Leadership getLeadership(String topic) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700202 Leadership leadership = leaderElector.getLeadership(getActiveTopic(topic));
203 return leadership != null ? new Leadership(
204 parseTopic(leadership.topic()),
205 leadership.leader(),
206 leadership.candidates()) : null;
Madan Jampani620f70d2016-01-30 22:22:47 -0800207 }
208
209 @Override
210 public Map<String, Leadership> getLeaderships() {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700211 Map<String, Leadership> leaderships = leaderElector.getLeaderships();
212 return leaderships.entrySet().stream()
213 .filter(e -> isActiveTopic(e.getKey()))
214 .collect(Collectors.toMap(e -> parseTopic(e.getKey()),
215 e -> new Leadership(parseTopic(e.getKey()), e.getValue().leader(), e.getValue().candidates())));
216 }
217
218 /**
219 * Returns a leader elector topic namespaced with the local node's version.
220 *
221 * @param topic the base topic
222 * @return a topic string namespaced with the local node's version
223 */
224 private String getLocalTopic(String topic) {
225 return topic + VERSION_SEP + versionService.version();
226 }
227
228 /**
229 * Returns a leader elector topic namespaced with the current cluster version.
230 *
231 * @param topic the base topic
232 * @return a topic string namespaced with the current cluster version
233 */
234 private String getActiveTopic(String topic) {
235 return topic + VERSION_SEP + upgradeService.getVersion();
236 }
237
238 /**
239 * Returns whether the given topic is a topic for the local version.
240 *
241 * @param topic the topic to check
242 * @return whether the given topic is relevant to the local version
243 */
244 private boolean isLocalTopic(String topic) {
245 return topic.endsWith(versionService.version().toString());
246 }
247
248 /**
249 * Returns whether the given topic is a topic for the current cluster version.
250 *
251 * @param topic the topic to check
252 * @return whether the given topic is relevant to the current cluster version
253 */
254 private boolean isActiveTopic(String topic) {
255 return topic.endsWith(VERSION_SEP + upgradeService.getVersion().toString());
256 }
257
258 /**
259 * Parses a topic string, returning the base topic.
260 *
261 * @param topic the topic string to parse
262 * @return the base topic string
263 */
264 private String parseTopic(String topic) {
265 return topic.substring(0, topic.lastIndexOf(VERSION_SEP));
266 }
267
268 /**
269 * Returns the versioned topic for the given node.
270 *
271 * @param topic the topic for the given node
272 * @param nodeId the node for which to return the namespaced topic
273 * @return the versioned topic for the given node
274 */
275 private String getTopicFor(String topic, NodeId nodeId) {
276 Version nodeVersion = clusterService.getVersion(nodeId);
277 return nodeVersion != null ? topic + VERSION_SEP + nodeVersion : topic + VERSION_SEP + versionService.version();
278 }
279
280 /**
281 * Internal upgrade event listener.
282 */
283 private class InternalUpgradeEventListener implements UpgradeEventListener {
284 @Override
285 public void event(UpgradeEvent event) {
286 if (event.type() == UpgradeEvent.Type.UPGRADED || event.type() == UpgradeEvent.Type.ROLLED_BACK) {
287 // Iterate through all current leaderships for the new version and trigger events.
288 for (Leadership leadership : getLeaderships().values()) {
289 notifyDelegate(new LeadershipEvent(
290 LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED,
291 leadership));
292 }
293 }
294 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800295 }
296}