blob: d7d2bc0a403f66aeffc311b2de56bae8bbdd5501 [file] [log] [blame]
Madan Jampani84b6b402015-02-25 17:49:54 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.mastership.impl;
17
18import static org.onlab.util.Tools.groupedThreads;
Ayaka Koshibe941f8602015-04-15 14:17:08 -070019import static org.onlab.util.Tools.futureGetOrElse;
Madan Jampani84b6b402015-02-25 17:49:54 -080020import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
21import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
22import static org.slf4j.LoggerFactory.getLogger;
23import static com.google.common.base.Preconditions.checkArgument;
24
Madan Jampani84b6b402015-02-25 17:49:54 -080025import java.util.List;
26import java.util.Map;
27import java.util.Set;
Madan Jampani84b6b402015-02-25 17:49:54 -080028import java.util.concurrent.ExecutorService;
29import java.util.concurrent.Executors;
Madan Jampani84b6b402015-02-25 17:49:54 -080030import java.util.regex.Matcher;
31import java.util.regex.Pattern;
32import java.util.stream.Collectors;
33
34import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Deactivate;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
40import org.onlab.util.KryoNamespace;
41import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.Leadership;
43import org.onosproject.cluster.LeadershipEvent;
44import org.onosproject.cluster.LeadershipEventListener;
45import org.onosproject.cluster.LeadershipService;
46import org.onosproject.cluster.NodeId;
47import org.onosproject.cluster.RoleInfo;
48import org.onosproject.mastership.MastershipEvent;
49import org.onosproject.mastership.MastershipStore;
50import org.onosproject.mastership.MastershipStoreDelegate;
51import org.onosproject.mastership.MastershipTerm;
52import org.onosproject.net.DeviceId;
53import org.onosproject.net.MastershipRole;
54import org.onosproject.store.AbstractStore;
55import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
56import org.onosproject.store.cluster.messaging.ClusterMessage;
57import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
58import org.onosproject.store.cluster.messaging.MessageSubject;
59import org.onosproject.store.serializers.KryoNamespaces;
60import org.onosproject.store.serializers.KryoSerializer;
61import org.onosproject.store.serializers.StoreSerializer;
62import org.slf4j.Logger;
63
64import com.google.common.base.Objects;
65import com.google.common.collect.Lists;
66import com.google.common.collect.Maps;
67import com.google.common.collect.Sets;
68
69/**
70 * Implementation of the MastershipStore on top of Leadership Service.
71 */
Madan Jampani5756c352015-04-29 00:23:58 -070072@Component(immediate = true, enabled = true)
Madan Jampani84b6b402015-02-25 17:49:54 -080073@Service
74public class ConsistentDeviceMastershipStore
75 extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
76 implements MastershipStore {
77
78 private final Logger log = getLogger(getClass());
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected LeadershipService leadershipService;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected ClusterService clusterService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87 protected ClusterCommunicationService clusterCommunicator;
88
89 private NodeId localNodeId;
90 private final Set<DeviceId> connectedDevices = Sets.newHashSet();
91
92 private static final MessageSubject ROLE_QUERY_SUBJECT =
93 new MessageSubject("mastership-store-device-role-query");
94 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
95 new MessageSubject("mastership-store-device-role-relinquish");
Madan Jampani1af8e132015-04-30 16:41:18 -070096 private static final MessageSubject MASTERSHIP_RELINQUISH_SUBJECT =
97 new MessageSubject("mastership-store-device-mastership-relinquish");
Madan Jampani84b6b402015-02-25 17:49:54 -080098
99 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Madan Jampani5756c352015-04-29 00:23:58 -0700100 Pattern.compile("device:(.*)");
Madan Jampani84b6b402015-02-25 17:49:54 -0800101
Madan Jampani84b6b402015-02-25 17:49:54 -0800102 private ExecutorService messageHandlingExecutor;
103 private final LeadershipEventListener leadershipEventListener =
104 new InternalDeviceMastershipEventListener();
105
106 private static final String NODE_ID_NULL = "Node ID cannot be null";
107 private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
108
109 public static final StoreSerializer SERIALIZER = new KryoSerializer() {
110 @Override
111 protected void setupKryoPool() {
112 serializerPool = KryoNamespace.newBuilder()
113 .register(KryoNamespaces.API)
114 .register(MastershipRole.class)
115 .register(MastershipEvent.class)
Madan Jampani1af8e132015-04-30 16:41:18 -0700116 .register(MastershipEvent.Type.class)
Madan Jampani84b6b402015-02-25 17:49:54 -0800117 .build();
118 }
119 };
120
121 @Activate
122 public void activate() {
123 messageHandlingExecutor =
124 Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
125 clusterCommunicator.addSubscriber(ROLE_QUERY_SUBJECT,
126 new RoleQueryHandler(),
127 messageHandlingExecutor);
128 clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
129 new RoleRelinquishHandler(),
130 messageHandlingExecutor);
Madan Jampani1af8e132015-04-30 16:41:18 -0700131 clusterCommunicator.addSubscriber(MASTERSHIP_RELINQUISH_SUBJECT,
132 SERIALIZER::decode,
133 this::relinquishMastership,
134 SERIALIZER::encode,
135 messageHandlingExecutor);
Madan Jampani84b6b402015-02-25 17:49:54 -0800136 localNodeId = clusterService.getLocalNode().id();
137 leadershipService.addListener(leadershipEventListener);
138
139 log.info("Started.");
140 }
141
142 @Deactivate
143 public void deactivate() {
144 clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
145 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
Madan Jampani1af8e132015-04-30 16:41:18 -0700146 clusterCommunicator.removeSubscriber(MASTERSHIP_RELINQUISH_SUBJECT);
Madan Jampani84b6b402015-02-25 17:49:54 -0800147 messageHandlingExecutor.shutdown();
148 leadershipService.removeListener(leadershipEventListener);
149
150 log.info("Stoppped.");
151 }
152
153 @Override
154 public MastershipRole requestRole(DeviceId deviceId) {
155 checkArgument(deviceId != null, DEVICE_ID_NULL);
156
157 String leadershipTopic = createDeviceMastershipTopic(deviceId);
158 if (connectedDevices.add(deviceId)) {
159 leadershipService.runForLeadership(leadershipTopic);
160 return MastershipRole.STANDBY;
161 } else {
162 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
163 if (leadership != null && leadership.leader().equals(localNodeId)) {
164 return MastershipRole.MASTER;
165 } else {
166 return MastershipRole.STANDBY;
167 }
168 }
169 }
170
171 @Override
172 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
173 checkArgument(nodeId != null, NODE_ID_NULL);
174 checkArgument(deviceId != null, DEVICE_ID_NULL);
175
176 String leadershipTopic = createDeviceMastershipTopic(deviceId);
177 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
178 if (leadership != null && nodeId.equals(leadership.leader())) {
179 return MastershipRole.MASTER;
180 }
181
182 if (localNodeId.equals(nodeId)) {
183 if (connectedDevices.contains(deviceId)) {
184 return MastershipRole.STANDBY;
185 } else {
186 return MastershipRole.NONE;
187 }
Madan Jampani84b6b402015-02-25 17:49:54 -0800188 }
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700189 MastershipRole role = futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700190 deviceId,
191 ROLE_QUERY_SUBJECT,
192 SERIALIZER::encode,
193 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700194 nodeId), null);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700195 return role == null ? MastershipRole.NONE : role;
Madan Jampani84b6b402015-02-25 17:49:54 -0800196 }
197
198 @Override
199 public NodeId getMaster(DeviceId deviceId) {
200 checkArgument(deviceId != null, DEVICE_ID_NULL);
201
202 String leadershipTopic = createDeviceMastershipTopic(deviceId);
203 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
204 return leadership != null ? leadership.leader() : null;
205 }
206
207 @Override
208 public RoleInfo getNodes(DeviceId deviceId) {
209 checkArgument(deviceId != null, DEVICE_ID_NULL);
210
211 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
212 clusterService
213 .getNodes()
214 .stream()
215 .parallel()
216 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
217
218 NodeId master = null;
219 final List<NodeId> standbys = Lists.newLinkedList();
220
221 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
222 if (entry.getValue() == MastershipRole.MASTER) {
223 master = entry.getKey();
224 } else if (entry.getValue() == MastershipRole.STANDBY) {
225 standbys.add(entry.getKey());
226 }
227 }
228
229 return new RoleInfo(master, standbys);
230 }
231
232 @Override
233 public Set<DeviceId> getDevices(NodeId nodeId) {
234 checkArgument(nodeId != null, NODE_ID_NULL);
235
236 return leadershipService
237 .ownedTopics(nodeId)
238 .stream()
239 .filter(this::isDeviceMastershipTopic)
240 .map(this::extractDeviceIdFromTopic)
241 .collect(Collectors.toSet());
242 }
243
244 @Override
245 public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
246 checkArgument(nodeId != null, NODE_ID_NULL);
247 checkArgument(deviceId != null, DEVICE_ID_NULL);
248
Madan Jampani1af8e132015-04-30 16:41:18 -0700249 NodeId currentMaster = getMaster(deviceId);
250 if (nodeId.equals(currentMaster)) {
251 return null;
252 } else {
253 String leadershipTopic = createDeviceMastershipTopic(deviceId);
254 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
255 if (candidates.isEmpty()) {
256 return null;
257 }
258 if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
259 return relinquishMastership(deviceId);
260 } else {
261 log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
262 }
263 }
264 return null;
Madan Jampani84b6b402015-02-25 17:49:54 -0800265 }
266
267 @Override
268 public MastershipTerm getTermFor(DeviceId deviceId) {
269 checkArgument(deviceId != null, DEVICE_ID_NULL);
270
271 String leadershipTopic = createDeviceMastershipTopic(deviceId);
272 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
273 return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
274 }
275
276 @Override
277 public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
278 checkArgument(nodeId != null, NODE_ID_NULL);
279 checkArgument(deviceId != null, DEVICE_ID_NULL);
280
Madan Jampani1af8e132015-04-30 16:41:18 -0700281 NodeId currentMaster = getMaster(deviceId);
282 if (!nodeId.equals(currentMaster)) {
283 return null;
284 }
285 // FIXME: This can becomes the master again unless it
286 // is demoted to the end of candidates list.
287 return relinquishMastership(deviceId);
Madan Jampani84b6b402015-02-25 17:49:54 -0800288 }
289
290 @Override
291 public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
292 checkArgument(nodeId != null, NODE_ID_NULL);
293 checkArgument(deviceId != null, DEVICE_ID_NULL);
294
295 if (!nodeId.equals(localNodeId)) {
296 log.debug("Forwarding request to relinquish "
297 + "role for device {} to {}", deviceId, nodeId);
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700298 return futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700299 deviceId,
300 ROLE_RELINQUISH_SUBJECT,
301 SERIALIZER::encode,
302 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700303 nodeId), null);
Madan Jampani84b6b402015-02-25 17:49:54 -0800304 }
305
306 // Check if this node is can be managed by this node.
307 if (!connectedDevices.contains(deviceId)) {
308 return null;
309 }
310
311 String leadershipTopic = createDeviceMastershipTopic(deviceId);
312 Leadership currentLeadership = leadershipService.getLeadership(leadershipTopic);
313
314 MastershipEvent.Type eventType = null;
315 if (currentLeadership != null && currentLeadership.leader().equals(localNodeId)) {
316 eventType = MastershipEvent.Type.MASTER_CHANGED;
317 } else {
318 eventType = MastershipEvent.Type.BACKUPS_CHANGED;
319 }
320
321 connectedDevices.remove(deviceId);
322 leadershipService.withdraw(leadershipTopic);
323
324 return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
325 }
326
Madan Jampani1af8e132015-04-30 16:41:18 -0700327 private MastershipEvent relinquishMastership(DeviceId deviceId) {
328 checkArgument(deviceId != null, DEVICE_ID_NULL);
329
330 NodeId currentMaster = getMaster(deviceId);
331 if (currentMaster == null) {
332 return null;
333 }
334
335 if (!currentMaster.equals(localNodeId)) {
336 log.info("Forwarding request to relinquish "
337 + "mastership for device {} to {}", deviceId, currentMaster);
338 return futureGetOrElse(clusterCommunicator.sendAndReceive(
339 deviceId,
340 MASTERSHIP_RELINQUISH_SUBJECT,
341 SERIALIZER::encode,
342 SERIALIZER::decode,
343 currentMaster), null);
344 }
345
346 String leadershipTopic = createDeviceMastershipTopic(deviceId);
347 Leadership currentLeadership = leadershipService.getLeadership(leadershipTopic);
348
349 MastershipEvent.Type eventType = null;
350 if (currentLeadership != null && currentLeadership.leader().equals(localNodeId)) {
351 eventType = MastershipEvent.Type.MASTER_CHANGED;
352 }
353
354 return leadershipService.stepdown(leadershipTopic)
355 ? new MastershipEvent(eventType, deviceId, getNodes(deviceId)) : null;
356 }
357
Madan Jampani84b6b402015-02-25 17:49:54 -0800358 private class RoleQueryHandler implements ClusterMessageHandler {
359 @Override
360 public void handle(ClusterMessage message) {
361 DeviceId deviceId = SERIALIZER.decode(message.payload());
Madan Jampanic26eede2015-04-16 11:42:16 -0700362 message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
Madan Jampani84b6b402015-02-25 17:49:54 -0800363 }
364 }
365
366
367 @Override
368 public void relinquishAllRole(NodeId nodeId) {
369 // Noop. LeadershipService already takes care of detecting and purging deadlocks.
370 }
371
372 private class RoleRelinquishHandler implements ClusterMessageHandler {
373 @Override
374 public void handle(ClusterMessage message) {
375 DeviceId deviceId = SERIALIZER.decode(message.payload());
Madan Jampanic26eede2015-04-16 11:42:16 -0700376 message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
Madan Jampani84b6b402015-02-25 17:49:54 -0800377 }
378 }
379
380 private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
381 @Override
382 public void event(LeadershipEvent event) {
383 Leadership leadership = event.subject();
384 if (!isDeviceMastershipTopic(leadership.topic())) {
385 return;
386 }
387 NodeId nodeId = leadership.leader();
388 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
389 if (Objects.equal(nodeId, localNodeId) && connectedDevices.contains(deviceId)) {
390 switch (event.type()) {
391 case LEADER_ELECTED:
392 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
393 break;
394 case LEADER_REELECTED:
395 // There is no concept of leader re-election in the new distributed leadership manager.
396 throw new IllegalStateException("Unexpected event type");
397 case LEADER_BOOTED:
398 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
399 break;
400 default:
401 return;
402 }
403 }
404 }
405 }
406
407 private String createDeviceMastershipTopic(DeviceId deviceId) {
Madan Jampani5756c352015-04-29 00:23:58 -0700408 return String.format("device:%s", deviceId.toString());
Madan Jampani84b6b402015-02-25 17:49:54 -0800409 }
410
411 private DeviceId extractDeviceIdFromTopic(String topic) {
412 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
413 if (m.matches()) {
414 return DeviceId.deviceId(m.group(1));
415 } else {
416 throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
417 }
418 }
419
420 private boolean isDeviceMastershipTopic(String topic) {
421 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
422 return m.matches();
423 }
424
Madan Jampanic26eede2015-04-16 11:42:16 -0700425}