blob: 47b9c6f780028d534a1ce7adf51eaf27003f20fc [file] [log] [blame]
Yoonseon Hana578d762017-05-08 13:42:02 -07001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
Yoonseon Hana3277012017-05-22 12:26:21 -070019import com.google.common.collect.ImmutableList;
20import com.google.common.collect.ImmutableSet;
Yoonseon Hana578d762017-05-08 13:42:02 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
Yoonseon Hana3277012017-05-22 12:26:21 -070024import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
Yoonseon Hana578d762017-05-08 13:42:02 -070026import org.apache.felix.scr.annotations.Service;
Yoonseon Hana3277012017-05-22 12:26:21 -070027import org.joda.time.DateTime;
28import org.onlab.packet.IpAddress;
29import org.onosproject.cluster.ClusterEventListener;
30import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.ControllerNode;
32import org.onosproject.cluster.DefaultControllerNode;
Yoonseon Hana578d762017-05-08 13:42:02 -070033import org.onosproject.cluster.NodeId;
34import org.onosproject.cluster.RoleInfo;
35import org.onosproject.incubator.net.virtual.NetworkId;
36import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
37import org.onosproject.mastership.MastershipEvent;
38import org.onosproject.mastership.MastershipStoreDelegate;
39import org.onosproject.mastership.MastershipTerm;
40import org.onosproject.net.DeviceId;
41import org.onosproject.net.MastershipRole;
42import org.slf4j.Logger;
43
Yoonseon Hana3277012017-05-22 12:26:21 -070044import java.util.ArrayList;
45import java.util.Collections;
46import java.util.HashMap;
47import java.util.HashSet;
48import java.util.List;
49import java.util.Map;
50import java.util.Objects;
Yoonseon Hana578d762017-05-08 13:42:02 -070051import java.util.Set;
52import java.util.concurrent.CompletableFuture;
Yoonseon Hana3277012017-05-22 12:26:21 -070053import java.util.concurrent.atomic.AtomicInteger;
Yoonseon Hana578d762017-05-08 13:42:02 -070054
Yoonseon Hana3277012017-05-22 12:26:21 -070055import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
56import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
Yoonseon Hana578d762017-05-08 13:42:02 -070057import static org.slf4j.LoggerFactory.getLogger;
58
59/**
60 * Implementation of the virtual network mastership store to manage inventory of
61 * mastership using trivial in-memory implementation.
62 */
63@Component(immediate = true)
64@Service
65public class SimpleVirtualMastershipStore
66 extends AbstractVirtualStore<MastershipEvent, MastershipStoreDelegate>
67 implements VirtualNetworkMastershipStore {
68
69 private final Logger log = getLogger(getClass());
70
Yoonseon Hana3277012017-05-22 12:26:21 -070071 private static final int NOTHING = 0;
72 private static final int INIT = 1;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected ClusterService clusterService;
76
77 //devices mapped to their masters, to emulate multiple nodes
78 protected final Map<NetworkId, Map<DeviceId, NodeId>> masterMapByNetwork =
79 new HashMap<>();
80 //emulate backups with pile of nodes
81 protected final Map<NetworkId, Map<DeviceId, List<NodeId>>> backupsByNetwork =
82 new HashMap<>();
83 //terms
84 protected final Map<NetworkId, Map<DeviceId, AtomicInteger>> termMapByNetwork =
85 new HashMap<>();
86
Yoonseon Hana578d762017-05-08 13:42:02 -070087 @Activate
88 public void activate() {
Yoonseon Hana3277012017-05-22 12:26:21 -070089 if (clusterService == null) {
90 clusterService = createFakeClusterService();
91 }
Yoonseon Hana578d762017-05-08 13:42:02 -070092 log.info("Started");
93 }
94
95 @Deactivate
96 public void deactivate() {
97 log.info("Stopped");
98 }
99
100 @Override
Yoonseon Hana3277012017-05-22 12:26:21 -0700101 public CompletableFuture<MastershipRole> requestRole(NetworkId networkId,
102 DeviceId deviceId) {
103 //query+possible reelection
104 NodeId node = clusterService.getLocalNode().id();
105 MastershipRole role = getRole(networkId, node, deviceId);
106
107 Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
108
109 switch (role) {
110 case MASTER:
111 return CompletableFuture.completedFuture(MastershipRole.MASTER);
112 case STANDBY:
113 if (getMaster(networkId, deviceId) == null) {
114 // no master => become master
115 masterMap.put(deviceId, node);
116 incrementTerm(networkId, deviceId);
117 // remove from backup list
118 removeFromBackups(networkId, deviceId, node);
119 notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
120 getNodes(networkId, deviceId)));
121 return CompletableFuture.completedFuture(MastershipRole.MASTER);
122 }
123 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
124 case NONE:
125 if (getMaster(networkId, deviceId) == null) {
126 // no master => become master
127 masterMap.put(deviceId, node);
128 incrementTerm(networkId, deviceId);
129 notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
130 getNodes(networkId, deviceId)));
131 return CompletableFuture.completedFuture(MastershipRole.MASTER);
132 }
133 // add to backup list
134 if (addToBackup(networkId, deviceId, node)) {
135 notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId,
136 getNodes(networkId, deviceId)));
137 }
138 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
139 default:
140 log.warn("unknown Mastership Role {}", role);
141 }
142 return CompletableFuture.completedFuture(role);
Yoonseon Hana578d762017-05-08 13:42:02 -0700143 }
144
145 @Override
146 public MastershipRole getRole(NetworkId networkId, NodeId nodeId, DeviceId deviceId) {
Yoonseon Hana3277012017-05-22 12:26:21 -0700147 Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
148 Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
149
150 //just query
151 NodeId current = masterMap.get(deviceId);
152 MastershipRole role;
153
154 if (current != null && current.equals(nodeId)) {
155 return MastershipRole.MASTER;
156 }
157
158 if (backups.getOrDefault(deviceId, Collections.emptyList()).contains(nodeId)) {
159 role = MastershipRole.STANDBY;
160 } else {
161 role = MastershipRole.NONE;
162 }
163 return role;
Yoonseon Hana578d762017-05-08 13:42:02 -0700164 }
165
166 @Override
167 public NodeId getMaster(NetworkId networkId, DeviceId deviceId) {
Yoonseon Hana3277012017-05-22 12:26:21 -0700168 Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
169 return masterMap.get(deviceId);
Yoonseon Hana578d762017-05-08 13:42:02 -0700170 }
171
172 @Override
173 public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
Yoonseon Hana3277012017-05-22 12:26:21 -0700174 Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
175 Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
176
177 return new RoleInfo(masterMap.get(deviceId),
178 backups.getOrDefault(deviceId, ImmutableList.of()));
Yoonseon Hana578d762017-05-08 13:42:02 -0700179 }
180
181 @Override
182 public Set<DeviceId> getDevices(NetworkId networkId, NodeId nodeId) {
Yoonseon Hana3277012017-05-22 12:26:21 -0700183 Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
184
185 Set<DeviceId> ids = new HashSet<>();
186 for (Map.Entry<DeviceId, NodeId> d : masterMap.entrySet()) {
187 if (Objects.equals(d.getValue(), nodeId)) {
188 ids.add(d.getKey());
189 }
190 }
191 return ids;
Yoonseon Hana578d762017-05-08 13:42:02 -0700192 }
193
194 @Override
Yoonseon Hana3277012017-05-22 12:26:21 -0700195 public synchronized CompletableFuture<MastershipEvent> setMaster(NetworkId networkId,
Yoonseon Hana578d762017-05-08 13:42:02 -0700196 NodeId nodeId, DeviceId deviceId) {
Yoonseon Hana3277012017-05-22 12:26:21 -0700197 Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
198
199 MastershipRole role = getRole(networkId, nodeId, deviceId);
200 switch (role) {
201 case MASTER:
202 // no-op
203 return CompletableFuture.completedFuture(null);
204 case STANDBY:
205 case NONE:
206 NodeId prevMaster = masterMap.put(deviceId, nodeId);
207 incrementTerm(networkId, deviceId);
208 removeFromBackups(networkId, deviceId, nodeId);
209 addToBackup(networkId, deviceId, prevMaster);
210 break;
211 default:
212 log.warn("unknown Mastership Role {}", role);
213 return null;
214 }
215
216 return CompletableFuture.completedFuture(
217 new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(networkId, deviceId)));
Yoonseon Hana578d762017-05-08 13:42:02 -0700218 }
219
220 @Override
221 public MastershipTerm getTermFor(NetworkId networkId, DeviceId deviceId) {
Yoonseon Hana3277012017-05-22 12:26:21 -0700222 Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
223 Map<DeviceId, AtomicInteger> termMap = getTermMap(networkId);
224
225 if ((termMap.get(deviceId) == null)) {
226 return MastershipTerm.of(masterMap.get(deviceId), NOTHING);
227 }
228 return MastershipTerm.of(
229 masterMap.get(deviceId), termMap.get(deviceId).get());
Yoonseon Hana578d762017-05-08 13:42:02 -0700230 }
231
232 @Override
233 public CompletableFuture<MastershipEvent> setStandby(NetworkId networkId,
234 NodeId nodeId, DeviceId deviceId) {
Yoonseon Hana3277012017-05-22 12:26:21 -0700235 Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
236
237 MastershipRole role = getRole(networkId, nodeId, deviceId);
238 switch (role) {
239 case MASTER:
240 NodeId backup = reelect(networkId, deviceId, nodeId);
241 if (backup == null) {
242 // no master alternative
243 masterMap.remove(deviceId);
244 // TODO: Should there be new event type for no MASTER?
245 return CompletableFuture.completedFuture(
246 new MastershipEvent(MASTER_CHANGED, deviceId,
247 getNodes(networkId, deviceId)));
248 } else {
249 NodeId prevMaster = masterMap.put(deviceId, backup);
250 incrementTerm(networkId, deviceId);
251 addToBackup(networkId, deviceId, prevMaster);
252 return CompletableFuture.completedFuture(
253 new MastershipEvent(MASTER_CHANGED, deviceId,
254 getNodes(networkId, deviceId)));
255 }
256
257 case STANDBY:
258 case NONE:
259 boolean modified = addToBackup(networkId, deviceId, nodeId);
260 if (modified) {
261 return CompletableFuture.completedFuture(
262 new MastershipEvent(BACKUPS_CHANGED, deviceId,
263 getNodes(networkId, deviceId)));
264 }
265 break;
266
267 default:
268 log.warn("unknown Mastership Role {}", role);
269 }
Yoonseon Hana578d762017-05-08 13:42:02 -0700270 return null;
271 }
272
Yoonseon Hana3277012017-05-22 12:26:21 -0700273
274 /**
275 * Dumbly selects next-available node that's not the current one.
276 * emulate leader election.
277 *
278 * @param networkId a virtual network identifier
279 * @param deviceId a virtual device identifier
280 * @param nodeId a nod identifier
281 * @return Next available node as a leader
282 */
283 private synchronized NodeId reelect(NetworkId networkId, DeviceId deviceId,
284 NodeId nodeId) {
285 Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
286
287 List<NodeId> stbys = backups.getOrDefault(deviceId, Collections.emptyList());
288 NodeId backup = null;
289 for (NodeId n : stbys) {
290 if (!n.equals(nodeId)) {
291 backup = n;
292 break;
293 }
294 }
295 stbys.remove(backup);
296 return backup;
297 }
298
Yoonseon Hana578d762017-05-08 13:42:02 -0700299 @Override
Yoonseon Hana3277012017-05-22 12:26:21 -0700300 public synchronized CompletableFuture<MastershipEvent>
301 relinquishRole(NetworkId networkId, NodeId nodeId, DeviceId deviceId) {
302 Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
303
304 MastershipRole role = getRole(networkId, nodeId, deviceId);
305 switch (role) {
306 case MASTER:
307 NodeId backup = reelect(networkId, deviceId, nodeId);
308 masterMap.put(deviceId, backup);
309 incrementTerm(networkId, deviceId);
310 return CompletableFuture.completedFuture(
311 new MastershipEvent(MASTER_CHANGED, deviceId,
312 getNodes(networkId, deviceId)));
313
314 case STANDBY:
315 if (removeFromBackups(networkId, deviceId, nodeId)) {
316 return CompletableFuture.completedFuture(
317 new MastershipEvent(BACKUPS_CHANGED, deviceId,
318 getNodes(networkId, deviceId)));
319 }
320 break;
321
322 case NONE:
323 break;
324
325 default:
326 log.warn("unknown Mastership Role {}", role);
327 }
328 return CompletableFuture.completedFuture(null);
Yoonseon Hana578d762017-05-08 13:42:02 -0700329 }
330
331 @Override
332 public void relinquishAllRole(NetworkId networkId, NodeId nodeId) {
Yoonseon Hana3277012017-05-22 12:26:21 -0700333 Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
334 Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
Yoonseon Hana578d762017-05-08 13:42:02 -0700335
Yoonseon Hana3277012017-05-22 12:26:21 -0700336 List<CompletableFuture<MastershipEvent>> eventFutures = new ArrayList<>();
337 Set<DeviceId> toRelinquish = new HashSet<>();
338
339 masterMap.entrySet().stream()
340 .filter(entry -> nodeId.equals(entry.getValue()))
341 .forEach(entry -> toRelinquish.add(entry.getKey()));
342
343 backups.entrySet().stream()
344 .filter(entry -> entry.getValue().contains(nodeId))
345 .forEach(entry -> toRelinquish.add(entry.getKey()));
346
347 toRelinquish.forEach(deviceId -> eventFutures.add(
348 relinquishRole(networkId, nodeId, deviceId)));
349
350 eventFutures.forEach(future -> {
351 future.whenComplete((event, error) -> notifyDelegate(networkId, event));
352 });
353 }
354
355 /**
356 * Increase the term for a device, and store it.
357 *
358 * @param networkId a virtual network identifier
359 * @param deviceId a virtual device identifier
360 */
361 private synchronized void incrementTerm(NetworkId networkId, DeviceId deviceId) {
362 Map<DeviceId, AtomicInteger> termMap = getTermMap(networkId);
363
364 AtomicInteger term = termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING));
365 term.incrementAndGet();
366 termMap.put(deviceId, term);
367 }
368
369 /**
370 * Remove backup node for a device.
371 *
372 * @param networkId a virtual network identifier
373 * @param deviceId a virtual device identifier
374 * @param nodeId a node identifier
375 * @return True if success
376 */
377 private synchronized boolean removeFromBackups(NetworkId networkId,
378 DeviceId deviceId, NodeId nodeId) {
379 Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
380
381 List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>());
382 boolean modified = stbys.remove(nodeId);
383 backups.put(deviceId, stbys);
384 return modified;
385 }
386
387 /**
388 * add to backup if not there already, silently ignores null node.
389 *
390 * @param networkId a virtual network identifier
391 * @param deviceId a virtual device identifier
392 * @param nodeId a node identifier
393 * @return True if success
394 */
395 private synchronized boolean addToBackup(NetworkId networkId,
396 DeviceId deviceId, NodeId nodeId) {
397 Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
398
399 boolean modified = false;
400 List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>());
401 if (nodeId != null && !stbys.contains(nodeId)) {
402 stbys.add(nodeId);
403 backups.put(deviceId, stbys);
404 modified = true;
405 }
406 return modified;
407 }
408
409 /**
410 * Returns deviceId-master map for a specified virtual network.
411 *
412 * @param networkId a virtual network identifier
413 * @return DeviceId-master map of a given virtual network.
414 */
415 private Map<DeviceId, NodeId> getMasterMap(NetworkId networkId) {
416 return masterMapByNetwork.computeIfAbsent(networkId, k -> new HashMap<>());
417 }
418
419 /**
420 * Returns deviceId-backups map for a specified virtual network.
421 *
422 * @param networkId a virtual network identifier
423 * @return DeviceId-backups map of a given virtual network.
424 */
425 private Map<DeviceId, List<NodeId>> getBackups(NetworkId networkId) {
426 return backupsByNetwork.computeIfAbsent(networkId, k -> new HashMap<>());
427 }
428
429 /**
430 * Returns deviceId-terms map for a specified virtual network.
431 *
432 * @param networkId a virtual network identifier
433 * @return DeviceId-terms map of a given virtual network.
434 */
435 private Map<DeviceId, AtomicInteger> getTermMap(NetworkId networkId) {
436 return termMapByNetwork.computeIfAbsent(networkId, k -> new HashMap<>());
437 }
438
439 /**
440 * Returns a fake cluster service for a test purpose only.
441 *
442 * @return a fake cluster service
443 */
444 private ClusterService createFakeClusterService() {
445 // just for ease of unit test
446 final ControllerNode instance =
447 new DefaultControllerNode(new NodeId("local"),
448 IpAddress.valueOf("127.0.0.1"));
449
450 ClusterService faceClusterService = new ClusterService() {
451
452 private final DateTime creationTime = DateTime.now();
453
454 @Override
455 public ControllerNode getLocalNode() {
456 return instance;
457 }
458
459 @Override
460 public Set<ControllerNode> getNodes() {
461 return ImmutableSet.of(instance);
462 }
463
464 @Override
465 public ControllerNode getNode(NodeId nodeId) {
466 if (instance.id().equals(nodeId)) {
467 return instance;
468 }
469 return null;
470 }
471
472 @Override
473 public ControllerNode.State getState(NodeId nodeId) {
474 if (instance.id().equals(nodeId)) {
475 return ControllerNode.State.ACTIVE;
476 } else {
477 return ControllerNode.State.INACTIVE;
478 }
479 }
480
481 @Override
482 public DateTime getLastUpdated(NodeId nodeId) {
483 return creationTime;
484 }
485
486 @Override
487 public void addListener(ClusterEventListener listener) {
488 }
489
490 @Override
491 public void removeListener(ClusterEventListener listener) {
492 }
493 };
494 return faceClusterService;
Yoonseon Hana578d762017-05-08 13:42:02 -0700495 }
496}