blob: d6f63ebea2d7d5c11d0d6440d179df6a24eb7732 [file] [log] [blame]
Jian Li19f25262018-07-03 22:37:12 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstackvtap.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Maps;
20import com.google.common.collect.Sets;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.net.DefaultAnnotations;
29import org.onosproject.net.DeviceId;
30import org.onosproject.net.SparseAnnotations;
31import org.onosproject.openstackvtap.api.OpenstackVtap;
32import org.onosproject.openstackvtap.api.OpenstackVtapCriterion;
33import org.onosproject.openstackvtap.api.OpenstackVtapEvent;
34import org.onosproject.openstackvtap.api.OpenstackVtapId;
35import org.onosproject.openstackvtap.api.OpenstackVtapStore;
36import org.onosproject.openstackvtap.api.OpenstackVtapStoreDelegate;
37import org.onosproject.store.AbstractStore;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.service.ConsistentMap;
40import org.onosproject.store.service.DistributedPrimitive.Status;
41import org.onosproject.store.service.MapEvent;
42import org.onosproject.store.service.MapEventListener;
43import org.onosproject.store.service.Serializer;
44import org.onosproject.store.service.StorageService;
45import org.slf4j.Logger;
46
47import java.util.Comparator;
48import java.util.Map;
49import java.util.Objects;
50import java.util.Set;
51import java.util.UUID;
52import java.util.concurrent.ScheduledExecutorService;
53import java.util.function.Consumer;
54import java.util.stream.Collectors;
55
56import static com.google.common.base.Preconditions.checkNotNull;
57import static com.google.common.base.Preconditions.checkState;
58import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
59import static org.onlab.util.Tools.groupedThreads;
60import static org.onosproject.net.DefaultAnnotations.merge;
61import static org.slf4j.LoggerFactory.getLogger;
62
63/**
64 * Manages the inventory of users using a {@code ConsistentMap}.
65 */
66@Component(immediate = true)
67@Service
68public class DistributedOpenstackVtapStore
69 extends AbstractStore<OpenstackVtapEvent, OpenstackVtapStoreDelegate>
70 implements OpenstackVtapStore {
71 private final Logger log = getLogger(getClass());
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected StorageService storageService;
75
76 private ConsistentMap<OpenstackVtapId, DefaultOpenstackVtap> vTapConsistentMap;
77 private MapEventListener<OpenstackVtapId, DefaultOpenstackVtap>
78 vTapListener = new VTapEventListener();
79 private Map<OpenstackVtapId, DefaultOpenstackVtap> vTapMap;
80
81 private static final Serializer SERIALIZER = Serializer
82 .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
83 .register(OpenstackVtapId.class)
84 .register(UUID.class)
85 .register(DefaultOpenstackVtap.class)
86 .register(OpenstackVtap.Type.class)
87 .register(DefaultOpenstackVtapCriterion.class)
88 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
89 .build());
90
91 private Map<DeviceId, Set<OpenstackVtapId>>
92 vTapIdsByTxDeviceId = Maps.newConcurrentMap();
93 private Map<DeviceId, Set<OpenstackVtapId>>
94 vTapIdsByRxDeviceId = Maps.newConcurrentMap();
95
96 private ScheduledExecutorService eventExecutor;
97
98 private Consumer<Status> vTapStatusListener;
99
100 public static final String INVALID_DESCRIPTION = "Invalid create/update parameter";
101
102 @Activate
103 public void activate() {
104 eventExecutor = newSingleThreadScheduledExecutor(
105 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
106
107 vTapConsistentMap = storageService.<OpenstackVtapId, DefaultOpenstackVtap>
108 consistentMapBuilder()
109 .withName("vTapMap")
110 .withSerializer(SERIALIZER)
111 .build();
112
113 vTapMap = vTapConsistentMap.asJavaMap();
114 vTapConsistentMap.addListener(vTapListener);
115
116 vTapStatusListener = status -> {
117 if (status == Status.ACTIVE) {
118 eventExecutor.execute(this::loadVTapIds);
119 }
120 };
121 vTapConsistentMap.addStatusChangeListener(vTapStatusListener);
122
123 log.info("Started {} - {}", this.getClass().getSimpleName());
124 }
125
126 @Deactivate
127 public void deactivate() {
128 vTapConsistentMap.removeStatusChangeListener(vTapStatusListener);
129 vTapConsistentMap.removeListener(vTapListener);
130 eventExecutor.shutdown();
131
132 log.info("Stopped {} - {}", this.getClass().getSimpleName());
133 }
134
135 private void loadVTapIds() {
136 vTapIdsByTxDeviceId.clear();
137 vTapIdsByRxDeviceId.clear();
138 vTapMap.values().forEach(vTap -> refreshDeviceIdsByVtap(null, vTap));
139 }
140
141 private boolean shouldUpdate(DefaultOpenstackVtap existing,
142 OpenstackVtap description,
143 boolean replaceDevices) {
144 if (existing == null) {
145 return true;
146 }
147
148 if ((description.type() != null && !description.type().equals(existing.type()))
149 || (description.vTapCriterion() != null &&
150 !description.vTapCriterion().equals(existing.vTapCriterion()))) {
151 return true;
152 }
153
154 if (description.txDeviceIds() != null) {
155 if (replaceDevices) {
156 if (!existing.txDeviceIds().equals(description.txDeviceIds())) {
157 return true;
158 }
159 } else {
160 if (!existing.txDeviceIds().containsAll(description.txDeviceIds())) {
161 return true;
162 }
163 }
164 }
165
166 if (description.rxDeviceIds() != null) {
167 if (replaceDevices) {
168 if (!existing.rxDeviceIds().equals(description.rxDeviceIds())) {
169 return true;
170 }
171 } else {
172 if (!existing.rxDeviceIds().containsAll(description.rxDeviceIds())) {
173 return true;
174 }
175 }
176 }
177
178 // check to see if any of the annotations provided by vTap
179 // differ from those in the existing vTap
180 return description.annotations().keys().stream()
181 .anyMatch(k -> !Objects.equals(description.annotations().value(k),
182 existing.annotations().value(k)));
183 }
184
185 @Override
186 public OpenstackVtap createOrUpdateVtap(OpenstackVtapId vTapId,
187 OpenstackVtap description,
188 boolean replaceFlag) {
189
190 return vTapMap.compute(vTapId, (id, existing) -> {
191 if (existing == null &&
192 (description.type() == null ||
193 description.vTapCriterion() == null ||
194 description.txDeviceIds() == null ||
195 description.rxDeviceIds() == null)) {
196 checkState(false, INVALID_DESCRIPTION);
197 return null;
198 }
199
200 if (shouldUpdate(existing, description, replaceFlag)) {
201 // Replace items
202 OpenstackVtap.Type type =
203 (description.type() == null ? existing.type() : description.type());
204 OpenstackVtapCriterion vTapCriterion =
205 (description.vTapCriterion() == null ?
206 existing.vTapCriterion() : description.vTapCriterion());
207
208 // Replace or add devices
209 Set<DeviceId> txDeviceIds;
210 if (description.txDeviceIds() == null) {
211 txDeviceIds = existing.txDeviceIds();
212 } else {
213 if (existing == null || replaceFlag) {
214 txDeviceIds = ImmutableSet.copyOf(description.txDeviceIds());
215 } else {
216 txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
217 txDeviceIds.addAll(description.txDeviceIds());
218 }
219 }
220
221 Set<DeviceId> rxDeviceIds;
222 if (description.rxDeviceIds() == null) {
223 rxDeviceIds = existing.rxDeviceIds();
224 } else {
225 if (existing == null || replaceFlag) {
226 rxDeviceIds = ImmutableSet.copyOf(description.rxDeviceIds());
227 } else {
228 rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
229 rxDeviceIds.addAll(description.rxDeviceIds());
230 }
231 }
232
233 // Replace or add annotations
234 SparseAnnotations annotations;
235 if (existing != null) {
236 annotations = merge((DefaultAnnotations) existing.annotations(),
237 (SparseAnnotations) description.annotations());
238 } else {
239 annotations = (SparseAnnotations) description.annotations();
240 }
241
242 // Make new changed vTap and return
243 return DefaultOpenstackVtap.builder()
244 .id(vTapId)
245 .type(type)
246 .vTapCriterion(vTapCriterion)
247 .txDeviceIds(txDeviceIds)
248 .rxDeviceIds(rxDeviceIds)
249 .annotations(annotations)
250 .build();
251 }
252 return existing;
253 });
254 }
255
256 @Override
257 public OpenstackVtap removeVtapById(OpenstackVtapId vTapId) {
258 return vTapMap.remove(vTapId);
259 }
260
261 @Override
262 public boolean addDeviceToVtap(OpenstackVtapId vTapId,
263 OpenstackVtap.Type type,
264 DeviceId deviceId) {
265 checkNotNull(vTapId);
266 checkNotNull(deviceId);
267
268 OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
269 if (existing == null) {
270 return null;
271 }
272 if (!existing.type().isValid(type)) {
273 log.error("Not valid OpenstackVtap type {} for requested type {}",
274 existing.type(), type);
275 return existing;
276 }
277
278 Set<DeviceId> txDeviceIds = null;
279 if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
280 !existing.txDeviceIds().contains(deviceId)) {
281 txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
282 txDeviceIds.add(deviceId);
283 }
284
285 Set<DeviceId> rxDeviceIds = null;
286 if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
287 !existing.rxDeviceIds().contains(deviceId)) {
288 rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
289 rxDeviceIds.add(deviceId);
290 }
291
292 if (txDeviceIds != null || rxDeviceIds != null) {
293 //updateVTapIdFromDeviceId(existing.id(), deviceId); // execute from event listener
294
295 return DefaultOpenstackVtap.builder()
296 .id(vTapId)
297 .type(existing.type())
298 .vTapCriterion(existing.vTapCriterion())
299 .txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
300 .rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
301 .annotations(existing.annotations())
302 .build();
303 }
304 return existing;
305 });
306 return (vTap != null);
307 }
308
309 @Override
310 public boolean removeDeviceFromVtap(OpenstackVtapId vTapId,
311 OpenstackVtap.Type type,
312 DeviceId deviceId) {
313 checkNotNull(vTapId);
314 checkNotNull(deviceId);
315
316 OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
317 if (existing == null) {
318 return null;
319 }
320 if (!existing.type().isValid(type)) {
321 log.error("Not valid OpenstackVtap type {} for requested type {}",
322 existing.type(), type);
323 return existing;
324 }
325
326 Set<DeviceId> txDeviceIds = null;
327 if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
328 existing.txDeviceIds().contains(deviceId)) {
329 txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
330 txDeviceIds.remove(deviceId);
331 }
332
333 Set<DeviceId> rxDeviceIds = null;
334 if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
335 existing.rxDeviceIds().contains(deviceId)) {
336 rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
337 rxDeviceIds.remove(deviceId);
338 }
339
340 if (txDeviceIds != null || rxDeviceIds != null) {
341 //removeVTapIdFromDeviceId(existing.id(), deviceId); // execute from event listener
342
343 return DefaultOpenstackVtap.builder()
344 .id(vTapId)
345 .type(existing.type())
346 .vTapCriterion(existing.vTapCriterion())
347 .txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
348 .rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
349 .annotations(existing.annotations())
350 .build();
351 }
352 return existing;
353 });
354 return (vTap != null);
355 }
356
357 @Override
358 public boolean updateDeviceForVtap(OpenstackVtapId vTapId,
359 Set<DeviceId> txDeviceIds, Set<DeviceId> rxDeviceIds,
360 boolean replaceDevices) {
361 checkNotNull(vTapId);
362 checkNotNull(txDeviceIds);
363 checkNotNull(rxDeviceIds);
364
365 OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
366 if (existing == null) {
367 return null;
368 }
369
370 // Replace or add devices
371 Set<DeviceId> txDS = null;
372 if (replaceDevices) {
373 if (!existing.txDeviceIds().equals(txDeviceIds)) {
374 txDS = ImmutableSet.copyOf(txDeviceIds);
375 }
376 } else {
377 if (!existing.txDeviceIds().containsAll(txDeviceIds)) {
378 txDS = Sets.newHashSet(existing.txDeviceIds());
379 txDS.addAll(txDeviceIds);
380 }
381 }
382
383 Set<DeviceId> rxDS = null;
384 if (replaceDevices) {
385 if (!existing.rxDeviceIds().equals(rxDeviceIds)) {
386 rxDS = ImmutableSet.copyOf(rxDeviceIds);
387 }
388 } else {
389 if (!existing.rxDeviceIds().containsAll(rxDeviceIds)) {
390 rxDS = Sets.newHashSet(existing.rxDeviceIds());
391 rxDS.addAll(rxDeviceIds);
392 }
393 }
394
395 if (txDS != null || rxDS != null) {
396
397 return DefaultOpenstackVtap.builder()
398 .id(vTapId)
399 .type(existing.type())
400 .vTapCriterion(existing.vTapCriterion())
401 .txDeviceIds(txDS != null ? txDS : existing.txDeviceIds())
402 .rxDeviceIds(rxDS != null ? rxDS : existing.rxDeviceIds())
403 .annotations(existing.annotations())
404 .build();
405 }
406 return existing;
407 });
408 return (vTap != null);
409 }
410
411 @Override
412 public int getVtapCount(OpenstackVtap.Type type) {
413 return (int) vTapMap.values().parallelStream()
414 .filter(vTap -> vTap.type().isValid(type))
415 .count();
416 }
417
418 @Override
419 public Set<OpenstackVtap> getVtaps(OpenstackVtap.Type type) {
420 return ImmutableSet.copyOf(
421 vTapMap.values().parallelStream()
422 .filter(vTap -> vTap.type().isValid(type))
423 .collect(Collectors.toSet()));
424 }
425
426 @Override
427 public OpenstackVtap getVtap(OpenstackVtapId vTapId) {
428 return vTapMap.get(vTapId);
429 }
430
431 @Override
432 public Set<OpenstackVtap> getVtapsByDeviceId(OpenstackVtap.Type type,
433 DeviceId deviceId) {
434 Set<OpenstackVtapId> vtapIds = Sets.newHashSet();
435 if (type.isValid(OpenstackVtap.Type.VTAP_TX)) {
436 vtapIds.addAll(vTapIdsByTxDeviceId.get(deviceId));
437 }
438 if (type.isValid(OpenstackVtap.Type.VTAP_RX)) {
439 vtapIds.addAll(vTapIdsByRxDeviceId.get(deviceId));
440 }
441
442 return ImmutableSet.copyOf(
443 vtapIds.parallelStream()
444 .map(vTapId -> vTapMap.get(vTapId))
445 .filter(Objects::nonNull)
446 .collect(Collectors.toSet()));
447 }
448
449 private class VTapComparator implements Comparator<OpenstackVtap> {
450 @Override
451 public int compare(OpenstackVtap v1, OpenstackVtap v2) {
452 int diff = (v2.type().compareTo(v1.type()));
453 if (diff == 0) {
454 return (v2.vTapCriterion().ipProtocol() - v1.vTapCriterion().ipProtocol());
455 }
456 return diff;
457 }
458 }
459
460 private static Set<OpenstackVtapId> addVTapIds(OpenstackVtapId vTapId) {
461 Set<OpenstackVtapId> vtapIds = Sets.newConcurrentHashSet();
462 vtapIds.add(vTapId);
463 return vtapIds;
464 }
465
466 private static Set<OpenstackVtapId> updateVTapIds(Set<OpenstackVtapId> existingVtapIds,
467 OpenstackVtapId vTapId) {
468 existingVtapIds.add(vTapId);
469 return existingVtapIds;
470 }
471
472 private static Set<OpenstackVtapId> removeVTapIds(Set<OpenstackVtapId> existingVtapIds,
473 OpenstackVtapId vTapId) {
474 existingVtapIds.remove(vTapId);
475 if (existingVtapIds.isEmpty()) {
476 return null;
477 }
478 return existingVtapIds;
479 }
480
481 private void updateVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
482 vTapIdsByTxDeviceId.compute(deviceId, (k, v) -> v == null ?
483 addVTapIds(vTapId) : updateVTapIds(v, vTapId));
484 }
485
486 private void removeVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
487 vTapIdsByTxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
488 }
489
490 private void updateVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
491 vTapIdsByRxDeviceId.compute(deviceId, (k, v) -> v == null ?
492 addVTapIds(vTapId) : updateVTapIds(v, vTapId));
493 }
494
495 private void removeVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
496 vTapIdsByRxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
497 }
498
499 private void refreshDeviceIdsByVtap(OpenstackVtap oldOpenstackVtap,
500 OpenstackVtap newOpenstackVtap) {
501 if (oldOpenstackVtap != null) {
502 Set<DeviceId> removeDeviceIds;
503
504 // Remove TX vTap
505 removeDeviceIds = (newOpenstackVtap != null) ?
506 Sets.difference(oldOpenstackVtap.txDeviceIds(),
507 newOpenstackVtap.txDeviceIds()) : oldOpenstackVtap.txDeviceIds();
508 removeDeviceIds.forEach(id -> removeVTapIdFromTxDeviceId(oldOpenstackVtap.id(), id));
509
510 // Remove RX vTap
511 removeDeviceIds = (newOpenstackVtap != null) ?
512 Sets.difference(oldOpenstackVtap.rxDeviceIds(),
513 newOpenstackVtap.rxDeviceIds()) : oldOpenstackVtap.rxDeviceIds();
514 removeDeviceIds.forEach(id -> removeVTapIdFromRxDeviceId(oldOpenstackVtap.id(), id));
515 }
516
517 if (newOpenstackVtap != null) {
518 Set<DeviceId> addDeviceIds;
519
520 // Add TX vTap
521 addDeviceIds = (oldOpenstackVtap != null) ?
522 Sets.difference(newOpenstackVtap.txDeviceIds(),
523 oldOpenstackVtap.txDeviceIds()) : newOpenstackVtap.txDeviceIds();
524 addDeviceIds.forEach(id -> updateVTapIdFromTxDeviceId(newOpenstackVtap.id(), id));
525
526 // Add RX vTap
527 addDeviceIds = (oldOpenstackVtap != null) ?
528 Sets.difference(newOpenstackVtap.rxDeviceIds(),
529 oldOpenstackVtap.rxDeviceIds()) : newOpenstackVtap.rxDeviceIds();
530 addDeviceIds.forEach(id -> updateVTapIdFromRxDeviceId(newOpenstackVtap.id(), id));
531 }
532 }
533
534 private class VTapEventListener
535 implements MapEventListener<OpenstackVtapId, DefaultOpenstackVtap> {
536 @Override
537 public void event(MapEvent<OpenstackVtapId, DefaultOpenstackVtap> event) {
538 DefaultOpenstackVtap newValue =
539 event.newValue() != null ? event.newValue().value() : null;
540 DefaultOpenstackVtap oldValue =
541 event.oldValue() != null ? event.oldValue().value() : null;
542
543 log.debug("VTapEventListener {} -> {}, {}", event.type(), oldValue, newValue);
544 switch (event.type()) {
545 case INSERT:
546 refreshDeviceIdsByVtap(oldValue, newValue);
547 notifyDelegate(new OpenstackVtapEvent(
548 OpenstackVtapEvent.Type.VTAP_ADDED, newValue));
549 break;
550
551 case UPDATE:
552 if (!Objects.equals(newValue, oldValue)) {
553 refreshDeviceIdsByVtap(oldValue, newValue);
554 notifyDelegate(new OpenstackVtapEvent(
555 OpenstackVtapEvent.Type.VTAP_UPDATED, newValue, oldValue));
556 }
557 break;
558
559 case REMOVE:
560 refreshDeviceIdsByVtap(oldValue, newValue);
561 notifyDelegate(new OpenstackVtapEvent(
562 OpenstackVtapEvent.Type.VTAP_REMOVED, oldValue));
563 break;
564
565 default:
566 log.warn("Unknown map event type: {}", event.type());
567 }
568 }
569 }
570}