blob: 5606c6004fc1ffbce1b08df94caf04ffc74e01b1 [file] [log] [blame]
Madan Jampani84b6b402015-02-25 17:49:54 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.mastership.impl;
17
18import static org.onlab.util.Tools.groupedThreads;
Ayaka Koshibe941f8602015-04-15 14:17:08 -070019import static org.onlab.util.Tools.futureGetOrElse;
Madan Jampani84b6b402015-02-25 17:49:54 -080020import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
21import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
22import static org.slf4j.LoggerFactory.getLogger;
23import static com.google.common.base.Preconditions.checkArgument;
24
Madan Jampani84b6b402015-02-25 17:49:54 -080025import java.util.List;
26import java.util.Map;
27import java.util.Set;
Madan Jampani84b6b402015-02-25 17:49:54 -080028import java.util.concurrent.ExecutorService;
29import java.util.concurrent.Executors;
Madan Jampani84b6b402015-02-25 17:49:54 -080030import java.util.regex.Matcher;
31import java.util.regex.Pattern;
32import java.util.stream.Collectors;
33
34import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Deactivate;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
40import org.onlab.util.KryoNamespace;
41import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.Leadership;
43import org.onosproject.cluster.LeadershipEvent;
44import org.onosproject.cluster.LeadershipEventListener;
45import org.onosproject.cluster.LeadershipService;
46import org.onosproject.cluster.NodeId;
47import org.onosproject.cluster.RoleInfo;
48import org.onosproject.mastership.MastershipEvent;
49import org.onosproject.mastership.MastershipStore;
50import org.onosproject.mastership.MastershipStoreDelegate;
51import org.onosproject.mastership.MastershipTerm;
52import org.onosproject.net.DeviceId;
53import org.onosproject.net.MastershipRole;
54import org.onosproject.store.AbstractStore;
55import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
56import org.onosproject.store.cluster.messaging.ClusterMessage;
57import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
58import org.onosproject.store.cluster.messaging.MessageSubject;
59import org.onosproject.store.serializers.KryoNamespaces;
60import org.onosproject.store.serializers.KryoSerializer;
61import org.onosproject.store.serializers.StoreSerializer;
62import org.slf4j.Logger;
63
64import com.google.common.base.Objects;
65import com.google.common.collect.Lists;
66import com.google.common.collect.Maps;
67import com.google.common.collect.Sets;
68
69/**
70 * Implementation of the MastershipStore on top of Leadership Service.
71 */
Madan Jampani5756c352015-04-29 00:23:58 -070072@Component(immediate = true, enabled = true)
Madan Jampani84b6b402015-02-25 17:49:54 -080073@Service
74public class ConsistentDeviceMastershipStore
75 extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
76 implements MastershipStore {
77
78 private final Logger log = getLogger(getClass());
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected LeadershipService leadershipService;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected ClusterService clusterService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87 protected ClusterCommunicationService clusterCommunicator;
88
89 private NodeId localNodeId;
90 private final Set<DeviceId> connectedDevices = Sets.newHashSet();
91
92 private static final MessageSubject ROLE_QUERY_SUBJECT =
93 new MessageSubject("mastership-store-device-role-query");
94 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
95 new MessageSubject("mastership-store-device-role-relinquish");
96
97 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Madan Jampani5756c352015-04-29 00:23:58 -070098 Pattern.compile("device:(.*)");
Madan Jampani84b6b402015-02-25 17:49:54 -080099
Madan Jampani84b6b402015-02-25 17:49:54 -0800100 private ExecutorService messageHandlingExecutor;
101 private final LeadershipEventListener leadershipEventListener =
102 new InternalDeviceMastershipEventListener();
103
104 private static final String NODE_ID_NULL = "Node ID cannot be null";
105 private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
106
107 public static final StoreSerializer SERIALIZER = new KryoSerializer() {
108 @Override
109 protected void setupKryoPool() {
110 serializerPool = KryoNamespace.newBuilder()
111 .register(KryoNamespaces.API)
112 .register(MastershipRole.class)
113 .register(MastershipEvent.class)
114 .build();
115 }
116 };
117
118 @Activate
119 public void activate() {
120 messageHandlingExecutor =
121 Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
122 clusterCommunicator.addSubscriber(ROLE_QUERY_SUBJECT,
123 new RoleQueryHandler(),
124 messageHandlingExecutor);
125 clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
126 new RoleRelinquishHandler(),
127 messageHandlingExecutor);
128 localNodeId = clusterService.getLocalNode().id();
129 leadershipService.addListener(leadershipEventListener);
130
131 log.info("Started.");
132 }
133
134 @Deactivate
135 public void deactivate() {
136 clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
137 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
138 messageHandlingExecutor.shutdown();
139 leadershipService.removeListener(leadershipEventListener);
140
141 log.info("Stoppped.");
142 }
143
144 @Override
145 public MastershipRole requestRole(DeviceId deviceId) {
146 checkArgument(deviceId != null, DEVICE_ID_NULL);
147
148 String leadershipTopic = createDeviceMastershipTopic(deviceId);
149 if (connectedDevices.add(deviceId)) {
150 leadershipService.runForLeadership(leadershipTopic);
151 return MastershipRole.STANDBY;
152 } else {
153 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
154 if (leadership != null && leadership.leader().equals(localNodeId)) {
155 return MastershipRole.MASTER;
156 } else {
157 return MastershipRole.STANDBY;
158 }
159 }
160 }
161
162 @Override
163 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
164 checkArgument(nodeId != null, NODE_ID_NULL);
165 checkArgument(deviceId != null, DEVICE_ID_NULL);
166
167 String leadershipTopic = createDeviceMastershipTopic(deviceId);
168 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
169 if (leadership != null && nodeId.equals(leadership.leader())) {
170 return MastershipRole.MASTER;
171 }
172
173 if (localNodeId.equals(nodeId)) {
174 if (connectedDevices.contains(deviceId)) {
175 return MastershipRole.STANDBY;
176 } else {
177 return MastershipRole.NONE;
178 }
Madan Jampani84b6b402015-02-25 17:49:54 -0800179 }
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700180 MastershipRole role = futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700181 deviceId,
182 ROLE_QUERY_SUBJECT,
183 SERIALIZER::encode,
184 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700185 nodeId), null);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700186 return role == null ? MastershipRole.NONE : role;
Madan Jampani84b6b402015-02-25 17:49:54 -0800187 }
188
189 @Override
190 public NodeId getMaster(DeviceId deviceId) {
191 checkArgument(deviceId != null, DEVICE_ID_NULL);
192
193 String leadershipTopic = createDeviceMastershipTopic(deviceId);
194 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
195 return leadership != null ? leadership.leader() : null;
196 }
197
198 @Override
199 public RoleInfo getNodes(DeviceId deviceId) {
200 checkArgument(deviceId != null, DEVICE_ID_NULL);
201
202 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
203 clusterService
204 .getNodes()
205 .stream()
206 .parallel()
207 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
208
209 NodeId master = null;
210 final List<NodeId> standbys = Lists.newLinkedList();
211
212 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
213 if (entry.getValue() == MastershipRole.MASTER) {
214 master = entry.getKey();
215 } else if (entry.getValue() == MastershipRole.STANDBY) {
216 standbys.add(entry.getKey());
217 }
218 }
219
220 return new RoleInfo(master, standbys);
221 }
222
223 @Override
224 public Set<DeviceId> getDevices(NodeId nodeId) {
225 checkArgument(nodeId != null, NODE_ID_NULL);
226
227 return leadershipService
228 .ownedTopics(nodeId)
229 .stream()
230 .filter(this::isDeviceMastershipTopic)
231 .map(this::extractDeviceIdFromTopic)
232 .collect(Collectors.toSet());
233 }
234
235 @Override
236 public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
237 checkArgument(nodeId != null, NODE_ID_NULL);
238 checkArgument(deviceId != null, DEVICE_ID_NULL);
239
240 throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
241 }
242
243 @Override
244 public MastershipTerm getTermFor(DeviceId deviceId) {
245 checkArgument(deviceId != null, DEVICE_ID_NULL);
246
247 String leadershipTopic = createDeviceMastershipTopic(deviceId);
248 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
249 return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
250 }
251
252 @Override
253 public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
254 checkArgument(nodeId != null, NODE_ID_NULL);
255 checkArgument(deviceId != null, DEVICE_ID_NULL);
256
257 throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
258 }
259
260 @Override
261 public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
262 checkArgument(nodeId != null, NODE_ID_NULL);
263 checkArgument(deviceId != null, DEVICE_ID_NULL);
264
265 if (!nodeId.equals(localNodeId)) {
266 log.debug("Forwarding request to relinquish "
267 + "role for device {} to {}", deviceId, nodeId);
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700268 return futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700269 deviceId,
270 ROLE_RELINQUISH_SUBJECT,
271 SERIALIZER::encode,
272 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700273 nodeId), null);
Madan Jampani84b6b402015-02-25 17:49:54 -0800274 }
275
276 // Check if this node is can be managed by this node.
277 if (!connectedDevices.contains(deviceId)) {
278 return null;
279 }
280
281 String leadershipTopic = createDeviceMastershipTopic(deviceId);
282 Leadership currentLeadership = leadershipService.getLeadership(leadershipTopic);
283
284 MastershipEvent.Type eventType = null;
285 if (currentLeadership != null && currentLeadership.leader().equals(localNodeId)) {
286 eventType = MastershipEvent.Type.MASTER_CHANGED;
287 } else {
288 eventType = MastershipEvent.Type.BACKUPS_CHANGED;
289 }
290
291 connectedDevices.remove(deviceId);
292 leadershipService.withdraw(leadershipTopic);
293
294 return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
295 }
296
297 private class RoleQueryHandler implements ClusterMessageHandler {
298 @Override
299 public void handle(ClusterMessage message) {
300 DeviceId deviceId = SERIALIZER.decode(message.payload());
Madan Jampanic26eede2015-04-16 11:42:16 -0700301 message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
Madan Jampani84b6b402015-02-25 17:49:54 -0800302 }
303 }
304
305
306 @Override
307 public void relinquishAllRole(NodeId nodeId) {
308 // Noop. LeadershipService already takes care of detecting and purging deadlocks.
309 }
310
311 private class RoleRelinquishHandler implements ClusterMessageHandler {
312 @Override
313 public void handle(ClusterMessage message) {
314 DeviceId deviceId = SERIALIZER.decode(message.payload());
Madan Jampanic26eede2015-04-16 11:42:16 -0700315 message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
Madan Jampani84b6b402015-02-25 17:49:54 -0800316 }
317 }
318
319 private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
320 @Override
321 public void event(LeadershipEvent event) {
322 Leadership leadership = event.subject();
323 if (!isDeviceMastershipTopic(leadership.topic())) {
324 return;
325 }
326 NodeId nodeId = leadership.leader();
327 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
328 if (Objects.equal(nodeId, localNodeId) && connectedDevices.contains(deviceId)) {
329 switch (event.type()) {
330 case LEADER_ELECTED:
331 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
332 break;
333 case LEADER_REELECTED:
334 // There is no concept of leader re-election in the new distributed leadership manager.
335 throw new IllegalStateException("Unexpected event type");
336 case LEADER_BOOTED:
337 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
338 break;
339 default:
340 return;
341 }
342 }
343 }
344 }
345
346 private String createDeviceMastershipTopic(DeviceId deviceId) {
Madan Jampani5756c352015-04-29 00:23:58 -0700347 return String.format("device:%s", deviceId.toString());
Madan Jampani84b6b402015-02-25 17:49:54 -0800348 }
349
350 private DeviceId extractDeviceIdFromTopic(String topic) {
351 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
352 if (m.matches()) {
353 return DeviceId.deviceId(m.group(1));
354 } else {
355 throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
356 }
357 }
358
359 private boolean isDeviceMastershipTopic(String topic) {
360 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
361 return m.matches();
362 }
363
Madan Jampanic26eede2015-04-16 11:42:16 -0700364}