blob: 3a92bdf0ce054fd9df24cfc0e1b0adf68ff0130d [file] [log] [blame]
Boyoung Jeong9e8faec2018-06-17 21:19:23 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li0bbbb1c2018-06-22 22:01:17 +090018import com.google.common.collect.Sets;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
Jian Li753280e2018-07-03 02:24:34 +090022import org.apache.felix.scr.annotations.Modified;
23import org.apache.felix.scr.annotations.Property;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090024import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090027import org.onlab.packet.IpAddress;
Jian Li753280e2018-07-03 02:24:34 +090028import org.onlab.packet.IpPrefix;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090029import org.onlab.packet.MacAddress;
30import org.onlab.packet.VlanId;
Jian Lia4947682018-07-07 14:53:32 +090031import org.onlab.util.SharedScheduledExecutors;
Jian Lie6110b72018-07-06 19:06:36 +090032import org.onosproject.cfg.ComponentConfigService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090033import org.onosproject.core.ApplicationId;
34import org.onosproject.core.CoreService;
Jian Li85573f42018-06-27 22:29:14 +090035import org.onosproject.mastership.MastershipService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090036import org.onosproject.net.DeviceId;
37import org.onosproject.net.Host;
Jian Lia4947682018-07-07 14:53:32 +090038import org.onosproject.net.PortNumber;
39import org.onosproject.net.device.DeviceService;
40import org.onosproject.net.device.PortStatistics;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090041import org.onosproject.net.flow.DefaultFlowRule;
42import org.onosproject.net.flow.DefaultTrafficSelector;
43import org.onosproject.net.flow.DefaultTrafficTreatment;
44import org.onosproject.net.flow.FlowEntry;
45import org.onosproject.net.flow.FlowRule;
46import org.onosproject.net.flow.FlowRuleOperations;
47import org.onosproject.net.flow.FlowRuleOperationsContext;
48import org.onosproject.net.flow.FlowRuleService;
49import org.onosproject.net.flow.IndexTableId;
50import org.onosproject.net.flow.TrafficSelector;
51import org.onosproject.net.flow.TrafficTreatment;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090052import org.onosproject.net.flow.criteria.IPCriterion;
53import org.onosproject.net.flow.criteria.IPProtocolCriterion;
54import org.onosproject.net.flow.criteria.TcpPortCriterion;
55import org.onosproject.net.flow.criteria.UdpPortCriterion;
56import org.onosproject.net.host.HostService;
Jian Lia4947682018-07-07 14:53:32 +090057import org.onosproject.openstacknetworking.api.InstancePort;
58import org.onosproject.openstacknetworking.api.InstancePortService;
Jian Li753280e2018-07-03 02:24:34 +090059import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
Jian Lia4947682018-07-07 14:53:32 +090060import org.onosproject.openstacknode.api.OpenstackNode;
61import org.onosproject.openstacknode.api.OpenstackNodeService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090062import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li0bbbb1c2018-06-22 22:01:17 +090063import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090064import org.onosproject.openstacktelemetry.api.StatsFlowRule;
65import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
66import org.onosproject.openstacktelemetry.api.StatsInfo;
Jian Li753280e2018-07-03 02:24:34 +090067import org.osgi.service.component.ComponentContext;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090068import org.slf4j.Logger;
69import org.slf4j.LoggerFactory;
70
Jian Li753280e2018-07-03 02:24:34 +090071import java.util.Dictionary;
Jian Lia4947682018-07-07 14:53:32 +090072import java.util.List;
Jian Li85573f42018-06-27 22:29:14 +090073import java.util.Optional;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090074import java.util.Set;
Jian Lia4947682018-07-07 14:53:32 +090075import java.util.concurrent.ScheduledFuture;
76import java.util.concurrent.TimeUnit;
77import java.util.stream.Collectors;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090078
Jian Li0bbbb1c2018-06-22 22:01:17 +090079import static org.onlab.packet.Ethernet.TYPE_IPV4;
80import static org.onlab.packet.IPv4.PROTOCOL_TCP;
81import static org.onlab.packet.IPv4.PROTOCOL_UDP;
82import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
83import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_SRC;
84import static org.onosproject.net.flow.criteria.Criterion.Type.IP_PROTO;
85import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_DST;
86import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_SRC;
87import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_DST;
88import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_SRC;
Jian Li753280e2018-07-03 02:24:34 +090089import static org.onosproject.openstacknetworking.api.Constants.STAT_FLAT_OUTBOUND_TABLE;
Jian Li0bbbb1c2018-06-22 22:01:17 +090090import static org.onosproject.openstacknetworking.api.Constants.STAT_INBOUND_TABLE;
91import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TABLE;
Jian Li753280e2018-07-03 02:24:34 +090092import static org.onosproject.openstacknetworking.api.Constants.VTAP_FLAT_OUTBOUND_TABLE;
Jian Li87ded822018-07-02 18:31:22 +090093import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
94import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
Jian Lia4947682018-07-07 14:53:32 +090095import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
Jian Lie6110b72018-07-06 19:06:36 +090096import static org.onosproject.openstacktelemetry.api.Constants.FLAT;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090097import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
Jian Lie6110b72018-07-06 19:06:36 +090098import static org.onosproject.openstacktelemetry.api.Constants.VLAN;
99import static org.onosproject.openstacktelemetry.api.Constants.VXLAN;
Jian Li753280e2018-07-03 02:24:34 +0900100import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900101
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900102/**
103 * Flow rule manager for network statistics of a VM.
104 */
105@Component(immediate = true)
106@Service
107public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
108
109 private final Logger log = LoggerFactory.getLogger(getClass());
110
111 private static final byte FLOW_TYPE_SONA = 1; // VLAN
112
Ray Milkeybcc53d32018-07-02 10:22:57 -0700113 private static final long MILLISECONDS = 1000L;
Jian Lia4947682018-07-07 14:53:32 +0900114 private static final long INITIAL_DELAY = 5L;
Ray Milkeybcc53d32018-07-02 10:22:57 -0700115 private static final long REFRESH_INTERVAL = 5L;
Jian Lia4947682018-07-07 14:53:32 +0900116 private static final TimeUnit TIME_UNIT_SECOND = TimeUnit.SECONDS;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900117
Jian Li753280e2018-07-03 02:24:34 +0900118 private static final String REVERSE_PATH_STATS = "reversePathStats";
119 private static final String EGRESS_STATS = "egressStats";
Jian Lia4947682018-07-07 14:53:32 +0900120 private static final String PORT_STATS = "portStats";
Jian Li753280e2018-07-03 02:24:34 +0900121
122 private static final boolean DEFAULT_REVERSE_PATH_STATS = false;
123 private static final boolean DEFAULT_EGRESS_STATS = false;
Jian Lia4947682018-07-07 14:53:32 +0900124 private static final boolean DEFAULT_PORT_STATS = true;
Jian Li753280e2018-07-03 02:24:34 +0900125
126 private static final String MAC_NOT_NULL = "MAC should not be null";
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900127
Jian Lia4947682018-07-07 14:53:32 +0900128 private static final String ARBITRARY_IP = "0.0.0.0/32";
129 private static final int ARBITRARY_LENGTH = 32;
130 private static final String ARBITRARY_MAC = "00:00:00:00:00:00";
131 private static final int ARBITRARY_IN_INTF = 0;
132 private static final int ARBITRARY_OUT_INTF = 0;
133
134 private static final boolean RECOVER_FROM_FAILURE = true;
135
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected CoreService coreService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected FlowRuleService flowRuleService;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143 protected HostService hostService;
144
Jian Li0bbbb1c2018-06-22 22:01:17 +0900145 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Lia4947682018-07-07 14:53:32 +0900146 protected DeviceService deviceService;
147
148 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Lie6110b72018-07-06 19:06:36 +0900149 protected ComponentConfigService componentConfigService;
Jian Li0bbbb1c2018-06-22 22:01:17 +0900150
151 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Li85573f42018-06-27 22:29:14 +0900152 protected MastershipService mastershipService;
153
154 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Li753280e2018-07-03 02:24:34 +0900155 protected OpenstackNetworkService osNetworkService;
156
157 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Lia4947682018-07-07 14:53:32 +0900158 protected InstancePortService instPortService;
159
160 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
161 protected OpenstackNodeService osNodeService;
162
163 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Li0bbbb1c2018-06-22 22:01:17 +0900164 protected OpenstackTelemetryService telemetryService;
165
Jian Li753280e2018-07-03 02:24:34 +0900166 @Property(name = REVERSE_PATH_STATS, boolValue = DEFAULT_REVERSE_PATH_STATS,
167 label = "A flag which indicates whether to install the rules for " +
168 "collecting the flow-based stats for reversed path.")
169 private boolean reversePathStats = DEFAULT_REVERSE_PATH_STATS;
170
171 @Property(name = EGRESS_STATS, boolValue = DEFAULT_EGRESS_STATS,
172 label = "A flag which indicates whether to install the rules for " +
173 "collecting the flow-based stats for egress port.")
174 private boolean egressStats = DEFAULT_EGRESS_STATS;
175
Jian Lia4947682018-07-07 14:53:32 +0900176 @Property(name = PORT_STATS, boolValue = DEFAULT_PORT_STATS,
177 label = "A flag which indicates whether to collect port TX & RX stats.")
178 private boolean portStats = DEFAULT_PORT_STATS;
179
Jian Li753280e2018-07-03 02:24:34 +0900180 private ApplicationId appId;
Jian Lia4947682018-07-07 14:53:32 +0900181 private TelemetryCollector collector;
Jian Lia4947682018-07-07 14:53:32 +0900182 private ScheduledFuture result;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900183
Jian Li0bbbb1c2018-06-22 22:01:17 +0900184 private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900185
186 private static final int SOURCE_ID = 1;
187 private static final int TARGET_ID = 2;
188 private static final int PRIORITY_BASE = 10000;
189 private static final int METRIC_PRIORITY_SOURCE = SOURCE_ID * PRIORITY_BASE;
190 private static final int METRIC_PRIORITY_TARGET = TARGET_ID * PRIORITY_BASE;
191
Jian Li0bbbb1c2018-06-22 22:01:17 +0900192 private static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900193
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900194 @Activate
195 protected void activate() {
196 appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
Jian Lie6110b72018-07-06 19:06:36 +0900197 componentConfigService.registerProperties(getClass());
Jian Libd295cd2018-07-22 11:53:57 +0900198 start();
Jian Li0bbbb1c2018-06-22 22:01:17 +0900199
200 log.info("Started");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900201 }
202
203 @Deactivate
204 protected void deactivate() {
Jian Lie6110b72018-07-06 19:06:36 +0900205 componentConfigService.unregisterProperties(getClass(), false);
Jian Lia4947682018-07-07 14:53:32 +0900206 flowRuleService.removeFlowRulesById(appId);
Jian Libd295cd2018-07-22 11:53:57 +0900207 stop();
Jian Lia4947682018-07-07 14:53:32 +0900208
Jian Li0bbbb1c2018-06-22 22:01:17 +0900209 log.info("Stopped");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900210 }
211
Jian Li753280e2018-07-03 02:24:34 +0900212 @Modified
213 protected void modified(ComponentContext context) {
214 readComponentConfiguration(context);
215
216 log.info("Modified");
217 }
218
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900219 @Override
220 public void start() {
221 log.info("Start publishing thread");
Jian Lia4947682018-07-07 14:53:32 +0900222 collector = new TelemetryCollector();
223
Jian Libd295cd2018-07-22 11:53:57 +0900224 result = SharedScheduledExecutors.getSingleThreadExecutor()
225 .scheduleAtFixedRate(collector, INITIAL_DELAY,
Jian Lia4947682018-07-07 14:53:32 +0900226 REFRESH_INTERVAL, TIME_UNIT_SECOND, RECOVER_FROM_FAILURE);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900227 }
228
229 @Override
230 public void stop() {
231 log.info("Stop data publishing thread");
Jian Lia4947682018-07-07 14:53:32 +0900232 result.cancel(true);
233 collector = null;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900234 }
235
Jian Li0bbbb1c2018-06-22 22:01:17 +0900236 @Override
237 public void createStatFlowRule(StatsFlowRule statsFlowRule) {
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900238
Jian Li0bbbb1c2018-06-22 22:01:17 +0900239 setStatFlowRule(statsFlowRule, true);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900240 }
241
242 @Override
243 public void deleteStatFlowRule(StatsFlowRule statsFlowRule) {
Jian Li0bbbb1c2018-06-22 22:01:17 +0900244
245 setStatFlowRule(statsFlowRule, false);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900246 }
247
Jian Lia4947682018-07-07 14:53:32 +0900248 /**
249 * Gets a set of the flow infos.
250 *
251 * @return a set of flow infos
252 */
253 @Override
254 public Set<FlowInfo> getFlowInfos() {
255 Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
256
257 // obtain all flow rule entries installed by telemetry app
258 for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
259 FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
260 TrafficSelector selector = entry.selector();
261
262 IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
263 IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
264 IPProtocolCriterion ipProtocol =
265 (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
266
267 log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
268 ((IndexTableId) entry.table()).id(),
269 srcIp.ip().toString(),
270 dstIp.ip().toString(),
271 entry.packets(),
272 entry.bytes());
273
274 fBuilder.withFlowType(FLOW_TYPE_SONA)
275 .withSrcIp(srcIp.ip())
276 .withDstIp(dstIp.ip());
277
278 if (ipProtocol != null) {
279 fBuilder.withProtocol((byte) ipProtocol.protocol());
280
281 if (ipProtocol.protocol() == PROTOCOL_TCP) {
282 TcpPortCriterion tcpSrc =
283 (TcpPortCriterion) selector.getCriterion(TCP_SRC);
284 TcpPortCriterion tcpDst =
285 (TcpPortCriterion) selector.getCriterion(TCP_DST);
286
287 log.debug("TCP SRC Port: {}, DST Port: {}",
288 tcpSrc.tcpPort().toInt(),
289 tcpDst.tcpPort().toInt());
290
291 fBuilder.withSrcPort(tcpSrc.tcpPort());
292 fBuilder.withDstPort(tcpDst.tcpPort());
293
294 } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
295
296 UdpPortCriterion udpSrc =
297 (UdpPortCriterion) selector.getCriterion(UDP_SRC);
298 UdpPortCriterion udpDst =
299 (UdpPortCriterion) selector.getCriterion(UDP_DST);
300
301 log.debug("UDP SRC Port: {}, DST Port: {}",
302 udpSrc.udpPort().toInt(),
303 udpDst.udpPort().toInt());
304
305 fBuilder.withSrcPort(udpSrc.udpPort());
306 fBuilder.withDstPort(udpDst.udpPort());
307 } else {
308 log.debug("Other protocol: {}", ipProtocol.protocol());
309 }
310 }
311
312 fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
313 .withDstMac(getMacAddress(dstIp.ip().address()))
314 .withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
315 .withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
316 .withVlanId(getVlanId(srcIp.ip().address()))
317 .withDeviceId(entry.deviceId());
318
319 StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
320
321 // TODO: need to collect error and drop packets stats
322 // TODO: need to make the refresh interval configurable
323 sBuilder.withStartupTime(System.currentTimeMillis())
324 .withFstPktArrTime(System.currentTimeMillis())
325 .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
326 .withCurrAccPkts((int) entry.packets())
327 .withCurrAccBytes(entry.bytes())
328 .withErrorPkts((short) 0)
329 .withDropPkts((short) 0);
330
331 fBuilder.withStatsInfo(sBuilder.build());
332
333 FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
334
335 flowInfos.add(flowInfo);
336
337 log.debug("FlowInfo: \n{}", flowInfo.toString());
338 }
339
340 return flowInfos;
341 }
342
343 /**
344 * Gets a set of flow infos by referring to destination VM port.
345 *
346 * @return flow infos
347 */
348 private Set<FlowInfo> getDstPortBasedFlowInfos() {
349 Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
350 Set<PortNumber> instPortNums = instPortService.instancePorts()
351 .stream()
352 .map(InstancePort::portNumber)
353 .collect(Collectors.toSet());
354 Set<DeviceId> deviceIds = osNodeService.completeNodes(COMPUTE)
355 .stream()
356 .map(OpenstackNode::intgBridge)
357 .collect(Collectors.toSet());
358
359 deviceIds.forEach(d -> {
360 List<PortStatistics> stats =
361 deviceService.getPortStatistics(d)
362 .stream()
363 .filter(s -> instPortNums.contains(s.portNumber()))
364 .collect(Collectors.toList());
365
366 stats.forEach(s -> {
367 InstancePort instPort = getInstancePort(d, s.portNumber());
368 flowInfos.add(buildTxPortInfo(instPort, s));
369 flowInfos.add(buildRxPortInfo(instPort, s));
370 });
371 });
372
373 return flowInfos;
374 }
375
376 /**
377 * Obtains the flow info generated by TX port.
378 *
379 * @param instPort instance port
380 * @param stat port statistics
381 * @return flow info
382 */
383 private FlowInfo buildTxPortInfo(InstancePort instPort, PortStatistics stat) {
384 FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
385
386 fBuilder.withFlowType(FLOW_TYPE_SONA)
387 .withSrcIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
388 .withDstIp(IpPrefix.valueOf(ARBITRARY_IP))
389 .withSrcMac(instPort.macAddress())
390 .withDstMac(MacAddress.valueOf(ARBITRARY_MAC))
391 .withDeviceId(instPort.deviceId())
392 .withInputInterfaceId(ARBITRARY_IN_INTF)
393 .withOutputInterfaceId(ARBITRARY_OUT_INTF)
394 .withVlanId(VlanId.vlanId());
395
396 StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
397 sBuilder.withStartupTime(System.currentTimeMillis())
398 .withFstPktArrTime(System.currentTimeMillis())
399 .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
400 .withCurrAccPkts((int) stat.packetsSent())
401 .withCurrAccBytes(stat.bytesSent())
402 .withErrorPkts((short) stat.packetsTxErrors())
403 .withDropPkts((short) stat.packetsTxDropped());
404
405 fBuilder.withStatsInfo(sBuilder.build());
406
407 return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
408 }
409
410 /**
411 * Obtains the flow info generated by RX port.
412 *
413 * @param instPort instance port
414 * @param stat port statistics
415 * @return flow info
416 */
417 private FlowInfo buildRxPortInfo(InstancePort instPort, PortStatistics stat) {
418 FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
419
420 fBuilder.withFlowType(FLOW_TYPE_SONA)
421 .withSrcIp(IpPrefix.valueOf(ARBITRARY_IP))
422 .withDstIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
423 .withSrcMac(MacAddress.valueOf(ARBITRARY_MAC))
424 .withDstMac(instPort.macAddress())
425 .withDeviceId(instPort.deviceId())
426 .withInputInterfaceId(ARBITRARY_IN_INTF)
427 .withOutputInterfaceId(ARBITRARY_OUT_INTF)
428 .withVlanId(VlanId.vlanId());
429
430 StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
431 sBuilder.withStartupTime(System.currentTimeMillis())
432 .withFstPktArrTime(System.currentTimeMillis())
433 .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
434 .withCurrAccPkts((int) stat.packetsReceived())
435 .withCurrAccBytes(stat.bytesReceived())
436 .withErrorPkts((short) stat.packetsRxErrors())
437 .withDropPkts((short) stat.packetsRxDropped());
438
439 fBuilder.withStatsInfo(sBuilder.build());
440
441 return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
442 }
443
444 /**
445 * Obtains instance port which associated with the given device identifier
446 * and port number.
447 *
448 * @param deviceId device identifier
449 * @param portNumber port number
450 * @return instance port
451 */
452 private InstancePort getInstancePort(DeviceId deviceId, PortNumber portNumber) {
453 return instPortService.instancePorts().stream()
454 .filter(p -> p.deviceId().equals(deviceId))
455 .filter(p -> p.portNumber().equals(portNumber))
456 .findFirst().orElse(null);
457 }
458
Jian Li0bbbb1c2018-06-22 22:01:17 +0900459 private void connectTables(DeviceId deviceId, int fromTable, int toTable,
460 StatsFlowRule statsFlowRule, int rulePriority,
461 boolean install) {
462
463 log.debug("Table Transition: {} -> {}", fromTable, toTable);
464 int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
465 int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
466 int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
467 byte protocol = statsFlowRule.ipProtocol();
468
469 TrafficSelector.Builder selectorBuilder =
Jian Li753280e2018-07-03 02:24:34 +0900470 DefaultTrafficSelector.builder()
471 .matchEthType(TYPE_IPV4)
472 .matchIPSrc(statsFlowRule.srcIpPrefix())
473 .matchIPDst(statsFlowRule.dstIpPrefix());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900474
475 if (protocol == PROTOCOL_TCP) {
476 selectorBuilder = selectorBuilder
Jian Li753280e2018-07-03 02:24:34 +0900477 .matchIPProtocol(statsFlowRule.ipProtocol())
478 .matchTcpSrc(statsFlowRule.srcTpPort())
479 .matchTcpDst(statsFlowRule.dstTpPort());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900480
481 } else if (protocol == PROTOCOL_UDP) {
482 selectorBuilder = selectorBuilder
Jian Li753280e2018-07-03 02:24:34 +0900483 .matchIPProtocol(statsFlowRule.ipProtocol())
484 .matchUdpSrc(statsFlowRule.srcTpPort())
485 .matchUdpDst(statsFlowRule.dstTpPort());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900486 } else {
487 log.warn("Unsupported protocol {}", statsFlowRule.ipProtocol());
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900488 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900489
490 TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
491
492 treatmentBuilder.transition(toTable);
493
494 FlowRule flowRule = DefaultFlowRule.builder()
Jian Li753280e2018-07-03 02:24:34 +0900495 .forDevice(deviceId)
496 .withSelector(selectorBuilder.build())
497 .withTreatment(treatmentBuilder.build())
498 .withPriority(prefixLength)
499 .fromApp(appId)
500 .makePermanent()
501 .forTable(fromTable)
502 .build();
Jian Li0bbbb1c2018-06-22 22:01:17 +0900503
504 applyRule(flowRule, install);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900505 }
506
507 /**
Jian Li0bbbb1c2018-06-22 22:01:17 +0900508 * Installs stats related flow rule to switch.
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900509 *
Jian Li0bbbb1c2018-06-22 22:01:17 +0900510 * @param flowRule flow rule
511 * @param install flag to install or not
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900512 */
513 private void applyRule(FlowRule flowRule, boolean install) {
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900514 FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
Jian Li0bbbb1c2018-06-22 22:01:17 +0900515 flowOpsBuilder = install ?
516 flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900517
518 flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
519 @Override
520 public void onSuccess(FlowRuleOperations ops) {
Jian Lia4947682018-07-07 14:53:32 +0900521 log.debug("Install rules for telemetry stats: \n {}",
522 ops.toString());
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900523 }
524
525 @Override
526 public void onError(FlowRuleOperations ops) {
Jian Lia4947682018-07-07 14:53:32 +0900527 log.debug("Failed to install rules for telemetry stats: \n {}",
528 ops.toString());
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900529 }
530 }));
531 }
532
533 /**
Jian Li0bbbb1c2018-06-22 22:01:17 +0900534 * Merges old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900535 *
536 * @param flowInfo current FlowInfo object
537 * @param fBuilder Builder for FlowInfo
538 * @param sBuilder Builder for StatsInfo
539 * @return Merged FlowInfo object
540 */
541 private FlowInfo mergeFlowInfo(FlowInfo flowInfo,
542 FlowInfo.Builder fBuilder,
543 StatsInfo.Builder sBuilder) {
Jian Li0bbbb1c2018-06-22 22:01:17 +0900544 for (FlowInfo gFlowInfo : gFlowInfoSet) {
545 log.debug("Old FlowInfo:\n{}", gFlowInfo.toString());
546 if (gFlowInfo.roughEquals(flowInfo)) {
547
548 // Get old StatsInfo object and merge the value to current object.
549 StatsInfo oldStatsInfo = gFlowInfo.statsInfo();
550 sBuilder.withPrevAccPkts(oldStatsInfo.currAccPkts());
551 sBuilder.withPrevAccBytes(oldStatsInfo.currAccBytes());
552 FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build())
553 .build();
554
555 gFlowInfoSet.remove(gFlowInfo);
556 gFlowInfoSet.add(newFlowInfo);
Jian Li85573f42018-06-27 22:29:14 +0900557 log.debug("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900558 return newFlowInfo;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900559 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900560 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900561
562 // No such record, then build the FlowInfo object and return this object.
Jian Li85573f42018-06-27 22:29:14 +0900563 log.debug("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900564 FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
565 gFlowInfoSet.add(newFlowInfo);
566 return newFlowInfo;
567 }
568
Jian Li753280e2018-07-03 02:24:34 +0900569 /**
570 * Installs flow rules for collecting both normal and reverse path flow stats.
571 *
572 * @param statsFlowRule flow rule used for collecting stats
573 * @param install flow rule installation flag
574 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900575 private void setStatFlowRule(StatsFlowRule statsFlowRule, boolean install) {
Jian Li753280e2018-07-03 02:24:34 +0900576 setStatFlowRuleBase(statsFlowRule, install);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900577
Jian Li753280e2018-07-03 02:24:34 +0900578 // if reverse path stats is enabled, we will install flow rules for
579 // collecting reverse path vFlow stats
580 if (reversePathStats) {
581 StatsFlowRule reverseFlowRule = DefaultStatsFlowRule.builder()
582 .srcIpPrefix(statsFlowRule.dstIpPrefix())
583 .dstIpPrefix(statsFlowRule.srcIpPrefix())
584 .ipProtocol(statsFlowRule.ipProtocol())
585 .srcTpPort(statsFlowRule.dstTpPort())
586 .dstTpPort(statsFlowRule.srcTpPort())
587 .build();
588 setStatFlowRuleBase(reverseFlowRule, install);
589 }
590 }
591
592 /**
593 * A base method which is for installing flow rules for collecting stats.
594 *
595 * @param statsFlowRule flow rule used for collecting stats
596 * @param install flow rule installation flag
597 */
598 private void setStatFlowRuleBase(StatsFlowRule statsFlowRule, boolean install) {
Jian Lie6110b72018-07-06 19:06:36 +0900599
600 IpPrefix srcIp = statsFlowRule.srcIpPrefix();
601 IpPrefix dstIp = statsFlowRule.dstIpPrefix();
602 DeviceId srcDeviceId = getDeviceId(srcIp.address());
603 DeviceId dstDeviceId = getDeviceId(dstIp.address());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900604
Jian Li998ec7b2018-06-29 15:15:49 +0900605 if (srcDeviceId == null && dstDeviceId == null) {
Jian Li85573f42018-06-27 22:29:14 +0900606 return;
607 }
608
Jian Li998ec7b2018-06-29 15:15:49 +0900609 if (srcDeviceId != null) {
Jian Li87ded822018-07-02 18:31:22 +0900610 connectTables(srcDeviceId, STAT_INBOUND_TABLE, VTAP_INBOUND_TABLE,
Jian Li998ec7b2018-06-29 15:15:49 +0900611 statsFlowRule, METRIC_PRIORITY_SOURCE, install);
Jian Li998ec7b2018-06-29 15:15:49 +0900612
Jian Lie6110b72018-07-06 19:06:36 +0900613 if (install) {
614 log.info("Install ingress stat flow rule for SrcIp:{} DstIp:{}",
615 srcIp.toString(), dstIp.toString());
616 } else {
617 log.info("Remove ingress stat flow rule for SrcIp:{} DstIp:{}",
618 srcIp.toString(), dstIp.toString());
Jian Li753280e2018-07-03 02:24:34 +0900619 }
Jian Li998ec7b2018-06-29 15:15:49 +0900620 }
Jian Li85573f42018-06-27 22:29:14 +0900621
Jian Lie6110b72018-07-06 19:06:36 +0900622 Set<IpPrefix> vxlanIps = osNetworkService.getFixedIpsByNetworkType(VXLAN);
623 Set<IpPrefix> vlanIps = osNetworkService.getFixedIpsByNetworkType(VLAN);
624 Set<IpPrefix> flatIps = osNetworkService.getFixedIpsByNetworkType(FLAT);
Jian Li753280e2018-07-03 02:24:34 +0900625
Jian Lie6110b72018-07-06 19:06:36 +0900626 int fromTable, toTable;
Jian Li753280e2018-07-03 02:24:34 +0900627
Jian Lie6110b72018-07-06 19:06:36 +0900628 if (dstDeviceId != null && egressStats) {
629
630 IpPrefix dstIpPrefix = statsFlowRule.dstIpPrefix();
631
632 if (vxlanIps.contains(dstIpPrefix) || vlanIps.contains(dstIpPrefix)) {
633 fromTable = STAT_OUTBOUND_TABLE;
634 toTable = VTAP_OUTBOUND_TABLE;
635 } else if (flatIps.contains(dstIpPrefix)) {
636 fromTable = STAT_FLAT_OUTBOUND_TABLE;
637 toTable = VTAP_FLAT_OUTBOUND_TABLE;
638 } else {
639 return;
640 }
641
642 connectTables(dstDeviceId, fromTable, toTable,
643 statsFlowRule, METRIC_PRIORITY_TARGET, install);
644
645 if (install) {
646 log.info("Install egress stat flow rule for SrcIp:{} DstIp:{}",
647 srcIp.toString(), dstIp.toString());
648 } else {
649 log.info("Remove egress stat flow rule for SrcIp:{} DstIp:{}",
650 srcIp.toString(), dstIp.toString());
651 }
Jian Li753280e2018-07-03 02:24:34 +0900652 }
Jian Li753280e2018-07-03 02:24:34 +0900653 }
654
655 /**
Jian Li85573f42018-06-27 22:29:14 +0900656 * Get Device ID which the VM is located.
657 *
658 * @param ipAddress IP Address of host
659 * @return Device ID
660 */
661 private DeviceId getDeviceId(IpAddress ipAddress) {
662 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
663 Optional<Host> host = hostService.getHostsByIp(ipAddress).stream().findAny();
664 return host.map(host1 -> host1.location().deviceId()).orElse(null);
665 } else {
Jian Lia4947682018-07-07 14:53:32 +0900666 log.debug("No DeviceID is associated to {}", ipAddress.toString());
Jian Li85573f42018-06-27 22:29:14 +0900667 return null;
Jian Li0bbbb1c2018-06-22 22:01:17 +0900668 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900669 }
670
671 /**
672 * Get VLAN ID with respect to IP Address.
673 *
674 * @param ipAddress IP Address of host
675 * @return VLAN ID
676 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900677 private VlanId getVlanId(IpAddress ipAddress) {
678 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
679 Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
680 return host.vlan();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900681 }
682 return VlanId.vlanId();
683 }
684
685 /**
686 * Get Interface ID of Switch which is connected to a host.
687 *
688 * @param ipAddress IP Address of host
689 * @return Interface ID of Switch
690 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900691 private int getInterfaceId(IpAddress ipAddress) {
692 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
693 Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
694 return (int) host.location().port().toLong();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900695 }
696 return -1;
697 }
698
699 /**
700 * Get MAC Address of host.
701 *
702 * @param ipAddress IP Address of host
703 * @return MAC Address of host
704 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900705 private MacAddress getMacAddress(IpAddress ipAddress) {
706 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
707 Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
708 return host.mac();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900709 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900710
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900711 return NO_HOST_MAC;
712 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900713
Jian Li753280e2018-07-03 02:24:34 +0900714 /**
715 * Extracts properties from the component configuration context.
716 *
717 * @param context the component context
718 */
719 private void readComponentConfiguration(ComponentContext context) {
720 Dictionary<?, ?> properties = context.getProperties();
721
722 Boolean reversePathStatsConfigured =
723 getBooleanProperty(properties, REVERSE_PATH_STATS);
724 if (reversePathStatsConfigured == null) {
725 reversePathStats = DEFAULT_REVERSE_PATH_STATS;
726 log.info("Reversed path stats flag is NOT " +
727 "configured, default value is {}", reversePathStats);
728 } else {
729 reversePathStats = reversePathStatsConfigured;
730 log.info("Configured. Reversed path stats flag is {}", reversePathStats);
731 }
732
733 Boolean egressStatsConfigured = getBooleanProperty(properties, EGRESS_STATS);
734 if (egressStatsConfigured == null) {
735 egressStats = DEFAULT_EGRESS_STATS;
736 log.info("Egress stats flag is NOT " +
737 "configured, default value is {}", egressStats);
738 } else {
739 egressStats = egressStatsConfigured;
740 log.info("Configured. Egress stats flag is {}", egressStats);
741 }
Jian Lia4947682018-07-07 14:53:32 +0900742
743 Boolean portStatsConfigured = getBooleanProperty(properties, PORT_STATS);
744 if (portStatsConfigured == null) {
745 portStats = DEFAULT_PORT_STATS;
746 log.info("Port stats flag is NOT " +
747 "configured, default value is {}", portStats);
748 } else {
749 portStats = portStatsConfigured;
750 log.info("Configured. Port stats flag is {}", portStats);
751 }
Jian Li753280e2018-07-03 02:24:34 +0900752 }
753
Jian Lia4947682018-07-07 14:53:32 +0900754 private class TelemetryCollector implements Runnable {
Jian Li0bbbb1c2018-06-22 22:01:17 +0900755 @Override
756 public void run() {
Jian Li85573f42018-06-27 22:29:14 +0900757 Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
758
759 // we only let the master controller of the device where the
Jian Lia4947682018-07-07 14:53:32 +0900760 // stats flow rules are installed send stats message
761 getFlowInfos().forEach(f -> {
762 if (checkSrcDstLocalMaster(f)) {
Jian Li85573f42018-06-27 22:29:14 +0900763 filteredFlowInfos.add(f);
764 }
765 });
766
Jian Lia4947682018-07-07 14:53:32 +0900767 // we only let the master controller of the device where the port
768 // is located to send stats message
769 if (portStats) {
770 getDstPortBasedFlowInfos().forEach(f -> {
771 if (checkSrcDstLocalMaster(f)) {
772 filteredFlowInfos.add(f);
773 }
774 });
Jian Li0bbbb1c2018-06-22 22:01:17 +0900775 }
Jian Lia4947682018-07-07 14:53:32 +0900776
777 telemetryService.publish(filteredFlowInfos);
778 }
779
780 private boolean checkSrcDstLocalMaster(FlowInfo info) {
781 DeviceId srcDeviceId = getDeviceId(info.srcIp().address());
782 DeviceId dstDeviceId = getDeviceId(info.dstIp().address());
783
784 boolean isSrcLocalMaster = srcDeviceId != null &&
785 mastershipService.isLocalMaster(srcDeviceId);
786 boolean isDstLocalMaster = dstDeviceId != null &&
787 mastershipService.isLocalMaster(dstDeviceId);
788
789 return isSrcLocalMaster || isDstLocalMaster;
Jian Li0bbbb1c2018-06-22 22:01:17 +0900790 }
791 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900792}