blob: 141af0bc9415a06159bdbf73b2c45ce6ce16c166 [file] [log] [blame]
Jian Li19f25262018-07-03 22:37:12 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstackvtap.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Maps;
20import com.google.common.collect.Sets;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.net.DefaultAnnotations;
29import org.onosproject.net.DeviceId;
30import org.onosproject.net.SparseAnnotations;
31import org.onosproject.openstackvtap.api.OpenstackVtap;
32import org.onosproject.openstackvtap.api.OpenstackVtapCriterion;
33import org.onosproject.openstackvtap.api.OpenstackVtapEvent;
34import org.onosproject.openstackvtap.api.OpenstackVtapId;
35import org.onosproject.openstackvtap.api.OpenstackVtapStore;
36import org.onosproject.openstackvtap.api.OpenstackVtapStoreDelegate;
37import org.onosproject.store.AbstractStore;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.service.ConsistentMap;
40import org.onosproject.store.service.DistributedPrimitive.Status;
41import org.onosproject.store.service.MapEvent;
42import org.onosproject.store.service.MapEventListener;
43import org.onosproject.store.service.Serializer;
44import org.onosproject.store.service.StorageService;
45import org.slf4j.Logger;
46
47import java.util.Comparator;
48import java.util.Map;
49import java.util.Objects;
50import java.util.Set;
51import java.util.UUID;
52import java.util.concurrent.ScheduledExecutorService;
53import java.util.function.Consumer;
54import java.util.stream.Collectors;
55
56import static com.google.common.base.Preconditions.checkNotNull;
57import static com.google.common.base.Preconditions.checkState;
58import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
59import static org.onlab.util.Tools.groupedThreads;
60import static org.onosproject.net.DefaultAnnotations.merge;
61import static org.slf4j.LoggerFactory.getLogger;
62
63/**
64 * Manages the inventory of users using a {@code ConsistentMap}.
65 */
66@Component(immediate = true)
67@Service
68public class DistributedOpenstackVtapStore
69 extends AbstractStore<OpenstackVtapEvent, OpenstackVtapStoreDelegate>
70 implements OpenstackVtapStore {
71 private final Logger log = getLogger(getClass());
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected StorageService storageService;
75
76 private ConsistentMap<OpenstackVtapId, DefaultOpenstackVtap> vTapConsistentMap;
77 private MapEventListener<OpenstackVtapId, DefaultOpenstackVtap>
Jian Li26ef1302018-07-04 14:37:06 +090078 vTapListener = new VtapEventListener();
Jian Li19f25262018-07-03 22:37:12 +090079 private Map<OpenstackVtapId, DefaultOpenstackVtap> vTapMap;
80
81 private static final Serializer SERIALIZER = Serializer
82 .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
83 .register(OpenstackVtapId.class)
84 .register(UUID.class)
85 .register(DefaultOpenstackVtap.class)
86 .register(OpenstackVtap.Type.class)
87 .register(DefaultOpenstackVtapCriterion.class)
88 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
89 .build());
90
91 private Map<DeviceId, Set<OpenstackVtapId>>
92 vTapIdsByTxDeviceId = Maps.newConcurrentMap();
93 private Map<DeviceId, Set<OpenstackVtapId>>
94 vTapIdsByRxDeviceId = Maps.newConcurrentMap();
95
96 private ScheduledExecutorService eventExecutor;
97
98 private Consumer<Status> vTapStatusListener;
99
100 public static final String INVALID_DESCRIPTION = "Invalid create/update parameter";
101
102 @Activate
103 public void activate() {
104 eventExecutor = newSingleThreadScheduledExecutor(
105 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
106
107 vTapConsistentMap = storageService.<OpenstackVtapId, DefaultOpenstackVtap>
108 consistentMapBuilder()
109 .withName("vTapMap")
110 .withSerializer(SERIALIZER)
111 .build();
112
113 vTapMap = vTapConsistentMap.asJavaMap();
114 vTapConsistentMap.addListener(vTapListener);
115
116 vTapStatusListener = status -> {
117 if (status == Status.ACTIVE) {
Jian Li26ef1302018-07-04 14:37:06 +0900118 eventExecutor.execute(this::loadVtapIds);
Jian Li19f25262018-07-03 22:37:12 +0900119 }
120 };
121 vTapConsistentMap.addStatusChangeListener(vTapStatusListener);
122
123 log.info("Started {} - {}", this.getClass().getSimpleName());
124 }
125
126 @Deactivate
127 public void deactivate() {
128 vTapConsistentMap.removeStatusChangeListener(vTapStatusListener);
129 vTapConsistentMap.removeListener(vTapListener);
130 eventExecutor.shutdown();
131
132 log.info("Stopped {} - {}", this.getClass().getSimpleName());
133 }
134
Jian Li19f25262018-07-03 22:37:12 +0900135 @Override
136 public OpenstackVtap createOrUpdateVtap(OpenstackVtapId vTapId,
137 OpenstackVtap description,
138 boolean replaceFlag) {
139
140 return vTapMap.compute(vTapId, (id, existing) -> {
141 if (existing == null &&
142 (description.type() == null ||
143 description.vTapCriterion() == null ||
144 description.txDeviceIds() == null ||
145 description.rxDeviceIds() == null)) {
146 checkState(false, INVALID_DESCRIPTION);
147 return null;
148 }
149
150 if (shouldUpdate(existing, description, replaceFlag)) {
151 // Replace items
152 OpenstackVtap.Type type =
153 (description.type() == null ? existing.type() : description.type());
154 OpenstackVtapCriterion vTapCriterion =
155 (description.vTapCriterion() == null ?
156 existing.vTapCriterion() : description.vTapCriterion());
157
158 // Replace or add devices
159 Set<DeviceId> txDeviceIds;
160 if (description.txDeviceIds() == null) {
161 txDeviceIds = existing.txDeviceIds();
162 } else {
163 if (existing == null || replaceFlag) {
164 txDeviceIds = ImmutableSet.copyOf(description.txDeviceIds());
165 } else {
166 txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
167 txDeviceIds.addAll(description.txDeviceIds());
168 }
169 }
170
171 Set<DeviceId> rxDeviceIds;
172 if (description.rxDeviceIds() == null) {
173 rxDeviceIds = existing.rxDeviceIds();
174 } else {
175 if (existing == null || replaceFlag) {
176 rxDeviceIds = ImmutableSet.copyOf(description.rxDeviceIds());
177 } else {
178 rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
179 rxDeviceIds.addAll(description.rxDeviceIds());
180 }
181 }
182
183 // Replace or add annotations
184 SparseAnnotations annotations;
185 if (existing != null) {
186 annotations = merge((DefaultAnnotations) existing.annotations(),
187 (SparseAnnotations) description.annotations());
188 } else {
189 annotations = (SparseAnnotations) description.annotations();
190 }
191
192 // Make new changed vTap and return
193 return DefaultOpenstackVtap.builder()
194 .id(vTapId)
195 .type(type)
196 .vTapCriterion(vTapCriterion)
197 .txDeviceIds(txDeviceIds)
198 .rxDeviceIds(rxDeviceIds)
199 .annotations(annotations)
200 .build();
201 }
202 return existing;
203 });
204 }
205
206 @Override
207 public OpenstackVtap removeVtapById(OpenstackVtapId vTapId) {
208 return vTapMap.remove(vTapId);
209 }
210
211 @Override
212 public boolean addDeviceToVtap(OpenstackVtapId vTapId,
213 OpenstackVtap.Type type,
214 DeviceId deviceId) {
215 checkNotNull(vTapId);
216 checkNotNull(deviceId);
217
218 OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
219 if (existing == null) {
220 return null;
221 }
222 if (!existing.type().isValid(type)) {
223 log.error("Not valid OpenstackVtap type {} for requested type {}",
224 existing.type(), type);
225 return existing;
226 }
227
228 Set<DeviceId> txDeviceIds = null;
229 if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
230 !existing.txDeviceIds().contains(deviceId)) {
231 txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
232 txDeviceIds.add(deviceId);
233 }
234
235 Set<DeviceId> rxDeviceIds = null;
236 if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
237 !existing.rxDeviceIds().contains(deviceId)) {
238 rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
239 rxDeviceIds.add(deviceId);
240 }
241
242 if (txDeviceIds != null || rxDeviceIds != null) {
243 //updateVTapIdFromDeviceId(existing.id(), deviceId); // execute from event listener
244
245 return DefaultOpenstackVtap.builder()
246 .id(vTapId)
247 .type(existing.type())
248 .vTapCriterion(existing.vTapCriterion())
249 .txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
250 .rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
251 .annotations(existing.annotations())
252 .build();
253 }
254 return existing;
255 });
256 return (vTap != null);
257 }
258
259 @Override
260 public boolean removeDeviceFromVtap(OpenstackVtapId vTapId,
261 OpenstackVtap.Type type,
262 DeviceId deviceId) {
263 checkNotNull(vTapId);
264 checkNotNull(deviceId);
265
266 OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
267 if (existing == null) {
268 return null;
269 }
270 if (!existing.type().isValid(type)) {
271 log.error("Not valid OpenstackVtap type {} for requested type {}",
272 existing.type(), type);
273 return existing;
274 }
275
276 Set<DeviceId> txDeviceIds = null;
277 if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
278 existing.txDeviceIds().contains(deviceId)) {
279 txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
280 txDeviceIds.remove(deviceId);
281 }
282
283 Set<DeviceId> rxDeviceIds = null;
284 if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
285 existing.rxDeviceIds().contains(deviceId)) {
286 rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
287 rxDeviceIds.remove(deviceId);
288 }
289
290 if (txDeviceIds != null || rxDeviceIds != null) {
291 //removeVTapIdFromDeviceId(existing.id(), deviceId); // execute from event listener
292
293 return DefaultOpenstackVtap.builder()
294 .id(vTapId)
295 .type(existing.type())
296 .vTapCriterion(existing.vTapCriterion())
297 .txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
298 .rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
299 .annotations(existing.annotations())
300 .build();
301 }
302 return existing;
303 });
304 return (vTap != null);
305 }
306
307 @Override
308 public boolean updateDeviceForVtap(OpenstackVtapId vTapId,
309 Set<DeviceId> txDeviceIds, Set<DeviceId> rxDeviceIds,
310 boolean replaceDevices) {
311 checkNotNull(vTapId);
312 checkNotNull(txDeviceIds);
313 checkNotNull(rxDeviceIds);
314
315 OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
316 if (existing == null) {
317 return null;
318 }
319
320 // Replace or add devices
321 Set<DeviceId> txDS = null;
322 if (replaceDevices) {
323 if (!existing.txDeviceIds().equals(txDeviceIds)) {
324 txDS = ImmutableSet.copyOf(txDeviceIds);
325 }
326 } else {
327 if (!existing.txDeviceIds().containsAll(txDeviceIds)) {
328 txDS = Sets.newHashSet(existing.txDeviceIds());
329 txDS.addAll(txDeviceIds);
330 }
331 }
332
333 Set<DeviceId> rxDS = null;
334 if (replaceDevices) {
335 if (!existing.rxDeviceIds().equals(rxDeviceIds)) {
336 rxDS = ImmutableSet.copyOf(rxDeviceIds);
337 }
338 } else {
339 if (!existing.rxDeviceIds().containsAll(rxDeviceIds)) {
340 rxDS = Sets.newHashSet(existing.rxDeviceIds());
341 rxDS.addAll(rxDeviceIds);
342 }
343 }
344
345 if (txDS != null || rxDS != null) {
346
347 return DefaultOpenstackVtap.builder()
348 .id(vTapId)
349 .type(existing.type())
350 .vTapCriterion(existing.vTapCriterion())
351 .txDeviceIds(txDS != null ? txDS : existing.txDeviceIds())
352 .rxDeviceIds(rxDS != null ? rxDS : existing.rxDeviceIds())
353 .annotations(existing.annotations())
354 .build();
355 }
356 return existing;
357 });
358 return (vTap != null);
359 }
360
361 @Override
362 public int getVtapCount(OpenstackVtap.Type type) {
363 return (int) vTapMap.values().parallelStream()
364 .filter(vTap -> vTap.type().isValid(type))
365 .count();
366 }
367
368 @Override
369 public Set<OpenstackVtap> getVtaps(OpenstackVtap.Type type) {
370 return ImmutableSet.copyOf(
371 vTapMap.values().parallelStream()
372 .filter(vTap -> vTap.type().isValid(type))
373 .collect(Collectors.toSet()));
374 }
375
376 @Override
377 public OpenstackVtap getVtap(OpenstackVtapId vTapId) {
378 return vTapMap.get(vTapId);
379 }
380
381 @Override
382 public Set<OpenstackVtap> getVtapsByDeviceId(OpenstackVtap.Type type,
383 DeviceId deviceId) {
384 Set<OpenstackVtapId> vtapIds = Sets.newHashSet();
385 if (type.isValid(OpenstackVtap.Type.VTAP_TX)) {
Jian Lib1ca1a22018-07-06 13:31:39 +0900386 if (vTapIdsByTxDeviceId.get(deviceId) != null) {
387 vtapIds.addAll(vTapIdsByTxDeviceId.get(deviceId));
388 }
Jian Li19f25262018-07-03 22:37:12 +0900389 }
390 if (type.isValid(OpenstackVtap.Type.VTAP_RX)) {
Jian Lib1ca1a22018-07-06 13:31:39 +0900391 if (vTapIdsByRxDeviceId.get(deviceId) != null) {
392 vtapIds.addAll(vTapIdsByRxDeviceId.get(deviceId));
393 }
Jian Li19f25262018-07-03 22:37:12 +0900394 }
395
396 return ImmutableSet.copyOf(
397 vtapIds.parallelStream()
398 .map(vTapId -> vTapMap.get(vTapId))
399 .filter(Objects::nonNull)
400 .collect(Collectors.toSet()));
401 }
402
Jian Li26ef1302018-07-04 14:37:06 +0900403 private void loadVtapIds() {
404 vTapIdsByTxDeviceId.clear();
405 vTapIdsByRxDeviceId.clear();
406 vTapMap.values().forEach(vTap -> refreshDeviceIdsByVtap(null, vTap));
407 }
408
409 private boolean shouldUpdate(DefaultOpenstackVtap existing,
410 OpenstackVtap description,
411 boolean replaceDevices) {
412 if (existing == null) {
413 return true;
414 }
415
416 if ((description.type() != null && !description.type().equals(existing.type()))
417 || (description.vTapCriterion() != null &&
418 !description.vTapCriterion().equals(existing.vTapCriterion()))) {
419 return true;
420 }
421
422 if (description.txDeviceIds() != null) {
423 if (replaceDevices) {
424 if (!existing.txDeviceIds().equals(description.txDeviceIds())) {
425 return true;
426 }
427 } else {
428 if (!existing.txDeviceIds().containsAll(description.txDeviceIds())) {
429 return true;
430 }
431 }
432 }
433
434 if (description.rxDeviceIds() != null) {
435 if (replaceDevices) {
436 if (!existing.rxDeviceIds().equals(description.rxDeviceIds())) {
437 return true;
438 }
439 } else {
440 if (!existing.rxDeviceIds().containsAll(description.rxDeviceIds())) {
441 return true;
442 }
443 }
444 }
445
446 // check to see if any of the annotations provided by vTap
447 // differ from those in the existing vTap
448 return description.annotations().keys().stream()
449 .anyMatch(k -> !Objects.equals(description.annotations().value(k),
450 existing.annotations().value(k)));
451 }
452
453 private class VtapComparator implements Comparator<OpenstackVtap> {
Jian Li19f25262018-07-03 22:37:12 +0900454 @Override
455 public int compare(OpenstackVtap v1, OpenstackVtap v2) {
456 int diff = (v2.type().compareTo(v1.type()));
457 if (diff == 0) {
458 return (v2.vTapCriterion().ipProtocol() - v1.vTapCriterion().ipProtocol());
459 }
460 return diff;
461 }
462 }
463
464 private static Set<OpenstackVtapId> addVTapIds(OpenstackVtapId vTapId) {
465 Set<OpenstackVtapId> vtapIds = Sets.newConcurrentHashSet();
466 vtapIds.add(vTapId);
467 return vtapIds;
468 }
469
470 private static Set<OpenstackVtapId> updateVTapIds(Set<OpenstackVtapId> existingVtapIds,
471 OpenstackVtapId vTapId) {
472 existingVtapIds.add(vTapId);
473 return existingVtapIds;
474 }
475
476 private static Set<OpenstackVtapId> removeVTapIds(Set<OpenstackVtapId> existingVtapIds,
477 OpenstackVtapId vTapId) {
478 existingVtapIds.remove(vTapId);
479 if (existingVtapIds.isEmpty()) {
480 return null;
481 }
482 return existingVtapIds;
483 }
484
485 private void updateVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
486 vTapIdsByTxDeviceId.compute(deviceId, (k, v) -> v == null ?
487 addVTapIds(vTapId) : updateVTapIds(v, vTapId));
488 }
489
490 private void removeVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
491 vTapIdsByTxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
492 }
493
494 private void updateVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
495 vTapIdsByRxDeviceId.compute(deviceId, (k, v) -> v == null ?
496 addVTapIds(vTapId) : updateVTapIds(v, vTapId));
497 }
498
499 private void removeVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
500 vTapIdsByRxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
501 }
502
503 private void refreshDeviceIdsByVtap(OpenstackVtap oldOpenstackVtap,
504 OpenstackVtap newOpenstackVtap) {
505 if (oldOpenstackVtap != null) {
506 Set<DeviceId> removeDeviceIds;
507
508 // Remove TX vTap
509 removeDeviceIds = (newOpenstackVtap != null) ?
510 Sets.difference(oldOpenstackVtap.txDeviceIds(),
511 newOpenstackVtap.txDeviceIds()) : oldOpenstackVtap.txDeviceIds();
512 removeDeviceIds.forEach(id -> removeVTapIdFromTxDeviceId(oldOpenstackVtap.id(), id));
513
514 // Remove RX vTap
515 removeDeviceIds = (newOpenstackVtap != null) ?
516 Sets.difference(oldOpenstackVtap.rxDeviceIds(),
517 newOpenstackVtap.rxDeviceIds()) : oldOpenstackVtap.rxDeviceIds();
518 removeDeviceIds.forEach(id -> removeVTapIdFromRxDeviceId(oldOpenstackVtap.id(), id));
519 }
520
521 if (newOpenstackVtap != null) {
522 Set<DeviceId> addDeviceIds;
523
524 // Add TX vTap
525 addDeviceIds = (oldOpenstackVtap != null) ?
526 Sets.difference(newOpenstackVtap.txDeviceIds(),
527 oldOpenstackVtap.txDeviceIds()) : newOpenstackVtap.txDeviceIds();
528 addDeviceIds.forEach(id -> updateVTapIdFromTxDeviceId(newOpenstackVtap.id(), id));
529
530 // Add RX vTap
531 addDeviceIds = (oldOpenstackVtap != null) ?
532 Sets.difference(newOpenstackVtap.rxDeviceIds(),
533 oldOpenstackVtap.rxDeviceIds()) : newOpenstackVtap.rxDeviceIds();
534 addDeviceIds.forEach(id -> updateVTapIdFromRxDeviceId(newOpenstackVtap.id(), id));
535 }
536 }
537
Jian Li26ef1302018-07-04 14:37:06 +0900538 private class VtapEventListener
Jian Li19f25262018-07-03 22:37:12 +0900539 implements MapEventListener<OpenstackVtapId, DefaultOpenstackVtap> {
540 @Override
541 public void event(MapEvent<OpenstackVtapId, DefaultOpenstackVtap> event) {
542 DefaultOpenstackVtap newValue =
543 event.newValue() != null ? event.newValue().value() : null;
544 DefaultOpenstackVtap oldValue =
545 event.oldValue() != null ? event.oldValue().value() : null;
546
Jian Li26ef1302018-07-04 14:37:06 +0900547 log.debug("VtapEventListener {} -> {}, {}", event.type(), oldValue, newValue);
Jian Li19f25262018-07-03 22:37:12 +0900548 switch (event.type()) {
549 case INSERT:
550 refreshDeviceIdsByVtap(oldValue, newValue);
551 notifyDelegate(new OpenstackVtapEvent(
552 OpenstackVtapEvent.Type.VTAP_ADDED, newValue));
553 break;
554
555 case UPDATE:
556 if (!Objects.equals(newValue, oldValue)) {
557 refreshDeviceIdsByVtap(oldValue, newValue);
558 notifyDelegate(new OpenstackVtapEvent(
559 OpenstackVtapEvent.Type.VTAP_UPDATED, newValue, oldValue));
560 }
561 break;
562
563 case REMOVE:
564 refreshDeviceIdsByVtap(oldValue, newValue);
565 notifyDelegate(new OpenstackVtapEvent(
566 OpenstackVtapEvent.Type.VTAP_REMOVED, oldValue));
567 break;
568
569 default:
570 log.warn("Unknown map event type: {}", event.type());
571 }
572 }
573 }
574}