[ONOS-7731] Add implementation of openstack vtap store and criterion
Change-Id: I7f41652f127038af9d3f79b34d427d28ce162d50
diff --git a/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/DistributedOpenstackVtapStore.java b/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/DistributedOpenstackVtapStore.java
new file mode 100644
index 0000000..d6f63eb
--- /dev/null
+++ b/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/DistributedOpenstackVtapStore.java
@@ -0,0 +1,570 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstackvtap.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.SparseAnnotations;
+import org.onosproject.openstackvtap.api.OpenstackVtap;
+import org.onosproject.openstackvtap.api.OpenstackVtapCriterion;
+import org.onosproject.openstackvtap.api.OpenstackVtapEvent;
+import org.onosproject.openstackvtap.api.OpenstackVtapId;
+import org.onosproject.openstackvtap.api.OpenstackVtapStore;
+import org.onosproject.openstackvtap.api.OpenstackVtapStoreDelegate;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedPrimitive.Status;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages the inventory of users using a {@code ConsistentMap}.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedOpenstackVtapStore
+ extends AbstractStore<OpenstackVtapEvent, OpenstackVtapStoreDelegate>
+ implements OpenstackVtapStore {
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private ConsistentMap<OpenstackVtapId, DefaultOpenstackVtap> vTapConsistentMap;
+ private MapEventListener<OpenstackVtapId, DefaultOpenstackVtap>
+ vTapListener = new VTapEventListener();
+ private Map<OpenstackVtapId, DefaultOpenstackVtap> vTapMap;
+
+ private static final Serializer SERIALIZER = Serializer
+ .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
+ .register(OpenstackVtapId.class)
+ .register(UUID.class)
+ .register(DefaultOpenstackVtap.class)
+ .register(OpenstackVtap.Type.class)
+ .register(DefaultOpenstackVtapCriterion.class)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .build());
+
+ private Map<DeviceId, Set<OpenstackVtapId>>
+ vTapIdsByTxDeviceId = Maps.newConcurrentMap();
+ private Map<DeviceId, Set<OpenstackVtapId>>
+ vTapIdsByRxDeviceId = Maps.newConcurrentMap();
+
+ private ScheduledExecutorService eventExecutor;
+
+ private Consumer<Status> vTapStatusListener;
+
+ public static final String INVALID_DESCRIPTION = "Invalid create/update parameter";
+
+ @Activate
+ public void activate() {
+ eventExecutor = newSingleThreadScheduledExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+ vTapConsistentMap = storageService.<OpenstackVtapId, DefaultOpenstackVtap>
+ consistentMapBuilder()
+ .withName("vTapMap")
+ .withSerializer(SERIALIZER)
+ .build();
+
+ vTapMap = vTapConsistentMap.asJavaMap();
+ vTapConsistentMap.addListener(vTapListener);
+
+ vTapStatusListener = status -> {
+ if (status == Status.ACTIVE) {
+ eventExecutor.execute(this::loadVTapIds);
+ }
+ };
+ vTapConsistentMap.addStatusChangeListener(vTapStatusListener);
+
+ log.info("Started {} - {}", this.getClass().getSimpleName());
+ }
+
+ @Deactivate
+ public void deactivate() {
+ vTapConsistentMap.removeStatusChangeListener(vTapStatusListener);
+ vTapConsistentMap.removeListener(vTapListener);
+ eventExecutor.shutdown();
+
+ log.info("Stopped {} - {}", this.getClass().getSimpleName());
+ }
+
+ private void loadVTapIds() {
+ vTapIdsByTxDeviceId.clear();
+ vTapIdsByRxDeviceId.clear();
+ vTapMap.values().forEach(vTap -> refreshDeviceIdsByVtap(null, vTap));
+ }
+
+ private boolean shouldUpdate(DefaultOpenstackVtap existing,
+ OpenstackVtap description,
+ boolean replaceDevices) {
+ if (existing == null) {
+ return true;
+ }
+
+ if ((description.type() != null && !description.type().equals(existing.type()))
+ || (description.vTapCriterion() != null &&
+ !description.vTapCriterion().equals(existing.vTapCriterion()))) {
+ return true;
+ }
+
+ if (description.txDeviceIds() != null) {
+ if (replaceDevices) {
+ if (!existing.txDeviceIds().equals(description.txDeviceIds())) {
+ return true;
+ }
+ } else {
+ if (!existing.txDeviceIds().containsAll(description.txDeviceIds())) {
+ return true;
+ }
+ }
+ }
+
+ if (description.rxDeviceIds() != null) {
+ if (replaceDevices) {
+ if (!existing.rxDeviceIds().equals(description.rxDeviceIds())) {
+ return true;
+ }
+ } else {
+ if (!existing.rxDeviceIds().containsAll(description.rxDeviceIds())) {
+ return true;
+ }
+ }
+ }
+
+ // check to see if any of the annotations provided by vTap
+ // differ from those in the existing vTap
+ return description.annotations().keys().stream()
+ .anyMatch(k -> !Objects.equals(description.annotations().value(k),
+ existing.annotations().value(k)));
+ }
+
+ @Override
+ public OpenstackVtap createOrUpdateVtap(OpenstackVtapId vTapId,
+ OpenstackVtap description,
+ boolean replaceFlag) {
+
+ return vTapMap.compute(vTapId, (id, existing) -> {
+ if (existing == null &&
+ (description.type() == null ||
+ description.vTapCriterion() == null ||
+ description.txDeviceIds() == null ||
+ description.rxDeviceIds() == null)) {
+ checkState(false, INVALID_DESCRIPTION);
+ return null;
+ }
+
+ if (shouldUpdate(existing, description, replaceFlag)) {
+ // Replace items
+ OpenstackVtap.Type type =
+ (description.type() == null ? existing.type() : description.type());
+ OpenstackVtapCriterion vTapCriterion =
+ (description.vTapCriterion() == null ?
+ existing.vTapCriterion() : description.vTapCriterion());
+
+ // Replace or add devices
+ Set<DeviceId> txDeviceIds;
+ if (description.txDeviceIds() == null) {
+ txDeviceIds = existing.txDeviceIds();
+ } else {
+ if (existing == null || replaceFlag) {
+ txDeviceIds = ImmutableSet.copyOf(description.txDeviceIds());
+ } else {
+ txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
+ txDeviceIds.addAll(description.txDeviceIds());
+ }
+ }
+
+ Set<DeviceId> rxDeviceIds;
+ if (description.rxDeviceIds() == null) {
+ rxDeviceIds = existing.rxDeviceIds();
+ } else {
+ if (existing == null || replaceFlag) {
+ rxDeviceIds = ImmutableSet.copyOf(description.rxDeviceIds());
+ } else {
+ rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
+ rxDeviceIds.addAll(description.rxDeviceIds());
+ }
+ }
+
+ // Replace or add annotations
+ SparseAnnotations annotations;
+ if (existing != null) {
+ annotations = merge((DefaultAnnotations) existing.annotations(),
+ (SparseAnnotations) description.annotations());
+ } else {
+ annotations = (SparseAnnotations) description.annotations();
+ }
+
+ // Make new changed vTap and return
+ return DefaultOpenstackVtap.builder()
+ .id(vTapId)
+ .type(type)
+ .vTapCriterion(vTapCriterion)
+ .txDeviceIds(txDeviceIds)
+ .rxDeviceIds(rxDeviceIds)
+ .annotations(annotations)
+ .build();
+ }
+ return existing;
+ });
+ }
+
+ @Override
+ public OpenstackVtap removeVtapById(OpenstackVtapId vTapId) {
+ return vTapMap.remove(vTapId);
+ }
+
+ @Override
+ public boolean addDeviceToVtap(OpenstackVtapId vTapId,
+ OpenstackVtap.Type type,
+ DeviceId deviceId) {
+ checkNotNull(vTapId);
+ checkNotNull(deviceId);
+
+ OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
+ if (existing == null) {
+ return null;
+ }
+ if (!existing.type().isValid(type)) {
+ log.error("Not valid OpenstackVtap type {} for requested type {}",
+ existing.type(), type);
+ return existing;
+ }
+
+ Set<DeviceId> txDeviceIds = null;
+ if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
+ !existing.txDeviceIds().contains(deviceId)) {
+ txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
+ txDeviceIds.add(deviceId);
+ }
+
+ Set<DeviceId> rxDeviceIds = null;
+ if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
+ !existing.rxDeviceIds().contains(deviceId)) {
+ rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
+ rxDeviceIds.add(deviceId);
+ }
+
+ if (txDeviceIds != null || rxDeviceIds != null) {
+ //updateVTapIdFromDeviceId(existing.id(), deviceId); // execute from event listener
+
+ return DefaultOpenstackVtap.builder()
+ .id(vTapId)
+ .type(existing.type())
+ .vTapCriterion(existing.vTapCriterion())
+ .txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
+ .rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
+ .annotations(existing.annotations())
+ .build();
+ }
+ return existing;
+ });
+ return (vTap != null);
+ }
+
+ @Override
+ public boolean removeDeviceFromVtap(OpenstackVtapId vTapId,
+ OpenstackVtap.Type type,
+ DeviceId deviceId) {
+ checkNotNull(vTapId);
+ checkNotNull(deviceId);
+
+ OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
+ if (existing == null) {
+ return null;
+ }
+ if (!existing.type().isValid(type)) {
+ log.error("Not valid OpenstackVtap type {} for requested type {}",
+ existing.type(), type);
+ return existing;
+ }
+
+ Set<DeviceId> txDeviceIds = null;
+ if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
+ existing.txDeviceIds().contains(deviceId)) {
+ txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
+ txDeviceIds.remove(deviceId);
+ }
+
+ Set<DeviceId> rxDeviceIds = null;
+ if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
+ existing.rxDeviceIds().contains(deviceId)) {
+ rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
+ rxDeviceIds.remove(deviceId);
+ }
+
+ if (txDeviceIds != null || rxDeviceIds != null) {
+ //removeVTapIdFromDeviceId(existing.id(), deviceId); // execute from event listener
+
+ return DefaultOpenstackVtap.builder()
+ .id(vTapId)
+ .type(existing.type())
+ .vTapCriterion(existing.vTapCriterion())
+ .txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
+ .rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
+ .annotations(existing.annotations())
+ .build();
+ }
+ return existing;
+ });
+ return (vTap != null);
+ }
+
+ @Override
+ public boolean updateDeviceForVtap(OpenstackVtapId vTapId,
+ Set<DeviceId> txDeviceIds, Set<DeviceId> rxDeviceIds,
+ boolean replaceDevices) {
+ checkNotNull(vTapId);
+ checkNotNull(txDeviceIds);
+ checkNotNull(rxDeviceIds);
+
+ OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
+ if (existing == null) {
+ return null;
+ }
+
+ // Replace or add devices
+ Set<DeviceId> txDS = null;
+ if (replaceDevices) {
+ if (!existing.txDeviceIds().equals(txDeviceIds)) {
+ txDS = ImmutableSet.copyOf(txDeviceIds);
+ }
+ } else {
+ if (!existing.txDeviceIds().containsAll(txDeviceIds)) {
+ txDS = Sets.newHashSet(existing.txDeviceIds());
+ txDS.addAll(txDeviceIds);
+ }
+ }
+
+ Set<DeviceId> rxDS = null;
+ if (replaceDevices) {
+ if (!existing.rxDeviceIds().equals(rxDeviceIds)) {
+ rxDS = ImmutableSet.copyOf(rxDeviceIds);
+ }
+ } else {
+ if (!existing.rxDeviceIds().containsAll(rxDeviceIds)) {
+ rxDS = Sets.newHashSet(existing.rxDeviceIds());
+ rxDS.addAll(rxDeviceIds);
+ }
+ }
+
+ if (txDS != null || rxDS != null) {
+
+ return DefaultOpenstackVtap.builder()
+ .id(vTapId)
+ .type(existing.type())
+ .vTapCriterion(existing.vTapCriterion())
+ .txDeviceIds(txDS != null ? txDS : existing.txDeviceIds())
+ .rxDeviceIds(rxDS != null ? rxDS : existing.rxDeviceIds())
+ .annotations(existing.annotations())
+ .build();
+ }
+ return existing;
+ });
+ return (vTap != null);
+ }
+
+ @Override
+ public int getVtapCount(OpenstackVtap.Type type) {
+ return (int) vTapMap.values().parallelStream()
+ .filter(vTap -> vTap.type().isValid(type))
+ .count();
+ }
+
+ @Override
+ public Set<OpenstackVtap> getVtaps(OpenstackVtap.Type type) {
+ return ImmutableSet.copyOf(
+ vTapMap.values().parallelStream()
+ .filter(vTap -> vTap.type().isValid(type))
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public OpenstackVtap getVtap(OpenstackVtapId vTapId) {
+ return vTapMap.get(vTapId);
+ }
+
+ @Override
+ public Set<OpenstackVtap> getVtapsByDeviceId(OpenstackVtap.Type type,
+ DeviceId deviceId) {
+ Set<OpenstackVtapId> vtapIds = Sets.newHashSet();
+ if (type.isValid(OpenstackVtap.Type.VTAP_TX)) {
+ vtapIds.addAll(vTapIdsByTxDeviceId.get(deviceId));
+ }
+ if (type.isValid(OpenstackVtap.Type.VTAP_RX)) {
+ vtapIds.addAll(vTapIdsByRxDeviceId.get(deviceId));
+ }
+
+ return ImmutableSet.copyOf(
+ vtapIds.parallelStream()
+ .map(vTapId -> vTapMap.get(vTapId))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet()));
+ }
+
+ private class VTapComparator implements Comparator<OpenstackVtap> {
+ @Override
+ public int compare(OpenstackVtap v1, OpenstackVtap v2) {
+ int diff = (v2.type().compareTo(v1.type()));
+ if (diff == 0) {
+ return (v2.vTapCriterion().ipProtocol() - v1.vTapCriterion().ipProtocol());
+ }
+ return diff;
+ }
+ }
+
+ private static Set<OpenstackVtapId> addVTapIds(OpenstackVtapId vTapId) {
+ Set<OpenstackVtapId> vtapIds = Sets.newConcurrentHashSet();
+ vtapIds.add(vTapId);
+ return vtapIds;
+ }
+
+ private static Set<OpenstackVtapId> updateVTapIds(Set<OpenstackVtapId> existingVtapIds,
+ OpenstackVtapId vTapId) {
+ existingVtapIds.add(vTapId);
+ return existingVtapIds;
+ }
+
+ private static Set<OpenstackVtapId> removeVTapIds(Set<OpenstackVtapId> existingVtapIds,
+ OpenstackVtapId vTapId) {
+ existingVtapIds.remove(vTapId);
+ if (existingVtapIds.isEmpty()) {
+ return null;
+ }
+ return existingVtapIds;
+ }
+
+ private void updateVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
+ vTapIdsByTxDeviceId.compute(deviceId, (k, v) -> v == null ?
+ addVTapIds(vTapId) : updateVTapIds(v, vTapId));
+ }
+
+ private void removeVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
+ vTapIdsByTxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
+ }
+
+ private void updateVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
+ vTapIdsByRxDeviceId.compute(deviceId, (k, v) -> v == null ?
+ addVTapIds(vTapId) : updateVTapIds(v, vTapId));
+ }
+
+ private void removeVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
+ vTapIdsByRxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
+ }
+
+ private void refreshDeviceIdsByVtap(OpenstackVtap oldOpenstackVtap,
+ OpenstackVtap newOpenstackVtap) {
+ if (oldOpenstackVtap != null) {
+ Set<DeviceId> removeDeviceIds;
+
+ // Remove TX vTap
+ removeDeviceIds = (newOpenstackVtap != null) ?
+ Sets.difference(oldOpenstackVtap.txDeviceIds(),
+ newOpenstackVtap.txDeviceIds()) : oldOpenstackVtap.txDeviceIds();
+ removeDeviceIds.forEach(id -> removeVTapIdFromTxDeviceId(oldOpenstackVtap.id(), id));
+
+ // Remove RX vTap
+ removeDeviceIds = (newOpenstackVtap != null) ?
+ Sets.difference(oldOpenstackVtap.rxDeviceIds(),
+ newOpenstackVtap.rxDeviceIds()) : oldOpenstackVtap.rxDeviceIds();
+ removeDeviceIds.forEach(id -> removeVTapIdFromRxDeviceId(oldOpenstackVtap.id(), id));
+ }
+
+ if (newOpenstackVtap != null) {
+ Set<DeviceId> addDeviceIds;
+
+ // Add TX vTap
+ addDeviceIds = (oldOpenstackVtap != null) ?
+ Sets.difference(newOpenstackVtap.txDeviceIds(),
+ oldOpenstackVtap.txDeviceIds()) : newOpenstackVtap.txDeviceIds();
+ addDeviceIds.forEach(id -> updateVTapIdFromTxDeviceId(newOpenstackVtap.id(), id));
+
+ // Add RX vTap
+ addDeviceIds = (oldOpenstackVtap != null) ?
+ Sets.difference(newOpenstackVtap.rxDeviceIds(),
+ oldOpenstackVtap.rxDeviceIds()) : newOpenstackVtap.rxDeviceIds();
+ addDeviceIds.forEach(id -> updateVTapIdFromRxDeviceId(newOpenstackVtap.id(), id));
+ }
+ }
+
+ private class VTapEventListener
+ implements MapEventListener<OpenstackVtapId, DefaultOpenstackVtap> {
+ @Override
+ public void event(MapEvent<OpenstackVtapId, DefaultOpenstackVtap> event) {
+ DefaultOpenstackVtap newValue =
+ event.newValue() != null ? event.newValue().value() : null;
+ DefaultOpenstackVtap oldValue =
+ event.oldValue() != null ? event.oldValue().value() : null;
+
+ log.debug("VTapEventListener {} -> {}, {}", event.type(), oldValue, newValue);
+ switch (event.type()) {
+ case INSERT:
+ refreshDeviceIdsByVtap(oldValue, newValue);
+ notifyDelegate(new OpenstackVtapEvent(
+ OpenstackVtapEvent.Type.VTAP_ADDED, newValue));
+ break;
+
+ case UPDATE:
+ if (!Objects.equals(newValue, oldValue)) {
+ refreshDeviceIdsByVtap(oldValue, newValue);
+ notifyDelegate(new OpenstackVtapEvent(
+ OpenstackVtapEvent.Type.VTAP_UPDATED, newValue, oldValue));
+ }
+ break;
+
+ case REMOVE:
+ refreshDeviceIdsByVtap(oldValue, newValue);
+ notifyDelegate(new OpenstackVtapEvent(
+ OpenstackVtapEvent.Type.VTAP_REMOVED, oldValue));
+ break;
+
+ default:
+ log.warn("Unknown map event type: {}", event.type());
+ }
+ }
+ }
+}