blob: 27a33b3b6b4c52590834da844fee1d9ad58b8855 [file] [log] [blame]
Madan Jampani84b6b402015-02-25 17:49:54 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.mastership.impl;
17
18import static org.onlab.util.Tools.groupedThreads;
Ayaka Koshibe941f8602015-04-15 14:17:08 -070019import static org.onlab.util.Tools.futureGetOrElse;
Madan Jampani84b6b402015-02-25 17:49:54 -080020import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
21import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
22import static org.slf4j.LoggerFactory.getLogger;
23import static com.google.common.base.Preconditions.checkArgument;
24
25import java.io.IOException;
26import java.util.List;
27import java.util.Map;
28import java.util.Set;
Madan Jampani84b6b402015-02-25 17:49:54 -080029import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
Madan Jampani84b6b402015-02-25 17:49:54 -080031import java.util.regex.Matcher;
32import java.util.regex.Pattern;
33import java.util.stream.Collectors;
34
35import org.apache.felix.scr.annotations.Activate;
36import org.apache.felix.scr.annotations.Component;
37import org.apache.felix.scr.annotations.Deactivate;
38import org.apache.felix.scr.annotations.Reference;
39import org.apache.felix.scr.annotations.ReferenceCardinality;
40import org.apache.felix.scr.annotations.Service;
41import org.onlab.util.KryoNamespace;
42import org.onosproject.cluster.ClusterService;
43import org.onosproject.cluster.Leadership;
44import org.onosproject.cluster.LeadershipEvent;
45import org.onosproject.cluster.LeadershipEventListener;
46import org.onosproject.cluster.LeadershipService;
47import org.onosproject.cluster.NodeId;
48import org.onosproject.cluster.RoleInfo;
49import org.onosproject.mastership.MastershipEvent;
50import org.onosproject.mastership.MastershipStore;
51import org.onosproject.mastership.MastershipStoreDelegate;
52import org.onosproject.mastership.MastershipTerm;
53import org.onosproject.net.DeviceId;
54import org.onosproject.net.MastershipRole;
55import org.onosproject.store.AbstractStore;
56import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
57import org.onosproject.store.cluster.messaging.ClusterMessage;
58import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
59import org.onosproject.store.cluster.messaging.MessageSubject;
60import org.onosproject.store.serializers.KryoNamespaces;
61import org.onosproject.store.serializers.KryoSerializer;
62import org.onosproject.store.serializers.StoreSerializer;
63import org.slf4j.Logger;
64
65import com.google.common.base.Objects;
66import com.google.common.collect.Lists;
67import com.google.common.collect.Maps;
68import com.google.common.collect.Sets;
69
70/**
71 * Implementation of the MastershipStore on top of Leadership Service.
72 */
73@Component(immediate = true, enabled = false)
74@Service
75public class ConsistentDeviceMastershipStore
76 extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
77 implements MastershipStore {
78
79 private final Logger log = getLogger(getClass());
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected LeadershipService leadershipService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected ClusterService clusterService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected ClusterCommunicationService clusterCommunicator;
89
90 private NodeId localNodeId;
91 private final Set<DeviceId> connectedDevices = Sets.newHashSet();
92
93 private static final MessageSubject ROLE_QUERY_SUBJECT =
94 new MessageSubject("mastership-store-device-role-query");
95 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
96 new MessageSubject("mastership-store-device-role-relinquish");
97
98 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
99 Pattern.compile("/devices/(.*)/mastership");
100
101 private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
102 private ExecutorService messageHandlingExecutor;
103 private final LeadershipEventListener leadershipEventListener =
104 new InternalDeviceMastershipEventListener();
105
106 private static final String NODE_ID_NULL = "Node ID cannot be null";
107 private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
108
109 public static final StoreSerializer SERIALIZER = new KryoSerializer() {
110 @Override
111 protected void setupKryoPool() {
112 serializerPool = KryoNamespace.newBuilder()
113 .register(KryoNamespaces.API)
114 .register(MastershipRole.class)
115 .register(MastershipEvent.class)
116 .build();
117 }
118 };
119
120 @Activate
121 public void activate() {
122 messageHandlingExecutor =
123 Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
124 clusterCommunicator.addSubscriber(ROLE_QUERY_SUBJECT,
125 new RoleQueryHandler(),
126 messageHandlingExecutor);
127 clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
128 new RoleRelinquishHandler(),
129 messageHandlingExecutor);
130 localNodeId = clusterService.getLocalNode().id();
131 leadershipService.addListener(leadershipEventListener);
132
133 log.info("Started.");
134 }
135
136 @Deactivate
137 public void deactivate() {
138 clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
139 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
140 messageHandlingExecutor.shutdown();
141 leadershipService.removeListener(leadershipEventListener);
142
143 log.info("Stoppped.");
144 }
145
146 @Override
147 public MastershipRole requestRole(DeviceId deviceId) {
148 checkArgument(deviceId != null, DEVICE_ID_NULL);
149
150 String leadershipTopic = createDeviceMastershipTopic(deviceId);
151 if (connectedDevices.add(deviceId)) {
152 leadershipService.runForLeadership(leadershipTopic);
153 return MastershipRole.STANDBY;
154 } else {
155 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
156 if (leadership != null && leadership.leader().equals(localNodeId)) {
157 return MastershipRole.MASTER;
158 } else {
159 return MastershipRole.STANDBY;
160 }
161 }
162 }
163
164 @Override
165 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
166 checkArgument(nodeId != null, NODE_ID_NULL);
167 checkArgument(deviceId != null, DEVICE_ID_NULL);
168
169 String leadershipTopic = createDeviceMastershipTopic(deviceId);
170 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
171 if (leadership != null && nodeId.equals(leadership.leader())) {
172 return MastershipRole.MASTER;
173 }
174
175 if (localNodeId.equals(nodeId)) {
176 if (connectedDevices.contains(deviceId)) {
177 return MastershipRole.STANDBY;
178 } else {
179 return MastershipRole.NONE;
180 }
Madan Jampani84b6b402015-02-25 17:49:54 -0800181 }
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700182 MastershipRole role = futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700183 deviceId,
184 ROLE_QUERY_SUBJECT,
185 SERIALIZER::encode,
186 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700187 nodeId), null);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700188 return role == null ? MastershipRole.NONE : role;
Madan Jampani84b6b402015-02-25 17:49:54 -0800189 }
190
191 @Override
192 public NodeId getMaster(DeviceId deviceId) {
193 checkArgument(deviceId != null, DEVICE_ID_NULL);
194
195 String leadershipTopic = createDeviceMastershipTopic(deviceId);
196 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
197 return leadership != null ? leadership.leader() : null;
198 }
199
200 @Override
201 public RoleInfo getNodes(DeviceId deviceId) {
202 checkArgument(deviceId != null, DEVICE_ID_NULL);
203
204 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
205 clusterService
206 .getNodes()
207 .stream()
208 .parallel()
209 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
210
211 NodeId master = null;
212 final List<NodeId> standbys = Lists.newLinkedList();
213
214 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
215 if (entry.getValue() == MastershipRole.MASTER) {
216 master = entry.getKey();
217 } else if (entry.getValue() == MastershipRole.STANDBY) {
218 standbys.add(entry.getKey());
219 }
220 }
221
222 return new RoleInfo(master, standbys);
223 }
224
225 @Override
226 public Set<DeviceId> getDevices(NodeId nodeId) {
227 checkArgument(nodeId != null, NODE_ID_NULL);
228
229 return leadershipService
230 .ownedTopics(nodeId)
231 .stream()
232 .filter(this::isDeviceMastershipTopic)
233 .map(this::extractDeviceIdFromTopic)
234 .collect(Collectors.toSet());
235 }
236
237 @Override
238 public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
239 checkArgument(nodeId != null, NODE_ID_NULL);
240 checkArgument(deviceId != null, DEVICE_ID_NULL);
241
242 throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
243 }
244
245 @Override
246 public MastershipTerm getTermFor(DeviceId deviceId) {
247 checkArgument(deviceId != null, DEVICE_ID_NULL);
248
249 String leadershipTopic = createDeviceMastershipTopic(deviceId);
250 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
251 return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
252 }
253
254 @Override
255 public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
256 checkArgument(nodeId != null, NODE_ID_NULL);
257 checkArgument(deviceId != null, DEVICE_ID_NULL);
258
259 throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
260 }
261
262 @Override
263 public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
264 checkArgument(nodeId != null, NODE_ID_NULL);
265 checkArgument(deviceId != null, DEVICE_ID_NULL);
266
267 if (!nodeId.equals(localNodeId)) {
268 log.debug("Forwarding request to relinquish "
269 + "role for device {} to {}", deviceId, nodeId);
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700270 return futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700271 deviceId,
272 ROLE_RELINQUISH_SUBJECT,
273 SERIALIZER::encode,
274 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700275 nodeId), null);
Madan Jampani84b6b402015-02-25 17:49:54 -0800276 }
277
278 // Check if this node is can be managed by this node.
279 if (!connectedDevices.contains(deviceId)) {
280 return null;
281 }
282
283 String leadershipTopic = createDeviceMastershipTopic(deviceId);
284 Leadership currentLeadership = leadershipService.getLeadership(leadershipTopic);
285
286 MastershipEvent.Type eventType = null;
287 if (currentLeadership != null && currentLeadership.leader().equals(localNodeId)) {
288 eventType = MastershipEvent.Type.MASTER_CHANGED;
289 } else {
290 eventType = MastershipEvent.Type.BACKUPS_CHANGED;
291 }
292
293 connectedDevices.remove(deviceId);
294 leadershipService.withdraw(leadershipTopic);
295
296 return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
297 }
298
299 private class RoleQueryHandler implements ClusterMessageHandler {
300 @Override
301 public void handle(ClusterMessage message) {
302 DeviceId deviceId = SERIALIZER.decode(message.payload());
303 try {
304 message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
305 } catch (IOException e) {
306 log.error("Failed to responsd to role query", e);
307 }
308 }
309 }
310
311
312 @Override
313 public void relinquishAllRole(NodeId nodeId) {
314 // Noop. LeadershipService already takes care of detecting and purging deadlocks.
315 }
316
317 private class RoleRelinquishHandler implements ClusterMessageHandler {
318 @Override
319 public void handle(ClusterMessage message) {
320 DeviceId deviceId = SERIALIZER.decode(message.payload());
321 try {
322 message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
323 } catch (IOException e) {
324 log.error("Failed to relinquish role.", e);
325 }
326 }
327 }
328
329 private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
330 @Override
331 public void event(LeadershipEvent event) {
332 Leadership leadership = event.subject();
333 if (!isDeviceMastershipTopic(leadership.topic())) {
334 return;
335 }
336 NodeId nodeId = leadership.leader();
337 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
338 if (Objects.equal(nodeId, localNodeId) && connectedDevices.contains(deviceId)) {
339 switch (event.type()) {
340 case LEADER_ELECTED:
341 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
342 break;
343 case LEADER_REELECTED:
344 // There is no concept of leader re-election in the new distributed leadership manager.
345 throw new IllegalStateException("Unexpected event type");
346 case LEADER_BOOTED:
347 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
348 break;
349 default:
350 return;
351 }
352 }
353 }
354 }
355
356 private String createDeviceMastershipTopic(DeviceId deviceId) {
357 return "/devices/" + deviceId.toString() + "/mastership";
358 }
359
360 private DeviceId extractDeviceIdFromTopic(String topic) {
361 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
362 if (m.matches()) {
363 return DeviceId.deviceId(m.group(1));
364 } else {
365 throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
366 }
367 }
368
369 private boolean isDeviceMastershipTopic(String topic) {
370 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
371 return m.matches();
372 }
373
Madan Jampani84b6b402015-02-25 17:49:54 -0800374}