blob: bf741b04fe4371f8320832c84959c94d2524e0ab [file] [log] [blame]
Yoonseon Han9b71b2c2017-05-26 15:17:29 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
Ray Milkeyd84f89b2018-08-17 14:54:17 -070019import com.google.common.base.Objects;
20import com.google.common.collect.ImmutableList;
Jordan Halterman0a2bd452018-06-13 17:24:58 -070021import com.google.common.collect.ImmutableMap;
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070022import org.onlab.util.KryoNamespace;
23import org.onosproject.cluster.ClusterService;
24import org.onosproject.cluster.Leadership;
25import org.onosproject.cluster.LeadershipAdminService;
26import org.onosproject.cluster.LeadershipEvent;
27import org.onosproject.cluster.LeadershipEventListener;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.cluster.RoleInfo;
31import org.onosproject.incubator.net.virtual.NetworkId;
32import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
33import org.onosproject.mastership.MastershipEvent;
Jordan Halterman0a2bd452018-06-13 17:24:58 -070034import org.onosproject.mastership.MastershipInfo;
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070035import org.onosproject.mastership.MastershipStoreDelegate;
36import org.onosproject.mastership.MastershipTerm;
37import org.onosproject.net.DeviceId;
38import org.onosproject.net.MastershipRole;
39import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
40import org.onosproject.store.cluster.messaging.MessageSubject;
41import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman2c83a102017-08-20 17:11:41 -070042import org.onosproject.store.service.Serializer;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070043import org.osgi.service.component.annotations.Activate;
44import org.osgi.service.component.annotations.Component;
45import org.osgi.service.component.annotations.Deactivate;
46import org.osgi.service.component.annotations.Reference;
47import org.osgi.service.component.annotations.ReferenceCardinality;
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070048import org.slf4j.Logger;
49
50import java.util.List;
Jordan Halterman0a2bd452018-06-13 17:24:58 -070051import java.util.Optional;
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070052import java.util.Set;
53import java.util.concurrent.CompletableFuture;
54import java.util.concurrent.ExecutorService;
55import java.util.concurrent.Executors;
56import java.util.concurrent.ScheduledExecutorService;
57import java.util.concurrent.TimeUnit;
58import java.util.regex.Matcher;
59import java.util.regex.Pattern;
60import java.util.stream.Collectors;
61
Ray Milkeyd84f89b2018-08-17 14:54:17 -070062import static com.google.common.base.Preconditions.checkArgument;
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070063import static org.onlab.util.Tools.groupedThreads;
64import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
65import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
66import static org.onosproject.mastership.MastershipEvent.Type.SUSPENDED;
67import static org.slf4j.LoggerFactory.getLogger;
68
Ray Milkeyd84f89b2018-08-17 14:54:17 -070069@Component(immediate = true, enabled = false, service = VirtualNetworkMastershipStore.class)
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070070public class ConsistentVirtualDeviceMastershipStore
71 extends AbstractVirtualStore<MastershipEvent, MastershipStoreDelegate>
72 implements VirtualNetworkMastershipStore {
73
74 private final Logger log = getLogger(getClass());
75
Ray Milkeyd84f89b2018-08-17 14:54:17 -070076 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070077 protected LeadershipService leadershipService;
78
Ray Milkeyd84f89b2018-08-17 14:54:17 -070079 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070080 protected LeadershipAdminService leadershipAdminService;
81
Ray Milkeyd84f89b2018-08-17 14:54:17 -070082 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070083 protected ClusterService clusterService;
84
Ray Milkeyd84f89b2018-08-17 14:54:17 -070085 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Han9b71b2c2017-05-26 15:17:29 -070086 protected ClusterCommunicationService clusterCommunicator;
87
88 private NodeId localNodeId;
89
90 private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
91 new MessageSubject("virtual-mastership-store-device-role-relinquish");
92
93 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
94 Pattern.compile("vnet:(.*),device:(.*)");
95
96 private ExecutorService eventHandler;
97 private ExecutorService messageHandlingExecutor;
98 private ScheduledExecutorService transferExecutor;
99 private final LeadershipEventListener leadershipEventListener =
100 new InternalDeviceMastershipEventListener();
101
102 private static final String NODE_ID_NULL = "Node ID cannot be null";
103 private static final String NETWORK_ID_NULL = "Network ID cannot be null";
104 private static final String DEVICE_ID_NULL = "Device ID cannot be null";
105 private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
106
Jordan Halterman2c83a102017-08-20 17:11:41 -0700107 public static final Serializer SERIALIZER = Serializer.using(
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700108 KryoNamespace.newBuilder()
109 .register(KryoNamespaces.API)
110 .register(MastershipRole.class)
111 .register(MastershipEvent.class)
112 .register(MastershipEvent.Type.class)
113 .register(VirtualDeviceId.class)
114 .build("VirtualMastershipStore"));
115
116 @Activate
117 public void activate() {
118 eventHandler = Executors.newSingleThreadExecutor(
119 groupedThreads("onos/store/virtual/mastership", "event-handler", log));
120
121 messageHandlingExecutor =
122 Executors.newSingleThreadExecutor(
123 groupedThreads("onos/store/virtual/mastership", "message-handler", log));
124 transferExecutor =
125 Executors.newSingleThreadScheduledExecutor(
126 groupedThreads("onos/store/virtual/mastership", "mastership-transfer-executor", log));
127 clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
128 SERIALIZER::decode,
129 this::relinquishLocalRole,
130 SERIALIZER::encode,
131 messageHandlingExecutor);
132 localNodeId = clusterService.getLocalNode().id();
133 leadershipService.addListener(leadershipEventListener);
134
135 log.info("Started");
136 }
137
138 @Deactivate
139 public void deactivate() {
140 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
141 leadershipService.removeListener(leadershipEventListener);
142 messageHandlingExecutor.shutdown();
143 transferExecutor.shutdown();
144 eventHandler.shutdown();
145 log.info("Stopped");
146 }
147
148 @Override
149 public CompletableFuture<MastershipRole> requestRole(NetworkId networkId,
150 DeviceId deviceId) {
151 checkArgument(networkId != null, NETWORK_ID_NULL);
152 checkArgument(deviceId != null, DEVICE_ID_NULL);
153
154 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
155 Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
156 return CompletableFuture
157 .completedFuture(localNodeId.equals(leadership.leaderNodeId()) ?
158 MastershipRole.MASTER : MastershipRole.STANDBY);
159 }
160
161 @Override
162 public MastershipRole getRole(NetworkId networkId, NodeId nodeId, DeviceId deviceId) {
163 checkArgument(networkId != null, NETWORK_ID_NULL);
164 checkArgument(nodeId != null, NODE_ID_NULL);
165 checkArgument(deviceId != null, DEVICE_ID_NULL);
166
167 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
168 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
169 NodeId leader = leadership == null ? null : leadership.leaderNodeId();
170 List<NodeId> candidates = leadership == null ?
171 ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
172 return Objects.equal(nodeId, leader) ?
173 MastershipRole.MASTER : candidates.contains(nodeId) ?
174 MastershipRole.STANDBY : MastershipRole.NONE;
175 }
176
177 @Override
178 public NodeId getMaster(NetworkId networkId, DeviceId deviceId) {
179 checkArgument(deviceId != null, DEVICE_ID_NULL);
180
181 return leadershipService.getLeader(createDeviceMastershipTopic(networkId, deviceId));
182 }
183
184 @Override
185 public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
186 checkArgument(networkId != null, NETWORK_ID_NULL);
187 checkArgument(deviceId != null, DEVICE_ID_NULL);
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700188 Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
189 return new RoleInfo(leadership.leaderNodeId(), leadership.candidates());
190 }
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700191
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700192 @Override
193 public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
194 checkArgument(networkId != null, NETWORK_ID_NULL);
195 checkArgument(deviceId != null, DEVICE_ID_NULL);
196 Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
197 return buildMastershipFromLeadership(leadership);
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700198 }
199
200 @Override
201 public Set<DeviceId> getDevices(NetworkId networkId, NodeId nodeId) {
202 checkArgument(networkId != null, NETWORK_ID_NULL);
203 checkArgument(nodeId != null, NODE_ID_NULL);
204
205 // FIXME This result contains REMOVED device.
206 // MastershipService cannot listen to DeviceEvent to GC removed topic,
207 // since DeviceManager depend on it.
208 // Reference count, etc. at LeadershipService layer?
209 return leadershipService
210 .ownedTopics(nodeId)
211 .stream()
212 .filter(this::isVirtualMastershipTopic)
213 .map(this::extractDeviceIdFromTopic)
214 .collect(Collectors.toSet());
215 }
216
217 @Override
218 public CompletableFuture<MastershipEvent> setMaster(NetworkId networkId,
219 NodeId nodeId, DeviceId deviceId) {
220 checkArgument(networkId != null, NETWORK_ID_NULL);
221 checkArgument(nodeId != null, NODE_ID_NULL);
222 checkArgument(deviceId != null, DEVICE_ID_NULL);
223
224 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
225 if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
226 transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
227 WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
228 }
229 return CompletableFuture.completedFuture(null);
230 }
231
232 @Override
233 public MastershipTerm getTermFor(NetworkId networkId, DeviceId deviceId) {
234 checkArgument(networkId != null, NETWORK_ID_NULL);
235 checkArgument(deviceId != null, DEVICE_ID_NULL);
236
237 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
238 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
239 return leadership != null && leadership.leaderNodeId() != null ?
240 MastershipTerm.of(leadership.leaderNodeId(),
241 leadership.leader().term()) : null;
242 }
243
244 @Override
245 public CompletableFuture<MastershipEvent> setStandby(NetworkId networkId,
246 NodeId nodeId,
247 DeviceId deviceId) {
248 checkArgument(networkId != null, NETWORK_ID_NULL);
249 checkArgument(nodeId != null, NODE_ID_NULL);
250 checkArgument(deviceId != null, DEVICE_ID_NULL);
251
252 NodeId currentMaster = getMaster(networkId, deviceId);
253 if (!nodeId.equals(currentMaster)) {
254 return CompletableFuture.completedFuture(null);
255 }
256
257 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
258 List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
259
260 NodeId newMaster = candidates.stream()
261 .filter(candidate -> !Objects.equal(nodeId, candidate))
262 .findFirst()
263 .orElse(null);
264 log.info("Transitioning to role {} for {}. Next master: {}",
265 newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE,
266 deviceId, newMaster);
267
268 if (newMaster != null) {
269 return setMaster(networkId, newMaster, deviceId);
270 }
271 return relinquishRole(networkId, nodeId, deviceId);
272 }
273
274 @Override
275 public CompletableFuture<MastershipEvent> relinquishRole(NetworkId networkId,
276 NodeId nodeId,
277 DeviceId deviceId) {
278 checkArgument(networkId != null, NETWORK_ID_NULL);
279 checkArgument(nodeId != null, NODE_ID_NULL);
280 checkArgument(deviceId != null, DEVICE_ID_NULL);
281
282 if (nodeId.equals(localNodeId)) {
283 return relinquishLocalRoleByNetwork(networkId, deviceId);
284 }
285
286 log.debug("Forwarding request to relinquish "
287 + "role for vnet {} device {} to {}", deviceId, nodeId);
288 return clusterCommunicator.sendAndReceive(
289 new VirtualDeviceId(networkId, deviceId),
290 ROLE_RELINQUISH_SUBJECT,
291 SERIALIZER::encode,
292 SERIALIZER::decode,
293 nodeId);
294 }
295
296 private CompletableFuture<MastershipEvent> relinquishLocalRoleByNetwork(NetworkId networkId,
297 DeviceId deviceId) {
298 checkArgument(networkId != null, NETWORK_ID_NULL);
299 checkArgument(deviceId != null, DEVICE_ID_NULL);
300
301 String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
302 if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
303 return CompletableFuture.completedFuture(null);
304 }
305 MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
306 MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
307 leadershipService.withdraw(leadershipTopic);
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700308 return CompletableFuture.completedFuture(
309 new MastershipEvent(eventType, deviceId, getMastership(networkId, deviceId)));
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700310 }
311
312 private CompletableFuture<MastershipEvent>
313 relinquishLocalRole(VirtualDeviceId virtualDeviceId) {
314 return relinquishLocalRoleByNetwork(virtualDeviceId.networkId,
315 virtualDeviceId.deviceId);
316 }
317
318 @Override
319 public void relinquishAllRole(NetworkId networkId, NodeId nodeId) {
320 // Noop. LeadershipService already takes care of detecting and purging stale locks.
321 }
322
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700323 private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
324 ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
325 if (leadership.leaderNodeId() != null) {
326 builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
327 }
328 leadership.candidates().forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
329 clusterService.getNodes().stream()
330 .filter(node -> !leadership.candidates().contains(node.id()))
331 .forEach(node -> builder.put(node.id(), MastershipRole.NONE));
332
333 return new MastershipInfo(
334 leadership.leader() != null ? leadership.leader().term() : 0,
335 leadership.leader() != null
336 ? Optional.of(leadership.leader().nodeId())
337 : Optional.empty(),
338 builder.build());
339 }
340
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700341 private class InternalDeviceMastershipEventListener
342 implements LeadershipEventListener {
343
344 @Override
345 public boolean isRelevant(LeadershipEvent event) {
346 Leadership leadership = event.subject();
347 return isVirtualMastershipTopic(leadership.topic());
348 }
349
350 @Override
351 public void event(LeadershipEvent event) {
352 eventHandler.execute(() -> handleEvent(event));
353 }
354
355 private void handleEvent(LeadershipEvent event) {
356 Leadership leadership = event.subject();
357
358 NetworkId networkId = extractNetworkIdFromTopic(leadership.topic());
359 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700360 MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
361 ? buildMastershipFromLeadership(event.subject())
362 : new MastershipInfo();
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700363
364 switch (event.type()) {
365 case LEADER_AND_CANDIDATES_CHANGED:
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700366 notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
367 notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700368 break;
369 case LEADER_CHANGED:
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700370 notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700371 break;
372 case CANDIDATES_CHANGED:
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700373 notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700374 break;
375 case SERVICE_DISRUPTED:
Jordan Halterman0a2bd452018-06-13 17:24:58 -0700376 notifyDelegate(networkId, new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
Yoonseon Han9b71b2c2017-05-26 15:17:29 -0700377 break;
378 case SERVICE_RESTORED:
379 // Do nothing, wait for updates from peers
380 break;
381 default:
382 }
383 }
384 }
385
386 private String createDeviceMastershipTopic(NetworkId networkId, DeviceId deviceId) {
387 return String.format("vnet:%s,device:%s", networkId.toString(), deviceId.toString());
388 }
389
390 /**
391 * Returns the virtual network identifier extracted from the topic.
392 *
393 * @param topic topic to extract virtual network identifier
394 * @return an extracted virtual network identifier
395 * @throws IllegalArgumentException the topic not match with the pattern
396 * used for virtual network mastership store
397 */
398 private NetworkId extractNetworkIdFromTopic(String topic) {
399 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
400 if (m.matches()) {
401 return NetworkId.networkId(Long.getLong(m.group(1)));
402 } else {
403 throw new IllegalArgumentException("Invalid virtual mastership topic: "
404 + topic);
405 }
406 }
407
408 /**
409 * Returns the device identifier extracted from the topic.
410 *
411 * @param topic topic to extract device identifier
412 * @return an extracted virtual device identifier
413 * @throws IllegalArgumentException the topic not match with the pattern
414 * used for virtual network mastership store
415 */
416 private DeviceId extractDeviceIdFromTopic(String topic) {
417 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
418 if (m.matches()) {
419 return DeviceId.deviceId(m.group(2));
420 } else {
421 throw new IllegalArgumentException("Invalid virtual mastership topic: "
422 + topic);
423 }
424 }
425
426 /**
427 * Returns whether the topic is matched with virtual mastership store topic.
428 *
429 * @param topic topic to match
430 * @return True when the topic matched with virtual network mastership store
431 */
432 private boolean isVirtualMastershipTopic(String topic) {
433 Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
434 return m.matches();
435 }
436
437 /**
438 * A wrapper class used for the communication service.
439 */
440 private class VirtualDeviceId {
441 NetworkId networkId;
442 DeviceId deviceId;
443
444 public VirtualDeviceId(NetworkId networkId, DeviceId deviceId) {
445 this.networkId = networkId;
446 this.deviceId = deviceId;
447 }
448
449 public int hashCode() {
450 return Objects.hashCode(networkId, deviceId);
451 }
452
453 @Override
454 public boolean equals(Object obj) {
455 if (this == obj) {
456 return true;
457 }
458 if (obj instanceof VirtualDeviceId) {
459 final VirtualDeviceId that = (VirtualDeviceId) obj;
460 return this.getClass() == that.getClass() &&
461 Objects.equal(this.networkId, that.networkId) &&
462 Objects.equal(this.deviceId, that.deviceId);
463 }
464 return false;
465 }
466 }
467}