blob: 0d85784b124f69ef92597ce2391db0f9e91d68b2 [file] [log] [blame]
Madan Jampani84b6b402015-02-25 17:49:54 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.mastership.impl;
17
18import static org.onlab.util.Tools.groupedThreads;
Ayaka Koshibe941f8602015-04-15 14:17:08 -070019import static org.onlab.util.Tools.futureGetOrElse;
Madan Jampani84b6b402015-02-25 17:49:54 -080020import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
21import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
22import static org.slf4j.LoggerFactory.getLogger;
23import static com.google.common.base.Preconditions.checkArgument;
24
Madan Jampani84b6b402015-02-25 17:49:54 -080025import java.util.List;
26import java.util.Map;
27import java.util.Set;
Madan Jampani84b6b402015-02-25 17:49:54 -080028import java.util.concurrent.ExecutorService;
29import java.util.concurrent.Executors;
Madan Jampani84b6b402015-02-25 17:49:54 -080030import java.util.regex.Matcher;
31import java.util.regex.Pattern;
32import java.util.stream.Collectors;
33
34import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Deactivate;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
40import org.onlab.util.KryoNamespace;
41import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.Leadership;
43import org.onosproject.cluster.LeadershipEvent;
44import org.onosproject.cluster.LeadershipEventListener;
45import org.onosproject.cluster.LeadershipService;
46import org.onosproject.cluster.NodeId;
47import org.onosproject.cluster.RoleInfo;
48import org.onosproject.mastership.MastershipEvent;
49import org.onosproject.mastership.MastershipStore;
50import org.onosproject.mastership.MastershipStoreDelegate;
51import org.onosproject.mastership.MastershipTerm;
52import org.onosproject.net.DeviceId;
53import org.onosproject.net.MastershipRole;
54import org.onosproject.store.AbstractStore;
55import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani84b6b402015-02-25 17:49:54 -080056import org.onosproject.store.cluster.messaging.MessageSubject;
57import org.onosproject.store.serializers.KryoNamespaces;
58import org.onosproject.store.serializers.KryoSerializer;
59import org.onosproject.store.serializers.StoreSerializer;
60import org.slf4j.Logger;
61
62import com.google.common.base.Objects;
63import com.google.common.collect.Lists;
64import com.google.common.collect.Maps;
65import com.google.common.collect.Sets;
66
67/**
68 * Implementation of the MastershipStore on top of Leadership Service.
69 */
Madan Jampani5756c352015-04-29 00:23:58 -070070@Component(immediate = true, enabled = true)
Madan Jampani84b6b402015-02-25 17:49:54 -080071@Service
72public class ConsistentDeviceMastershipStore
73 extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
74 implements MastershipStore {
75
76 private final Logger log = getLogger(getClass());
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected LeadershipService leadershipService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected ClusterService clusterService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected ClusterCommunicationService clusterCommunicator;
86
87 private NodeId localNodeId;
88 private final Set<DeviceId> connectedDevices = Sets.newHashSet();
89
90 private static final MessageSubject ROLE_QUERY_SUBJECT =
91 new MessageSubject("mastership-store-device-role-query");
92 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
93 new MessageSubject("mastership-store-device-role-relinquish");
Madan Jampani9bd1f152015-04-30 23:33:35 -070094 private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
Madan Jampani1af8e132015-04-30 16:41:18 -070095 new MessageSubject("mastership-store-device-mastership-relinquish");
Madan Jampani84b6b402015-02-25 17:49:54 -080096
97 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Madan Jampani5756c352015-04-29 00:23:58 -070098 Pattern.compile("device:(.*)");
Madan Jampani84b6b402015-02-25 17:49:54 -080099
Madan Jampani84b6b402015-02-25 17:49:54 -0800100 private ExecutorService messageHandlingExecutor;
101 private final LeadershipEventListener leadershipEventListener =
102 new InternalDeviceMastershipEventListener();
103
104 private static final String NODE_ID_NULL = "Node ID cannot be null";
105 private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
106
107 public static final StoreSerializer SERIALIZER = new KryoSerializer() {
108 @Override
109 protected void setupKryoPool() {
110 serializerPool = KryoNamespace.newBuilder()
111 .register(KryoNamespaces.API)
112 .register(MastershipRole.class)
113 .register(MastershipEvent.class)
Madan Jampani1af8e132015-04-30 16:41:18 -0700114 .register(MastershipEvent.Type.class)
Madan Jampani84b6b402015-02-25 17:49:54 -0800115 .build();
116 }
117 };
118
119 @Activate
120 public void activate() {
121 messageHandlingExecutor =
122 Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
Madan Jampanid46e18f2015-05-04 23:19:33 -0700123 clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
124 SERIALIZER::decode,
125 deviceId -> getRole(localNodeId, deviceId),
126 SERIALIZER::encode,
Madan Jampani84b6b402015-02-25 17:49:54 -0800127 messageHandlingExecutor);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700128 clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
129 SERIALIZER::decode,
130 deviceId -> relinquishRole(localNodeId, deviceId),
131 SERIALIZER::encode,
132 messageHandlingExecutor);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700133 clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
Madan Jampani1af8e132015-04-30 16:41:18 -0700134 SERIALIZER::decode,
Madan Jampani9bd1f152015-04-30 23:33:35 -0700135 this::transitionFromMasterToStandby,
Madan Jampani1af8e132015-04-30 16:41:18 -0700136 SERIALIZER::encode,
137 messageHandlingExecutor);
Madan Jampani84b6b402015-02-25 17:49:54 -0800138 localNodeId = clusterService.getLocalNode().id();
139 leadershipService.addListener(leadershipEventListener);
140
141 log.info("Started.");
142 }
143
144 @Deactivate
145 public void deactivate() {
146 clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
147 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700148 clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
Madan Jampani84b6b402015-02-25 17:49:54 -0800149 messageHandlingExecutor.shutdown();
150 leadershipService.removeListener(leadershipEventListener);
151
152 log.info("Stoppped.");
153 }
154
155 @Override
156 public MastershipRole requestRole(DeviceId deviceId) {
157 checkArgument(deviceId != null, DEVICE_ID_NULL);
158
159 String leadershipTopic = createDeviceMastershipTopic(deviceId);
160 if (connectedDevices.add(deviceId)) {
161 leadershipService.runForLeadership(leadershipTopic);
162 return MastershipRole.STANDBY;
163 } else {
164 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
165 if (leadership != null && leadership.leader().equals(localNodeId)) {
166 return MastershipRole.MASTER;
167 } else {
168 return MastershipRole.STANDBY;
169 }
170 }
171 }
172
173 @Override
174 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
175 checkArgument(nodeId != null, NODE_ID_NULL);
176 checkArgument(deviceId != null, DEVICE_ID_NULL);
177
178 String leadershipTopic = createDeviceMastershipTopic(deviceId);
179 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
180 if (leadership != null && nodeId.equals(leadership.leader())) {
181 return MastershipRole.MASTER;
182 }
183
184 if (localNodeId.equals(nodeId)) {
185 if (connectedDevices.contains(deviceId)) {
186 return MastershipRole.STANDBY;
187 } else {
188 return MastershipRole.NONE;
189 }
Madan Jampani84b6b402015-02-25 17:49:54 -0800190 }
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700191 MastershipRole role = futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700192 deviceId,
193 ROLE_QUERY_SUBJECT,
194 SERIALIZER::encode,
195 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700196 nodeId), null);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700197 return role == null ? MastershipRole.NONE : role;
Madan Jampani84b6b402015-02-25 17:49:54 -0800198 }
199
200 @Override
201 public NodeId getMaster(DeviceId deviceId) {
202 checkArgument(deviceId != null, DEVICE_ID_NULL);
203
204 String leadershipTopic = createDeviceMastershipTopic(deviceId);
205 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
206 return leadership != null ? leadership.leader() : null;
207 }
208
209 @Override
210 public RoleInfo getNodes(DeviceId deviceId) {
211 checkArgument(deviceId != null, DEVICE_ID_NULL);
212
213 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
214 clusterService
215 .getNodes()
Madan Jampani84b6b402015-02-25 17:49:54 -0800216 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
217
218 NodeId master = null;
219 final List<NodeId> standbys = Lists.newLinkedList();
220
221 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
222 if (entry.getValue() == MastershipRole.MASTER) {
223 master = entry.getKey();
224 } else if (entry.getValue() == MastershipRole.STANDBY) {
225 standbys.add(entry.getKey());
226 }
227 }
228
229 return new RoleInfo(master, standbys);
230 }
231
232 @Override
233 public Set<DeviceId> getDevices(NodeId nodeId) {
234 checkArgument(nodeId != null, NODE_ID_NULL);
235
236 return leadershipService
237 .ownedTopics(nodeId)
238 .stream()
239 .filter(this::isDeviceMastershipTopic)
240 .map(this::extractDeviceIdFromTopic)
241 .collect(Collectors.toSet());
242 }
243
244 @Override
245 public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
246 checkArgument(nodeId != null, NODE_ID_NULL);
247 checkArgument(deviceId != null, DEVICE_ID_NULL);
248
Madan Jampani1af8e132015-04-30 16:41:18 -0700249 NodeId currentMaster = getMaster(deviceId);
250 if (nodeId.equals(currentMaster)) {
251 return null;
252 } else {
253 String leadershipTopic = createDeviceMastershipTopic(deviceId);
254 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
255 if (candidates.isEmpty()) {
256 return null;
257 }
258 if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
Madan Jampani9bd1f152015-04-30 23:33:35 -0700259 return transitionFromMasterToStandby(deviceId);
Madan Jampani1af8e132015-04-30 16:41:18 -0700260 } else {
261 log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
262 }
263 }
264 return null;
Madan Jampani84b6b402015-02-25 17:49:54 -0800265 }
266
267 @Override
268 public MastershipTerm getTermFor(DeviceId deviceId) {
269 checkArgument(deviceId != null, DEVICE_ID_NULL);
270
271 String leadershipTopic = createDeviceMastershipTopic(deviceId);
272 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
273 return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
274 }
275
276 @Override
277 public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
278 checkArgument(nodeId != null, NODE_ID_NULL);
279 checkArgument(deviceId != null, DEVICE_ID_NULL);
280
Madan Jampani1af8e132015-04-30 16:41:18 -0700281 NodeId currentMaster = getMaster(deviceId);
282 if (!nodeId.equals(currentMaster)) {
283 return null;
284 }
Madan Jampanid46e18f2015-05-04 23:19:33 -0700285
286 String leadershipTopic = createDeviceMastershipTopic(deviceId);
287 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
288
289 NodeId newMaster = candidates.stream()
290 .filter(candidate -> !Objects.equal(nodeId, candidate))
291 .findFirst()
292 .orElse(null);
293 log.info("Transitioning to role {} for {}. Next master: {}",
294 newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster);
295
296 if (newMaster != null) {
297 return setMaster(newMaster, deviceId);
298 }
299 return relinquishRole(nodeId, deviceId);
Madan Jampani84b6b402015-02-25 17:49:54 -0800300 }
301
302 @Override
303 public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
304 checkArgument(nodeId != null, NODE_ID_NULL);
305 checkArgument(deviceId != null, DEVICE_ID_NULL);
306
307 if (!nodeId.equals(localNodeId)) {
308 log.debug("Forwarding request to relinquish "
309 + "role for device {} to {}", deviceId, nodeId);
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700310 return futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700311 deviceId,
312 ROLE_RELINQUISH_SUBJECT,
313 SERIALIZER::encode,
314 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700315 nodeId), null);
Madan Jampani84b6b402015-02-25 17:49:54 -0800316 }
317
318 // Check if this node is can be managed by this node.
319 if (!connectedDevices.contains(deviceId)) {
320 return null;
321 }
322
323 String leadershipTopic = createDeviceMastershipTopic(deviceId);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700324 NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
Madan Jampani84b6b402015-02-25 17:49:54 -0800325
Madan Jampani9bd1f152015-04-30 23:33:35 -0700326 MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
327 ? MastershipEvent.Type.MASTER_CHANGED
328 : MastershipEvent.Type.BACKUPS_CHANGED;
Madan Jampani84b6b402015-02-25 17:49:54 -0800329
330 connectedDevices.remove(deviceId);
331 leadershipService.withdraw(leadershipTopic);
332
333 return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
334 }
335
Madan Jampani9bd1f152015-04-30 23:33:35 -0700336 private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700337 checkArgument(deviceId != null, DEVICE_ID_NULL);
338
339 NodeId currentMaster = getMaster(deviceId);
340 if (currentMaster == null) {
341 return null;
342 }
343
344 if (!currentMaster.equals(localNodeId)) {
345 log.info("Forwarding request to relinquish "
346 + "mastership for device {} to {}", deviceId, currentMaster);
347 return futureGetOrElse(clusterCommunicator.sendAndReceive(
348 deviceId,
Madan Jampani9bd1f152015-04-30 23:33:35 -0700349 TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
Madan Jampani1af8e132015-04-30 16:41:18 -0700350 SERIALIZER::encode,
351 SERIALIZER::decode,
352 currentMaster), null);
353 }
354
Madan Jampani9bd1f152015-04-30 23:33:35 -0700355 return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
356 ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
Madan Jampani1af8e132015-04-30 16:41:18 -0700357 }
358
Madan Jampani84b6b402015-02-25 17:49:54 -0800359 @Override
360 public void relinquishAllRole(NodeId nodeId) {
361 // Noop. LeadershipService already takes care of detecting and purging deadlocks.
362 }
363
Madan Jampani84b6b402015-02-25 17:49:54 -0800364 private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
365 @Override
366 public void event(LeadershipEvent event) {
367 Leadership leadership = event.subject();
368 if (!isDeviceMastershipTopic(leadership.topic())) {
369 return;
370 }
371 NodeId nodeId = leadership.leader();
372 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
373 if (Objects.equal(nodeId, localNodeId) && connectedDevices.contains(deviceId)) {
374 switch (event.type()) {
375 case LEADER_ELECTED:
376 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
377 break;
378 case LEADER_REELECTED:
379 // There is no concept of leader re-election in the new distributed leadership manager.
380 throw new IllegalStateException("Unexpected event type");
381 case LEADER_BOOTED:
382 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
383 break;
384 default:
385 return;
386 }
387 }
388 }
389 }
390
391 private String createDeviceMastershipTopic(DeviceId deviceId) {
Madan Jampani5756c352015-04-29 00:23:58 -0700392 return String.format("device:%s", deviceId.toString());
Madan Jampani84b6b402015-02-25 17:49:54 -0800393 }
394
395 private DeviceId extractDeviceIdFromTopic(String topic) {
396 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
397 if (m.matches()) {
398 return DeviceId.deviceId(m.group(1));
399 } else {
400 throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
401 }
402 }
403
404 private boolean isDeviceMastershipTopic(String topic) {
405 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
406 return m.matches();
407 }
408
Madan Jampanic26eede2015-04-16 11:42:16 -0700409}