blob: e63fc9c9263f9eb8af3d76d929a0bd84bdc68196 [file] [log] [blame]
/*
* Copyright 2015-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.net.impl;
import com.google.common.collect.Maps;
import org.onosproject.incubator.net.PortStatisticsService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.statistic.DefaultLoad;
import org.onosproject.net.statistic.Load;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import java.util.Map;
import java.util.stream.Collectors;
import static org.onosproject.net.PortNumber.portNumber;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of the port statistics service.
*/
@Component(immediate = true, service = PortStatisticsService.class)
public class PortStatisticsManager implements PortStatisticsService {
private final Logger log = getLogger(getClass());
private static final long POLL_FREQUENCY = 10_000; // milliseconds
private static final long STALE_LIMIT = (long) (1.5 * POLL_FREQUENCY);
private static final int SECOND = 1_000; // milliseconds
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
private final DeviceListener deviceListener = new InternalDeviceListener();
private Map<ConnectPoint, DataPoint> current = Maps.newConcurrentMap();
private Map<ConnectPoint, DataPoint> previous = Maps.newConcurrentMap();
@Activate
public void activate() {
deviceService.addListener(deviceListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
deviceService.removeListener(deviceListener);
log.info("Stopped");
}
@Override
public Load load(ConnectPoint connectPoint) {
return load(connectPoint, MetricType.BYTES);
}
@Override
public Load load(ConnectPoint connectPoint, MetricType metricType) {
DataPoint c = current.get(connectPoint);
DataPoint p = previous.get(connectPoint);
long now = System.currentTimeMillis();
if (c != null && p != null && (now - c.time < STALE_LIMIT)) {
if (c.time > p.time + SECOND) {
long cve = getEgressValue(c.stats, metricType);
long cvi = getIngressValue(c.stats, metricType);
long pve = getEgressValue(p.stats, metricType);
long pvi = getIngressValue(p.stats, metricType);
//Use max of either Tx or Rx load as the total load of a port
Load load = null;
if (cve >= pve) {
load = new DefaultLoad(cve, pve, (int) (c.time - p.time) / SECOND);
}
if (cvi >= pvi) {
Load rcvLoad = new DefaultLoad(cvi, pvi, (int) (c.time - p.time) / SECOND);
load = ((load == null) || (rcvLoad.rate() > load.rate())) ? rcvLoad : load;
}
return load;
}
}
return null;
}
private long getEgressValue(PortStatistics stats, MetricType metricType) {
return metricType == MetricType.BYTES ? stats.bytesSent() : stats.packetsSent();
}
private long getIngressValue(PortStatistics stats, MetricType metricType) {
return metricType == MetricType.BYTES ? stats.bytesReceived() : stats.packetsReceived();
}
// Monitors port stats update messages.
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceEvent.Type type = event.type();
DeviceId deviceId = event.subject().id();
if (type == PORT_STATS_UPDATED) {
// Update port load
updateDeviceData(deviceId);
} else if (type == DEVICE_REMOVED ||
(type == DEVICE_AVAILABILITY_CHANGED &&
!deviceService.isAvailable(deviceId))) {
// Clean-up all port loads
pruneDeviceData(deviceId);
}
}
}
// Updates the port stats for the specified device
private void updateDeviceData(DeviceId deviceId) {
deviceService.getPortStatistics(deviceId)
.forEach(stats -> updatePortData(deviceId, stats));
}
// Updates the port stats for the specified port
private void updatePortData(DeviceId deviceId, PortStatistics stats) {
ConnectPoint cp = new ConnectPoint(deviceId, portNumber(stats.port()));
DataPoint c = current.get(cp);
// Create a new data point and make it the current one
current.put(cp, new DataPoint(stats));
// If we have a current data point, demote it to previous
if (c != null) {
previous.put(cp, c);
}
}
// Cleans all port loads for the specified device
private void pruneDeviceData(DeviceId deviceId) {
pruneMap(current, deviceId);
pruneMap(previous, deviceId);
}
private void pruneMap(Map<ConnectPoint, DataPoint> map, DeviceId deviceId) {
map.keySet().stream().filter(cp -> deviceId.equals(cp.deviceId()))
.collect(Collectors.toSet()).forEach(map::remove);
}
// Auxiliary data point to track when we receive different samples.
private class DataPoint {
long time;
PortStatistics stats;
DataPoint(PortStatistics stats) {
time = System.currentTimeMillis();
this.stats = stats;
}
}
}