blob: bbd7b7a5173e8692b5ccca9222576fe028e7d3b5 [file] [log] [blame]
Madan Jampani84b6b402015-02-25 17:49:54 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.mastership.impl;
17
18import static org.onlab.util.Tools.groupedThreads;
Ayaka Koshibe941f8602015-04-15 14:17:08 -070019import static org.onlab.util.Tools.futureGetOrElse;
Madan Jampani84b6b402015-02-25 17:49:54 -080020import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
21import static org.slf4j.LoggerFactory.getLogger;
22import static com.google.common.base.Preconditions.checkArgument;
23
Madan Jampani84b6b402015-02-25 17:49:54 -080024import java.util.List;
25import java.util.Map;
26import java.util.Set;
Madan Jampanif7536ab2015-05-07 23:23:23 -070027import java.util.concurrent.CompletableFuture;
Madan Jampani84b6b402015-02-25 17:49:54 -080028import java.util.concurrent.ExecutorService;
29import java.util.concurrent.Executors;
Madan Jampanif7536ab2015-05-07 23:23:23 -070030import java.util.concurrent.ScheduledExecutorService;
31import java.util.concurrent.TimeUnit;
Madan Jampani84b6b402015-02-25 17:49:54 -080032import java.util.regex.Matcher;
33import java.util.regex.Pattern;
34import java.util.stream.Collectors;
35
36import org.apache.felix.scr.annotations.Activate;
37import org.apache.felix.scr.annotations.Component;
38import org.apache.felix.scr.annotations.Deactivate;
39import org.apache.felix.scr.annotations.Reference;
40import org.apache.felix.scr.annotations.ReferenceCardinality;
41import org.apache.felix.scr.annotations.Service;
42import org.onlab.util.KryoNamespace;
43import org.onosproject.cluster.ClusterService;
44import org.onosproject.cluster.Leadership;
45import org.onosproject.cluster.LeadershipEvent;
46import org.onosproject.cluster.LeadershipEventListener;
47import org.onosproject.cluster.LeadershipService;
48import org.onosproject.cluster.NodeId;
49import org.onosproject.cluster.RoleInfo;
50import org.onosproject.mastership.MastershipEvent;
51import org.onosproject.mastership.MastershipStore;
52import org.onosproject.mastership.MastershipStoreDelegate;
53import org.onosproject.mastership.MastershipTerm;
54import org.onosproject.net.DeviceId;
55import org.onosproject.net.MastershipRole;
56import org.onosproject.store.AbstractStore;
57import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani84b6b402015-02-25 17:49:54 -080058import org.onosproject.store.cluster.messaging.MessageSubject;
59import org.onosproject.store.serializers.KryoNamespaces;
60import org.onosproject.store.serializers.KryoSerializer;
61import org.onosproject.store.serializers.StoreSerializer;
62import org.slf4j.Logger;
63
64import com.google.common.base.Objects;
65import com.google.common.collect.Lists;
66import com.google.common.collect.Maps;
67import com.google.common.collect.Sets;
68
69/**
70 * Implementation of the MastershipStore on top of Leadership Service.
71 */
Madan Jampani5756c352015-04-29 00:23:58 -070072@Component(immediate = true, enabled = true)
Madan Jampani84b6b402015-02-25 17:49:54 -080073@Service
74public class ConsistentDeviceMastershipStore
75 extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
76 implements MastershipStore {
77
78 private final Logger log = getLogger(getClass());
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected LeadershipService leadershipService;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected ClusterService clusterService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87 protected ClusterCommunicationService clusterCommunicator;
88
89 private NodeId localNodeId;
90 private final Set<DeviceId> connectedDevices = Sets.newHashSet();
91
92 private static final MessageSubject ROLE_QUERY_SUBJECT =
93 new MessageSubject("mastership-store-device-role-query");
94 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
95 new MessageSubject("mastership-store-device-role-relinquish");
Madan Jampani9bd1f152015-04-30 23:33:35 -070096 private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
Madan Jampani1af8e132015-04-30 16:41:18 -070097 new MessageSubject("mastership-store-device-mastership-relinquish");
Madan Jampani84b6b402015-02-25 17:49:54 -080098
99 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Madan Jampani5756c352015-04-29 00:23:58 -0700100 Pattern.compile("device:(.*)");
Madan Jampani84b6b402015-02-25 17:49:54 -0800101
Madan Jampani84b6b402015-02-25 17:49:54 -0800102 private ExecutorService messageHandlingExecutor;
Madan Jampanif7536ab2015-05-07 23:23:23 -0700103 private ScheduledExecutorService transferExecutor;
Madan Jampani84b6b402015-02-25 17:49:54 -0800104 private final LeadershipEventListener leadershipEventListener =
105 new InternalDeviceMastershipEventListener();
106
107 private static final String NODE_ID_NULL = "Node ID cannot be null";
Madan Jampanif7536ab2015-05-07 23:23:23 -0700108 private static final String DEVICE_ID_NULL = "Device ID cannot be null";
109 private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
Madan Jampani84b6b402015-02-25 17:49:54 -0800110
111 public static final StoreSerializer SERIALIZER = new KryoSerializer() {
112 @Override
113 protected void setupKryoPool() {
114 serializerPool = KryoNamespace.newBuilder()
115 .register(KryoNamespaces.API)
116 .register(MastershipRole.class)
117 .register(MastershipEvent.class)
Madan Jampani1af8e132015-04-30 16:41:18 -0700118 .register(MastershipEvent.Type.class)
Madan Jampani84b6b402015-02-25 17:49:54 -0800119 .build();
120 }
121 };
122
123 @Activate
124 public void activate() {
125 messageHandlingExecutor =
Madan Jampanif7536ab2015-05-07 23:23:23 -0700126 Executors.newSingleThreadExecutor(
127 groupedThreads("onos/store/device/mastership", "message-handler"));
128 transferExecutor =
129 Executors.newSingleThreadScheduledExecutor(
130 groupedThreads("onos/store/device/mastership", "mastership-transfer-executor"));
Madan Jampanid46e18f2015-05-04 23:19:33 -0700131 clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
132 SERIALIZER::decode,
133 deviceId -> getRole(localNodeId, deviceId),
134 SERIALIZER::encode,
Madan Jampani84b6b402015-02-25 17:49:54 -0800135 messageHandlingExecutor);
Madan Jampanid46e18f2015-05-04 23:19:33 -0700136 clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
137 SERIALIZER::decode,
Madan Jampanif7536ab2015-05-07 23:23:23 -0700138 this::relinquishLocalRole,
Madan Jampanid46e18f2015-05-04 23:19:33 -0700139 SERIALIZER::encode,
140 messageHandlingExecutor);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700141 clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
Madan Jampani1af8e132015-04-30 16:41:18 -0700142 SERIALIZER::decode,
Madan Jampani9bd1f152015-04-30 23:33:35 -0700143 this::transitionFromMasterToStandby,
Madan Jampani1af8e132015-04-30 16:41:18 -0700144 SERIALIZER::encode,
145 messageHandlingExecutor);
Madan Jampani84b6b402015-02-25 17:49:54 -0800146 localNodeId = clusterService.getLocalNode().id();
147 leadershipService.addListener(leadershipEventListener);
148
149 log.info("Started.");
150 }
151
152 @Deactivate
153 public void deactivate() {
154 clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
155 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700156 clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
Madan Jampani84b6b402015-02-25 17:49:54 -0800157 messageHandlingExecutor.shutdown();
Madan Jampanif7536ab2015-05-07 23:23:23 -0700158 transferExecutor.shutdown();
Madan Jampani84b6b402015-02-25 17:49:54 -0800159 leadershipService.removeListener(leadershipEventListener);
160
161 log.info("Stoppped.");
162 }
163
164 @Override
165 public MastershipRole requestRole(DeviceId deviceId) {
166 checkArgument(deviceId != null, DEVICE_ID_NULL);
167
168 String leadershipTopic = createDeviceMastershipTopic(deviceId);
169 if (connectedDevices.add(deviceId)) {
170 leadershipService.runForLeadership(leadershipTopic);
171 return MastershipRole.STANDBY;
172 } else {
173 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
174 if (leadership != null && leadership.leader().equals(localNodeId)) {
175 return MastershipRole.MASTER;
176 } else {
177 return MastershipRole.STANDBY;
178 }
179 }
180 }
181
182 @Override
183 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
184 checkArgument(nodeId != null, NODE_ID_NULL);
185 checkArgument(deviceId != null, DEVICE_ID_NULL);
186
187 String leadershipTopic = createDeviceMastershipTopic(deviceId);
188 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
189 if (leadership != null && nodeId.equals(leadership.leader())) {
190 return MastershipRole.MASTER;
191 }
192
193 if (localNodeId.equals(nodeId)) {
194 if (connectedDevices.contains(deviceId)) {
195 return MastershipRole.STANDBY;
196 } else {
197 return MastershipRole.NONE;
198 }
Madan Jampani84b6b402015-02-25 17:49:54 -0800199 }
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700200 MastershipRole role = futureGetOrElse(clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700201 deviceId,
202 ROLE_QUERY_SUBJECT,
203 SERIALIZER::encode,
204 SERIALIZER::decode,
Ayaka Koshibe941f8602015-04-15 14:17:08 -0700205 nodeId), null);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700206 return role == null ? MastershipRole.NONE : role;
Madan Jampani84b6b402015-02-25 17:49:54 -0800207 }
208
209 @Override
210 public NodeId getMaster(DeviceId deviceId) {
211 checkArgument(deviceId != null, DEVICE_ID_NULL);
212
213 String leadershipTopic = createDeviceMastershipTopic(deviceId);
214 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
215 return leadership != null ? leadership.leader() : null;
216 }
217
218 @Override
219 public RoleInfo getNodes(DeviceId deviceId) {
220 checkArgument(deviceId != null, DEVICE_ID_NULL);
221
222 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
223 clusterService
224 .getNodes()
Madan Jampani84b6b402015-02-25 17:49:54 -0800225 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
226
227 NodeId master = null;
228 final List<NodeId> standbys = Lists.newLinkedList();
229
Madan Jampani86940d92015-05-06 11:47:57 -0700230 List<NodeId> candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId));
231
Madan Jampani84b6b402015-02-25 17:49:54 -0800232 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
233 if (entry.getValue() == MastershipRole.MASTER) {
234 master = entry.getKey();
235 } else if (entry.getValue() == MastershipRole.STANDBY) {
236 standbys.add(entry.getKey());
237 }
238 }
239
Madan Jampani86940d92015-05-06 11:47:57 -0700240 List<NodeId> sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList());
241
242 return new RoleInfo(master, sortedStandbyList);
Madan Jampani84b6b402015-02-25 17:49:54 -0800243 }
244
245 @Override
246 public Set<DeviceId> getDevices(NodeId nodeId) {
247 checkArgument(nodeId != null, NODE_ID_NULL);
248
249 return leadershipService
250 .ownedTopics(nodeId)
251 .stream()
252 .filter(this::isDeviceMastershipTopic)
253 .map(this::extractDeviceIdFromTopic)
254 .collect(Collectors.toSet());
255 }
256
257 @Override
Madan Jampanif7536ab2015-05-07 23:23:23 -0700258 public CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800259 checkArgument(nodeId != null, NODE_ID_NULL);
260 checkArgument(deviceId != null, DEVICE_ID_NULL);
261
Madan Jampani1af8e132015-04-30 16:41:18 -0700262 NodeId currentMaster = getMaster(deviceId);
263 if (nodeId.equals(currentMaster)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700264 return CompletableFuture.completedFuture(null);
Madan Jampani1af8e132015-04-30 16:41:18 -0700265 } else {
266 String leadershipTopic = createDeviceMastershipTopic(deviceId);
267 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
268 if (candidates.isEmpty()) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700269 return CompletableFuture.completedFuture(null);
Madan Jampani1af8e132015-04-30 16:41:18 -0700270 }
271 if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700272 CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
273 // There is brief wait before we step down from mastership.
274 // This is to ensure any work that happens when standby preference
275 // order changes can complete. For example: flow entries need to be backed
276 // to the new top standby (ONOS-1883)
277 // FIXME: This potentially introduces a race-condition.
278 // Right now role changes are only forced via CLI.
279 transferExecutor.schedule(() -> {
280 result.complete(transitionFromMasterToStandby(deviceId));
281 }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
282 return result;
Madan Jampani1af8e132015-04-30 16:41:18 -0700283 } else {
284 log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
285 }
286 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700287 return CompletableFuture.completedFuture(null);
Madan Jampani84b6b402015-02-25 17:49:54 -0800288 }
289
290 @Override
291 public MastershipTerm getTermFor(DeviceId deviceId) {
292 checkArgument(deviceId != null, DEVICE_ID_NULL);
293
294 String leadershipTopic = createDeviceMastershipTopic(deviceId);
295 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
296 return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
297 }
298
299 @Override
Madan Jampanif7536ab2015-05-07 23:23:23 -0700300 public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800301 checkArgument(nodeId != null, NODE_ID_NULL);
302 checkArgument(deviceId != null, DEVICE_ID_NULL);
303
Madan Jampani1af8e132015-04-30 16:41:18 -0700304 NodeId currentMaster = getMaster(deviceId);
305 if (!nodeId.equals(currentMaster)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700306 return CompletableFuture.completedFuture(null);
Madan Jampani1af8e132015-04-30 16:41:18 -0700307 }
Madan Jampanid46e18f2015-05-04 23:19:33 -0700308
309 String leadershipTopic = createDeviceMastershipTopic(deviceId);
310 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
311
312 NodeId newMaster = candidates.stream()
313 .filter(candidate -> !Objects.equal(nodeId, candidate))
314 .findFirst()
315 .orElse(null);
316 log.info("Transitioning to role {} for {}. Next master: {}",
317 newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster);
318
319 if (newMaster != null) {
320 return setMaster(newMaster, deviceId);
321 }
322 return relinquishRole(nodeId, deviceId);
Madan Jampani84b6b402015-02-25 17:49:54 -0800323 }
324
325 @Override
Madan Jampanif7536ab2015-05-07 23:23:23 -0700326 public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
Madan Jampani84b6b402015-02-25 17:49:54 -0800327 checkArgument(nodeId != null, NODE_ID_NULL);
328 checkArgument(deviceId != null, DEVICE_ID_NULL);
329
330 if (!nodeId.equals(localNodeId)) {
331 log.debug("Forwarding request to relinquish "
332 + "role for device {} to {}", deviceId, nodeId);
Madan Jampanif7536ab2015-05-07 23:23:23 -0700333 return clusterCommunicator.sendAndReceive(
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700334 deviceId,
335 ROLE_RELINQUISH_SUBJECT,
336 SERIALIZER::encode,
337 SERIALIZER::decode,
Madan Jampanif7536ab2015-05-07 23:23:23 -0700338 nodeId);
Madan Jampani84b6b402015-02-25 17:49:54 -0800339 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700340 return CompletableFuture.completedFuture(relinquishLocalRole(deviceId));
341 }
342
343 private MastershipEvent relinquishLocalRole(DeviceId deviceId) {
344 checkArgument(deviceId != null, DEVICE_ID_NULL);
Madan Jampani84b6b402015-02-25 17:49:54 -0800345
346 // Check if this node is can be managed by this node.
347 if (!connectedDevices.contains(deviceId)) {
348 return null;
349 }
350
351 String leadershipTopic = createDeviceMastershipTopic(deviceId);
Madan Jampani9bd1f152015-04-30 23:33:35 -0700352 NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
Madan Jampani84b6b402015-02-25 17:49:54 -0800353
Madan Jampani9bd1f152015-04-30 23:33:35 -0700354 MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
355 ? MastershipEvent.Type.MASTER_CHANGED
356 : MastershipEvent.Type.BACKUPS_CHANGED;
Madan Jampani84b6b402015-02-25 17:49:54 -0800357
358 connectedDevices.remove(deviceId);
359 leadershipService.withdraw(leadershipTopic);
360
361 return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
362 }
363
Madan Jampani9bd1f152015-04-30 23:33:35 -0700364 private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
Madan Jampani1af8e132015-04-30 16:41:18 -0700365 checkArgument(deviceId != null, DEVICE_ID_NULL);
366
367 NodeId currentMaster = getMaster(deviceId);
368 if (currentMaster == null) {
369 return null;
370 }
371
372 if (!currentMaster.equals(localNodeId)) {
373 log.info("Forwarding request to relinquish "
374 + "mastership for device {} to {}", deviceId, currentMaster);
375 return futureGetOrElse(clusterCommunicator.sendAndReceive(
376 deviceId,
Madan Jampani9bd1f152015-04-30 23:33:35 -0700377 TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
Madan Jampani1af8e132015-04-30 16:41:18 -0700378 SERIALIZER::encode,
379 SERIALIZER::decode,
380 currentMaster), null);
381 }
382
Madan Jampani9bd1f152015-04-30 23:33:35 -0700383 return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
384 ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
Madan Jampani1af8e132015-04-30 16:41:18 -0700385 }
386
Madan Jampani84b6b402015-02-25 17:49:54 -0800387 @Override
388 public void relinquishAllRole(NodeId nodeId) {
389 // Noop. LeadershipService already takes care of detecting and purging deadlocks.
390 }
391
Madan Jampani84b6b402015-02-25 17:49:54 -0800392 private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
393 @Override
394 public void event(LeadershipEvent event) {
395 Leadership leadership = event.subject();
396 if (!isDeviceMastershipTopic(leadership.topic())) {
397 return;
398 }
399 NodeId nodeId = leadership.leader();
400 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
401 if (Objects.equal(nodeId, localNodeId) && connectedDevices.contains(deviceId)) {
402 switch (event.type()) {
403 case LEADER_ELECTED:
404 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
405 break;
406 case LEADER_REELECTED:
407 // There is no concept of leader re-election in the new distributed leadership manager.
408 throw new IllegalStateException("Unexpected event type");
409 case LEADER_BOOTED:
Madan Jampania2a229c2015-05-11 17:16:30 -0700410 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
Madan Jampani84b6b402015-02-25 17:49:54 -0800411 break;
412 default:
413 return;
414 }
415 }
416 }
417 }
418
419 private String createDeviceMastershipTopic(DeviceId deviceId) {
Madan Jampani5756c352015-04-29 00:23:58 -0700420 return String.format("device:%s", deviceId.toString());
Madan Jampani84b6b402015-02-25 17:49:54 -0800421 }
422
423 private DeviceId extractDeviceIdFromTopic(String topic) {
424 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
425 if (m.matches()) {
426 return DeviceId.deviceId(m.group(1));
427 } else {
428 throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
429 }
430 }
431
432 private boolean isDeviceMastershipTopic(String topic) {
433 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
434 return m.matches();
435 }
436
Madan Jampanic26eede2015-04-16 11:42:16 -0700437}