blob: f8897e8386db12364c87eb8b355dce057f602697 [file] [log] [blame]
Jian Li19f25262018-07-03 22:37:12 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstackvtap.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Maps;
20import com.google.common.collect.Sets;
Jian Li19f25262018-07-03 22:37:12 +090021import org.onlab.util.KryoNamespace;
22import org.onosproject.net.DefaultAnnotations;
23import org.onosproject.net.DeviceId;
24import org.onosproject.net.SparseAnnotations;
25import org.onosproject.openstackvtap.api.OpenstackVtap;
26import org.onosproject.openstackvtap.api.OpenstackVtapCriterion;
27import org.onosproject.openstackvtap.api.OpenstackVtapEvent;
28import org.onosproject.openstackvtap.api.OpenstackVtapId;
29import org.onosproject.openstackvtap.api.OpenstackVtapStore;
30import org.onosproject.openstackvtap.api.OpenstackVtapStoreDelegate;
31import org.onosproject.store.AbstractStore;
32import org.onosproject.store.serializers.KryoNamespaces;
33import org.onosproject.store.service.ConsistentMap;
34import org.onosproject.store.service.DistributedPrimitive.Status;
35import org.onosproject.store.service.MapEvent;
36import org.onosproject.store.service.MapEventListener;
37import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070039import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li19f25262018-07-03 22:37:12 +090044import org.slf4j.Logger;
45
46import java.util.Comparator;
47import java.util.Map;
48import java.util.Objects;
49import java.util.Set;
50import java.util.UUID;
51import java.util.concurrent.ScheduledExecutorService;
52import java.util.function.Consumer;
53import java.util.stream.Collectors;
54
55import static com.google.common.base.Preconditions.checkNotNull;
56import static com.google.common.base.Preconditions.checkState;
57import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
58import static org.onlab.util.Tools.groupedThreads;
59import static org.onosproject.net.DefaultAnnotations.merge;
60import static org.slf4j.LoggerFactory.getLogger;
61
62/**
63 * Manages the inventory of users using a {@code ConsistentMap}.
64 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070065@Component(immediate = true, service = OpenstackVtapStore.class)
Jian Li19f25262018-07-03 22:37:12 +090066public class DistributedOpenstackVtapStore
67 extends AbstractStore<OpenstackVtapEvent, OpenstackVtapStoreDelegate>
68 implements OpenstackVtapStore {
69 private final Logger log = getLogger(getClass());
70
Ray Milkeyd84f89b2018-08-17 14:54:17 -070071 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li19f25262018-07-03 22:37:12 +090072 protected StorageService storageService;
73
74 private ConsistentMap<OpenstackVtapId, DefaultOpenstackVtap> vTapConsistentMap;
75 private MapEventListener<OpenstackVtapId, DefaultOpenstackVtap>
Jian Li26ef1302018-07-04 14:37:06 +090076 vTapListener = new VtapEventListener();
Jian Li19f25262018-07-03 22:37:12 +090077 private Map<OpenstackVtapId, DefaultOpenstackVtap> vTapMap;
78
79 private static final Serializer SERIALIZER = Serializer
80 .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
81 .register(OpenstackVtapId.class)
82 .register(UUID.class)
83 .register(DefaultOpenstackVtap.class)
84 .register(OpenstackVtap.Type.class)
85 .register(DefaultOpenstackVtapCriterion.class)
86 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
87 .build());
88
89 private Map<DeviceId, Set<OpenstackVtapId>>
90 vTapIdsByTxDeviceId = Maps.newConcurrentMap();
91 private Map<DeviceId, Set<OpenstackVtapId>>
92 vTapIdsByRxDeviceId = Maps.newConcurrentMap();
93
94 private ScheduledExecutorService eventExecutor;
95
96 private Consumer<Status> vTapStatusListener;
97
98 public static final String INVALID_DESCRIPTION = "Invalid create/update parameter";
99
100 @Activate
101 public void activate() {
102 eventExecutor = newSingleThreadScheduledExecutor(
103 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
104
105 vTapConsistentMap = storageService.<OpenstackVtapId, DefaultOpenstackVtap>
106 consistentMapBuilder()
107 .withName("vTapMap")
108 .withSerializer(SERIALIZER)
109 .build();
110
111 vTapMap = vTapConsistentMap.asJavaMap();
112 vTapConsistentMap.addListener(vTapListener);
113
114 vTapStatusListener = status -> {
115 if (status == Status.ACTIVE) {
Jian Li26ef1302018-07-04 14:37:06 +0900116 eventExecutor.execute(this::loadVtapIds);
Jian Li19f25262018-07-03 22:37:12 +0900117 }
118 };
119 vTapConsistentMap.addStatusChangeListener(vTapStatusListener);
120
121 log.info("Started {} - {}", this.getClass().getSimpleName());
122 }
123
124 @Deactivate
125 public void deactivate() {
126 vTapConsistentMap.removeStatusChangeListener(vTapStatusListener);
127 vTapConsistentMap.removeListener(vTapListener);
128 eventExecutor.shutdown();
129
130 log.info("Stopped {} - {}", this.getClass().getSimpleName());
131 }
132
Jian Li19f25262018-07-03 22:37:12 +0900133 @Override
134 public OpenstackVtap createOrUpdateVtap(OpenstackVtapId vTapId,
135 OpenstackVtap description,
136 boolean replaceFlag) {
137
138 return vTapMap.compute(vTapId, (id, existing) -> {
139 if (existing == null &&
140 (description.type() == null ||
141 description.vTapCriterion() == null ||
142 description.txDeviceIds() == null ||
143 description.rxDeviceIds() == null)) {
144 checkState(false, INVALID_DESCRIPTION);
145 return null;
146 }
147
148 if (shouldUpdate(existing, description, replaceFlag)) {
149 // Replace items
150 OpenstackVtap.Type type =
151 (description.type() == null ? existing.type() : description.type());
152 OpenstackVtapCriterion vTapCriterion =
153 (description.vTapCriterion() == null ?
154 existing.vTapCriterion() : description.vTapCriterion());
155
156 // Replace or add devices
157 Set<DeviceId> txDeviceIds;
158 if (description.txDeviceIds() == null) {
159 txDeviceIds = existing.txDeviceIds();
160 } else {
161 if (existing == null || replaceFlag) {
162 txDeviceIds = ImmutableSet.copyOf(description.txDeviceIds());
163 } else {
164 txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
165 txDeviceIds.addAll(description.txDeviceIds());
166 }
167 }
168
169 Set<DeviceId> rxDeviceIds;
170 if (description.rxDeviceIds() == null) {
171 rxDeviceIds = existing.rxDeviceIds();
172 } else {
173 if (existing == null || replaceFlag) {
174 rxDeviceIds = ImmutableSet.copyOf(description.rxDeviceIds());
175 } else {
176 rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
177 rxDeviceIds.addAll(description.rxDeviceIds());
178 }
179 }
180
181 // Replace or add annotations
182 SparseAnnotations annotations;
183 if (existing != null) {
184 annotations = merge((DefaultAnnotations) existing.annotations(),
185 (SparseAnnotations) description.annotations());
186 } else {
187 annotations = (SparseAnnotations) description.annotations();
188 }
189
190 // Make new changed vTap and return
191 return DefaultOpenstackVtap.builder()
192 .id(vTapId)
193 .type(type)
194 .vTapCriterion(vTapCriterion)
195 .txDeviceIds(txDeviceIds)
196 .rxDeviceIds(rxDeviceIds)
197 .annotations(annotations)
198 .build();
199 }
200 return existing;
201 });
202 }
203
204 @Override
205 public OpenstackVtap removeVtapById(OpenstackVtapId vTapId) {
206 return vTapMap.remove(vTapId);
207 }
208
209 @Override
210 public boolean addDeviceToVtap(OpenstackVtapId vTapId,
211 OpenstackVtap.Type type,
212 DeviceId deviceId) {
213 checkNotNull(vTapId);
214 checkNotNull(deviceId);
215
216 OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
217 if (existing == null) {
218 return null;
219 }
220 if (!existing.type().isValid(type)) {
221 log.error("Not valid OpenstackVtap type {} for requested type {}",
222 existing.type(), type);
223 return existing;
224 }
225
226 Set<DeviceId> txDeviceIds = null;
227 if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
228 !existing.txDeviceIds().contains(deviceId)) {
229 txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
230 txDeviceIds.add(deviceId);
231 }
232
233 Set<DeviceId> rxDeviceIds = null;
234 if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
235 !existing.rxDeviceIds().contains(deviceId)) {
236 rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
237 rxDeviceIds.add(deviceId);
238 }
239
240 if (txDeviceIds != null || rxDeviceIds != null) {
241 //updateVTapIdFromDeviceId(existing.id(), deviceId); // execute from event listener
242
243 return DefaultOpenstackVtap.builder()
244 .id(vTapId)
245 .type(existing.type())
246 .vTapCriterion(existing.vTapCriterion())
247 .txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
248 .rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
249 .annotations(existing.annotations())
250 .build();
251 }
252 return existing;
253 });
254 return (vTap != null);
255 }
256
257 @Override
258 public boolean removeDeviceFromVtap(OpenstackVtapId vTapId,
259 OpenstackVtap.Type type,
260 DeviceId deviceId) {
261 checkNotNull(vTapId);
262 checkNotNull(deviceId);
263
264 OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
265 if (existing == null) {
266 return null;
267 }
268 if (!existing.type().isValid(type)) {
269 log.error("Not valid OpenstackVtap type {} for requested type {}",
270 existing.type(), type);
271 return existing;
272 }
273
274 Set<DeviceId> txDeviceIds = null;
275 if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
276 existing.txDeviceIds().contains(deviceId)) {
277 txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
278 txDeviceIds.remove(deviceId);
279 }
280
281 Set<DeviceId> rxDeviceIds = null;
282 if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
283 existing.rxDeviceIds().contains(deviceId)) {
284 rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
285 rxDeviceIds.remove(deviceId);
286 }
287
288 if (txDeviceIds != null || rxDeviceIds != null) {
289 //removeVTapIdFromDeviceId(existing.id(), deviceId); // execute from event listener
290
291 return DefaultOpenstackVtap.builder()
292 .id(vTapId)
293 .type(existing.type())
294 .vTapCriterion(existing.vTapCriterion())
295 .txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
296 .rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
297 .annotations(existing.annotations())
298 .build();
299 }
300 return existing;
301 });
302 return (vTap != null);
303 }
304
305 @Override
306 public boolean updateDeviceForVtap(OpenstackVtapId vTapId,
307 Set<DeviceId> txDeviceIds, Set<DeviceId> rxDeviceIds,
308 boolean replaceDevices) {
309 checkNotNull(vTapId);
310 checkNotNull(txDeviceIds);
311 checkNotNull(rxDeviceIds);
312
313 OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
314 if (existing == null) {
315 return null;
316 }
317
318 // Replace or add devices
319 Set<DeviceId> txDS = null;
320 if (replaceDevices) {
321 if (!existing.txDeviceIds().equals(txDeviceIds)) {
322 txDS = ImmutableSet.copyOf(txDeviceIds);
323 }
324 } else {
325 if (!existing.txDeviceIds().containsAll(txDeviceIds)) {
326 txDS = Sets.newHashSet(existing.txDeviceIds());
327 txDS.addAll(txDeviceIds);
328 }
329 }
330
331 Set<DeviceId> rxDS = null;
332 if (replaceDevices) {
333 if (!existing.rxDeviceIds().equals(rxDeviceIds)) {
334 rxDS = ImmutableSet.copyOf(rxDeviceIds);
335 }
336 } else {
337 if (!existing.rxDeviceIds().containsAll(rxDeviceIds)) {
338 rxDS = Sets.newHashSet(existing.rxDeviceIds());
339 rxDS.addAll(rxDeviceIds);
340 }
341 }
342
343 if (txDS != null || rxDS != null) {
344
345 return DefaultOpenstackVtap.builder()
346 .id(vTapId)
347 .type(existing.type())
348 .vTapCriterion(existing.vTapCriterion())
349 .txDeviceIds(txDS != null ? txDS : existing.txDeviceIds())
350 .rxDeviceIds(rxDS != null ? rxDS : existing.rxDeviceIds())
351 .annotations(existing.annotations())
352 .build();
353 }
354 return existing;
355 });
356 return (vTap != null);
357 }
358
359 @Override
360 public int getVtapCount(OpenstackVtap.Type type) {
361 return (int) vTapMap.values().parallelStream()
362 .filter(vTap -> vTap.type().isValid(type))
363 .count();
364 }
365
366 @Override
367 public Set<OpenstackVtap> getVtaps(OpenstackVtap.Type type) {
368 return ImmutableSet.copyOf(
369 vTapMap.values().parallelStream()
370 .filter(vTap -> vTap.type().isValid(type))
371 .collect(Collectors.toSet()));
372 }
373
374 @Override
375 public OpenstackVtap getVtap(OpenstackVtapId vTapId) {
376 return vTapMap.get(vTapId);
377 }
378
379 @Override
380 public Set<OpenstackVtap> getVtapsByDeviceId(OpenstackVtap.Type type,
381 DeviceId deviceId) {
382 Set<OpenstackVtapId> vtapIds = Sets.newHashSet();
383 if (type.isValid(OpenstackVtap.Type.VTAP_TX)) {
Jian Lib1ca1a22018-07-06 13:31:39 +0900384 if (vTapIdsByTxDeviceId.get(deviceId) != null) {
385 vtapIds.addAll(vTapIdsByTxDeviceId.get(deviceId));
386 }
Jian Li19f25262018-07-03 22:37:12 +0900387 }
388 if (type.isValid(OpenstackVtap.Type.VTAP_RX)) {
Jian Lib1ca1a22018-07-06 13:31:39 +0900389 if (vTapIdsByRxDeviceId.get(deviceId) != null) {
390 vtapIds.addAll(vTapIdsByRxDeviceId.get(deviceId));
391 }
Jian Li19f25262018-07-03 22:37:12 +0900392 }
393
394 return ImmutableSet.copyOf(
395 vtapIds.parallelStream()
396 .map(vTapId -> vTapMap.get(vTapId))
397 .filter(Objects::nonNull)
398 .collect(Collectors.toSet()));
399 }
400
Jian Li26ef1302018-07-04 14:37:06 +0900401 private void loadVtapIds() {
402 vTapIdsByTxDeviceId.clear();
403 vTapIdsByRxDeviceId.clear();
404 vTapMap.values().forEach(vTap -> refreshDeviceIdsByVtap(null, vTap));
405 }
406
407 private boolean shouldUpdate(DefaultOpenstackVtap existing,
408 OpenstackVtap description,
409 boolean replaceDevices) {
410 if (existing == null) {
411 return true;
412 }
413
414 if ((description.type() != null && !description.type().equals(existing.type()))
415 || (description.vTapCriterion() != null &&
416 !description.vTapCriterion().equals(existing.vTapCriterion()))) {
417 return true;
418 }
419
420 if (description.txDeviceIds() != null) {
421 if (replaceDevices) {
422 if (!existing.txDeviceIds().equals(description.txDeviceIds())) {
423 return true;
424 }
425 } else {
426 if (!existing.txDeviceIds().containsAll(description.txDeviceIds())) {
427 return true;
428 }
429 }
430 }
431
432 if (description.rxDeviceIds() != null) {
433 if (replaceDevices) {
434 if (!existing.rxDeviceIds().equals(description.rxDeviceIds())) {
435 return true;
436 }
437 } else {
438 if (!existing.rxDeviceIds().containsAll(description.rxDeviceIds())) {
439 return true;
440 }
441 }
442 }
443
444 // check to see if any of the annotations provided by vTap
445 // differ from those in the existing vTap
446 return description.annotations().keys().stream()
447 .anyMatch(k -> !Objects.equals(description.annotations().value(k),
448 existing.annotations().value(k)));
449 }
450
451 private class VtapComparator implements Comparator<OpenstackVtap> {
Jian Li19f25262018-07-03 22:37:12 +0900452 @Override
453 public int compare(OpenstackVtap v1, OpenstackVtap v2) {
454 int diff = (v2.type().compareTo(v1.type()));
455 if (diff == 0) {
456 return (v2.vTapCriterion().ipProtocol() - v1.vTapCriterion().ipProtocol());
457 }
458 return diff;
459 }
460 }
461
462 private static Set<OpenstackVtapId> addVTapIds(OpenstackVtapId vTapId) {
463 Set<OpenstackVtapId> vtapIds = Sets.newConcurrentHashSet();
464 vtapIds.add(vTapId);
465 return vtapIds;
466 }
467
468 private static Set<OpenstackVtapId> updateVTapIds(Set<OpenstackVtapId> existingVtapIds,
469 OpenstackVtapId vTapId) {
470 existingVtapIds.add(vTapId);
471 return existingVtapIds;
472 }
473
474 private static Set<OpenstackVtapId> removeVTapIds(Set<OpenstackVtapId> existingVtapIds,
475 OpenstackVtapId vTapId) {
476 existingVtapIds.remove(vTapId);
477 if (existingVtapIds.isEmpty()) {
478 return null;
479 }
480 return existingVtapIds;
481 }
482
483 private void updateVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
484 vTapIdsByTxDeviceId.compute(deviceId, (k, v) -> v == null ?
485 addVTapIds(vTapId) : updateVTapIds(v, vTapId));
486 }
487
488 private void removeVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
489 vTapIdsByTxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
490 }
491
492 private void updateVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
493 vTapIdsByRxDeviceId.compute(deviceId, (k, v) -> v == null ?
494 addVTapIds(vTapId) : updateVTapIds(v, vTapId));
495 }
496
497 private void removeVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
498 vTapIdsByRxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
499 }
500
501 private void refreshDeviceIdsByVtap(OpenstackVtap oldOpenstackVtap,
502 OpenstackVtap newOpenstackVtap) {
503 if (oldOpenstackVtap != null) {
504 Set<DeviceId> removeDeviceIds;
505
506 // Remove TX vTap
507 removeDeviceIds = (newOpenstackVtap != null) ?
508 Sets.difference(oldOpenstackVtap.txDeviceIds(),
509 newOpenstackVtap.txDeviceIds()) : oldOpenstackVtap.txDeviceIds();
510 removeDeviceIds.forEach(id -> removeVTapIdFromTxDeviceId(oldOpenstackVtap.id(), id));
511
512 // Remove RX vTap
513 removeDeviceIds = (newOpenstackVtap != null) ?
514 Sets.difference(oldOpenstackVtap.rxDeviceIds(),
515 newOpenstackVtap.rxDeviceIds()) : oldOpenstackVtap.rxDeviceIds();
516 removeDeviceIds.forEach(id -> removeVTapIdFromRxDeviceId(oldOpenstackVtap.id(), id));
517 }
518
519 if (newOpenstackVtap != null) {
520 Set<DeviceId> addDeviceIds;
521
522 // Add TX vTap
523 addDeviceIds = (oldOpenstackVtap != null) ?
524 Sets.difference(newOpenstackVtap.txDeviceIds(),
525 oldOpenstackVtap.txDeviceIds()) : newOpenstackVtap.txDeviceIds();
526 addDeviceIds.forEach(id -> updateVTapIdFromTxDeviceId(newOpenstackVtap.id(), id));
527
528 // Add RX vTap
529 addDeviceIds = (oldOpenstackVtap != null) ?
530 Sets.difference(newOpenstackVtap.rxDeviceIds(),
531 oldOpenstackVtap.rxDeviceIds()) : newOpenstackVtap.rxDeviceIds();
532 addDeviceIds.forEach(id -> updateVTapIdFromRxDeviceId(newOpenstackVtap.id(), id));
533 }
534 }
535
Jian Li26ef1302018-07-04 14:37:06 +0900536 private class VtapEventListener
Jian Li19f25262018-07-03 22:37:12 +0900537 implements MapEventListener<OpenstackVtapId, DefaultOpenstackVtap> {
538 @Override
539 public void event(MapEvent<OpenstackVtapId, DefaultOpenstackVtap> event) {
540 DefaultOpenstackVtap newValue =
541 event.newValue() != null ? event.newValue().value() : null;
542 DefaultOpenstackVtap oldValue =
543 event.oldValue() != null ? event.oldValue().value() : null;
544
Jian Li26ef1302018-07-04 14:37:06 +0900545 log.debug("VtapEventListener {} -> {}, {}", event.type(), oldValue, newValue);
Jian Li19f25262018-07-03 22:37:12 +0900546 switch (event.type()) {
547 case INSERT:
548 refreshDeviceIdsByVtap(oldValue, newValue);
549 notifyDelegate(new OpenstackVtapEvent(
550 OpenstackVtapEvent.Type.VTAP_ADDED, newValue));
551 break;
552
553 case UPDATE:
554 if (!Objects.equals(newValue, oldValue)) {
555 refreshDeviceIdsByVtap(oldValue, newValue);
556 notifyDelegate(new OpenstackVtapEvent(
557 OpenstackVtapEvent.Type.VTAP_UPDATED, newValue, oldValue));
558 }
559 break;
560
561 case REMOVE:
562 refreshDeviceIdsByVtap(oldValue, newValue);
563 notifyDelegate(new OpenstackVtapEvent(
564 OpenstackVtapEvent.Type.VTAP_REMOVED, oldValue));
565 break;
566
567 default:
568 log.warn("Unknown map event type: {}", event.type());
569 }
570 }
571 }
572}