blob: ea2d2e3128f8a177028079e159f033fe00fa0580 [file] [log] [blame]
Thomas Vachuskaf0397b52015-05-29 13:50:17 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.net.impl;
17
18import com.google.common.collect.Maps;
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onosproject.incubator.net.PortStatisticsService;
26import org.onosproject.net.ConnectPoint;
27import org.onosproject.net.DeviceId;
28import org.onosproject.net.device.DeviceEvent;
29import org.onosproject.net.device.DeviceListener;
30import org.onosproject.net.device.DeviceService;
31import org.onosproject.net.device.PortStatistics;
32import org.onosproject.net.statistic.DefaultLoad;
33import org.onosproject.net.statistic.Load;
34import org.slf4j.Logger;
35
36import java.util.Map;
37import java.util.stream.Collectors;
38
39import static org.onosproject.net.PortNumber.portNumber;
40import static org.onosproject.net.device.DeviceEvent.Type.*;
41import static org.slf4j.LoggerFactory.getLogger;
42
43/**
44 * Implementation of the port statistics service.
45 */
46@Component(immediate = true)
47@Service
48public class PortStatisticsManager implements PortStatisticsService {
Thomas Vachuskad910a5c2015-05-29 17:09:59 -070049
Thomas Vachuskaf0397b52015-05-29 13:50:17 -070050 private final Logger log = getLogger(getClass());
51
Thomas Vachuska028ddb92015-06-03 17:36:32 -070052 private static final int SECOND = 1_000; // milliseconds
53 private static final long STALE_LIMIT = 15_000; // milliseconds
Thomas Vachuskad910a5c2015-05-29 17:09:59 -070054
Thomas Vachuskaf0397b52015-05-29 13:50:17 -070055 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
56 protected DeviceService deviceService;
57
58 private final DeviceListener deviceListener = new InternalDeviceListener();
59
60 private Map<ConnectPoint, DataPoint> current = Maps.newConcurrentMap();
61 private Map<ConnectPoint, DataPoint> previous = Maps.newConcurrentMap();
62
63 @Activate
64 public void activate() {
65 deviceService.addListener(deviceListener);
66 log.info("Started");
Thomas Vachuskaf0397b52015-05-29 13:50:17 -070067 }
68
69 @Deactivate
70 public void deactivate() {
71 deviceService.removeListener(deviceListener);
72 log.info("Stopped");
73 }
74
75 @Override
76 public Load load(ConnectPoint connectPoint) {
77 DataPoint c = current.get(connectPoint);
78 DataPoint p = previous.get(connectPoint);
Thomas Vachuska028ddb92015-06-03 17:36:32 -070079 long now = System.currentTimeMillis();
80 if (c != null && p != null && (now - c.time < STALE_LIMIT) &&
81 (c.time > p.time + SECOND) &&
82 (c.stats.bytesSent() - p.stats.bytesSent() >= 0)) {
Thomas Vachuskaf0397b52015-05-29 13:50:17 -070083 return new DefaultLoad(c.stats.bytesSent(), p.stats.bytesSent(),
Thomas Vachuskad910a5c2015-05-29 17:09:59 -070084 (int) (c.time - p.time) / SECOND);
Thomas Vachuskaf0397b52015-05-29 13:50:17 -070085 }
86 return null;
87 }
88
89 // Monitors port stats update messages.
90 private class InternalDeviceListener implements DeviceListener {
91 @Override
92 public void event(DeviceEvent event) {
93 DeviceEvent.Type type = event.type();
94 DeviceId deviceId = event.subject().id();
95 if (type == PORT_STATS_UPDATED) {
96 // Update port load
97 updateDeviceData(deviceId);
98
99 } else if (type == DEVICE_REMOVED ||
100 (type == DEVICE_AVAILABILITY_CHANGED &&
101 !deviceService.isAvailable(deviceId))) {
102 // Clean-up all port loads
103 pruneDeviceData(deviceId);
104 }
105 }
106 }
107
108 // Updates the port stats for the specified device
109 private void updateDeviceData(DeviceId deviceId) {
110 deviceService.getPortStatistics(deviceId)
111 .forEach(stats -> updatePortData(deviceId, stats));
112 }
113
114 // Updates the port stats for the specified port
115 private void updatePortData(DeviceId deviceId, PortStatistics stats) {
116 ConnectPoint cp = new ConnectPoint(deviceId, portNumber(stats.port()));
117
118 // If we have a current data point, demote it to previous
119 DataPoint c = current.get(cp);
120 if (c != null) {
121 previous.put(cp, c);
122 }
123
124 // Create a new data point and make it the current one
125 current.put(cp, new DataPoint(stats));
126 }
127
128 // Cleans all port loads for the specified device
129 private void pruneDeviceData(DeviceId deviceId) {
130 pruneMap(current, deviceId);
131 pruneMap(previous, deviceId);
132 }
133
134 private void pruneMap(Map<ConnectPoint, DataPoint> map, DeviceId deviceId) {
135 map.keySet().stream().filter(cp -> deviceId.equals(cp.deviceId()))
136 .collect(Collectors.toSet()).forEach(map::remove);
137 }
138
139 // Auxiliary data point to track when we receive different samples.
140 private class DataPoint {
141 long time;
142 PortStatistics stats;
143
144 DataPoint(PortStatistics stats) {
145 time = System.currentTimeMillis();
146 this.stats = stats;
147 }
148 }
149
150}