blob: 25870b72a6efaa9d8b466e3be1ab3216f9d56e9b [file] [log] [blame]
Madan Jampani84b6b402015-02-25 17:49:54 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.mastership.impl;
17
18import static org.onlab.util.Tools.groupedThreads;
19import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
20import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
21import static org.slf4j.LoggerFactory.getLogger;
22import static com.google.common.base.Preconditions.checkArgument;
23
24import java.io.IOException;
25import java.util.List;
26import java.util.Map;
27import java.util.Set;
28import java.util.concurrent.ExecutionException;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
31import java.util.concurrent.Future;
32import java.util.concurrent.TimeUnit;
33import java.util.concurrent.TimeoutException;
34import java.util.regex.Matcher;
35import java.util.regex.Pattern;
36import java.util.stream.Collectors;
37
38import org.apache.felix.scr.annotations.Activate;
39import org.apache.felix.scr.annotations.Component;
40import org.apache.felix.scr.annotations.Deactivate;
41import org.apache.felix.scr.annotations.Reference;
42import org.apache.felix.scr.annotations.ReferenceCardinality;
43import org.apache.felix.scr.annotations.Service;
44import org.onlab.util.KryoNamespace;
45import org.onosproject.cluster.ClusterService;
46import org.onosproject.cluster.Leadership;
47import org.onosproject.cluster.LeadershipEvent;
48import org.onosproject.cluster.LeadershipEventListener;
49import org.onosproject.cluster.LeadershipService;
50import org.onosproject.cluster.NodeId;
51import org.onosproject.cluster.RoleInfo;
52import org.onosproject.mastership.MastershipEvent;
53import org.onosproject.mastership.MastershipStore;
54import org.onosproject.mastership.MastershipStoreDelegate;
55import org.onosproject.mastership.MastershipTerm;
56import org.onosproject.net.DeviceId;
57import org.onosproject.net.MastershipRole;
58import org.onosproject.store.AbstractStore;
59import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60import org.onosproject.store.cluster.messaging.ClusterMessage;
61import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
62import org.onosproject.store.cluster.messaging.MessageSubject;
63import org.onosproject.store.serializers.KryoNamespaces;
64import org.onosproject.store.serializers.KryoSerializer;
65import org.onosproject.store.serializers.StoreSerializer;
66import org.slf4j.Logger;
67
68import com.google.common.base.Objects;
69import com.google.common.collect.Lists;
70import com.google.common.collect.Maps;
71import com.google.common.collect.Sets;
72
73/**
74 * Implementation of the MastershipStore on top of Leadership Service.
75 */
76@Component(immediate = true, enabled = false)
77@Service
78public class ConsistentDeviceMastershipStore
79 extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
80 implements MastershipStore {
81
82 private final Logger log = getLogger(getClass());
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected LeadershipService leadershipService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected ClusterService clusterService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 protected ClusterCommunicationService clusterCommunicator;
92
93 private NodeId localNodeId;
94 private final Set<DeviceId> connectedDevices = Sets.newHashSet();
95
96 private static final MessageSubject ROLE_QUERY_SUBJECT =
97 new MessageSubject("mastership-store-device-role-query");
98 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
99 new MessageSubject("mastership-store-device-role-relinquish");
100
101 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
102 Pattern.compile("/devices/(.*)/mastership");
103
104 private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
105 private ExecutorService messageHandlingExecutor;
106 private final LeadershipEventListener leadershipEventListener =
107 new InternalDeviceMastershipEventListener();
108
109 private static final String NODE_ID_NULL = "Node ID cannot be null";
110 private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
111
112 public static final StoreSerializer SERIALIZER = new KryoSerializer() {
113 @Override
114 protected void setupKryoPool() {
115 serializerPool = KryoNamespace.newBuilder()
116 .register(KryoNamespaces.API)
117 .register(MastershipRole.class)
118 .register(MastershipEvent.class)
119 .build();
120 }
121 };
122
123 @Activate
124 public void activate() {
125 messageHandlingExecutor =
126 Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
127 clusterCommunicator.addSubscriber(ROLE_QUERY_SUBJECT,
128 new RoleQueryHandler(),
129 messageHandlingExecutor);
130 clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
131 new RoleRelinquishHandler(),
132 messageHandlingExecutor);
133 localNodeId = clusterService.getLocalNode().id();
134 leadershipService.addListener(leadershipEventListener);
135
136 log.info("Started.");
137 }
138
139 @Deactivate
140 public void deactivate() {
141 clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
142 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
143 messageHandlingExecutor.shutdown();
144 leadershipService.removeListener(leadershipEventListener);
145
146 log.info("Stoppped.");
147 }
148
149 @Override
150 public MastershipRole requestRole(DeviceId deviceId) {
151 checkArgument(deviceId != null, DEVICE_ID_NULL);
152
153 String leadershipTopic = createDeviceMastershipTopic(deviceId);
154 if (connectedDevices.add(deviceId)) {
155 leadershipService.runForLeadership(leadershipTopic);
156 return MastershipRole.STANDBY;
157 } else {
158 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
159 if (leadership != null && leadership.leader().equals(localNodeId)) {
160 return MastershipRole.MASTER;
161 } else {
162 return MastershipRole.STANDBY;
163 }
164 }
165 }
166
167 @Override
168 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
169 checkArgument(nodeId != null, NODE_ID_NULL);
170 checkArgument(deviceId != null, DEVICE_ID_NULL);
171
172 String leadershipTopic = createDeviceMastershipTopic(deviceId);
173 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
174 if (leadership != null && nodeId.equals(leadership.leader())) {
175 return MastershipRole.MASTER;
176 }
177
178 if (localNodeId.equals(nodeId)) {
179 if (connectedDevices.contains(deviceId)) {
180 return MastershipRole.STANDBY;
181 } else {
182 return MastershipRole.NONE;
183 }
184 } else {
185 try {
186 MastershipRole role = complete(clusterCommunicator.sendAndReceive(
187 new ClusterMessage(
188 localNodeId,
189 ROLE_QUERY_SUBJECT,
190 SERIALIZER.encode(deviceId)),
191 nodeId));
192 return role == null ? MastershipRole.NONE : role;
193 } catch (IOException e) {
194 log.warn("Failed to query {} for {}'s role. Defaulting to NONE", nodeId, deviceId, e);
195 return MastershipRole.NONE;
196 }
197 }
198 }
199
200 @Override
201 public NodeId getMaster(DeviceId deviceId) {
202 checkArgument(deviceId != null, DEVICE_ID_NULL);
203
204 String leadershipTopic = createDeviceMastershipTopic(deviceId);
205 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
206 return leadership != null ? leadership.leader() : null;
207 }
208
209 @Override
210 public RoleInfo getNodes(DeviceId deviceId) {
211 checkArgument(deviceId != null, DEVICE_ID_NULL);
212
213 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
214 clusterService
215 .getNodes()
216 .stream()
217 .parallel()
218 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
219
220 NodeId master = null;
221 final List<NodeId> standbys = Lists.newLinkedList();
222
223 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
224 if (entry.getValue() == MastershipRole.MASTER) {
225 master = entry.getKey();
226 } else if (entry.getValue() == MastershipRole.STANDBY) {
227 standbys.add(entry.getKey());
228 }
229 }
230
231 return new RoleInfo(master, standbys);
232 }
233
234 @Override
235 public Set<DeviceId> getDevices(NodeId nodeId) {
236 checkArgument(nodeId != null, NODE_ID_NULL);
237
238 return leadershipService
239 .ownedTopics(nodeId)
240 .stream()
241 .filter(this::isDeviceMastershipTopic)
242 .map(this::extractDeviceIdFromTopic)
243 .collect(Collectors.toSet());
244 }
245
246 @Override
247 public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
248 checkArgument(nodeId != null, NODE_ID_NULL);
249 checkArgument(deviceId != null, DEVICE_ID_NULL);
250
251 throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
252 }
253
254 @Override
255 public MastershipTerm getTermFor(DeviceId deviceId) {
256 checkArgument(deviceId != null, DEVICE_ID_NULL);
257
258 String leadershipTopic = createDeviceMastershipTopic(deviceId);
259 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
260 return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
261 }
262
263 @Override
264 public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
265 checkArgument(nodeId != null, NODE_ID_NULL);
266 checkArgument(deviceId != null, DEVICE_ID_NULL);
267
268 throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
269 }
270
271 @Override
272 public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
273 checkArgument(nodeId != null, NODE_ID_NULL);
274 checkArgument(deviceId != null, DEVICE_ID_NULL);
275
276 if (!nodeId.equals(localNodeId)) {
277 log.debug("Forwarding request to relinquish "
278 + "role for device {} to {}", deviceId, nodeId);
279 try {
280 return complete(clusterCommunicator.sendAndReceive(
281 new ClusterMessage(
282 localNodeId,
283 ROLE_RELINQUISH_SUBJECT,
284 SERIALIZER.encode(deviceId)),
285 nodeId));
286 } catch (IOException e) {
287 log.warn("Failed to send a request to relinquish role for {} to {}", deviceId, nodeId, e);
288 return null;
289 }
290 }
291
292 // Check if this node is can be managed by this node.
293 if (!connectedDevices.contains(deviceId)) {
294 return null;
295 }
296
297 String leadershipTopic = createDeviceMastershipTopic(deviceId);
298 Leadership currentLeadership = leadershipService.getLeadership(leadershipTopic);
299
300 MastershipEvent.Type eventType = null;
301 if (currentLeadership != null && currentLeadership.leader().equals(localNodeId)) {
302 eventType = MastershipEvent.Type.MASTER_CHANGED;
303 } else {
304 eventType = MastershipEvent.Type.BACKUPS_CHANGED;
305 }
306
307 connectedDevices.remove(deviceId);
308 leadershipService.withdraw(leadershipTopic);
309
310 return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
311 }
312
313 private class RoleQueryHandler implements ClusterMessageHandler {
314 @Override
315 public void handle(ClusterMessage message) {
316 DeviceId deviceId = SERIALIZER.decode(message.payload());
317 try {
318 message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
319 } catch (IOException e) {
320 log.error("Failed to responsd to role query", e);
321 }
322 }
323 }
324
325
326 @Override
327 public void relinquishAllRole(NodeId nodeId) {
328 // Noop. LeadershipService already takes care of detecting and purging deadlocks.
329 }
330
331 private class RoleRelinquishHandler implements ClusterMessageHandler {
332 @Override
333 public void handle(ClusterMessage message) {
334 DeviceId deviceId = SERIALIZER.decode(message.payload());
335 try {
336 message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
337 } catch (IOException e) {
338 log.error("Failed to relinquish role.", e);
339 }
340 }
341 }
342
343 private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
344 @Override
345 public void event(LeadershipEvent event) {
346 Leadership leadership = event.subject();
347 if (!isDeviceMastershipTopic(leadership.topic())) {
348 return;
349 }
350 NodeId nodeId = leadership.leader();
351 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
352 if (Objects.equal(nodeId, localNodeId) && connectedDevices.contains(deviceId)) {
353 switch (event.type()) {
354 case LEADER_ELECTED:
355 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
356 break;
357 case LEADER_REELECTED:
358 // There is no concept of leader re-election in the new distributed leadership manager.
359 throw new IllegalStateException("Unexpected event type");
360 case LEADER_BOOTED:
361 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
362 break;
363 default:
364 return;
365 }
366 }
367 }
368 }
369
370 private String createDeviceMastershipTopic(DeviceId deviceId) {
371 return "/devices/" + deviceId.toString() + "/mastership";
372 }
373
374 private DeviceId extractDeviceIdFromTopic(String topic) {
375 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
376 if (m.matches()) {
377 return DeviceId.deviceId(m.group(1));
378 } else {
379 throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
380 }
381 }
382
383 private boolean isDeviceMastershipTopic(String topic) {
384 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
385 return m.matches();
386 }
387
388 private <T> T complete(Future<byte[]> future) {
389 try {
390 return SERIALIZER.decode(future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
391 } catch (InterruptedException e) {
392 Thread.currentThread().interrupt();
393 log.error("Interrupted while waiting for operation to complete.", e);
394 return null;
395 } catch (TimeoutException | ExecutionException e) {
396 log.error("Failed remote operation", e);
397 return null;
398 }
399 }
400}