blob: 5bc55aff3d0ca2191564c629c5eb39da35983242 [file] [log] [blame]
Madan Jampani84b6b402015-02-25 17:49:54 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.mastership.impl;
17
18import static org.onlab.util.Tools.groupedThreads;
Ayaka Koshibe941f8602015-04-15 14:17:08 -070019import static org.onlab.util.Tools.futureGetOrElse;
Madan Jampani84b6b402015-02-25 17:49:54 -080020import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
21import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
22import static org.slf4j.LoggerFactory.getLogger;
23import static com.google.common.base.Preconditions.checkArgument;
24
Madan Jampani84b6b402015-02-25 17:49:54 -080025import java.util.List;
26import java.util.Map;
27import java.util.Set;
Madan Jampani84b6b402015-02-25 17:49:54 -080028import java.util.concurrent.ExecutorService;
29import java.util.concurrent.Executors;
Madan Jampani84b6b402015-02-25 17:49:54 -080030import java.util.regex.Matcher;
31import java.util.regex.Pattern;
32import java.util.stream.Collectors;
33
34import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Deactivate;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
40import org.onlab.util.KryoNamespace;
41import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.Leadership;
43import org.onosproject.cluster.LeadershipEvent;
44import org.onosproject.cluster.LeadershipEventListener;
45import org.onosproject.cluster.LeadershipService;
46import org.onosproject.cluster.NodeId;
47import org.onosproject.cluster.RoleInfo;
48import org.onosproject.mastership.MastershipEvent;
49import org.onosproject.mastership.MastershipStore;
50import org.onosproject.mastership.MastershipStoreDelegate;
51import org.onosproject.mastership.MastershipTerm;
52import org.onosproject.net.DeviceId;
53import org.onosproject.net.MastershipRole;
54import org.onosproject.store.AbstractStore;
55import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani84b6b402015-02-25 17:49:54 -080056import org.onosproject.store.cluster.messaging.MessageSubject;
57import org.onosproject.store.serializers.KryoNamespaces;
58import org.onosproject.store.serializers.KryoSerializer;
59import org.onosproject.store.serializers.StoreSerializer;
60import org.slf4j.Logger;
61
62import com.google.common.base.Objects;
63import com.google.common.collect.Lists;
64import com.google.common.collect.Maps;
65import com.google.common.collect.Sets;
66
67/**
68 * Implementation of the MastershipStore on top of Leadership Service.
69 */
Madan Jampani5756c352015-04-29 00:23:58 -070070@Component(immediate = true, enabled = true)
Madan Jampani84b6b402015-02-25 17:49:54 -080071@Service
72public class ConsistentDeviceMastershipStore
73 extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
74 implements MastershipStore {
75
76 private final Logger log = getLogger(getClass());
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected LeadershipService leadershipService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected ClusterService clusterService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected ClusterCommunicationService clusterCommunicator;
86
87 private NodeId localNodeId;
88 private final Set<DeviceId> connectedDevices = Sets.newHashSet();
89
90 private static final MessageSubject ROLE_QUERY_SUBJECT =
91 new MessageSubject("mastership-store-device-role-query");
92 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
93 new MessageSubject("mastership-store-device-role-relinquish");
Madan Jampani9bd1f152015-04-30 23:33:35 -070094 private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
Madan Jampani1af8e132015-04-30 16:41:18 -070095 new MessageSubject("mastership-store-device-mastership-relinquish");
Madan Jampani84b6b402015-02-25 17:49:54 -080096
97 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Madan Jampani5756c352015-04-29 00:23:58 -070098 Pattern.compile("device:(.*)");
Madan Jampani84b6b402015-02-25 17:49:54 -080099
Madan Jampani84b6b402015-02-25 17:49:54 -0800100 private ExecutorService messageHandlingExecutor;
101 private final LeadershipEventListener leadershipEventListener =
102 new InternalDeviceMastershipEventListener();
103
104 private static final String NODE_ID_NULL = "Node ID cannot be null";
105 private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
106
107 public static final StoreSerializer SERIALIZER = new KryoSerializer() {
108 @Override
109 protected void setupKryoPool() {
110 serializerPool = KryoNamespace.newBuilder()
111 .register(KryoNamespaces.API)
112 .register(MastershipRole.class)
113 .register(MastershipEvent.class)
Madan Jampani1af8e132015-04-30 16:41:18 -0700114 .register(MastershipEvent.Type.class)
Madan Jampani84b6b402015-02-25 17:49:54 -0800115 .build();
116 }
117 };
118
119 @Activate
120 public void activate() {
121 messageHandlingExecutor =
122 Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
Madan Jampanid46e18f2015-05-04 23:19:33 -0700123 clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
124 SERIALIZER::decode,
125 deviceId -> getRole(localNodeId, deviceId),
126 SERIALIZER::encode,
Madan Jampani84b6b402015-02-25 17:49:54 -0800127 messageHandlingExecutor);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700128 clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
129 SERIALIZER::decode,
130 deviceId -> relinquishRole(localNodeId, deviceId),
131 SERIALIZER::encode,
132 messageHandlingExecutor);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700133 clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
Madan Jampani1af8e132015-04-30 16:41:18 -0700134 SERIALIZER::decode,
Madan Jampani9bd1f152015-04-30 23:33:35 -0700135 this::transitionFromMasterToStandby,
Madan Jampani1af8e132015-04-30 16:41:18 -0700136 SERIALIZER::encode,
137 messageHandlingExecutor);
Madan Jampani84b6b402015-02-25 17:49:54 -0800138 localNodeId = clusterService.getLocalNode().id();
139 leadershipService.addListener(leadershipEventListener);
140
141 log.info("Started.");
142 }
143
144 @Deactivate
145 public void deactivate() {
146 clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
147 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700148 clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
Madan Jampani84b6b402015-02-25 17:49:54 -0800149 messageHandlingExecutor.shutdown();
150 leadershipService.removeListener(leadershipEventListener);
151
152 log.info("Stoppped.");
153 }
154
155 @Override
156 public MastershipRole requestRole(DeviceId deviceId) {
157 checkArgument(deviceId != null, DEVICE_ID_NULL);
158
159 String leadershipTopic = createDeviceMastershipTopic(deviceId);
160 if (connectedDevices.add(deviceId)) {
161 leadershipService.runForLeadership(leadershipTopic);
162 return MastershipRole.STANDBY;
163 } else {
164 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
165 if (leadership != null && leadership.leader().equals(localNodeId)) {
166 return MastershipRole.MASTER;
167 } else {
168 return MastershipRole.STANDBY;
169 }
170 }
171 }
172
173 @Override
174 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
175 checkArgument(nodeId != null, NODE_ID_NULL);
176 checkArgument(deviceId != null, DEVICE_ID_NULL);
177
178 String leadershipTopic = createDeviceMastershipTopic(deviceId);
179 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
180 if (leadership != null && nodeId.equals(leadership.leader())) {
181 return MastershipRole.MASTER;
182 }
183
184 if (localNodeId.equals(nodeId)) {
185 if (connectedDevices.contains(deviceId)) {
186 return MastershipRole.STANDBY;
187 } else {
188 return MastershipRole.NONE;
189 }
Madan Jampani84b6b402015-02-25 17:49:54 -0800190 }
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700191 MastershipRole role = futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700192 deviceId,
193 ROLE_QUERY_SUBJECT,
194 SERIALIZER::encode,
195 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700196 nodeId), null);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700197 return role == null ? MastershipRole.NONE : role;
Madan Jampani84b6b402015-02-25 17:49:54 -0800198 }
199
200 @Override
201 public NodeId getMaster(DeviceId deviceId) {
202 checkArgument(deviceId != null, DEVICE_ID_NULL);
203
204 String leadershipTopic = createDeviceMastershipTopic(deviceId);
205 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
206 return leadership != null ? leadership.leader() : null;
207 }
208
209 @Override
210 public RoleInfo getNodes(DeviceId deviceId) {
211 checkArgument(deviceId != null, DEVICE_ID_NULL);
212
213 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
214 clusterService
215 .getNodes()
Madan Jampani84b6b402015-02-25 17:49:54 -0800216 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
217
218 NodeId master = null;
219 final List<NodeId> standbys = Lists.newLinkedList();
220
Madan Jampani86940d92015-05-06 11:47:57 -0700221 List<NodeId> candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId));
222
Madan Jampani84b6b402015-02-25 17:49:54 -0800223 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
224 if (entry.getValue() == MastershipRole.MASTER) {
225 master = entry.getKey();
226 } else if (entry.getValue() == MastershipRole.STANDBY) {
227 standbys.add(entry.getKey());
228 }
229 }
230
Madan Jampani86940d92015-05-06 11:47:57 -0700231 List<NodeId> sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList());
232
233 return new RoleInfo(master, sortedStandbyList);
Madan Jampani84b6b402015-02-25 17:49:54 -0800234 }
235
236 @Override
237 public Set<DeviceId> getDevices(NodeId nodeId) {
238 checkArgument(nodeId != null, NODE_ID_NULL);
239
240 return leadershipService
241 .ownedTopics(nodeId)
242 .stream()
243 .filter(this::isDeviceMastershipTopic)
244 .map(this::extractDeviceIdFromTopic)
245 .collect(Collectors.toSet());
246 }
247
248 @Override
249 public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
250 checkArgument(nodeId != null, NODE_ID_NULL);
251 checkArgument(deviceId != null, DEVICE_ID_NULL);
252
Madan Jampani1af8e132015-04-30 16:41:18 -0700253 NodeId currentMaster = getMaster(deviceId);
254 if (nodeId.equals(currentMaster)) {
255 return null;
256 } else {
257 String leadershipTopic = createDeviceMastershipTopic(deviceId);
258 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
259 if (candidates.isEmpty()) {
260 return null;
261 }
262 if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
Madan Jampani9bd1f152015-04-30 23:33:35 -0700263 return transitionFromMasterToStandby(deviceId);
Madan Jampani1af8e132015-04-30 16:41:18 -0700264 } else {
265 log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
266 }
267 }
268 return null;
Madan Jampani84b6b402015-02-25 17:49:54 -0800269 }
270
271 @Override
272 public MastershipTerm getTermFor(DeviceId deviceId) {
273 checkArgument(deviceId != null, DEVICE_ID_NULL);
274
275 String leadershipTopic = createDeviceMastershipTopic(deviceId);
276 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
277 return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
278 }
279
280 @Override
281 public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
282 checkArgument(nodeId != null, NODE_ID_NULL);
283 checkArgument(deviceId != null, DEVICE_ID_NULL);
284
Madan Jampani1af8e132015-04-30 16:41:18 -0700285 NodeId currentMaster = getMaster(deviceId);
286 if (!nodeId.equals(currentMaster)) {
287 return null;
288 }
Madan Jampanid46e18f2015-05-04 23:19:33 -0700289
290 String leadershipTopic = createDeviceMastershipTopic(deviceId);
291 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
292
293 NodeId newMaster = candidates.stream()
294 .filter(candidate -> !Objects.equal(nodeId, candidate))
295 .findFirst()
296 .orElse(null);
297 log.info("Transitioning to role {} for {}. Next master: {}",
298 newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster);
299
300 if (newMaster != null) {
301 return setMaster(newMaster, deviceId);
302 }
303 return relinquishRole(nodeId, deviceId);
Madan Jampani84b6b402015-02-25 17:49:54 -0800304 }
305
306 @Override
307 public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
308 checkArgument(nodeId != null, NODE_ID_NULL);
309 checkArgument(deviceId != null, DEVICE_ID_NULL);
310
311 if (!nodeId.equals(localNodeId)) {
312 log.debug("Forwarding request to relinquish "
313 + "role for device {} to {}", deviceId, nodeId);
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700314 return futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700315 deviceId,
316 ROLE_RELINQUISH_SUBJECT,
317 SERIALIZER::encode,
318 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700319 nodeId), null);
Madan Jampani84b6b402015-02-25 17:49:54 -0800320 }
321
322 // Check if this node is can be managed by this node.
323 if (!connectedDevices.contains(deviceId)) {
324 return null;
325 }
326
327 String leadershipTopic = createDeviceMastershipTopic(deviceId);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700328 NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
Madan Jampani84b6b402015-02-25 17:49:54 -0800329
Madan Jampani9bd1f152015-04-30 23:33:35 -0700330 MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
331 ? MastershipEvent.Type.MASTER_CHANGED
332 : MastershipEvent.Type.BACKUPS_CHANGED;
Madan Jampani84b6b402015-02-25 17:49:54 -0800333
334 connectedDevices.remove(deviceId);
335 leadershipService.withdraw(leadershipTopic);
336
337 return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
338 }
339
Madan Jampani9bd1f152015-04-30 23:33:35 -0700340 private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700341 checkArgument(deviceId != null, DEVICE_ID_NULL);
342
343 NodeId currentMaster = getMaster(deviceId);
344 if (currentMaster == null) {
345 return null;
346 }
347
348 if (!currentMaster.equals(localNodeId)) {
349 log.info("Forwarding request to relinquish "
350 + "mastership for device {} to {}", deviceId, currentMaster);
351 return futureGetOrElse(clusterCommunicator.sendAndReceive(
352 deviceId,
Madan Jampani9bd1f152015-04-30 23:33:35 -0700353 TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
Madan Jampani1af8e132015-04-30 16:41:18 -0700354 SERIALIZER::encode,
355 SERIALIZER::decode,
356 currentMaster), null);
357 }
358
Madan Jampani9bd1f152015-04-30 23:33:35 -0700359 return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
360 ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
Madan Jampani1af8e132015-04-30 16:41:18 -0700361 }
362
Madan Jampani84b6b402015-02-25 17:49:54 -0800363 @Override
364 public void relinquishAllRole(NodeId nodeId) {
365 // Noop. LeadershipService already takes care of detecting and purging deadlocks.
366 }
367
Madan Jampani84b6b402015-02-25 17:49:54 -0800368 private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
369 @Override
370 public void event(LeadershipEvent event) {
371 Leadership leadership = event.subject();
372 if (!isDeviceMastershipTopic(leadership.topic())) {
373 return;
374 }
375 NodeId nodeId = leadership.leader();
376 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
377 if (Objects.equal(nodeId, localNodeId) && connectedDevices.contains(deviceId)) {
378 switch (event.type()) {
379 case LEADER_ELECTED:
380 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
381 break;
382 case LEADER_REELECTED:
383 // There is no concept of leader re-election in the new distributed leadership manager.
384 throw new IllegalStateException("Unexpected event type");
385 case LEADER_BOOTED:
386 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
387 break;
388 default:
389 return;
390 }
391 }
392 }
393 }
394
395 private String createDeviceMastershipTopic(DeviceId deviceId) {
Madan Jampani5756c352015-04-29 00:23:58 -0700396 return String.format("device:%s", deviceId.toString());
Madan Jampani84b6b402015-02-25 17:49:54 -0800397 }
398
399 private DeviceId extractDeviceIdFromTopic(String topic) {
400 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
401 if (m.matches()) {
402 return DeviceId.deviceId(m.group(1));
403 } else {
404 throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
405 }
406 }
407
408 private boolean isDeviceMastershipTopic(String topic) {
409 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
410 return m.matches();
411 }
412
Madan Jampanic26eede2015-04-16 11:42:16 -0700413}