blob: c90571f7b4a884e5a631ee458f301ddfae5deeb8 [file] [log] [blame]
Jian Li19f25262018-07-03 22:37:12 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstackvtap.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Maps;
20import com.google.common.collect.Sets;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.net.DefaultAnnotations;
29import org.onosproject.net.DeviceId;
30import org.onosproject.net.SparseAnnotations;
31import org.onosproject.openstackvtap.api.OpenstackVtap;
32import org.onosproject.openstackvtap.api.OpenstackVtapCriterion;
33import org.onosproject.openstackvtap.api.OpenstackVtapEvent;
34import org.onosproject.openstackvtap.api.OpenstackVtapId;
35import org.onosproject.openstackvtap.api.OpenstackVtapStore;
36import org.onosproject.openstackvtap.api.OpenstackVtapStoreDelegate;
37import org.onosproject.store.AbstractStore;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.service.ConsistentMap;
40import org.onosproject.store.service.DistributedPrimitive.Status;
41import org.onosproject.store.service.MapEvent;
42import org.onosproject.store.service.MapEventListener;
43import org.onosproject.store.service.Serializer;
44import org.onosproject.store.service.StorageService;
45import org.slf4j.Logger;
46
47import java.util.Comparator;
48import java.util.Map;
49import java.util.Objects;
50import java.util.Set;
51import java.util.UUID;
52import java.util.concurrent.ScheduledExecutorService;
53import java.util.function.Consumer;
54import java.util.stream.Collectors;
55
56import static com.google.common.base.Preconditions.checkNotNull;
57import static com.google.common.base.Preconditions.checkState;
58import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
59import static org.onlab.util.Tools.groupedThreads;
60import static org.onosproject.net.DefaultAnnotations.merge;
61import static org.slf4j.LoggerFactory.getLogger;
62
63/**
64 * Manages the inventory of users using a {@code ConsistentMap}.
65 */
66@Component(immediate = true)
67@Service
68public class DistributedOpenstackVtapStore
69 extends AbstractStore<OpenstackVtapEvent, OpenstackVtapStoreDelegate>
70 implements OpenstackVtapStore {
71 private final Logger log = getLogger(getClass());
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected StorageService storageService;
75
76 private ConsistentMap<OpenstackVtapId, DefaultOpenstackVtap> vTapConsistentMap;
77 private MapEventListener<OpenstackVtapId, DefaultOpenstackVtap>
Jian Li26ef1302018-07-04 14:37:06 +090078 vTapListener = new VtapEventListener();
Jian Li19f25262018-07-03 22:37:12 +090079 private Map<OpenstackVtapId, DefaultOpenstackVtap> vTapMap;
80
81 private static final Serializer SERIALIZER = Serializer
82 .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
83 .register(OpenstackVtapId.class)
84 .register(UUID.class)
85 .register(DefaultOpenstackVtap.class)
86 .register(OpenstackVtap.Type.class)
87 .register(DefaultOpenstackVtapCriterion.class)
88 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
89 .build());
90
91 private Map<DeviceId, Set<OpenstackVtapId>>
92 vTapIdsByTxDeviceId = Maps.newConcurrentMap();
93 private Map<DeviceId, Set<OpenstackVtapId>>
94 vTapIdsByRxDeviceId = Maps.newConcurrentMap();
95
96 private ScheduledExecutorService eventExecutor;
97
98 private Consumer<Status> vTapStatusListener;
99
100 public static final String INVALID_DESCRIPTION = "Invalid create/update parameter";
101
102 @Activate
103 public void activate() {
104 eventExecutor = newSingleThreadScheduledExecutor(
105 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
106
107 vTapConsistentMap = storageService.<OpenstackVtapId, DefaultOpenstackVtap>
108 consistentMapBuilder()
109 .withName("vTapMap")
110 .withSerializer(SERIALIZER)
111 .build();
112
113 vTapMap = vTapConsistentMap.asJavaMap();
114 vTapConsistentMap.addListener(vTapListener);
115
116 vTapStatusListener = status -> {
117 if (status == Status.ACTIVE) {
Jian Li26ef1302018-07-04 14:37:06 +0900118 eventExecutor.execute(this::loadVtapIds);
Jian Li19f25262018-07-03 22:37:12 +0900119 }
120 };
121 vTapConsistentMap.addStatusChangeListener(vTapStatusListener);
122
123 log.info("Started {} - {}", this.getClass().getSimpleName());
124 }
125
126 @Deactivate
127 public void deactivate() {
128 vTapConsistentMap.removeStatusChangeListener(vTapStatusListener);
129 vTapConsistentMap.removeListener(vTapListener);
130 eventExecutor.shutdown();
131
132 log.info("Stopped {} - {}", this.getClass().getSimpleName());
133 }
134
Jian Li19f25262018-07-03 22:37:12 +0900135 @Override
136 public OpenstackVtap createOrUpdateVtap(OpenstackVtapId vTapId,
137 OpenstackVtap description,
138 boolean replaceFlag) {
139
140 return vTapMap.compute(vTapId, (id, existing) -> {
141 if (existing == null &&
142 (description.type() == null ||
143 description.vTapCriterion() == null ||
144 description.txDeviceIds() == null ||
145 description.rxDeviceIds() == null)) {
146 checkState(false, INVALID_DESCRIPTION);
147 return null;
148 }
149
150 if (shouldUpdate(existing, description, replaceFlag)) {
151 // Replace items
152 OpenstackVtap.Type type =
153 (description.type() == null ? existing.type() : description.type());
154 OpenstackVtapCriterion vTapCriterion =
155 (description.vTapCriterion() == null ?
156 existing.vTapCriterion() : description.vTapCriterion());
157
158 // Replace or add devices
159 Set<DeviceId> txDeviceIds;
160 if (description.txDeviceIds() == null) {
161 txDeviceIds = existing.txDeviceIds();
162 } else {
163 if (existing == null || replaceFlag) {
164 txDeviceIds = ImmutableSet.copyOf(description.txDeviceIds());
165 } else {
166 txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
167 txDeviceIds.addAll(description.txDeviceIds());
168 }
169 }
170
171 Set<DeviceId> rxDeviceIds;
172 if (description.rxDeviceIds() == null) {
173 rxDeviceIds = existing.rxDeviceIds();
174 } else {
175 if (existing == null || replaceFlag) {
176 rxDeviceIds = ImmutableSet.copyOf(description.rxDeviceIds());
177 } else {
178 rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
179 rxDeviceIds.addAll(description.rxDeviceIds());
180 }
181 }
182
183 // Replace or add annotations
184 SparseAnnotations annotations;
185 if (existing != null) {
186 annotations = merge((DefaultAnnotations) existing.annotations(),
187 (SparseAnnotations) description.annotations());
188 } else {
189 annotations = (SparseAnnotations) description.annotations();
190 }
191
192 // Make new changed vTap and return
193 return DefaultOpenstackVtap.builder()
194 .id(vTapId)
195 .type(type)
196 .vTapCriterion(vTapCriterion)
197 .txDeviceIds(txDeviceIds)
198 .rxDeviceIds(rxDeviceIds)
199 .annotations(annotations)
200 .build();
201 }
202 return existing;
203 });
204 }
205
206 @Override
207 public OpenstackVtap removeVtapById(OpenstackVtapId vTapId) {
208 return vTapMap.remove(vTapId);
209 }
210
211 @Override
212 public boolean addDeviceToVtap(OpenstackVtapId vTapId,
213 OpenstackVtap.Type type,
214 DeviceId deviceId) {
215 checkNotNull(vTapId);
216 checkNotNull(deviceId);
217
218 OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
219 if (existing == null) {
220 return null;
221 }
222 if (!existing.type().isValid(type)) {
223 log.error("Not valid OpenstackVtap type {} for requested type {}",
224 existing.type(), type);
225 return existing;
226 }
227
228 Set<DeviceId> txDeviceIds = null;
229 if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
230 !existing.txDeviceIds().contains(deviceId)) {
231 txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
232 txDeviceIds.add(deviceId);
233 }
234
235 Set<DeviceId> rxDeviceIds = null;
236 if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
237 !existing.rxDeviceIds().contains(deviceId)) {
238 rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
239 rxDeviceIds.add(deviceId);
240 }
241
242 if (txDeviceIds != null || rxDeviceIds != null) {
243 //updateVTapIdFromDeviceId(existing.id(), deviceId); // execute from event listener
244
245 return DefaultOpenstackVtap.builder()
246 .id(vTapId)
247 .type(existing.type())
248 .vTapCriterion(existing.vTapCriterion())
249 .txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
250 .rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
251 .annotations(existing.annotations())
252 .build();
253 }
254 return existing;
255 });
256 return (vTap != null);
257 }
258
259 @Override
260 public boolean removeDeviceFromVtap(OpenstackVtapId vTapId,
261 OpenstackVtap.Type type,
262 DeviceId deviceId) {
263 checkNotNull(vTapId);
264 checkNotNull(deviceId);
265
266 OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
267 if (existing == null) {
268 return null;
269 }
270 if (!existing.type().isValid(type)) {
271 log.error("Not valid OpenstackVtap type {} for requested type {}",
272 existing.type(), type);
273 return existing;
274 }
275
276 Set<DeviceId> txDeviceIds = null;
277 if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
278 existing.txDeviceIds().contains(deviceId)) {
279 txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
280 txDeviceIds.remove(deviceId);
281 }
282
283 Set<DeviceId> rxDeviceIds = null;
284 if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
285 existing.rxDeviceIds().contains(deviceId)) {
286 rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
287 rxDeviceIds.remove(deviceId);
288 }
289
290 if (txDeviceIds != null || rxDeviceIds != null) {
291 //removeVTapIdFromDeviceId(existing.id(), deviceId); // execute from event listener
292
293 return DefaultOpenstackVtap.builder()
294 .id(vTapId)
295 .type(existing.type())
296 .vTapCriterion(existing.vTapCriterion())
297 .txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
298 .rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
299 .annotations(existing.annotations())
300 .build();
301 }
302 return existing;
303 });
304 return (vTap != null);
305 }
306
307 @Override
308 public boolean updateDeviceForVtap(OpenstackVtapId vTapId,
309 Set<DeviceId> txDeviceIds, Set<DeviceId> rxDeviceIds,
310 boolean replaceDevices) {
311 checkNotNull(vTapId);
312 checkNotNull(txDeviceIds);
313 checkNotNull(rxDeviceIds);
314
315 OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
316 if (existing == null) {
317 return null;
318 }
319
320 // Replace or add devices
321 Set<DeviceId> txDS = null;
322 if (replaceDevices) {
323 if (!existing.txDeviceIds().equals(txDeviceIds)) {
324 txDS = ImmutableSet.copyOf(txDeviceIds);
325 }
326 } else {
327 if (!existing.txDeviceIds().containsAll(txDeviceIds)) {
328 txDS = Sets.newHashSet(existing.txDeviceIds());
329 txDS.addAll(txDeviceIds);
330 }
331 }
332
333 Set<DeviceId> rxDS = null;
334 if (replaceDevices) {
335 if (!existing.rxDeviceIds().equals(rxDeviceIds)) {
336 rxDS = ImmutableSet.copyOf(rxDeviceIds);
337 }
338 } else {
339 if (!existing.rxDeviceIds().containsAll(rxDeviceIds)) {
340 rxDS = Sets.newHashSet(existing.rxDeviceIds());
341 rxDS.addAll(rxDeviceIds);
342 }
343 }
344
345 if (txDS != null || rxDS != null) {
346
347 return DefaultOpenstackVtap.builder()
348 .id(vTapId)
349 .type(existing.type())
350 .vTapCriterion(existing.vTapCriterion())
351 .txDeviceIds(txDS != null ? txDS : existing.txDeviceIds())
352 .rxDeviceIds(rxDS != null ? rxDS : existing.rxDeviceIds())
353 .annotations(existing.annotations())
354 .build();
355 }
356 return existing;
357 });
358 return (vTap != null);
359 }
360
361 @Override
362 public int getVtapCount(OpenstackVtap.Type type) {
363 return (int) vTapMap.values().parallelStream()
364 .filter(vTap -> vTap.type().isValid(type))
365 .count();
366 }
367
368 @Override
369 public Set<OpenstackVtap> getVtaps(OpenstackVtap.Type type) {
370 return ImmutableSet.copyOf(
371 vTapMap.values().parallelStream()
372 .filter(vTap -> vTap.type().isValid(type))
373 .collect(Collectors.toSet()));
374 }
375
376 @Override
377 public OpenstackVtap getVtap(OpenstackVtapId vTapId) {
378 return vTapMap.get(vTapId);
379 }
380
381 @Override
382 public Set<OpenstackVtap> getVtapsByDeviceId(OpenstackVtap.Type type,
383 DeviceId deviceId) {
384 Set<OpenstackVtapId> vtapIds = Sets.newHashSet();
385 if (type.isValid(OpenstackVtap.Type.VTAP_TX)) {
386 vtapIds.addAll(vTapIdsByTxDeviceId.get(deviceId));
387 }
388 if (type.isValid(OpenstackVtap.Type.VTAP_RX)) {
389 vtapIds.addAll(vTapIdsByRxDeviceId.get(deviceId));
390 }
391
392 return ImmutableSet.copyOf(
393 vtapIds.parallelStream()
394 .map(vTapId -> vTapMap.get(vTapId))
395 .filter(Objects::nonNull)
396 .collect(Collectors.toSet()));
397 }
398
Jian Li26ef1302018-07-04 14:37:06 +0900399 private void loadVtapIds() {
400 vTapIdsByTxDeviceId.clear();
401 vTapIdsByRxDeviceId.clear();
402 vTapMap.values().forEach(vTap -> refreshDeviceIdsByVtap(null, vTap));
403 }
404
405 private boolean shouldUpdate(DefaultOpenstackVtap existing,
406 OpenstackVtap description,
407 boolean replaceDevices) {
408 if (existing == null) {
409 return true;
410 }
411
412 if ((description.type() != null && !description.type().equals(existing.type()))
413 || (description.vTapCriterion() != null &&
414 !description.vTapCriterion().equals(existing.vTapCriterion()))) {
415 return true;
416 }
417
418 if (description.txDeviceIds() != null) {
419 if (replaceDevices) {
420 if (!existing.txDeviceIds().equals(description.txDeviceIds())) {
421 return true;
422 }
423 } else {
424 if (!existing.txDeviceIds().containsAll(description.txDeviceIds())) {
425 return true;
426 }
427 }
428 }
429
430 if (description.rxDeviceIds() != null) {
431 if (replaceDevices) {
432 if (!existing.rxDeviceIds().equals(description.rxDeviceIds())) {
433 return true;
434 }
435 } else {
436 if (!existing.rxDeviceIds().containsAll(description.rxDeviceIds())) {
437 return true;
438 }
439 }
440 }
441
442 // check to see if any of the annotations provided by vTap
443 // differ from those in the existing vTap
444 return description.annotations().keys().stream()
445 .anyMatch(k -> !Objects.equals(description.annotations().value(k),
446 existing.annotations().value(k)));
447 }
448
449 private class VtapComparator implements Comparator<OpenstackVtap> {
Jian Li19f25262018-07-03 22:37:12 +0900450 @Override
451 public int compare(OpenstackVtap v1, OpenstackVtap v2) {
452 int diff = (v2.type().compareTo(v1.type()));
453 if (diff == 0) {
454 return (v2.vTapCriterion().ipProtocol() - v1.vTapCriterion().ipProtocol());
455 }
456 return diff;
457 }
458 }
459
460 private static Set<OpenstackVtapId> addVTapIds(OpenstackVtapId vTapId) {
461 Set<OpenstackVtapId> vtapIds = Sets.newConcurrentHashSet();
462 vtapIds.add(vTapId);
463 return vtapIds;
464 }
465
466 private static Set<OpenstackVtapId> updateVTapIds(Set<OpenstackVtapId> existingVtapIds,
467 OpenstackVtapId vTapId) {
468 existingVtapIds.add(vTapId);
469 return existingVtapIds;
470 }
471
472 private static Set<OpenstackVtapId> removeVTapIds(Set<OpenstackVtapId> existingVtapIds,
473 OpenstackVtapId vTapId) {
474 existingVtapIds.remove(vTapId);
475 if (existingVtapIds.isEmpty()) {
476 return null;
477 }
478 return existingVtapIds;
479 }
480
481 private void updateVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
482 vTapIdsByTxDeviceId.compute(deviceId, (k, v) -> v == null ?
483 addVTapIds(vTapId) : updateVTapIds(v, vTapId));
484 }
485
486 private void removeVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
487 vTapIdsByTxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
488 }
489
490 private void updateVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
491 vTapIdsByRxDeviceId.compute(deviceId, (k, v) -> v == null ?
492 addVTapIds(vTapId) : updateVTapIds(v, vTapId));
493 }
494
495 private void removeVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
496 vTapIdsByRxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
497 }
498
499 private void refreshDeviceIdsByVtap(OpenstackVtap oldOpenstackVtap,
500 OpenstackVtap newOpenstackVtap) {
501 if (oldOpenstackVtap != null) {
502 Set<DeviceId> removeDeviceIds;
503
504 // Remove TX vTap
505 removeDeviceIds = (newOpenstackVtap != null) ?
506 Sets.difference(oldOpenstackVtap.txDeviceIds(),
507 newOpenstackVtap.txDeviceIds()) : oldOpenstackVtap.txDeviceIds();
508 removeDeviceIds.forEach(id -> removeVTapIdFromTxDeviceId(oldOpenstackVtap.id(), id));
509
510 // Remove RX vTap
511 removeDeviceIds = (newOpenstackVtap != null) ?
512 Sets.difference(oldOpenstackVtap.rxDeviceIds(),
513 newOpenstackVtap.rxDeviceIds()) : oldOpenstackVtap.rxDeviceIds();
514 removeDeviceIds.forEach(id -> removeVTapIdFromRxDeviceId(oldOpenstackVtap.id(), id));
515 }
516
517 if (newOpenstackVtap != null) {
518 Set<DeviceId> addDeviceIds;
519
520 // Add TX vTap
521 addDeviceIds = (oldOpenstackVtap != null) ?
522 Sets.difference(newOpenstackVtap.txDeviceIds(),
523 oldOpenstackVtap.txDeviceIds()) : newOpenstackVtap.txDeviceIds();
524 addDeviceIds.forEach(id -> updateVTapIdFromTxDeviceId(newOpenstackVtap.id(), id));
525
526 // Add RX vTap
527 addDeviceIds = (oldOpenstackVtap != null) ?
528 Sets.difference(newOpenstackVtap.rxDeviceIds(),
529 oldOpenstackVtap.rxDeviceIds()) : newOpenstackVtap.rxDeviceIds();
530 addDeviceIds.forEach(id -> updateVTapIdFromRxDeviceId(newOpenstackVtap.id(), id));
531 }
532 }
533
Jian Li26ef1302018-07-04 14:37:06 +0900534 private class VtapEventListener
Jian Li19f25262018-07-03 22:37:12 +0900535 implements MapEventListener<OpenstackVtapId, DefaultOpenstackVtap> {
536 @Override
537 public void event(MapEvent<OpenstackVtapId, DefaultOpenstackVtap> event) {
538 DefaultOpenstackVtap newValue =
539 event.newValue() != null ? event.newValue().value() : null;
540 DefaultOpenstackVtap oldValue =
541 event.oldValue() != null ? event.oldValue().value() : null;
542
Jian Li26ef1302018-07-04 14:37:06 +0900543 log.debug("VtapEventListener {} -> {}, {}", event.type(), oldValue, newValue);
Jian Li19f25262018-07-03 22:37:12 +0900544 switch (event.type()) {
545 case INSERT:
546 refreshDeviceIdsByVtap(oldValue, newValue);
547 notifyDelegate(new OpenstackVtapEvent(
548 OpenstackVtapEvent.Type.VTAP_ADDED, newValue));
549 break;
550
551 case UPDATE:
552 if (!Objects.equals(newValue, oldValue)) {
553 refreshDeviceIdsByVtap(oldValue, newValue);
554 notifyDelegate(new OpenstackVtapEvent(
555 OpenstackVtapEvent.Type.VTAP_UPDATED, newValue, oldValue));
556 }
557 break;
558
559 case REMOVE:
560 refreshDeviceIdsByVtap(oldValue, newValue);
561 notifyDelegate(new OpenstackVtapEvent(
562 OpenstackVtapEvent.Type.VTAP_REMOVED, oldValue));
563 break;
564
565 default:
566 log.warn("Unknown map event type: {}", event.type());
567 }
568 }
569 }
570}