blob: 7f984abebc54caf7627a1b9c6125a0ec17fffb6e [file] [log] [blame]
/*
* Copyright 2015-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.statistic.impl;
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.DefaultTypedFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TypedStoredFlowEntry;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.statistic.DefaultLoad;
import org.onosproject.net.statistic.FlowEntryWithLoad;
import org.onosproject.net.statistic.FlowStatisticService;
import org.onosproject.net.statistic.Load;
import org.onosproject.net.statistic.PollInterval;
import org.onosproject.net.statistic.StatisticStore;
import org.onosproject.net.statistic.SummaryFlowEntryWithLoad;
import org.onosproject.net.statistic.TypedFlowEntryWithLoad;
import org.onosproject.utils.Comparators;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.STATISTIC_READ;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Provides an implementation of the Flow Statistic Service.
*/
@Component(immediate = true, service = FlowStatisticService.class)
public class FlowStatisticManager implements FlowStatisticService {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StatisticStore statisticStore;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device) {
checkPermission(STATISTIC_READ);
Map<ConnectPoint, SummaryFlowEntryWithLoad> summaryLoad =
new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
if (device == null) {
return summaryLoad;
}
List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
for (Port port : ports) {
ConnectPoint cp = new ConnectPoint(device.id(), port.number());
SummaryFlowEntryWithLoad sfe = loadSummaryPortInternal(cp);
summaryLoad.put(cp, sfe);
}
return summaryLoad;
}
@Override
public SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber) {
checkPermission(STATISTIC_READ);
ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
return loadSummaryPortInternal(cp);
}
@Override
public Map<ConnectPoint, List<FlowEntryWithLoad>> loadAllByType(Device device,
FlowEntry.FlowLiveType liveType,
Instruction.Type instType) {
checkPermission(STATISTIC_READ);
Map<ConnectPoint, List<FlowEntryWithLoad>> allLoad =
new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
if (device == null) {
return allLoad;
}
List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
for (Port port : ports) {
ConnectPoint cp = new ConnectPoint(device.id(), port.number());
List<FlowEntryWithLoad> fel = loadAllPortInternal(cp, liveType, instType);
allLoad.put(cp, fel);
}
return allLoad;
}
@Override
public List<FlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,
FlowEntry.FlowLiveType liveType,
Instruction.Type instType) {
checkPermission(STATISTIC_READ);
ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
return loadAllPortInternal(cp, liveType, instType);
}
@Override
public Map<ConnectPoint, List<FlowEntryWithLoad>> loadTopnByType(Device device,
FlowEntry.FlowLiveType liveType,
Instruction.Type instType,
int topn) {
checkPermission(STATISTIC_READ);
Map<ConnectPoint, List<FlowEntryWithLoad>> allLoad =
new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
if (device == null) {
return allLoad;
}
List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
for (Port port : ports) {
ConnectPoint cp = new ConnectPoint(device.id(), port.number());
List<FlowEntryWithLoad> fel = loadTopnPortInternal(cp, liveType, instType, topn);
allLoad.put(cp, fel);
}
return allLoad;
}
@Override
public List<FlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,
FlowEntry.FlowLiveType liveType,
Instruction.Type instType,
int topn) {
checkPermission(STATISTIC_READ);
ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
return loadTopnPortInternal(cp, liveType, instType, topn);
}
private SummaryFlowEntryWithLoad loadSummaryPortInternal(ConnectPoint cp) {
checkPermission(STATISTIC_READ);
Set<FlowEntry> currentStats;
Set<FlowEntry> previousStats;
TypedStatistics typedStatistics;
synchronized (statisticStore) {
currentStats = statisticStore.getCurrentStatistic(cp);
if (currentStats == null) {
return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
}
previousStats = statisticStore.getPreviousStatistic(cp);
if (previousStats == null) {
return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
}
// copy to local flow entry
typedStatistics = new TypedStatistics(currentStats, previousStats);
// Check for validity of this stats data
checkLoadValidity(currentStats, previousStats);
}
// current and previous set is not empty!
Set<FlowEntry> currentSet = typedStatistics.current();
Set<FlowEntry> previousSet = typedStatistics.previous();
PollInterval pollIntervalInstance = PollInterval.getInstance();
// We assume that default pollInterval is flowPollFrequency in case adaptiveFlowSampling is true or false
Load totalLoad = new DefaultLoad(aggregateBytesSet(currentSet), aggregateBytesSet(previousSet),
pollIntervalInstance.getPollInterval());
Map<FlowRule, FlowEntry> currentMap;
Map<FlowRule, FlowEntry> previousMap;
currentMap = typedStatistics.currentImmediate();
previousMap = typedStatistics.previousImmediate();
Load immediateLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
pollIntervalInstance.getPollInterval());
currentMap = typedStatistics.currentShort();
previousMap = typedStatistics.previousShort();
Load shortLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
pollIntervalInstance.getPollInterval());
currentMap = typedStatistics.currentMid();
previousMap = typedStatistics.previousMid();
Load midLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
pollIntervalInstance.getMidPollInterval());
currentMap = typedStatistics.currentLong();
previousMap = typedStatistics.previousLong();
Load longLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
pollIntervalInstance.getLongPollInterval());
currentMap = typedStatistics.currentUnknown();
previousMap = typedStatistics.previousUnknown();
Load unknownLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
pollIntervalInstance.getPollInterval());
return new SummaryFlowEntryWithLoad(cp, totalLoad, immediateLoad, shortLoad, midLoad, longLoad, unknownLoad);
}
private List<FlowEntryWithLoad> loadAllPortInternal(ConnectPoint cp,
FlowEntry.FlowLiveType liveType,
Instruction.Type instType) {
checkPermission(STATISTIC_READ);
List<FlowEntryWithLoad> retFel = new ArrayList<>();
Set<FlowEntry> currentStats;
Set<FlowEntry> previousStats;
TypedStatistics typedStatistics;
synchronized (statisticStore) {
currentStats = statisticStore.getCurrentStatistic(cp);
if (currentStats == null) {
return retFel;
}
previousStats = statisticStore.getPreviousStatistic(cp);
if (previousStats == null) {
return retFel;
}
// copy to local flow entry set
typedStatistics = new TypedStatistics(currentStats, previousStats);
// Check for validity of this stats data
checkLoadValidity(currentStats, previousStats);
}
// current and previous set is not empty!
boolean isAllInstType = (instType == null ? true : false); // null is all inst type
boolean isAllLiveType = (liveType == null ? true : false); // null is all live type
Map<FlowRule, FlowEntry> currentMap;
Map<FlowRule, FlowEntry> previousMap;
if (isAllLiveType) {
currentMap = typedStatistics.currentAll();
previousMap = typedStatistics.previousAll();
} else {
switch (liveType) {
case IMMEDIATE:
currentMap = typedStatistics.currentImmediate();
previousMap = typedStatistics.previousImmediate();
break;
case SHORT:
currentMap = typedStatistics.currentShort();
previousMap = typedStatistics.previousShort();
break;
case MID:
currentMap = typedStatistics.currentMid();
previousMap = typedStatistics.previousMid();
break;
case LONG:
currentMap = typedStatistics.currentLong();
previousMap = typedStatistics.previousLong();
break;
case UNKNOWN:
currentMap = typedStatistics.currentUnknown();
previousMap = typedStatistics.previousUnknown();
break;
default:
currentMap = new HashMap<>();
previousMap = new HashMap<>();
break;
}
}
return typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap, isAllInstType, instType);
}
private List<FlowEntryWithLoad> typedFlowEntryLoadByInstInternal(ConnectPoint cp,
Map<FlowRule, FlowEntry> currentMap,
Map<FlowRule, FlowEntry> previousMap,
boolean isAllInstType,
Instruction.Type instType) {
List<FlowEntryWithLoad> fel = new ArrayList<>();
currentMap.values().forEach(fe -> {
if (isAllInstType ||
fe.treatment().allInstructions().stream().
filter(i -> i.type() == instType).
findAny().isPresent()) {
long currentBytes = fe.bytes();
long previousBytes = previousMap.getOrDefault(fe, new DefaultFlowEntry(fe)).bytes();
long liveTypePollInterval = getLiveTypePollInterval(fe.liveType());
Load fLoad = new DefaultLoad(currentBytes, previousBytes, liveTypePollInterval);
fel.add(new FlowEntryWithLoad(cp, fe, fLoad));
}
});
return fel;
}
private List<FlowEntryWithLoad> loadTopnPortInternal(ConnectPoint cp,
FlowEntry.FlowLiveType liveType,
Instruction.Type instType,
int topn) {
List<FlowEntryWithLoad> fel = loadAllPortInternal(cp, liveType, instType);
// Sort with descending order of load
List<FlowEntryWithLoad> retFel =
fel.stream().sorted(Comparators.FLOWENTRY_WITHLOAD_COMPARATOR).
limit(topn).collect(Collectors.toList());
return retFel;
}
private long aggregateBytesSet(Set<FlowEntry> setFE) {
return setFE.stream().mapToLong(FlowEntry::bytes).sum();
}
private long aggregateBytesMap(Map<FlowRule, FlowEntry> mapFE) {
return mapFE.values().stream().mapToLong(FlowEntry::bytes).sum();
}
private long getLiveTypePollInterval(FlowEntry.FlowLiveType liveType) {
// returns the flow live type poll interval value
PollInterval pollIntervalInstance = PollInterval.getInstance();
switch (liveType) {
case LONG:
return pollIntervalInstance.getLongPollInterval();
case MID:
return pollIntervalInstance.getMidPollInterval();
case SHORT:
case IMMEDIATE:
default: // UNKNOWN
return pollIntervalInstance.getPollInterval();
}
}
private TypedStoredFlowEntry.FlowLiveType toTypedStoredFlowEntryLiveType(FlowEntry.FlowLiveType liveType) {
if (liveType == null) {
return null;
}
// convert TypedStoredFlowEntry flow live type to FlowEntry one
switch (liveType) {
case IMMEDIATE:
return TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW;
case SHORT:
return TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW;
case MID:
return TypedStoredFlowEntry.FlowLiveType.MID_FLOW;
case LONG:
return TypedStoredFlowEntry.FlowLiveType.LONG_FLOW;
default:
return TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW;
}
}
private Map<ConnectPoint, List<TypedFlowEntryWithLoad>> toFlowEntryWithLoadMap(
Map<ConnectPoint, List<FlowEntryWithLoad>> loadMap) {
// convert FlowEntryWithLoad list to TypedFlowEntryWithLoad list
Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad =
new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
loadMap.forEach((k, v) -> {
List<TypedFlowEntryWithLoad> tfelList =
toFlowEntryWithLoad(v);
allLoad.put(k, tfelList);
});
return allLoad;
}
private List<TypedFlowEntryWithLoad> toFlowEntryWithLoad(List<FlowEntryWithLoad> loadList) {
// convert FlowEntryWithLoad list to TypedFlowEntryWithLoad list
List<TypedFlowEntryWithLoad> tfelList = new ArrayList<>();
loadList.forEach(fel -> {
StoredFlowEntry sfe = fel.storedFlowEntry();
TypedStoredFlowEntry.FlowLiveType liveType = toTypedStoredFlowEntryLiveType(sfe.liveType());
TypedStoredFlowEntry tfe = new DefaultTypedFlowEntry(sfe, liveType);
TypedFlowEntryWithLoad tfel = new TypedFlowEntryWithLoad(fel.connectPoint(), tfe, fel.load());
tfelList.add(tfel);
});
return tfelList;
}
/**
* Internal data class holding two set of flow entries included flow liveType.
*/
private static class TypedStatistics {
private final ImmutableSet<FlowEntry> current;
private final ImmutableSet<FlowEntry> previous;
private final Map<FlowRule, FlowEntry> currentAll = new HashMap<>();
private final Map<FlowRule, FlowEntry> previousAll = new HashMap<>();
private final Map<FlowRule, FlowEntry> currentImmediate = new HashMap<>();
private final Map<FlowRule, FlowEntry> previousImmediate = new HashMap<>();
private final Map<FlowRule, FlowEntry> currentShort = new HashMap<>();
private final Map<FlowRule, FlowEntry> previousShort = new HashMap<>();
private final Map<FlowRule, FlowEntry> currentMid = new HashMap<>();
private final Map<FlowRule, FlowEntry> previousMid = new HashMap<>();
private final Map<FlowRule, FlowEntry> currentLong = new HashMap<>();
private final Map<FlowRule, FlowEntry> previousLong = new HashMap<>();
private final Map<FlowRule, FlowEntry> currentUnknown = new HashMap<>();
private final Map<FlowRule, FlowEntry> previousUnknown = new HashMap<>();
public TypedStatistics(Set<FlowEntry> current, Set<FlowEntry> previous) {
this.current = ImmutableSet.copyOf(checkNotNull(current));
this.previous = ImmutableSet.copyOf(checkNotNull(previous));
current.forEach(fe -> {
switch (fe.liveType()) {
case IMMEDIATE:
currentImmediate.put(fe, fe);
break;
case SHORT:
currentShort.put(fe, fe);
break;
case MID:
currentMid.put(fe, fe);
break;
case LONG:
currentLong.put(fe, fe);
break;
default: // unknown
currentUnknown.put(fe, fe);
break;
}
currentAll.put(fe, fe);
});
previous.forEach(fe -> {
switch (fe.liveType()) {
case IMMEDIATE:
if (currentImmediate.containsKey(fe)) {
previousImmediate.put(fe, fe);
} else if (currentShort.containsKey(fe)) {
previousShort.put(fe, fe);
} else if (currentMid.containsKey(fe)) {
previousMid.put(fe, fe);
} else if (currentLong.containsKey(fe)) {
previousLong.put(fe, fe);
} else {
previousUnknown.put(fe, fe);
}
break;
case SHORT:
if (currentShort.containsKey(fe)) {
previousShort.put(fe, fe);
} else if (currentMid.containsKey(fe)) {
previousMid.put(fe, fe);
} else if (currentLong.containsKey(fe)) {
previousLong.put(fe, fe);
} else {
previousUnknown.put(fe, fe);
}
break;
case MID:
if (currentMid.containsKey(fe)) {
previousMid.put(fe, fe);
} else if (currentLong.containsKey(fe)) {
previousLong.put(fe, fe);
} else {
previousUnknown.put(fe, fe);
}
break;
case LONG:
if (currentLong.containsKey(fe)) {
previousLong.put(fe, fe);
} else {
previousUnknown.put(fe, fe);
}
break;
default: // unknown
previousUnknown.put(fe, fe);
break;
}
previousAll.put(fe, fe);
});
}
/**
* Returns flow entries as the current value.
*
* @return flow entries as the current value
*/
public ImmutableSet<FlowEntry> current() {
return current;
}
/**
* Returns flow entries as the previous value.
*
* @return flow entries as the previous value
*/
public ImmutableSet<FlowEntry> previous() {
return previous;
}
public Map<FlowRule, FlowEntry> currentAll() {
return currentAll;
}
public Map<FlowRule, FlowEntry> previousAll() {
return previousAll;
}
public Map<FlowRule, FlowEntry> currentImmediate() {
return currentImmediate;
}
public Map<FlowRule, FlowEntry> previousImmediate() {
return previousImmediate;
}
public Map<FlowRule, FlowEntry> currentShort() {
return currentShort;
}
public Map<FlowRule, FlowEntry> previousShort() {
return previousShort;
}
public Map<FlowRule, FlowEntry> currentMid() {
return currentMid;
}
public Map<FlowRule, FlowEntry> previousMid() {
return previousMid;
}
public Map<FlowRule, FlowEntry> currentLong() {
return currentLong;
}
public Map<FlowRule, FlowEntry> previousLong() {
return previousLong;
}
public Map<FlowRule, FlowEntry> currentUnknown() {
return currentUnknown;
}
public Map<FlowRule, FlowEntry> previousUnknown() {
return previousUnknown;
}
/**
* Validates values are not empty.
*
* @return false if either of the sets is empty. Otherwise, true.
*/
public boolean isValid() {
return !(currentAll.isEmpty() || previousAll.isEmpty());
}
@Override
public int hashCode() {
return Objects.hash(currentAll, previousAll);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof TypedStatistics)) {
return false;
}
final TypedStatistics other = (TypedStatistics) obj;
return Objects.equals(this.currentAll, other.currentAll) &&
Objects.equals(this.previousAll, other.previousAll);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("current", currentAll)
.add("previous", previousAll)
.toString();
}
}
private void checkLoadValidity(Set<FlowEntry> current, Set<FlowEntry> previous) {
current.forEach(c -> {
FlowEntry f = previous.stream().filter(p -> c.equals(p)).
findAny().orElse(null);
if (f != null && c.bytes() < f.bytes()) {
log.debug("FlowStatisticManager:checkLoadValidity():" +
"Error: " + c + " :Previous bytes=" + f.bytes() +
" is larger than current bytes=" + c.bytes() + " !!!");
}
});
}
/**
* Creates a predicate that checks the instruction type of a flow entry is the same as
* the specified instruction type.
*
* @param instType instruction type to be checked
* @return predicate
*/
private static Predicate<FlowEntry> hasInstructionType(Instruction.Type instType) {
return new Predicate<FlowEntry>() {
@Override
public boolean apply(FlowEntry flowEntry) {
List<Instruction> allInstructions = flowEntry.treatment().allInstructions();
return allInstructions.stream().filter(i -> i.type() == instType).findAny().isPresent();
}
};
}
/**
* Creates a predicate that checks the flow type of a flow entry is the same as
* the specified live type.
*
* @param liveType flow live type to be checked
* @return predicate
*/
private static Predicate<FlowEntry> hasLiveType(FlowEntry.FlowLiveType liveType) {
return flowEntry -> flowEntry.liveType() == liveType;
}
}