blob: 79cb4c7d50e3b68ee1853cc7d32edce199d73c14 [file] [log] [blame]
Boyoung Jeong9e8faec2018-06-17 21:19:23 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Boyoung Jeong1cca5e82018-08-01 21:00:08 +090018import com.google.common.collect.Maps;
Jian Li0bbbb1c2018-06-22 22:01:17 +090019import com.google.common.collect.Sets;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
Jian Li753280e2018-07-03 02:24:34 +090023import org.apache.felix.scr.annotations.Modified;
24import org.apache.felix.scr.annotations.Property;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090028import org.onlab.packet.IpAddress;
Jian Li753280e2018-07-03 02:24:34 +090029import org.onlab.packet.IpPrefix;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090030import org.onlab.packet.MacAddress;
31import org.onlab.packet.VlanId;
Jian Lia4947682018-07-07 14:53:32 +090032import org.onlab.util.SharedScheduledExecutors;
Jian Lie6110b72018-07-06 19:06:36 +090033import org.onosproject.cfg.ComponentConfigService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090034import org.onosproject.core.ApplicationId;
35import org.onosproject.core.CoreService;
Jian Li85573f42018-06-27 22:29:14 +090036import org.onosproject.mastership.MastershipService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090037import org.onosproject.net.DeviceId;
38import org.onosproject.net.Host;
Jian Lia4947682018-07-07 14:53:32 +090039import org.onosproject.net.PortNumber;
40import org.onosproject.net.device.DeviceService;
41import org.onosproject.net.device.PortStatistics;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090042import org.onosproject.net.flow.DefaultFlowRule;
43import org.onosproject.net.flow.DefaultTrafficSelector;
44import org.onosproject.net.flow.DefaultTrafficTreatment;
45import org.onosproject.net.flow.FlowEntry;
46import org.onosproject.net.flow.FlowRule;
47import org.onosproject.net.flow.FlowRuleOperations;
48import org.onosproject.net.flow.FlowRuleOperationsContext;
49import org.onosproject.net.flow.FlowRuleService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090050import org.onosproject.net.flow.TrafficSelector;
51import org.onosproject.net.flow.TrafficTreatment;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090052import org.onosproject.net.flow.criteria.IPCriterion;
53import org.onosproject.net.flow.criteria.IPProtocolCriterion;
54import org.onosproject.net.flow.criteria.TcpPortCriterion;
55import org.onosproject.net.flow.criteria.UdpPortCriterion;
56import org.onosproject.net.host.HostService;
Jian Lia4947682018-07-07 14:53:32 +090057import org.onosproject.openstacknetworking.api.InstancePort;
58import org.onosproject.openstacknetworking.api.InstancePortService;
Jian Li753280e2018-07-03 02:24:34 +090059import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
Jian Lia4947682018-07-07 14:53:32 +090060import org.onosproject.openstacknode.api.OpenstackNode;
61import org.onosproject.openstacknode.api.OpenstackNodeService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090062import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li0bbbb1c2018-06-22 22:01:17 +090063import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090064import org.onosproject.openstacktelemetry.api.StatsFlowRule;
65import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
66import org.onosproject.openstacktelemetry.api.StatsInfo;
Jian Li753280e2018-07-03 02:24:34 +090067import org.osgi.service.component.ComponentContext;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090068import org.slf4j.Logger;
69import org.slf4j.LoggerFactory;
70
Jian Li753280e2018-07-03 02:24:34 +090071import java.util.Dictionary;
Boyoung Jeong1cca5e82018-08-01 21:00:08 +090072import java.util.LinkedList;
Jian Lia4947682018-07-07 14:53:32 +090073import java.util.List;
Boyoung Jeong1cca5e82018-08-01 21:00:08 +090074import java.util.Map;
Jian Li85573f42018-06-27 22:29:14 +090075import java.util.Optional;
Boyoung Jeong1cca5e82018-08-01 21:00:08 +090076import java.util.Queue;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090077import java.util.Set;
Jian Lia4947682018-07-07 14:53:32 +090078import java.util.concurrent.ScheduledFuture;
79import java.util.concurrent.TimeUnit;
80import java.util.stream.Collectors;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090081
Jian Li0bbbb1c2018-06-22 22:01:17 +090082import static org.onlab.packet.Ethernet.TYPE_IPV4;
83import static org.onlab.packet.IPv4.PROTOCOL_TCP;
84import static org.onlab.packet.IPv4.PROTOCOL_UDP;
85import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
86import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_SRC;
87import static org.onosproject.net.flow.criteria.Criterion.Type.IP_PROTO;
88import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_DST;
89import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_SRC;
90import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_DST;
91import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_SRC;
Jian Li753280e2018-07-03 02:24:34 +090092import static org.onosproject.openstacknetworking.api.Constants.STAT_FLAT_OUTBOUND_TABLE;
Jian Li0bbbb1c2018-06-22 22:01:17 +090093import static org.onosproject.openstacknetworking.api.Constants.STAT_INBOUND_TABLE;
94import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TABLE;
Jian Li753280e2018-07-03 02:24:34 +090095import static org.onosproject.openstacknetworking.api.Constants.VTAP_FLAT_OUTBOUND_TABLE;
Jian Li87ded822018-07-02 18:31:22 +090096import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
97import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
Jian Lia4947682018-07-07 14:53:32 +090098import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
Boyoung Jeong1cca5e82018-08-01 21:00:08 +090099import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_DATA_POINT_SIZE;
Jian Lie6110b72018-07-06 19:06:36 +0900100import static org.onosproject.openstacktelemetry.api.Constants.FLAT;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900101import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
Jian Lie6110b72018-07-06 19:06:36 +0900102import static org.onosproject.openstacktelemetry.api.Constants.VLAN;
103import static org.onosproject.openstacktelemetry.api.Constants.VXLAN;
Jian Li753280e2018-07-03 02:24:34 +0900104import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900105
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900106/**
107 * Flow rule manager for network statistics of a VM.
108 */
109@Component(immediate = true)
110@Service
111public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
112
113 private final Logger log = LoggerFactory.getLogger(getClass());
114
115 private static final byte FLOW_TYPE_SONA = 1; // VLAN
116
Ray Milkeybcc53d32018-07-02 10:22:57 -0700117 private static final long MILLISECONDS = 1000L;
Jian Lia4947682018-07-07 14:53:32 +0900118 private static final long INITIAL_DELAY = 5L;
Ray Milkeybcc53d32018-07-02 10:22:57 -0700119 private static final long REFRESH_INTERVAL = 5L;
Jian Lia4947682018-07-07 14:53:32 +0900120 private static final TimeUnit TIME_UNIT_SECOND = TimeUnit.SECONDS;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900121
Jian Li753280e2018-07-03 02:24:34 +0900122 private static final String REVERSE_PATH_STATS = "reversePathStats";
123 private static final String EGRESS_STATS = "egressStats";
Jian Lia4947682018-07-07 14:53:32 +0900124 private static final String PORT_STATS = "portStats";
Jian Li753280e2018-07-03 02:24:34 +0900125
126 private static final boolean DEFAULT_REVERSE_PATH_STATS = false;
127 private static final boolean DEFAULT_EGRESS_STATS = false;
Jian Lia4947682018-07-07 14:53:32 +0900128 private static final boolean DEFAULT_PORT_STATS = true;
Jian Li753280e2018-07-03 02:24:34 +0900129
Jian Lia4947682018-07-07 14:53:32 +0900130 private static final String ARBITRARY_IP = "0.0.0.0/32";
131 private static final int ARBITRARY_LENGTH = 32;
132 private static final String ARBITRARY_MAC = "00:00:00:00:00:00";
Boyoung Jeong1cca5e82018-08-01 21:00:08 +0900133 private static final MacAddress NO_HOST_MAC = MacAddress.valueOf(ARBITRARY_MAC);
Jian Lia4947682018-07-07 14:53:32 +0900134 private static final int ARBITRARY_IN_INTF = 0;
135 private static final int ARBITRARY_OUT_INTF = 0;
136
137 private static final boolean RECOVER_FROM_FAILURE = true;
138
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected CoreService coreService;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143 protected FlowRuleService flowRuleService;
144
145 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
146 protected HostService hostService;
147
Jian Li0bbbb1c2018-06-22 22:01:17 +0900148 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Lia4947682018-07-07 14:53:32 +0900149 protected DeviceService deviceService;
150
151 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Lie6110b72018-07-06 19:06:36 +0900152 protected ComponentConfigService componentConfigService;
Jian Li0bbbb1c2018-06-22 22:01:17 +0900153
154 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Li85573f42018-06-27 22:29:14 +0900155 protected MastershipService mastershipService;
156
157 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Li753280e2018-07-03 02:24:34 +0900158 protected OpenstackNetworkService osNetworkService;
159
160 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Lia4947682018-07-07 14:53:32 +0900161 protected InstancePortService instPortService;
162
163 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
164 protected OpenstackNodeService osNodeService;
165
166 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Li0bbbb1c2018-06-22 22:01:17 +0900167 protected OpenstackTelemetryService telemetryService;
168
Jian Li753280e2018-07-03 02:24:34 +0900169 @Property(name = REVERSE_PATH_STATS, boolValue = DEFAULT_REVERSE_PATH_STATS,
170 label = "A flag which indicates whether to install the rules for " +
171 "collecting the flow-based stats for reversed path.")
172 private boolean reversePathStats = DEFAULT_REVERSE_PATH_STATS;
173
174 @Property(name = EGRESS_STATS, boolValue = DEFAULT_EGRESS_STATS,
175 label = "A flag which indicates whether to install the rules for " +
176 "collecting the flow-based stats for egress port.")
177 private boolean egressStats = DEFAULT_EGRESS_STATS;
178
Jian Lia4947682018-07-07 14:53:32 +0900179 @Property(name = PORT_STATS, boolValue = DEFAULT_PORT_STATS,
180 label = "A flag which indicates whether to collect port TX & RX stats.")
181 private boolean portStats = DEFAULT_PORT_STATS;
182
Jian Li753280e2018-07-03 02:24:34 +0900183 private ApplicationId appId;
Jian Lia4947682018-07-07 14:53:32 +0900184 private TelemetryCollector collector;
Jian Lia4947682018-07-07 14:53:32 +0900185 private ScheduledFuture result;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900186
Jian Li0bbbb1c2018-06-22 22:01:17 +0900187 private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
Boyoung Jeong1cca5e82018-08-01 21:00:08 +0900188 private final Map<String, Queue<FlowInfo>> flowInfoMap = Maps.newConcurrentMap();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900189
190 private static final int SOURCE_ID = 1;
191 private static final int TARGET_ID = 2;
192 private static final int PRIORITY_BASE = 10000;
193 private static final int METRIC_PRIORITY_SOURCE = SOURCE_ID * PRIORITY_BASE;
194 private static final int METRIC_PRIORITY_TARGET = TARGET_ID * PRIORITY_BASE;
195
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900196 @Activate
197 protected void activate() {
198 appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
Jian Lie6110b72018-07-06 19:06:36 +0900199 componentConfigService.registerProperties(getClass());
Jian Libd295cd2018-07-22 11:53:57 +0900200 start();
Jian Li0bbbb1c2018-06-22 22:01:17 +0900201
202 log.info("Started");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900203 }
204
205 @Deactivate
206 protected void deactivate() {
Jian Lie6110b72018-07-06 19:06:36 +0900207 componentConfigService.unregisterProperties(getClass(), false);
Jian Lia4947682018-07-07 14:53:32 +0900208 flowRuleService.removeFlowRulesById(appId);
Jian Libd295cd2018-07-22 11:53:57 +0900209 stop();
Jian Lia4947682018-07-07 14:53:32 +0900210
Jian Li0bbbb1c2018-06-22 22:01:17 +0900211 log.info("Stopped");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900212 }
213
Jian Li753280e2018-07-03 02:24:34 +0900214 @Modified
215 protected void modified(ComponentContext context) {
216 readComponentConfiguration(context);
217
218 log.info("Modified");
219 }
220
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900221 @Override
222 public void start() {
223 log.info("Start publishing thread");
Jian Lia4947682018-07-07 14:53:32 +0900224 collector = new TelemetryCollector();
225
Jian Libd295cd2018-07-22 11:53:57 +0900226 result = SharedScheduledExecutors.getSingleThreadExecutor()
227 .scheduleAtFixedRate(collector, INITIAL_DELAY,
Jian Lia4947682018-07-07 14:53:32 +0900228 REFRESH_INTERVAL, TIME_UNIT_SECOND, RECOVER_FROM_FAILURE);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900229 }
230
231 @Override
232 public void stop() {
233 log.info("Stop data publishing thread");
Jian Lia4947682018-07-07 14:53:32 +0900234 result.cancel(true);
235 collector = null;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900236 }
237
Jian Li0bbbb1c2018-06-22 22:01:17 +0900238 @Override
239 public void createStatFlowRule(StatsFlowRule statsFlowRule) {
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900240
Jian Li0bbbb1c2018-06-22 22:01:17 +0900241 setStatFlowRule(statsFlowRule, true);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900242 }
243
244 @Override
245 public void deleteStatFlowRule(StatsFlowRule statsFlowRule) {
Jian Li0bbbb1c2018-06-22 22:01:17 +0900246
247 setStatFlowRule(statsFlowRule, false);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900248 }
249
Jian Lia4947682018-07-07 14:53:32 +0900250 /**
251 * Gets a set of the flow infos.
252 *
253 * @return a set of flow infos
254 */
255 @Override
256 public Set<FlowInfo> getFlowInfos() {
257 Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
258
259 // obtain all flow rule entries installed by telemetry app
260 for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
261 FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
262 TrafficSelector selector = entry.selector();
263
264 IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
265 IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
266 IPProtocolCriterion ipProtocol =
267 (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
268
Jian Lia4947682018-07-07 14:53:32 +0900269 fBuilder.withFlowType(FLOW_TYPE_SONA)
270 .withSrcIp(srcIp.ip())
271 .withDstIp(dstIp.ip());
272
273 if (ipProtocol != null) {
274 fBuilder.withProtocol((byte) ipProtocol.protocol());
275
276 if (ipProtocol.protocol() == PROTOCOL_TCP) {
277 TcpPortCriterion tcpSrc =
278 (TcpPortCriterion) selector.getCriterion(TCP_SRC);
279 TcpPortCriterion tcpDst =
280 (TcpPortCriterion) selector.getCriterion(TCP_DST);
Jian Lia4947682018-07-07 14:53:32 +0900281 fBuilder.withSrcPort(tcpSrc.tcpPort());
282 fBuilder.withDstPort(tcpDst.tcpPort());
Jian Lia4947682018-07-07 14:53:32 +0900283 } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
Jian Lia4947682018-07-07 14:53:32 +0900284 UdpPortCriterion udpSrc =
285 (UdpPortCriterion) selector.getCriterion(UDP_SRC);
286 UdpPortCriterion udpDst =
287 (UdpPortCriterion) selector.getCriterion(UDP_DST);
Jian Lia4947682018-07-07 14:53:32 +0900288 fBuilder.withSrcPort(udpSrc.udpPort());
289 fBuilder.withDstPort(udpDst.udpPort());
290 } else {
291 log.debug("Other protocol: {}", ipProtocol.protocol());
292 }
293 }
294
295 fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
296 .withDstMac(getMacAddress(dstIp.ip().address()))
297 .withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
298 .withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
299 .withVlanId(getVlanId(srcIp.ip().address()))
300 .withDeviceId(entry.deviceId());
301
302 StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
303
304 // TODO: need to collect error and drop packets stats
305 // TODO: need to make the refresh interval configurable
306 sBuilder.withStartupTime(System.currentTimeMillis())
307 .withFstPktArrTime(System.currentTimeMillis())
308 .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
309 .withCurrAccPkts((int) entry.packets())
310 .withCurrAccBytes(entry.bytes())
311 .withErrorPkts((short) 0)
312 .withDropPkts((short) 0);
313
314 fBuilder.withStatsInfo(sBuilder.build());
315
316 FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
317
318 flowInfos.add(flowInfo);
319
320 log.debug("FlowInfo: \n{}", flowInfo.toString());
321 }
322
323 return flowInfos;
324 }
325
326 /**
327 * Gets a set of flow infos by referring to destination VM port.
328 *
329 * @return flow infos
330 */
331 private Set<FlowInfo> getDstPortBasedFlowInfos() {
332 Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
333 Set<PortNumber> instPortNums = instPortService.instancePorts()
334 .stream()
335 .map(InstancePort::portNumber)
336 .collect(Collectors.toSet());
337 Set<DeviceId> deviceIds = osNodeService.completeNodes(COMPUTE)
338 .stream()
339 .map(OpenstackNode::intgBridge)
340 .collect(Collectors.toSet());
341
342 deviceIds.forEach(d -> {
343 List<PortStatistics> stats =
344 deviceService.getPortStatistics(d)
345 .stream()
346 .filter(s -> instPortNums.contains(s.portNumber()))
347 .collect(Collectors.toList());
348
349 stats.forEach(s -> {
350 InstancePort instPort = getInstancePort(d, s.portNumber());
351 flowInfos.add(buildTxPortInfo(instPort, s));
352 flowInfos.add(buildRxPortInfo(instPort, s));
353 });
354 });
355
356 return flowInfos;
357 }
358
359 /**
360 * Obtains the flow info generated by TX port.
361 *
362 * @param instPort instance port
363 * @param stat port statistics
364 * @return flow info
365 */
366 private FlowInfo buildTxPortInfo(InstancePort instPort, PortStatistics stat) {
367 FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
368
369 fBuilder.withFlowType(FLOW_TYPE_SONA)
370 .withSrcIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
371 .withDstIp(IpPrefix.valueOf(ARBITRARY_IP))
372 .withSrcMac(instPort.macAddress())
Boyoung Jeong1cca5e82018-08-01 21:00:08 +0900373 .withDstMac(NO_HOST_MAC)
Jian Lia4947682018-07-07 14:53:32 +0900374 .withDeviceId(instPort.deviceId())
375 .withInputInterfaceId(ARBITRARY_IN_INTF)
376 .withOutputInterfaceId(ARBITRARY_OUT_INTF)
377 .withVlanId(VlanId.vlanId());
378
379 StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
380 sBuilder.withStartupTime(System.currentTimeMillis())
381 .withFstPktArrTime(System.currentTimeMillis())
382 .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
383 .withCurrAccPkts((int) stat.packetsSent())
384 .withCurrAccBytes(stat.bytesSent())
385 .withErrorPkts((short) stat.packetsTxErrors())
386 .withDropPkts((short) stat.packetsTxDropped());
387
388 fBuilder.withStatsInfo(sBuilder.build());
389
390 return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
391 }
392
393 /**
394 * Obtains the flow info generated by RX port.
395 *
396 * @param instPort instance port
397 * @param stat port statistics
398 * @return flow info
399 */
400 private FlowInfo buildRxPortInfo(InstancePort instPort, PortStatistics stat) {
401 FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
402
403 fBuilder.withFlowType(FLOW_TYPE_SONA)
404 .withSrcIp(IpPrefix.valueOf(ARBITRARY_IP))
405 .withDstIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
Boyoung Jeong1cca5e82018-08-01 21:00:08 +0900406 .withSrcMac(NO_HOST_MAC)
Jian Lia4947682018-07-07 14:53:32 +0900407 .withDstMac(instPort.macAddress())
408 .withDeviceId(instPort.deviceId())
409 .withInputInterfaceId(ARBITRARY_IN_INTF)
410 .withOutputInterfaceId(ARBITRARY_OUT_INTF)
411 .withVlanId(VlanId.vlanId());
412
413 StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
414 sBuilder.withStartupTime(System.currentTimeMillis())
415 .withFstPktArrTime(System.currentTimeMillis())
416 .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
417 .withCurrAccPkts((int) stat.packetsReceived())
418 .withCurrAccBytes(stat.bytesReceived())
419 .withErrorPkts((short) stat.packetsRxErrors())
420 .withDropPkts((short) stat.packetsRxDropped());
421
422 fBuilder.withStatsInfo(sBuilder.build());
423
424 return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
425 }
426
427 /**
428 * Obtains instance port which associated with the given device identifier
429 * and port number.
430 *
431 * @param deviceId device identifier
432 * @param portNumber port number
433 * @return instance port
434 */
435 private InstancePort getInstancePort(DeviceId deviceId, PortNumber portNumber) {
436 return instPortService.instancePorts().stream()
437 .filter(p -> p.deviceId().equals(deviceId))
438 .filter(p -> p.portNumber().equals(portNumber))
439 .findFirst().orElse(null);
440 }
441
Jian Li0bbbb1c2018-06-22 22:01:17 +0900442 private void connectTables(DeviceId deviceId, int fromTable, int toTable,
443 StatsFlowRule statsFlowRule, int rulePriority,
444 boolean install) {
445
Jian Li0bbbb1c2018-06-22 22:01:17 +0900446 int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
447 int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
448 int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
449 byte protocol = statsFlowRule.ipProtocol();
450
451 TrafficSelector.Builder selectorBuilder =
Jian Li753280e2018-07-03 02:24:34 +0900452 DefaultTrafficSelector.builder()
453 .matchEthType(TYPE_IPV4)
454 .matchIPSrc(statsFlowRule.srcIpPrefix())
455 .matchIPDst(statsFlowRule.dstIpPrefix());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900456
457 if (protocol == PROTOCOL_TCP) {
458 selectorBuilder = selectorBuilder
Jian Li753280e2018-07-03 02:24:34 +0900459 .matchIPProtocol(statsFlowRule.ipProtocol())
460 .matchTcpSrc(statsFlowRule.srcTpPort())
461 .matchTcpDst(statsFlowRule.dstTpPort());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900462
463 } else if (protocol == PROTOCOL_UDP) {
464 selectorBuilder = selectorBuilder
Jian Li753280e2018-07-03 02:24:34 +0900465 .matchIPProtocol(statsFlowRule.ipProtocol())
466 .matchUdpSrc(statsFlowRule.srcTpPort())
467 .matchUdpDst(statsFlowRule.dstTpPort());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900468 } else {
469 log.warn("Unsupported protocol {}", statsFlowRule.ipProtocol());
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900470 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900471
472 TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
473
474 treatmentBuilder.transition(toTable);
475
476 FlowRule flowRule = DefaultFlowRule.builder()
Jian Li753280e2018-07-03 02:24:34 +0900477 .forDevice(deviceId)
478 .withSelector(selectorBuilder.build())
479 .withTreatment(treatmentBuilder.build())
480 .withPriority(prefixLength)
481 .fromApp(appId)
482 .makePermanent()
483 .forTable(fromTable)
484 .build();
Jian Li0bbbb1c2018-06-22 22:01:17 +0900485
486 applyRule(flowRule, install);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900487 }
488
489 /**
Jian Li0bbbb1c2018-06-22 22:01:17 +0900490 * Installs stats related flow rule to switch.
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900491 *
Jian Li0bbbb1c2018-06-22 22:01:17 +0900492 * @param flowRule flow rule
493 * @param install flag to install or not
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900494 */
495 private void applyRule(FlowRule flowRule, boolean install) {
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900496 FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
Jian Li0bbbb1c2018-06-22 22:01:17 +0900497 flowOpsBuilder = install ?
498 flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900499
500 flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
501 @Override
502 public void onSuccess(FlowRuleOperations ops) {
Jian Lia4947682018-07-07 14:53:32 +0900503 log.debug("Install rules for telemetry stats: \n {}",
504 ops.toString());
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900505 }
506
507 @Override
508 public void onError(FlowRuleOperations ops) {
Jian Lia4947682018-07-07 14:53:32 +0900509 log.debug("Failed to install rules for telemetry stats: \n {}",
510 ops.toString());
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900511 }
512 }));
513 }
514
515 /**
Jian Li0bbbb1c2018-06-22 22:01:17 +0900516 * Merges old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900517 *
518 * @param flowInfo current FlowInfo object
519 * @param fBuilder Builder for FlowInfo
520 * @param sBuilder Builder for StatsInfo
521 * @return Merged FlowInfo object
522 */
523 private FlowInfo mergeFlowInfo(FlowInfo flowInfo,
524 FlowInfo.Builder fBuilder,
525 StatsInfo.Builder sBuilder) {
Jian Li0bbbb1c2018-06-22 22:01:17 +0900526 for (FlowInfo gFlowInfo : gFlowInfoSet) {
527 log.debug("Old FlowInfo:\n{}", gFlowInfo.toString());
528 if (gFlowInfo.roughEquals(flowInfo)) {
529
530 // Get old StatsInfo object and merge the value to current object.
531 StatsInfo oldStatsInfo = gFlowInfo.statsInfo();
532 sBuilder.withPrevAccPkts(oldStatsInfo.currAccPkts());
533 sBuilder.withPrevAccBytes(oldStatsInfo.currAccBytes());
534 FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build())
535 .build();
536
537 gFlowInfoSet.remove(gFlowInfo);
538 gFlowInfoSet.add(newFlowInfo);
Jian Li85573f42018-06-27 22:29:14 +0900539 log.debug("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900540 return newFlowInfo;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900541 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900542 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900543
544 // No such record, then build the FlowInfo object and return this object.
Jian Li85573f42018-06-27 22:29:14 +0900545 log.debug("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900546 FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
547 gFlowInfoSet.add(newFlowInfo);
548 return newFlowInfo;
549 }
550
Jian Li753280e2018-07-03 02:24:34 +0900551 /**
552 * Installs flow rules for collecting both normal and reverse path flow stats.
553 *
554 * @param statsFlowRule flow rule used for collecting stats
555 * @param install flow rule installation flag
556 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900557 private void setStatFlowRule(StatsFlowRule statsFlowRule, boolean install) {
Jian Li753280e2018-07-03 02:24:34 +0900558 setStatFlowRuleBase(statsFlowRule, install);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900559
Jian Li753280e2018-07-03 02:24:34 +0900560 // if reverse path stats is enabled, we will install flow rules for
561 // collecting reverse path vFlow stats
562 if (reversePathStats) {
563 StatsFlowRule reverseFlowRule = DefaultStatsFlowRule.builder()
564 .srcIpPrefix(statsFlowRule.dstIpPrefix())
565 .dstIpPrefix(statsFlowRule.srcIpPrefix())
566 .ipProtocol(statsFlowRule.ipProtocol())
567 .srcTpPort(statsFlowRule.dstTpPort())
568 .dstTpPort(statsFlowRule.srcTpPort())
569 .build();
570 setStatFlowRuleBase(reverseFlowRule, install);
571 }
572 }
573
574 /**
575 * A base method which is for installing flow rules for collecting stats.
576 *
577 * @param statsFlowRule flow rule used for collecting stats
578 * @param install flow rule installation flag
579 */
580 private void setStatFlowRuleBase(StatsFlowRule statsFlowRule, boolean install) {
Jian Lie6110b72018-07-06 19:06:36 +0900581
582 IpPrefix srcIp = statsFlowRule.srcIpPrefix();
583 IpPrefix dstIp = statsFlowRule.dstIpPrefix();
584 DeviceId srcDeviceId = getDeviceId(srcIp.address());
585 DeviceId dstDeviceId = getDeviceId(dstIp.address());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900586
Jian Li998ec7b2018-06-29 15:15:49 +0900587 if (srcDeviceId == null && dstDeviceId == null) {
Jian Li85573f42018-06-27 22:29:14 +0900588 return;
589 }
590
Jian Li998ec7b2018-06-29 15:15:49 +0900591 if (srcDeviceId != null) {
Jian Li87ded822018-07-02 18:31:22 +0900592 connectTables(srcDeviceId, STAT_INBOUND_TABLE, VTAP_INBOUND_TABLE,
Jian Li998ec7b2018-06-29 15:15:49 +0900593 statsFlowRule, METRIC_PRIORITY_SOURCE, install);
Jian Li998ec7b2018-06-29 15:15:49 +0900594
Jian Lie6110b72018-07-06 19:06:36 +0900595 if (install) {
596 log.info("Install ingress stat flow rule for SrcIp:{} DstIp:{}",
597 srcIp.toString(), dstIp.toString());
598 } else {
599 log.info("Remove ingress stat flow rule for SrcIp:{} DstIp:{}",
600 srcIp.toString(), dstIp.toString());
Jian Li753280e2018-07-03 02:24:34 +0900601 }
Jian Li998ec7b2018-06-29 15:15:49 +0900602 }
Jian Li85573f42018-06-27 22:29:14 +0900603
Jian Lie6110b72018-07-06 19:06:36 +0900604 Set<IpPrefix> vxlanIps = osNetworkService.getFixedIpsByNetworkType(VXLAN);
605 Set<IpPrefix> vlanIps = osNetworkService.getFixedIpsByNetworkType(VLAN);
606 Set<IpPrefix> flatIps = osNetworkService.getFixedIpsByNetworkType(FLAT);
Jian Li753280e2018-07-03 02:24:34 +0900607
Jian Lie6110b72018-07-06 19:06:36 +0900608 int fromTable, toTable;
Jian Li753280e2018-07-03 02:24:34 +0900609
Jian Lie6110b72018-07-06 19:06:36 +0900610 if (dstDeviceId != null && egressStats) {
611
612 IpPrefix dstIpPrefix = statsFlowRule.dstIpPrefix();
613
614 if (vxlanIps.contains(dstIpPrefix) || vlanIps.contains(dstIpPrefix)) {
615 fromTable = STAT_OUTBOUND_TABLE;
616 toTable = VTAP_OUTBOUND_TABLE;
617 } else if (flatIps.contains(dstIpPrefix)) {
618 fromTable = STAT_FLAT_OUTBOUND_TABLE;
619 toTable = VTAP_FLAT_OUTBOUND_TABLE;
620 } else {
621 return;
622 }
623
624 connectTables(dstDeviceId, fromTable, toTable,
625 statsFlowRule, METRIC_PRIORITY_TARGET, install);
626
627 if (install) {
628 log.info("Install egress stat flow rule for SrcIp:{} DstIp:{}",
629 srcIp.toString(), dstIp.toString());
630 } else {
631 log.info("Remove egress stat flow rule for SrcIp:{} DstIp:{}",
632 srcIp.toString(), dstIp.toString());
633 }
Jian Li753280e2018-07-03 02:24:34 +0900634 }
Jian Li753280e2018-07-03 02:24:34 +0900635 }
636
637 /**
Jian Li85573f42018-06-27 22:29:14 +0900638 * Get Device ID which the VM is located.
639 *
640 * @param ipAddress IP Address of host
641 * @return Device ID
642 */
643 private DeviceId getDeviceId(IpAddress ipAddress) {
644 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
645 Optional<Host> host = hostService.getHostsByIp(ipAddress).stream().findAny();
646 return host.map(host1 -> host1.location().deviceId()).orElse(null);
647 } else {
Jian Lia4947682018-07-07 14:53:32 +0900648 log.debug("No DeviceID is associated to {}", ipAddress.toString());
Jian Li85573f42018-06-27 22:29:14 +0900649 return null;
Jian Li0bbbb1c2018-06-22 22:01:17 +0900650 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900651 }
652
653 /**
654 * Get VLAN ID with respect to IP Address.
655 *
656 * @param ipAddress IP Address of host
657 * @return VLAN ID
658 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900659 private VlanId getVlanId(IpAddress ipAddress) {
660 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
661 Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
662 return host.vlan();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900663 }
664 return VlanId.vlanId();
665 }
666
667 /**
668 * Get Interface ID of Switch which is connected to a host.
669 *
670 * @param ipAddress IP Address of host
671 * @return Interface ID of Switch
672 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900673 private int getInterfaceId(IpAddress ipAddress) {
674 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
675 Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
676 return (int) host.location().port().toLong();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900677 }
678 return -1;
679 }
680
681 /**
682 * Get MAC Address of host.
683 *
684 * @param ipAddress IP Address of host
685 * @return MAC Address of host
686 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900687 private MacAddress getMacAddress(IpAddress ipAddress) {
688 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
689 Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
690 return host.mac();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900691 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900692
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900693 return NO_HOST_MAC;
694 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900695
Boyoung Jeong1cca5e82018-08-01 21:00:08 +0900696 private void enqFlowInfo(FlowInfo flowInfo) {
697 String key = flowInfo.uniqueFlowInfoKey();
698 Queue<FlowInfo> queue = flowInfoMap.get(key);
699 if (queue == null) {
700 Queue<FlowInfo> newQueue = new LinkedList<FlowInfo>();
701 newQueue.offer(flowInfo);
702 flowInfoMap.put(key, newQueue);
703 return;
704 }
705 queue.offer(flowInfo);
706
707 while (queue.size() > DEFAULT_DATA_POINT_SIZE) {
708 queue.remove(); // Removes a garbage data in the queue.
709 }
710 }
711
712 public Map<String, Queue<FlowInfo>> getFlowInfoMap() {
713 return flowInfoMap;
714 }
715
Jian Li753280e2018-07-03 02:24:34 +0900716 /**
717 * Extracts properties from the component configuration context.
718 *
719 * @param context the component context
720 */
721 private void readComponentConfiguration(ComponentContext context) {
722 Dictionary<?, ?> properties = context.getProperties();
723
724 Boolean reversePathStatsConfigured =
725 getBooleanProperty(properties, REVERSE_PATH_STATS);
726 if (reversePathStatsConfigured == null) {
727 reversePathStats = DEFAULT_REVERSE_PATH_STATS;
728 log.info("Reversed path stats flag is NOT " +
729 "configured, default value is {}", reversePathStats);
730 } else {
731 reversePathStats = reversePathStatsConfigured;
732 log.info("Configured. Reversed path stats flag is {}", reversePathStats);
733 }
734
735 Boolean egressStatsConfigured = getBooleanProperty(properties, EGRESS_STATS);
736 if (egressStatsConfigured == null) {
737 egressStats = DEFAULT_EGRESS_STATS;
738 log.info("Egress stats flag is NOT " +
739 "configured, default value is {}", egressStats);
740 } else {
741 egressStats = egressStatsConfigured;
742 log.info("Configured. Egress stats flag is {}", egressStats);
743 }
Jian Lia4947682018-07-07 14:53:32 +0900744
745 Boolean portStatsConfigured = getBooleanProperty(properties, PORT_STATS);
746 if (portStatsConfigured == null) {
747 portStats = DEFAULT_PORT_STATS;
748 log.info("Port stats flag is NOT " +
749 "configured, default value is {}", portStats);
750 } else {
751 portStats = portStatsConfigured;
752 log.info("Configured. Port stats flag is {}", portStats);
753 }
Jian Li753280e2018-07-03 02:24:34 +0900754 }
755
Jian Lia4947682018-07-07 14:53:32 +0900756 private class TelemetryCollector implements Runnable {
Jian Li0bbbb1c2018-06-22 22:01:17 +0900757 @Override
758 public void run() {
Jian Li85573f42018-06-27 22:29:14 +0900759 Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
760
761 // we only let the master controller of the device where the
Jian Lia4947682018-07-07 14:53:32 +0900762 // stats flow rules are installed send stats message
763 getFlowInfos().forEach(f -> {
764 if (checkSrcDstLocalMaster(f)) {
Jian Li85573f42018-06-27 22:29:14 +0900765 filteredFlowInfos.add(f);
766 }
767 });
768
Jian Lia4947682018-07-07 14:53:32 +0900769 // we only let the master controller of the device where the port
770 // is located to send stats message
771 if (portStats) {
772 getDstPortBasedFlowInfos().forEach(f -> {
773 if (checkSrcDstLocalMaster(f)) {
774 filteredFlowInfos.add(f);
775 }
776 });
Jian Li0bbbb1c2018-06-22 22:01:17 +0900777 }
Jian Lia4947682018-07-07 14:53:32 +0900778
779 telemetryService.publish(filteredFlowInfos);
Boyoung Jeong1cca5e82018-08-01 21:00:08 +0900780
781 // TODO: Refactor the following code to "TelemetryService" style.
782 filteredFlowInfos.forEach(flowInfo -> {
783 enqFlowInfo(flowInfo);
784 });
Jian Lia4947682018-07-07 14:53:32 +0900785 }
786
787 private boolean checkSrcDstLocalMaster(FlowInfo info) {
788 DeviceId srcDeviceId = getDeviceId(info.srcIp().address());
789 DeviceId dstDeviceId = getDeviceId(info.dstIp().address());
790
791 boolean isSrcLocalMaster = srcDeviceId != null &&
792 mastershipService.isLocalMaster(srcDeviceId);
793 boolean isDstLocalMaster = dstDeviceId != null &&
794 mastershipService.isLocalMaster(dstDeviceId);
795
796 return isSrcLocalMaster || isDstLocalMaster;
Jian Li0bbbb1c2018-06-22 22:01:17 +0900797 }
798 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900799}