blob: 41761309e77ecb05c925b4faec85405817dadb80 [file] [log] [blame]
Yoonseon Han9b71b2c2017-05-26 15:17:29 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
Jordan Halterman0a2bd452018-06-13 17:24:58 -070019import com.google.common.collect.ImmutableMap;
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.Leadership;
29import org.onosproject.cluster.LeadershipAdminService;
30import org.onosproject.cluster.LeadershipEvent;
31import org.onosproject.cluster.LeadershipEventListener;
32import org.onosproject.cluster.LeadershipService;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.cluster.RoleInfo;
35import org.onosproject.incubator.net.virtual.NetworkId;
36import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
37import org.onosproject.mastership.MastershipEvent;
Jordan Halterman0a2bd452018-06-13 17:24:58 -070038import org.onosproject.mastership.MastershipInfo;
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070039import org.onosproject.mastership.MastershipStoreDelegate;
40import org.onosproject.mastership.MastershipTerm;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.MastershipRole;
43import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
44import org.onosproject.store.cluster.messaging.MessageSubject;
45import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman2c83a102017-08-20 17:11:41 -070046import org.onosproject.store.service.Serializer;
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070047import org.slf4j.Logger;
48
49import java.util.List;
Jordan Halterman0a2bd452018-06-13 17:24:58 -070050import java.util.Optional;
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070051import java.util.Set;
52import java.util.concurrent.CompletableFuture;
53import java.util.concurrent.ExecutorService;
54import java.util.concurrent.Executors;
55import java.util.concurrent.ScheduledExecutorService;
56import java.util.concurrent.TimeUnit;
57import java.util.regex.Matcher;
58import java.util.regex.Pattern;
59import java.util.stream.Collectors;
60
61import static org.onlab.util.Tools.groupedThreads;
62import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
63import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
64import static org.onosproject.mastership.MastershipEvent.Type.SUSPENDED;
65import static org.slf4j.LoggerFactory.getLogger;
66
67import com.google.common.base.Objects;
68import com.google.common.collect.ImmutableList;
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070069import static com.google.common.base.Preconditions.checkArgument;
70
71@Component(immediate = true, enabled = false)
72@Service
73public class ConsistentVirtualDeviceMastershipStore
74 extends AbstractVirtualStore<MastershipEvent, MastershipStoreDelegate>
75 implements VirtualNetworkMastershipStore {
76
77 private final Logger log = getLogger(getClass());
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected LeadershipService leadershipService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected LeadershipAdminService leadershipAdminService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 protected ClusterService clusterService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
89 protected ClusterCommunicationService clusterCommunicator;
90
91 private NodeId localNodeId;
92
93 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
94 new MessageSubject("virtual-mastership-store-device-role-relinquish");
95
96 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
97 Pattern.compile("vnet:(.*),device:(.*)");
98
99 private ExecutorService eventHandler;
100 private ExecutorService messageHandlingExecutor;
101 private ScheduledExecutorService transferExecutor;
102 private final LeadershipEventListener leadershipEventListener =
103 new InternalDeviceMastershipEventListener();
104
105 private static final String NODE_ID_NULL = "Node ID cannot be null";
106 private static final String NETWORK_ID_NULL = "Network ID cannot be null";
107 private static final String DEVICE_ID_NULL = "Device ID cannot be null";
108 private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
109
Jordan Halterman2c83a102017-08-20 17:11:41 -0700110 public static final Serializer SERIALIZER = Serializer.using(
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700111 KryoNamespace.newBuilder()
112 .register(KryoNamespaces.API)
113 .register(MastershipRole.class)
114 .register(MastershipEvent.class)
115 .register(MastershipEvent.Type.class)
116 .register(VirtualDeviceId.class)
117 .build("VirtualMastershipStore"));
118
119 @Activate
120 public void activate() {
121 eventHandler = Executors.newSingleThreadExecutor(
122 groupedThreads("onos/store/virtual/mastership", "event-handler", log));
123
124 messageHandlingExecutor =
125 Executors.newSingleThreadExecutor(
126 groupedThreads("onos/store/virtual/mastership", "message-handler", log));
127 transferExecutor =
128 Executors.newSingleThreadScheduledExecutor(
129 groupedThreads("onos/store/virtual/mastership", "mastership-transfer-executor", log));
130 clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
131 SERIALIZER::decode,
132 this::relinquishLocalRole,
133 SERIALIZER::encode,
134 messageHandlingExecutor);
135 localNodeId = clusterService.getLocalNode().id();
136 leadershipService.addListener(leadershipEventListener);
137
138 log.info("Started");
139 }
140
141 @Deactivate
142 public void deactivate() {
143 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
144 leadershipService.removeListener(leadershipEventListener);
145 messageHandlingExecutor.shutdown();
146 transferExecutor.shutdown();
147 eventHandler.shutdown();
148 log.info("Stopped");
149 }
150
151 @Override
152 public CompletableFuture<MastershipRole> requestRole(NetworkId networkId,
153 DeviceId deviceId) {
154 checkArgument(networkId != null, NETWORK_ID_NULL);
155 checkArgument(deviceId != null, DEVICE_ID_NULL);
156
157 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
158 Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
159 return CompletableFuture
160 .completedFuture(localNodeId.equals(leadership.leaderNodeId()) ?
161 MastershipRole.MASTER : MastershipRole.STANDBY);
162 }
163
164 @Override
165 public MastershipRole getRole(NetworkId networkId, NodeId nodeId, DeviceId deviceId) {
166 checkArgument(networkId != null, NETWORK_ID_NULL);
167 checkArgument(nodeId != null, NODE_ID_NULL);
168 checkArgument(deviceId != null, DEVICE_ID_NULL);
169
170 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
171 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
172 NodeId leader = leadership == null ? null : leadership.leaderNodeId();
173 List<NodeId> candidates = leadership == null ?
174 ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
175 return Objects.equal(nodeId, leader) ?
176 MastershipRole.MASTER : candidates.contains(nodeId) ?
177 MastershipRole.STANDBY : MastershipRole.NONE;
178 }
179
180 @Override
181 public NodeId getMaster(NetworkId networkId, DeviceId deviceId) {
182 checkArgument(deviceId != null, DEVICE_ID_NULL);
183
184 return leadershipService.getLeader(createDeviceMastershipTopic(networkId, deviceId));
185 }
186
187 @Override
188 public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
189 checkArgument(networkId != null, NETWORK_ID_NULL);
190 checkArgument(deviceId != null, DEVICE_ID_NULL);
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700191 Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
192 return new RoleInfo(leadership.leaderNodeId(), leadership.candidates());
193 }
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700194
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700195 @Override
196 public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
197 checkArgument(networkId != null, NETWORK_ID_NULL);
198 checkArgument(deviceId != null, DEVICE_ID_NULL);
199 Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
200 return buildMastershipFromLeadership(leadership);
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700201 }
202
203 @Override
204 public Set<DeviceId> getDevices(NetworkId networkId, NodeId nodeId) {
205 checkArgument(networkId != null, NETWORK_ID_NULL);
206 checkArgument(nodeId != null, NODE_ID_NULL);
207
208 // FIXME This result contains REMOVED device.
209 // MastershipService cannot listen to DeviceEvent to GC removed topic,
210 // since DeviceManager depend on it.
211 // Reference count, etc. at LeadershipService layer?
212 return leadershipService
213 .ownedTopics(nodeId)
214 .stream()
215 .filter(this::isVirtualMastershipTopic)
216 .map(this::extractDeviceIdFromTopic)
217 .collect(Collectors.toSet());
218 }
219
220 @Override
221 public CompletableFuture<MastershipEvent> setMaster(NetworkId networkId,
222 NodeId nodeId, DeviceId deviceId) {
223 checkArgument(networkId != null, NETWORK_ID_NULL);
224 checkArgument(nodeId != null, NODE_ID_NULL);
225 checkArgument(deviceId != null, DEVICE_ID_NULL);
226
227 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
228 if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
229 transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
230 WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
231 }
232 return CompletableFuture.completedFuture(null);
233 }
234
235 @Override
236 public MastershipTerm getTermFor(NetworkId networkId, DeviceId deviceId) {
237 checkArgument(networkId != null, NETWORK_ID_NULL);
238 checkArgument(deviceId != null, DEVICE_ID_NULL);
239
240 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
241 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
242 return leadership != null && leadership.leaderNodeId() != null ?
243 MastershipTerm.of(leadership.leaderNodeId(),
244 leadership.leader().term()) : null;
245 }
246
247 @Override
248 public CompletableFuture<MastershipEvent> setStandby(NetworkId networkId,
249 NodeId nodeId,
250 DeviceId deviceId) {
251 checkArgument(networkId != null, NETWORK_ID_NULL);
252 checkArgument(nodeId != null, NODE_ID_NULL);
253 checkArgument(deviceId != null, DEVICE_ID_NULL);
254
255 NodeId currentMaster = getMaster(networkId, deviceId);
256 if (!nodeId.equals(currentMaster)) {
257 return CompletableFuture.completedFuture(null);
258 }
259
260 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
261 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
262
263 NodeId newMaster = candidates.stream()
264 .filter(candidate -> !Objects.equal(nodeId, candidate))
265 .findFirst()
266 .orElse(null);
267 log.info("Transitioning to role {} for {}. Next master: {}",
268 newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE,
269 deviceId, newMaster);
270
271 if (newMaster != null) {
272 return setMaster(networkId, newMaster, deviceId);
273 }
274 return relinquishRole(networkId, nodeId, deviceId);
275 }
276
277 @Override
278 public CompletableFuture<MastershipEvent> relinquishRole(NetworkId networkId,
279 NodeId nodeId,
280 DeviceId deviceId) {
281 checkArgument(networkId != null, NETWORK_ID_NULL);
282 checkArgument(nodeId != null, NODE_ID_NULL);
283 checkArgument(deviceId != null, DEVICE_ID_NULL);
284
285 if (nodeId.equals(localNodeId)) {
286 return relinquishLocalRoleByNetwork(networkId, deviceId);
287 }
288
289 log.debug("Forwarding request to relinquish "
290 + "role for vnet {} device {} to {}", deviceId, nodeId);
291 return clusterCommunicator.sendAndReceive(
292 new VirtualDeviceId(networkId, deviceId),
293 ROLE_RELINQUISH_SUBJECT,
294 SERIALIZER::encode,
295 SERIALIZER::decode,
296 nodeId);
297 }
298
299 private CompletableFuture<MastershipEvent> relinquishLocalRoleByNetwork(NetworkId networkId,
300 DeviceId deviceId) {
301 checkArgument(networkId != null, NETWORK_ID_NULL);
302 checkArgument(deviceId != null, DEVICE_ID_NULL);
303
304 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
305 if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
306 return CompletableFuture.completedFuture(null);
307 }
308 MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
309 MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
310 leadershipService.withdraw(leadershipTopic);
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700311 return CompletableFuture.completedFuture(
312 new MastershipEvent(eventType, deviceId, getMastership(networkId, deviceId)));
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700313 }
314
315 private CompletableFuture<MastershipEvent>
316 relinquishLocalRole(VirtualDeviceId virtualDeviceId) {
317 return relinquishLocalRoleByNetwork(virtualDeviceId.networkId,
318 virtualDeviceId.deviceId);
319 }
320
321 @Override
322 public void relinquishAllRole(NetworkId networkId, NodeId nodeId) {
323 // Noop. LeadershipService already takes care of detecting and purging stale locks.
324 }
325
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700326 private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
327 ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
328 if (leadership.leaderNodeId() != null) {
329 builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
330 }
331 leadership.candidates().forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
332 clusterService.getNodes().stream()
333 .filter(node -> !leadership.candidates().contains(node.id()))
334 .forEach(node -> builder.put(node.id(), MastershipRole.NONE));
335
336 return new MastershipInfo(
337 leadership.leader() != null ? leadership.leader().term() : 0,
338 leadership.leader() != null
339 ? Optional.of(leadership.leader().nodeId())
340 : Optional.empty(),
341 builder.build());
342 }
343
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700344 private class InternalDeviceMastershipEventListener
345 implements LeadershipEventListener {
346
347 @Override
348 public boolean isRelevant(LeadershipEvent event) {
349 Leadership leadership = event.subject();
350 return isVirtualMastershipTopic(leadership.topic());
351 }
352
353 @Override
354 public void event(LeadershipEvent event) {
355 eventHandler.execute(() -> handleEvent(event));
356 }
357
358 private void handleEvent(LeadershipEvent event) {
359 Leadership leadership = event.subject();
360
361 NetworkId networkId = extractNetworkIdFromTopic(leadership.topic());
362 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700363 MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
364 ? buildMastershipFromLeadership(event.subject())
365 : new MastershipInfo();
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700366
367 switch (event.type()) {
368 case LEADER_AND_CANDIDATES_CHANGED:
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700369 notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
370 notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700371 break;
372 case LEADER_CHANGED:
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700373 notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700374 break;
375 case CANDIDATES_CHANGED:
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700376 notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700377 break;
378 case SERVICE_DISRUPTED:
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700379 notifyDelegate(networkId, new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700380 break;
381 case SERVICE_RESTORED:
382 // Do nothing, wait for updates from peers
383 break;
384 default:
385 }
386 }
387 }
388
389 private String createDeviceMastershipTopic(NetworkId networkId, DeviceId deviceId) {
390 return String.format("vnet:%s,device:%s", networkId.toString(), deviceId.toString());
391 }
392
393 /**
394 * Returns the virtual network identifier extracted from the topic.
395 *
396 * @param topic topic to extract virtual network identifier
397 * @return an extracted virtual network identifier
398 * @throws IllegalArgumentException the topic not match with the pattern
399 * used for virtual network mastership store
400 */
401 private NetworkId extractNetworkIdFromTopic(String topic) {
402 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
403 if (m.matches()) {
404 return NetworkId.networkId(Long.getLong(m.group(1)));
405 } else {
406 throw new IllegalArgumentException("Invalid virtual mastership topic: "
407 + topic);
408 }
409 }
410
411 /**
412 * Returns the device identifier extracted from the topic.
413 *
414 * @param topic topic to extract device identifier
415 * @return an extracted virtual device identifier
416 * @throws IllegalArgumentException the topic not match with the pattern
417 * used for virtual network mastership store
418 */
419 private DeviceId extractDeviceIdFromTopic(String topic) {
420 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
421 if (m.matches()) {
422 return DeviceId.deviceId(m.group(2));
423 } else {
424 throw new IllegalArgumentException("Invalid virtual mastership topic: "
425 + topic);
426 }
427 }
428
429 /**
430 * Returns whether the topic is matched with virtual mastership store topic.
431 *
432 * @param topic topic to match
433 * @return True when the topic matched with virtual network mastership store
434 */
435 private boolean isVirtualMastershipTopic(String topic) {
436 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
437 return m.matches();
438 }
439
440 /**
441 * A wrapper class used for the communication service.
442 */
443 private class VirtualDeviceId {
444 NetworkId networkId;
445 DeviceId deviceId;
446
447 public VirtualDeviceId(NetworkId networkId, DeviceId deviceId) {
448 this.networkId = networkId;
449 this.deviceId = deviceId;
450 }
451
452 public int hashCode() {
453 return Objects.hashCode(networkId, deviceId);
454 }
455
456 @Override
457 public boolean equals(Object obj) {
458 if (this == obj) {
459 return true;
460 }
461 if (obj instanceof VirtualDeviceId) {
462 final VirtualDeviceId that = (VirtualDeviceId) obj;
463 return this.getClass() == that.getClass() &&
464 Objects.equal(this.networkId, that.networkId) &&
465 Objects.equal(this.deviceId, that.deviceId);
466 }
467 return false;
468 }
469 }
470}