blob: 3d1c376343980e2780335231ea44548bd198dbc9 [file] [log] [blame]
Yoonseon Han9b71b2c2017-05-26 15:17:29 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onlab.util.KryoNamespace;
26import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.Leadership;
28import org.onosproject.cluster.LeadershipAdminService;
29import org.onosproject.cluster.LeadershipEvent;
30import org.onosproject.cluster.LeadershipEventListener;
31import org.onosproject.cluster.LeadershipService;
32import org.onosproject.cluster.NodeId;
33import org.onosproject.cluster.RoleInfo;
34import org.onosproject.incubator.net.virtual.NetworkId;
35import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
36import org.onosproject.mastership.MastershipEvent;
37import org.onosproject.mastership.MastershipStoreDelegate;
38import org.onosproject.mastership.MastershipTerm;
39import org.onosproject.net.DeviceId;
40import org.onosproject.net.MastershipRole;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
42import org.onosproject.store.cluster.messaging.MessageSubject;
43import org.onosproject.store.serializers.KryoNamespaces;
Jordan Haltermanc6c6ef22017-08-20 17:11:41 -070044import org.onosproject.store.service.Serializer;
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070045import org.slf4j.Logger;
46
47import java.util.List;
48import java.util.Map;
49import java.util.Set;
50import java.util.concurrent.CompletableFuture;
51import java.util.concurrent.ExecutorService;
52import java.util.concurrent.Executors;
53import java.util.concurrent.ScheduledExecutorService;
54import java.util.concurrent.TimeUnit;
55import java.util.regex.Matcher;
56import java.util.regex.Pattern;
57import java.util.stream.Collectors;
58
59import static org.onlab.util.Tools.groupedThreads;
60import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
61import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
62import static org.onosproject.mastership.MastershipEvent.Type.SUSPENDED;
63import static org.slf4j.LoggerFactory.getLogger;
64
65import com.google.common.base.Objects;
66import com.google.common.collect.ImmutableList;
67import com.google.common.collect.Lists;
68import com.google.common.collect.Maps;
69import static com.google.common.base.Preconditions.checkArgument;
70
71@Component(immediate = true, enabled = false)
72@Service
73public class ConsistentVirtualDeviceMastershipStore
74 extends AbstractVirtualStore<MastershipEvent, MastershipStoreDelegate>
75 implements VirtualNetworkMastershipStore {
76
77 private final Logger log = getLogger(getClass());
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected LeadershipService leadershipService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected LeadershipAdminService leadershipAdminService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 protected ClusterService clusterService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
89 protected ClusterCommunicationService clusterCommunicator;
90
91 private NodeId localNodeId;
92
93 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
94 new MessageSubject("virtual-mastership-store-device-role-relinquish");
95
96 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
97 Pattern.compile("vnet:(.*),device:(.*)");
98
99 private ExecutorService eventHandler;
100 private ExecutorService messageHandlingExecutor;
101 private ScheduledExecutorService transferExecutor;
102 private final LeadershipEventListener leadershipEventListener =
103 new InternalDeviceMastershipEventListener();
104
105 private static final String NODE_ID_NULL = "Node ID cannot be null";
106 private static final String NETWORK_ID_NULL = "Network ID cannot be null";
107 private static final String DEVICE_ID_NULL = "Device ID cannot be null";
108 private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
109
Jordan Haltermanc6c6ef22017-08-20 17:11:41 -0700110 public static final Serializer SERIALIZER = Serializer.using(
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700111 KryoNamespace.newBuilder()
112 .register(KryoNamespaces.API)
113 .register(MastershipRole.class)
114 .register(MastershipEvent.class)
115 .register(MastershipEvent.Type.class)
116 .register(VirtualDeviceId.class)
117 .build("VirtualMastershipStore"));
118
119 @Activate
120 public void activate() {
121 eventHandler = Executors.newSingleThreadExecutor(
122 groupedThreads("onos/store/virtual/mastership", "event-handler", log));
123
124 messageHandlingExecutor =
125 Executors.newSingleThreadExecutor(
126 groupedThreads("onos/store/virtual/mastership", "message-handler", log));
127 transferExecutor =
128 Executors.newSingleThreadScheduledExecutor(
129 groupedThreads("onos/store/virtual/mastership", "mastership-transfer-executor", log));
130 clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
131 SERIALIZER::decode,
132 this::relinquishLocalRole,
133 SERIALIZER::encode,
134 messageHandlingExecutor);
135 localNodeId = clusterService.getLocalNode().id();
136 leadershipService.addListener(leadershipEventListener);
137
138 log.info("Started");
139 }
140
141 @Deactivate
142 public void deactivate() {
143 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
144 leadershipService.removeListener(leadershipEventListener);
145 messageHandlingExecutor.shutdown();
146 transferExecutor.shutdown();
147 eventHandler.shutdown();
148 log.info("Stopped");
149 }
150
151 @Override
152 public CompletableFuture<MastershipRole> requestRole(NetworkId networkId,
153 DeviceId deviceId) {
154 checkArgument(networkId != null, NETWORK_ID_NULL);
155 checkArgument(deviceId != null, DEVICE_ID_NULL);
156
157 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
158 Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
159 return CompletableFuture
160 .completedFuture(localNodeId.equals(leadership.leaderNodeId()) ?
161 MastershipRole.MASTER : MastershipRole.STANDBY);
162 }
163
164 @Override
165 public MastershipRole getRole(NetworkId networkId, NodeId nodeId, DeviceId deviceId) {
166 checkArgument(networkId != null, NETWORK_ID_NULL);
167 checkArgument(nodeId != null, NODE_ID_NULL);
168 checkArgument(deviceId != null, DEVICE_ID_NULL);
169
170 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
171 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
172 NodeId leader = leadership == null ? null : leadership.leaderNodeId();
173 List<NodeId> candidates = leadership == null ?
174 ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
175 return Objects.equal(nodeId, leader) ?
176 MastershipRole.MASTER : candidates.contains(nodeId) ?
177 MastershipRole.STANDBY : MastershipRole.NONE;
178 }
179
180 @Override
181 public NodeId getMaster(NetworkId networkId, DeviceId deviceId) {
182 checkArgument(deviceId != null, DEVICE_ID_NULL);
183
184 return leadershipService.getLeader(createDeviceMastershipTopic(networkId, deviceId));
185 }
186
187 @Override
188 public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
189 checkArgument(networkId != null, NETWORK_ID_NULL);
190 checkArgument(deviceId != null, DEVICE_ID_NULL);
191
192 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
193 clusterService.getNodes()
194 .forEach((node) -> roles.put(node.id(),
195 getRole(networkId, node.id(), deviceId)));
196
197 NodeId master = null;
198 final List<NodeId> standbys = Lists.newLinkedList();
199
200 List<NodeId> candidates = leadershipService
201 .getCandidates(createDeviceMastershipTopic(networkId, deviceId));
202
203 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
204 if (entry.getValue() == MastershipRole.MASTER) {
205 master = entry.getKey();
206 } else if (entry.getValue() == MastershipRole.STANDBY) {
207 standbys.add(entry.getKey());
208 }
209 }
210
211 List<NodeId> sortedStandbyList = candidates.stream()
212 .filter(standbys::contains).collect(Collectors.toList());
213
214 return new RoleInfo(master, sortedStandbyList);
215 }
216
217 @Override
218 public Set<DeviceId> getDevices(NetworkId networkId, NodeId nodeId) {
219 checkArgument(networkId != null, NETWORK_ID_NULL);
220 checkArgument(nodeId != null, NODE_ID_NULL);
221
222 // FIXME This result contains REMOVED device.
223 // MastershipService cannot listen to DeviceEvent to GC removed topic,
224 // since DeviceManager depend on it.
225 // Reference count, etc. at LeadershipService layer?
226 return leadershipService
227 .ownedTopics(nodeId)
228 .stream()
229 .filter(this::isVirtualMastershipTopic)
230 .map(this::extractDeviceIdFromTopic)
231 .collect(Collectors.toSet());
232 }
233
234 @Override
235 public CompletableFuture<MastershipEvent> setMaster(NetworkId networkId,
236 NodeId nodeId, DeviceId deviceId) {
237 checkArgument(networkId != null, NETWORK_ID_NULL);
238 checkArgument(nodeId != null, NODE_ID_NULL);
239 checkArgument(deviceId != null, DEVICE_ID_NULL);
240
241 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
242 if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
243 transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
244 WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
245 }
246 return CompletableFuture.completedFuture(null);
247 }
248
249 @Override
250 public MastershipTerm getTermFor(NetworkId networkId, DeviceId deviceId) {
251 checkArgument(networkId != null, NETWORK_ID_NULL);
252 checkArgument(deviceId != null, DEVICE_ID_NULL);
253
254 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
255 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
256 return leadership != null && leadership.leaderNodeId() != null ?
257 MastershipTerm.of(leadership.leaderNodeId(),
258 leadership.leader().term()) : null;
259 }
260
261 @Override
262 public CompletableFuture<MastershipEvent> setStandby(NetworkId networkId,
263 NodeId nodeId,
264 DeviceId deviceId) {
265 checkArgument(networkId != null, NETWORK_ID_NULL);
266 checkArgument(nodeId != null, NODE_ID_NULL);
267 checkArgument(deviceId != null, DEVICE_ID_NULL);
268
269 NodeId currentMaster = getMaster(networkId, deviceId);
270 if (!nodeId.equals(currentMaster)) {
271 return CompletableFuture.completedFuture(null);
272 }
273
274 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
275 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
276
277 NodeId newMaster = candidates.stream()
278 .filter(candidate -> !Objects.equal(nodeId, candidate))
279 .findFirst()
280 .orElse(null);
281 log.info("Transitioning to role {} for {}. Next master: {}",
282 newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE,
283 deviceId, newMaster);
284
285 if (newMaster != null) {
286 return setMaster(networkId, newMaster, deviceId);
287 }
288 return relinquishRole(networkId, nodeId, deviceId);
289 }
290
291 @Override
292 public CompletableFuture<MastershipEvent> relinquishRole(NetworkId networkId,
293 NodeId nodeId,
294 DeviceId deviceId) {
295 checkArgument(networkId != null, NETWORK_ID_NULL);
296 checkArgument(nodeId != null, NODE_ID_NULL);
297 checkArgument(deviceId != null, DEVICE_ID_NULL);
298
299 if (nodeId.equals(localNodeId)) {
300 return relinquishLocalRoleByNetwork(networkId, deviceId);
301 }
302
303 log.debug("Forwarding request to relinquish "
304 + "role for vnet {} device {} to {}", deviceId, nodeId);
305 return clusterCommunicator.sendAndReceive(
306 new VirtualDeviceId(networkId, deviceId),
307 ROLE_RELINQUISH_SUBJECT,
308 SERIALIZER::encode,
309 SERIALIZER::decode,
310 nodeId);
311 }
312
313 private CompletableFuture<MastershipEvent> relinquishLocalRoleByNetwork(NetworkId networkId,
314 DeviceId deviceId) {
315 checkArgument(networkId != null, NETWORK_ID_NULL);
316 checkArgument(deviceId != null, DEVICE_ID_NULL);
317
318 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
319 if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
320 return CompletableFuture.completedFuture(null);
321 }
322 MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
323 MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
324 leadershipService.withdraw(leadershipTopic);
325 return CompletableFuture.completedFuture(new MastershipEvent(eventType,
326 deviceId,
327 getNodes(networkId, deviceId)));
328 }
329
330 private CompletableFuture<MastershipEvent>
331 relinquishLocalRole(VirtualDeviceId virtualDeviceId) {
332 return relinquishLocalRoleByNetwork(virtualDeviceId.networkId,
333 virtualDeviceId.deviceId);
334 }
335
336 @Override
337 public void relinquishAllRole(NetworkId networkId, NodeId nodeId) {
338 // Noop. LeadershipService already takes care of detecting and purging stale locks.
339 }
340
341 private class InternalDeviceMastershipEventListener
342 implements LeadershipEventListener {
343
344 @Override
345 public boolean isRelevant(LeadershipEvent event) {
346 Leadership leadership = event.subject();
347 return isVirtualMastershipTopic(leadership.topic());
348 }
349
350 @Override
351 public void event(LeadershipEvent event) {
352 eventHandler.execute(() -> handleEvent(event));
353 }
354
355 private void handleEvent(LeadershipEvent event) {
356 Leadership leadership = event.subject();
357
358 NetworkId networkId = extractNetworkIdFromTopic(leadership.topic());
359 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
360
361 RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
362 getNodes(networkId, deviceId) : new RoleInfo();
363
364 switch (event.type()) {
365 case LEADER_AND_CANDIDATES_CHANGED:
366 notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
367 deviceId, roleInfo));
368 notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
369 deviceId, roleInfo));
370 break;
371 case LEADER_CHANGED:
372 notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
373 deviceId, roleInfo));
374 break;
375 case CANDIDATES_CHANGED:
376 notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
377 deviceId, roleInfo));
378 break;
379 case SERVICE_DISRUPTED:
380 notifyDelegate(networkId, new MastershipEvent(SUSPENDED,
381 deviceId, roleInfo));
382 break;
383 case SERVICE_RESTORED:
384 // Do nothing, wait for updates from peers
385 break;
386 default:
387 }
388 }
389 }
390
391 private String createDeviceMastershipTopic(NetworkId networkId, DeviceId deviceId) {
392 return String.format("vnet:%s,device:%s", networkId.toString(), deviceId.toString());
393 }
394
395 /**
396 * Returns the virtual network identifier extracted from the topic.
397 *
398 * @param topic topic to extract virtual network identifier
399 * @return an extracted virtual network identifier
400 * @throws IllegalArgumentException the topic not match with the pattern
401 * used for virtual network mastership store
402 */
403 private NetworkId extractNetworkIdFromTopic(String topic) {
404 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
405 if (m.matches()) {
406 return NetworkId.networkId(Long.getLong(m.group(1)));
407 } else {
408 throw new IllegalArgumentException("Invalid virtual mastership topic: "
409 + topic);
410 }
411 }
412
413 /**
414 * Returns the device identifier extracted from the topic.
415 *
416 * @param topic topic to extract device identifier
417 * @return an extracted virtual device identifier
418 * @throws IllegalArgumentException the topic not match with the pattern
419 * used for virtual network mastership store
420 */
421 private DeviceId extractDeviceIdFromTopic(String topic) {
422 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
423 if (m.matches()) {
424 return DeviceId.deviceId(m.group(2));
425 } else {
426 throw new IllegalArgumentException("Invalid virtual mastership topic: "
427 + topic);
428 }
429 }
430
431 /**
432 * Returns whether the topic is matched with virtual mastership store topic.
433 *
434 * @param topic topic to match
435 * @return True when the topic matched with virtual network mastership store
436 */
437 private boolean isVirtualMastershipTopic(String topic) {
438 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
439 return m.matches();
440 }
441
442 /**
443 * A wrapper class used for the communication service.
444 */
445 private class VirtualDeviceId {
446 NetworkId networkId;
447 DeviceId deviceId;
448
449 public VirtualDeviceId(NetworkId networkId, DeviceId deviceId) {
450 this.networkId = networkId;
451 this.deviceId = deviceId;
452 }
453
454 public int hashCode() {
455 return Objects.hashCode(networkId, deviceId);
456 }
457
458 @Override
459 public boolean equals(Object obj) {
460 if (this == obj) {
461 return true;
462 }
463 if (obj instanceof VirtualDeviceId) {
464 final VirtualDeviceId that = (VirtualDeviceId) obj;
465 return this.getClass() == that.getClass() &&
466 Objects.equal(this.networkId, that.networkId) &&
467 Objects.equal(this.deviceId, that.deviceId);
468 }
469 return false;
470 }
471 }
472}