blob: bc31e0896a4991ce17c426b90659d6e04ed7d289 [file] [log] [blame]
Madan Jampani84b6b402015-02-25 17:49:54 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.mastership.impl;
17
18import static org.onlab.util.Tools.groupedThreads;
Ayaka Koshibe941f8602015-04-15 14:17:08 -070019import static org.onlab.util.Tools.futureGetOrElse;
Madan Jampani84b6b402015-02-25 17:49:54 -080020import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
21import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
22import static org.slf4j.LoggerFactory.getLogger;
23import static com.google.common.base.Preconditions.checkArgument;
24
Madan Jampani84b6b402015-02-25 17:49:54 -080025import java.util.List;
26import java.util.Map;
27import java.util.Set;
Madan Jampani84b6b402015-02-25 17:49:54 -080028import java.util.concurrent.ExecutorService;
29import java.util.concurrent.Executors;
Madan Jampani84b6b402015-02-25 17:49:54 -080030import java.util.regex.Matcher;
31import java.util.regex.Pattern;
32import java.util.stream.Collectors;
33
34import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Deactivate;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
40import org.onlab.util.KryoNamespace;
41import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.Leadership;
43import org.onosproject.cluster.LeadershipEvent;
44import org.onosproject.cluster.LeadershipEventListener;
45import org.onosproject.cluster.LeadershipService;
46import org.onosproject.cluster.NodeId;
47import org.onosproject.cluster.RoleInfo;
48import org.onosproject.mastership.MastershipEvent;
49import org.onosproject.mastership.MastershipStore;
50import org.onosproject.mastership.MastershipStoreDelegate;
51import org.onosproject.mastership.MastershipTerm;
52import org.onosproject.net.DeviceId;
53import org.onosproject.net.MastershipRole;
54import org.onosproject.store.AbstractStore;
55import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
56import org.onosproject.store.cluster.messaging.ClusterMessage;
57import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
58import org.onosproject.store.cluster.messaging.MessageSubject;
59import org.onosproject.store.serializers.KryoNamespaces;
60import org.onosproject.store.serializers.KryoSerializer;
61import org.onosproject.store.serializers.StoreSerializer;
62import org.slf4j.Logger;
63
64import com.google.common.base.Objects;
65import com.google.common.collect.Lists;
66import com.google.common.collect.Maps;
67import com.google.common.collect.Sets;
68
69/**
70 * Implementation of the MastershipStore on top of Leadership Service.
71 */
72@Component(immediate = true, enabled = false)
73@Service
74public class ConsistentDeviceMastershipStore
75 extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
76 implements MastershipStore {
77
78 private final Logger log = getLogger(getClass());
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected LeadershipService leadershipService;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected ClusterService clusterService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87 protected ClusterCommunicationService clusterCommunicator;
88
89 private NodeId localNodeId;
90 private final Set<DeviceId> connectedDevices = Sets.newHashSet();
91
92 private static final MessageSubject ROLE_QUERY_SUBJECT =
93 new MessageSubject("mastership-store-device-role-query");
94 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
95 new MessageSubject("mastership-store-device-role-relinquish");
96
97 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
98 Pattern.compile("/devices/(.*)/mastership");
99
100 private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
101 private ExecutorService messageHandlingExecutor;
102 private final LeadershipEventListener leadershipEventListener =
103 new InternalDeviceMastershipEventListener();
104
105 private static final String NODE_ID_NULL = "Node ID cannot be null";
106 private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
107
108 public static final StoreSerializer SERIALIZER = new KryoSerializer() {
109 @Override
110 protected void setupKryoPool() {
111 serializerPool = KryoNamespace.newBuilder()
112 .register(KryoNamespaces.API)
113 .register(MastershipRole.class)
114 .register(MastershipEvent.class)
115 .build();
116 }
117 };
118
119 @Activate
120 public void activate() {
121 messageHandlingExecutor =
122 Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
123 clusterCommunicator.addSubscriber(ROLE_QUERY_SUBJECT,
124 new RoleQueryHandler(),
125 messageHandlingExecutor);
126 clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
127 new RoleRelinquishHandler(),
128 messageHandlingExecutor);
129 localNodeId = clusterService.getLocalNode().id();
130 leadershipService.addListener(leadershipEventListener);
131
132 log.info("Started.");
133 }
134
135 @Deactivate
136 public void deactivate() {
137 clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
138 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
139 messageHandlingExecutor.shutdown();
140 leadershipService.removeListener(leadershipEventListener);
141
142 log.info("Stoppped.");
143 }
144
145 @Override
146 public MastershipRole requestRole(DeviceId deviceId) {
147 checkArgument(deviceId != null, DEVICE_ID_NULL);
148
149 String leadershipTopic = createDeviceMastershipTopic(deviceId);
150 if (connectedDevices.add(deviceId)) {
151 leadershipService.runForLeadership(leadershipTopic);
152 return MastershipRole.STANDBY;
153 } else {
154 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
155 if (leadership != null && leadership.leader().equals(localNodeId)) {
156 return MastershipRole.MASTER;
157 } else {
158 return MastershipRole.STANDBY;
159 }
160 }
161 }
162
163 @Override
164 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
165 checkArgument(nodeId != null, NODE_ID_NULL);
166 checkArgument(deviceId != null, DEVICE_ID_NULL);
167
168 String leadershipTopic = createDeviceMastershipTopic(deviceId);
169 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
170 if (leadership != null && nodeId.equals(leadership.leader())) {
171 return MastershipRole.MASTER;
172 }
173
174 if (localNodeId.equals(nodeId)) {
175 if (connectedDevices.contains(deviceId)) {
176 return MastershipRole.STANDBY;
177 } else {
178 return MastershipRole.NONE;
179 }
Madan Jampani84b6b402015-02-25 17:49:54 -0800180 }
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700181 MastershipRole role = futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700182 deviceId,
183 ROLE_QUERY_SUBJECT,
184 SERIALIZER::encode,
185 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700186 nodeId), null);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700187 return role == null ? MastershipRole.NONE : role;
Madan Jampani84b6b402015-02-25 17:49:54 -0800188 }
189
190 @Override
191 public NodeId getMaster(DeviceId deviceId) {
192 checkArgument(deviceId != null, DEVICE_ID_NULL);
193
194 String leadershipTopic = createDeviceMastershipTopic(deviceId);
195 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
196 return leadership != null ? leadership.leader() : null;
197 }
198
199 @Override
200 public RoleInfo getNodes(DeviceId deviceId) {
201 checkArgument(deviceId != null, DEVICE_ID_NULL);
202
203 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
204 clusterService
205 .getNodes()
206 .stream()
207 .parallel()
208 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
209
210 NodeId master = null;
211 final List<NodeId> standbys = Lists.newLinkedList();
212
213 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
214 if (entry.getValue() == MastershipRole.MASTER) {
215 master = entry.getKey();
216 } else if (entry.getValue() == MastershipRole.STANDBY) {
217 standbys.add(entry.getKey());
218 }
219 }
220
221 return new RoleInfo(master, standbys);
222 }
223
224 @Override
225 public Set<DeviceId> getDevices(NodeId nodeId) {
226 checkArgument(nodeId != null, NODE_ID_NULL);
227
228 return leadershipService
229 .ownedTopics(nodeId)
230 .stream()
231 .filter(this::isDeviceMastershipTopic)
232 .map(this::extractDeviceIdFromTopic)
233 .collect(Collectors.toSet());
234 }
235
236 @Override
237 public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
238 checkArgument(nodeId != null, NODE_ID_NULL);
239 checkArgument(deviceId != null, DEVICE_ID_NULL);
240
241 throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
242 }
243
244 @Override
245 public MastershipTerm getTermFor(DeviceId deviceId) {
246 checkArgument(deviceId != null, DEVICE_ID_NULL);
247
248 String leadershipTopic = createDeviceMastershipTopic(deviceId);
249 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
250 return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
251 }
252
253 @Override
254 public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
255 checkArgument(nodeId != null, NODE_ID_NULL);
256 checkArgument(deviceId != null, DEVICE_ID_NULL);
257
258 throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
259 }
260
261 @Override
262 public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
263 checkArgument(nodeId != null, NODE_ID_NULL);
264 checkArgument(deviceId != null, DEVICE_ID_NULL);
265
266 if (!nodeId.equals(localNodeId)) {
267 log.debug("Forwarding request to relinquish "
268 + "role for device {} to {}", deviceId, nodeId);
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700269 return futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700270 deviceId,
271 ROLE_RELINQUISH_SUBJECT,
272 SERIALIZER::encode,
273 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700274 nodeId), null);
Madan Jampani84b6b402015-02-25 17:49:54 -0800275 }
276
277 // Check if this node is can be managed by this node.
278 if (!connectedDevices.contains(deviceId)) {
279 return null;
280 }
281
282 String leadershipTopic = createDeviceMastershipTopic(deviceId);
283 Leadership currentLeadership = leadershipService.getLeadership(leadershipTopic);
284
285 MastershipEvent.Type eventType = null;
286 if (currentLeadership != null && currentLeadership.leader().equals(localNodeId)) {
287 eventType = MastershipEvent.Type.MASTER_CHANGED;
288 } else {
289 eventType = MastershipEvent.Type.BACKUPS_CHANGED;
290 }
291
292 connectedDevices.remove(deviceId);
293 leadershipService.withdraw(leadershipTopic);
294
295 return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
296 }
297
298 private class RoleQueryHandler implements ClusterMessageHandler {
299 @Override
300 public void handle(ClusterMessage message) {
301 DeviceId deviceId = SERIALIZER.decode(message.payload());
Madan Jampanic26eede2015-04-16 11:42:16 -0700302 message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
Madan Jampani84b6b402015-02-25 17:49:54 -0800303 }
304 }
305
306
307 @Override
308 public void relinquishAllRole(NodeId nodeId) {
309 // Noop. LeadershipService already takes care of detecting and purging deadlocks.
310 }
311
312 private class RoleRelinquishHandler implements ClusterMessageHandler {
313 @Override
314 public void handle(ClusterMessage message) {
315 DeviceId deviceId = SERIALIZER.decode(message.payload());
Madan Jampanic26eede2015-04-16 11:42:16 -0700316 message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
Madan Jampani84b6b402015-02-25 17:49:54 -0800317 }
318 }
319
320 private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
321 @Override
322 public void event(LeadershipEvent event) {
323 Leadership leadership = event.subject();
324 if (!isDeviceMastershipTopic(leadership.topic())) {
325 return;
326 }
327 NodeId nodeId = leadership.leader();
328 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
329 if (Objects.equal(nodeId, localNodeId) && connectedDevices.contains(deviceId)) {
330 switch (event.type()) {
331 case LEADER_ELECTED:
332 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
333 break;
334 case LEADER_REELECTED:
335 // There is no concept of leader re-election in the new distributed leadership manager.
336 throw new IllegalStateException("Unexpected event type");
337 case LEADER_BOOTED:
338 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
339 break;
340 default:
341 return;
342 }
343 }
344 }
345 }
346
347 private String createDeviceMastershipTopic(DeviceId deviceId) {
348 return "/devices/" + deviceId.toString() + "/mastership";
349 }
350
351 private DeviceId extractDeviceIdFromTopic(String topic) {
352 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
353 if (m.matches()) {
354 return DeviceId.deviceId(m.group(1));
355 } else {
356 throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
357 }
358 }
359
360 private boolean isDeviceMastershipTopic(String topic) {
361 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
362 return m.matches();
363 }
364
Madan Jampanic26eede2015-04-16 11:42:16 -0700365}