blob: f8897e8386db12364c87eb8b355dce057f602697 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.openstackvtap.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onlab.util.KryoNamespace;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DeviceId;
import org.onosproject.net.SparseAnnotations;
import org.onosproject.openstackvtap.api.OpenstackVtap;
import org.onosproject.openstackvtap.api.OpenstackVtapCriterion;
import org.onosproject.openstackvtap.api.OpenstackVtapEvent;
import org.onosproject.openstackvtap.api.OpenstackVtapId;
import org.onosproject.openstackvtap.api.OpenstackVtapStore;
import org.onosproject.openstackvtap.api.OpenstackVtapStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages the inventory of users using a {@code ConsistentMap}.
*/
@Component(immediate = true, service = OpenstackVtapStore.class)
public class DistributedOpenstackVtapStore
extends AbstractStore<OpenstackVtapEvent, OpenstackVtapStoreDelegate>
implements OpenstackVtapStore {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;
private ConsistentMap<OpenstackVtapId, DefaultOpenstackVtap> vTapConsistentMap;
private MapEventListener<OpenstackVtapId, DefaultOpenstackVtap>
vTapListener = new VtapEventListener();
private Map<OpenstackVtapId, DefaultOpenstackVtap> vTapMap;
private static final Serializer SERIALIZER = Serializer
.using(new KryoNamespace.Builder().register(KryoNamespaces.API)
.register(OpenstackVtapId.class)
.register(UUID.class)
.register(DefaultOpenstackVtap.class)
.register(OpenstackVtap.Type.class)
.register(DefaultOpenstackVtapCriterion.class)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build());
private Map<DeviceId, Set<OpenstackVtapId>>
vTapIdsByTxDeviceId = Maps.newConcurrentMap();
private Map<DeviceId, Set<OpenstackVtapId>>
vTapIdsByRxDeviceId = Maps.newConcurrentMap();
private ScheduledExecutorService eventExecutor;
private Consumer<Status> vTapStatusListener;
public static final String INVALID_DESCRIPTION = "Invalid create/update parameter";
@Activate
public void activate() {
eventExecutor = newSingleThreadScheduledExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
vTapConsistentMap = storageService.<OpenstackVtapId, DefaultOpenstackVtap>
consistentMapBuilder()
.withName("vTapMap")
.withSerializer(SERIALIZER)
.build();
vTapMap = vTapConsistentMap.asJavaMap();
vTapConsistentMap.addListener(vTapListener);
vTapStatusListener = status -> {
if (status == Status.ACTIVE) {
eventExecutor.execute(this::loadVtapIds);
}
};
vTapConsistentMap.addStatusChangeListener(vTapStatusListener);
log.info("Started {} - {}", this.getClass().getSimpleName());
}
@Deactivate
public void deactivate() {
vTapConsistentMap.removeStatusChangeListener(vTapStatusListener);
vTapConsistentMap.removeListener(vTapListener);
eventExecutor.shutdown();
log.info("Stopped {} - {}", this.getClass().getSimpleName());
}
@Override
public OpenstackVtap createOrUpdateVtap(OpenstackVtapId vTapId,
OpenstackVtap description,
boolean replaceFlag) {
return vTapMap.compute(vTapId, (id, existing) -> {
if (existing == null &&
(description.type() == null ||
description.vTapCriterion() == null ||
description.txDeviceIds() == null ||
description.rxDeviceIds() == null)) {
checkState(false, INVALID_DESCRIPTION);
return null;
}
if (shouldUpdate(existing, description, replaceFlag)) {
// Replace items
OpenstackVtap.Type type =
(description.type() == null ? existing.type() : description.type());
OpenstackVtapCriterion vTapCriterion =
(description.vTapCriterion() == null ?
existing.vTapCriterion() : description.vTapCriterion());
// Replace or add devices
Set<DeviceId> txDeviceIds;
if (description.txDeviceIds() == null) {
txDeviceIds = existing.txDeviceIds();
} else {
if (existing == null || replaceFlag) {
txDeviceIds = ImmutableSet.copyOf(description.txDeviceIds());
} else {
txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
txDeviceIds.addAll(description.txDeviceIds());
}
}
Set<DeviceId> rxDeviceIds;
if (description.rxDeviceIds() == null) {
rxDeviceIds = existing.rxDeviceIds();
} else {
if (existing == null || replaceFlag) {
rxDeviceIds = ImmutableSet.copyOf(description.rxDeviceIds());
} else {
rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
rxDeviceIds.addAll(description.rxDeviceIds());
}
}
// Replace or add annotations
SparseAnnotations annotations;
if (existing != null) {
annotations = merge((DefaultAnnotations) existing.annotations(),
(SparseAnnotations) description.annotations());
} else {
annotations = (SparseAnnotations) description.annotations();
}
// Make new changed vTap and return
return DefaultOpenstackVtap.builder()
.id(vTapId)
.type(type)
.vTapCriterion(vTapCriterion)
.txDeviceIds(txDeviceIds)
.rxDeviceIds(rxDeviceIds)
.annotations(annotations)
.build();
}
return existing;
});
}
@Override
public OpenstackVtap removeVtapById(OpenstackVtapId vTapId) {
return vTapMap.remove(vTapId);
}
@Override
public boolean addDeviceToVtap(OpenstackVtapId vTapId,
OpenstackVtap.Type type,
DeviceId deviceId) {
checkNotNull(vTapId);
checkNotNull(deviceId);
OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
if (existing == null) {
return null;
}
if (!existing.type().isValid(type)) {
log.error("Not valid OpenstackVtap type {} for requested type {}",
existing.type(), type);
return existing;
}
Set<DeviceId> txDeviceIds = null;
if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
!existing.txDeviceIds().contains(deviceId)) {
txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
txDeviceIds.add(deviceId);
}
Set<DeviceId> rxDeviceIds = null;
if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
!existing.rxDeviceIds().contains(deviceId)) {
rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
rxDeviceIds.add(deviceId);
}
if (txDeviceIds != null || rxDeviceIds != null) {
//updateVTapIdFromDeviceId(existing.id(), deviceId); // execute from event listener
return DefaultOpenstackVtap.builder()
.id(vTapId)
.type(existing.type())
.vTapCriterion(existing.vTapCriterion())
.txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
.rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
.annotations(existing.annotations())
.build();
}
return existing;
});
return (vTap != null);
}
@Override
public boolean removeDeviceFromVtap(OpenstackVtapId vTapId,
OpenstackVtap.Type type,
DeviceId deviceId) {
checkNotNull(vTapId);
checkNotNull(deviceId);
OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
if (existing == null) {
return null;
}
if (!existing.type().isValid(type)) {
log.error("Not valid OpenstackVtap type {} for requested type {}",
existing.type(), type);
return existing;
}
Set<DeviceId> txDeviceIds = null;
if (type.isValid(OpenstackVtap.Type.VTAP_TX) &&
existing.txDeviceIds().contains(deviceId)) {
txDeviceIds = Sets.newHashSet(existing.txDeviceIds());
txDeviceIds.remove(deviceId);
}
Set<DeviceId> rxDeviceIds = null;
if (type.isValid(OpenstackVtap.Type.VTAP_RX) &&
existing.rxDeviceIds().contains(deviceId)) {
rxDeviceIds = Sets.newHashSet(existing.rxDeviceIds());
rxDeviceIds.remove(deviceId);
}
if (txDeviceIds != null || rxDeviceIds != null) {
//removeVTapIdFromDeviceId(existing.id(), deviceId); // execute from event listener
return DefaultOpenstackVtap.builder()
.id(vTapId)
.type(existing.type())
.vTapCriterion(existing.vTapCriterion())
.txDeviceIds(txDeviceIds != null ? txDeviceIds : existing.txDeviceIds())
.rxDeviceIds(rxDeviceIds != null ? rxDeviceIds : existing.rxDeviceIds())
.annotations(existing.annotations())
.build();
}
return existing;
});
return (vTap != null);
}
@Override
public boolean updateDeviceForVtap(OpenstackVtapId vTapId,
Set<DeviceId> txDeviceIds, Set<DeviceId> rxDeviceIds,
boolean replaceDevices) {
checkNotNull(vTapId);
checkNotNull(txDeviceIds);
checkNotNull(rxDeviceIds);
OpenstackVtap vTap = vTapMap.compute(vTapId, (id, existing) -> {
if (existing == null) {
return null;
}
// Replace or add devices
Set<DeviceId> txDS = null;
if (replaceDevices) {
if (!existing.txDeviceIds().equals(txDeviceIds)) {
txDS = ImmutableSet.copyOf(txDeviceIds);
}
} else {
if (!existing.txDeviceIds().containsAll(txDeviceIds)) {
txDS = Sets.newHashSet(existing.txDeviceIds());
txDS.addAll(txDeviceIds);
}
}
Set<DeviceId> rxDS = null;
if (replaceDevices) {
if (!existing.rxDeviceIds().equals(rxDeviceIds)) {
rxDS = ImmutableSet.copyOf(rxDeviceIds);
}
} else {
if (!existing.rxDeviceIds().containsAll(rxDeviceIds)) {
rxDS = Sets.newHashSet(existing.rxDeviceIds());
rxDS.addAll(rxDeviceIds);
}
}
if (txDS != null || rxDS != null) {
return DefaultOpenstackVtap.builder()
.id(vTapId)
.type(existing.type())
.vTapCriterion(existing.vTapCriterion())
.txDeviceIds(txDS != null ? txDS : existing.txDeviceIds())
.rxDeviceIds(rxDS != null ? rxDS : existing.rxDeviceIds())
.annotations(existing.annotations())
.build();
}
return existing;
});
return (vTap != null);
}
@Override
public int getVtapCount(OpenstackVtap.Type type) {
return (int) vTapMap.values().parallelStream()
.filter(vTap -> vTap.type().isValid(type))
.count();
}
@Override
public Set<OpenstackVtap> getVtaps(OpenstackVtap.Type type) {
return ImmutableSet.copyOf(
vTapMap.values().parallelStream()
.filter(vTap -> vTap.type().isValid(type))
.collect(Collectors.toSet()));
}
@Override
public OpenstackVtap getVtap(OpenstackVtapId vTapId) {
return vTapMap.get(vTapId);
}
@Override
public Set<OpenstackVtap> getVtapsByDeviceId(OpenstackVtap.Type type,
DeviceId deviceId) {
Set<OpenstackVtapId> vtapIds = Sets.newHashSet();
if (type.isValid(OpenstackVtap.Type.VTAP_TX)) {
if (vTapIdsByTxDeviceId.get(deviceId) != null) {
vtapIds.addAll(vTapIdsByTxDeviceId.get(deviceId));
}
}
if (type.isValid(OpenstackVtap.Type.VTAP_RX)) {
if (vTapIdsByRxDeviceId.get(deviceId) != null) {
vtapIds.addAll(vTapIdsByRxDeviceId.get(deviceId));
}
}
return ImmutableSet.copyOf(
vtapIds.parallelStream()
.map(vTapId -> vTapMap.get(vTapId))
.filter(Objects::nonNull)
.collect(Collectors.toSet()));
}
private void loadVtapIds() {
vTapIdsByTxDeviceId.clear();
vTapIdsByRxDeviceId.clear();
vTapMap.values().forEach(vTap -> refreshDeviceIdsByVtap(null, vTap));
}
private boolean shouldUpdate(DefaultOpenstackVtap existing,
OpenstackVtap description,
boolean replaceDevices) {
if (existing == null) {
return true;
}
if ((description.type() != null && !description.type().equals(existing.type()))
|| (description.vTapCriterion() != null &&
!description.vTapCriterion().equals(existing.vTapCriterion()))) {
return true;
}
if (description.txDeviceIds() != null) {
if (replaceDevices) {
if (!existing.txDeviceIds().equals(description.txDeviceIds())) {
return true;
}
} else {
if (!existing.txDeviceIds().containsAll(description.txDeviceIds())) {
return true;
}
}
}
if (description.rxDeviceIds() != null) {
if (replaceDevices) {
if (!existing.rxDeviceIds().equals(description.rxDeviceIds())) {
return true;
}
} else {
if (!existing.rxDeviceIds().containsAll(description.rxDeviceIds())) {
return true;
}
}
}
// check to see if any of the annotations provided by vTap
// differ from those in the existing vTap
return description.annotations().keys().stream()
.anyMatch(k -> !Objects.equals(description.annotations().value(k),
existing.annotations().value(k)));
}
private class VtapComparator implements Comparator<OpenstackVtap> {
@Override
public int compare(OpenstackVtap v1, OpenstackVtap v2) {
int diff = (v2.type().compareTo(v1.type()));
if (diff == 0) {
return (v2.vTapCriterion().ipProtocol() - v1.vTapCriterion().ipProtocol());
}
return diff;
}
}
private static Set<OpenstackVtapId> addVTapIds(OpenstackVtapId vTapId) {
Set<OpenstackVtapId> vtapIds = Sets.newConcurrentHashSet();
vtapIds.add(vTapId);
return vtapIds;
}
private static Set<OpenstackVtapId> updateVTapIds(Set<OpenstackVtapId> existingVtapIds,
OpenstackVtapId vTapId) {
existingVtapIds.add(vTapId);
return existingVtapIds;
}
private static Set<OpenstackVtapId> removeVTapIds(Set<OpenstackVtapId> existingVtapIds,
OpenstackVtapId vTapId) {
existingVtapIds.remove(vTapId);
if (existingVtapIds.isEmpty()) {
return null;
}
return existingVtapIds;
}
private void updateVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
vTapIdsByTxDeviceId.compute(deviceId, (k, v) -> v == null ?
addVTapIds(vTapId) : updateVTapIds(v, vTapId));
}
private void removeVTapIdFromTxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
vTapIdsByTxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
}
private void updateVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
vTapIdsByRxDeviceId.compute(deviceId, (k, v) -> v == null ?
addVTapIds(vTapId) : updateVTapIds(v, vTapId));
}
private void removeVTapIdFromRxDeviceId(OpenstackVtapId vTapId, DeviceId deviceId) {
vTapIdsByRxDeviceId.computeIfPresent(deviceId, (k, v) -> removeVTapIds(v, vTapId));
}
private void refreshDeviceIdsByVtap(OpenstackVtap oldOpenstackVtap,
OpenstackVtap newOpenstackVtap) {
if (oldOpenstackVtap != null) {
Set<DeviceId> removeDeviceIds;
// Remove TX vTap
removeDeviceIds = (newOpenstackVtap != null) ?
Sets.difference(oldOpenstackVtap.txDeviceIds(),
newOpenstackVtap.txDeviceIds()) : oldOpenstackVtap.txDeviceIds();
removeDeviceIds.forEach(id -> removeVTapIdFromTxDeviceId(oldOpenstackVtap.id(), id));
// Remove RX vTap
removeDeviceIds = (newOpenstackVtap != null) ?
Sets.difference(oldOpenstackVtap.rxDeviceIds(),
newOpenstackVtap.rxDeviceIds()) : oldOpenstackVtap.rxDeviceIds();
removeDeviceIds.forEach(id -> removeVTapIdFromRxDeviceId(oldOpenstackVtap.id(), id));
}
if (newOpenstackVtap != null) {
Set<DeviceId> addDeviceIds;
// Add TX vTap
addDeviceIds = (oldOpenstackVtap != null) ?
Sets.difference(newOpenstackVtap.txDeviceIds(),
oldOpenstackVtap.txDeviceIds()) : newOpenstackVtap.txDeviceIds();
addDeviceIds.forEach(id -> updateVTapIdFromTxDeviceId(newOpenstackVtap.id(), id));
// Add RX vTap
addDeviceIds = (oldOpenstackVtap != null) ?
Sets.difference(newOpenstackVtap.rxDeviceIds(),
oldOpenstackVtap.rxDeviceIds()) : newOpenstackVtap.rxDeviceIds();
addDeviceIds.forEach(id -> updateVTapIdFromRxDeviceId(newOpenstackVtap.id(), id));
}
}
private class VtapEventListener
implements MapEventListener<OpenstackVtapId, DefaultOpenstackVtap> {
@Override
public void event(MapEvent<OpenstackVtapId, DefaultOpenstackVtap> event) {
DefaultOpenstackVtap newValue =
event.newValue() != null ? event.newValue().value() : null;
DefaultOpenstackVtap oldValue =
event.oldValue() != null ? event.oldValue().value() : null;
log.debug("VtapEventListener {} -> {}, {}", event.type(), oldValue, newValue);
switch (event.type()) {
case INSERT:
refreshDeviceIdsByVtap(oldValue, newValue);
notifyDelegate(new OpenstackVtapEvent(
OpenstackVtapEvent.Type.VTAP_ADDED, newValue));
break;
case UPDATE:
if (!Objects.equals(newValue, oldValue)) {
refreshDeviceIdsByVtap(oldValue, newValue);
notifyDelegate(new OpenstackVtapEvent(
OpenstackVtapEvent.Type.VTAP_UPDATED, newValue, oldValue));
}
break;
case REMOVE:
refreshDeviceIdsByVtap(oldValue, newValue);
notifyDelegate(new OpenstackVtapEvent(
OpenstackVtapEvent.Type.VTAP_REMOVED, oldValue));
break;
default:
log.warn("Unknown map event type: {}", event.type());
}
}
}
}