blob: 90d75dd32fd8b832b48cdf8619288d417c8b4488 [file] [log] [blame]
/*
* Copyright 2023-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.netflow.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import java.util.LinkedList;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.onosproject.netflow.NetflowStore;
import org.onosproject.netflow.Flow;
import org.onosproject.netflow.FlowField;
import org.onosproject.netflow.FlowTemplateField;
import org.onosproject.netflow.SourceId;
import org.onosproject.netflow.TemplateId;
import org.onosproject.netflow.DataRecord;
import org.onosproject.netflow.DataFlowRecord;
import org.onosproject.netflow.FlowSet;
import org.onosproject.netflow.DataFlowSet;
import org.onosproject.netflow.TemplateRecord;
import org.onosproject.netflow.DataTemplateRecord;
import org.onosproject.netflow.OptionalTemplateFlowSet;
import org.onosproject.netflow.TemplateFlowSet;
import org.onlab.packet.BasePacket;
/**
* Manages inventory of Netflow template and data flowset to distribute
* information.
*/
@Component(immediate = true, service = NetflowStore.class)
public class DistributedNetflowStore implements NetflowStore {
private final Logger log = LoggerFactory.getLogger(getClass());
private EventuallyConsistentMap<TemplateId, DataTemplateRecord> templateFlowSet;
private EventuallyConsistentMap<TemplateId, OptionalTemplateFlowSet> optionalTemplateFlowSet;
private EventuallyConsistentMap<TemplateId, List<DataFlowRecord>> dataFlowSet;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;
@Activate
protected void activate() {
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.MISC)
.register(List.class)
.register(LinkedList.class)
.register(ArrayList.class)
.register(HashSet.class)
.register(BasePacket.class)
.register(Flow.class)
.register(FlowField.class)
.register(FlowTemplateField.class)
.register(SourceId.class)
.register(TemplateId.class)
.register(DataRecord.class)
.register(DataFlowRecord.class)
.register(FlowSet.class)
.register(DataFlowSet.class)
.register(TemplateRecord.class)
.register(DataTemplateRecord.class)
.register(NetFlowPacket.class)
.register(OptionalTemplateFlowSet.class)
.register(TemplateFlowSet.class)
.register(TemplateRecord.class);
templateFlowSet = storageService.<TemplateId, DataTemplateRecord>eventuallyConsistentMapBuilder()
.withSerializer(serializer)
.withName("netflow-templateflowset")
.withAntiEntropyPeriod(10, TimeUnit.SECONDS)
.withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
.withTombstonesDisabled()
.build();
optionalTemplateFlowSet = storageService.<TemplateId, OptionalTemplateFlowSet>eventuallyConsistentMapBuilder()
.withSerializer(serializer)
.withName("netflow-optionaltemplateflowset")
.withAntiEntropyPeriod(10, TimeUnit.SECONDS)
.withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
.withTombstonesDisabled()
.build();
dataFlowSet = storageService.<TemplateId, List<DataFlowRecord>>eventuallyConsistentMapBuilder()
.withSerializer(serializer)
.withName("netflow-dataflowset")
.withAntiEntropyPeriod(10, TimeUnit.SECONDS)
.withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
.withTombstonesDisabled()
.build();
log.info("Started");
}
@Deactivate
public void deactive() {
log.info("Stopped");
}
/**
* Get template flowset from store.
* it will fetch current template which is matching to the template id from store.
*
* @param templateId template id.
* @return optional of data template record, optional will be empty if template not found.
*/
@Override
public Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId) {
return Optional.ofNullable(templateFlowSet.get(templateId));
}
/**
* Get set of template flowsets from store.
* it will fetch all template flowsets from store.
*
* @return set of data template record, set will be empty if templates not found in the store.
*/
@Override
public Set<DataTemplateRecord> getTemplateFlowSet() {
return templateFlowSet.values().stream()
.collect(Collectors.toSet());
}
/**
* Get optional template flowset from store.
* it will fetch current optional template which is matching to the template id from store.
*
* @param templateId template id.
* @return optional of optional template flowset, optional will be empty if template not found.
*/
@Override
public Optional<OptionalTemplateFlowSet> getOptionalTemplateFlowSet(TemplateId templateId) {
return Optional.ofNullable(optionalTemplateFlowSet.get(templateId));
}
/**
* Get set of optional template flowsets from store.
* it will fetch all optional template flowsets from store.
*
* @return set of optional template flowsets, set will be empty if templates not found in the store.
*/
@Override
public Set<OptionalTemplateFlowSet> getOptionalTemplateFlowSet() {
return ImmutableSet.copyOf(optionalTemplateFlowSet.values());
}
/**
* Get data flowset from store.
* it will fetch current data flowset which is matching to the template id from store.
*
* @param templateId template id.
* @return list of data flow record, list will be empty if template id not matched.
*/
@Override
public List<DataFlowRecord> getDataFlowSet(TemplateId templateId) {
List<DataFlowRecord> dataRecord = dataFlowSet.get(templateId);
if (!Objects.isNull(dataRecord)) {
return ImmutableList.copyOf(dataRecord);
}
return Lists.newArrayList();
}
/**
* Get data flowset from store.
* it will fetch current data flowset which is matching to the template id from store.
*
* @return mapping from a template id to data flow record.
*/
@Override
public Map<TemplateId, List<DataFlowRecord>> getDataFlowSet() {
return dataFlowSet.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
}
/**
* Update template flowset to the store.
* Add new template to store if it not exist,
* otherwise it will replace the existing template.
*
* @param templateRecord template flowset record.
*/
@Override
public void updateTemplateFlowSet(DataTemplateRecord templateRecord) {
templateFlowSet.put(templateRecord.getTemplateId(), templateRecord);
}
/**
* Update optional template flowset to the store.
* Add new optional template to store if it not exist,
* otherwise it will replace the existing optional template.
*
* @param templateFlowSet optional template flowset.
*/
@Override
public void updateOptionalTemplateFlowSet(OptionalTemplateFlowSet templateFlowSet) {
optionalTemplateFlowSet.put(templateFlowSet.getTemplateId(), templateFlowSet);
}
/**
* Add data flow record to the store.
* Add new data flow record to store
*
* @param dataFlowRecord data flow record.
*/
@Override
public void addDataFlowSet(DataFlowRecord dataFlowRecord) {
dataFlowSet.compute(dataFlowRecord.getTemplateId(),
(id, flowSet) -> {
List<DataFlowRecord> newSet = new ArrayList<>();
if (flowSet != null) {
newSet.addAll(flowSet);
}
newSet.add(dataFlowRecord);
return newSet;
});
}
/**
* Remove template flowset from store.
* it will remove template flowset which is matching to the given template id from store.
*
* @param templateId template id.
*/
@Override
public void clearTemplateFlowSet(TemplateId templateId) {
if (templateFlowSet.containsKey(templateId)) {
templateFlowSet.remove(templateId);
}
}
/**
* Remove all template flowset from store.
* it will remove all template flowsets from store.
*/
@Override
public void clearTemplateFlowSet() {
templateFlowSet.clear();
}
/**
* Remove optional template flowset from store.
* it will remove optional template which is matching to the given template id from store.
*
* @param templateId template id.
*/
@Override
public void clearOptionalTemplateFlowSet(TemplateId templateId) {
if (optionalTemplateFlowSet.containsKey(templateId)) {
optionalTemplateFlowSet.remove(templateId);
}
}
/**
* Remove all optional template flowset from store.
* it will remove all optional template flowsets from store.
*/
@Override
public void clearOptionalTemplateFlowSet() {
optionalTemplateFlowSet.clear();
}
/**
* Remove data flowset from store.
* it will remove dataflowset which is matching to the given template id from store.
*
* @param templateId template id.
*/
@Override
public void clearDataFlowSet(TemplateId templateId) {
if (dataFlowSet.containsKey(templateId)) {
dataFlowSet.remove(templateId);
}
}
/**
* Remove all data flowset from store.
* it will remove all data flowsets from store.
*/
@Override
public void clearDataFlowSet() {
dataFlowSet.clear();
}
/**
* Remove template, optional template and data flowsets from store.
* it will remove all flowsets from store.
*/
@Override
public void clearAllFlowSet() {
templateFlowSet.clear();
optionalTemplateFlowSet.clear();
dataFlowSet.clear();
}
}