blob: 5453a273997a0bcf60102ffbb45ed210bb199d03 [file] [log] [blame]
Boyoung Jeong9e8faec2018-06-17 21:19:23 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li0bbbb1c2018-06-22 22:01:17 +090018import com.google.common.collect.Sets;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090019import org.apache.commons.lang3.exception.ExceptionUtils;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
Jian Li753280e2018-07-03 02:24:34 +090023import org.apache.felix.scr.annotations.Modified;
24import org.apache.felix.scr.annotations.Property;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090028import org.onlab.packet.IpAddress;
Jian Li753280e2018-07-03 02:24:34 +090029import org.onlab.packet.IpPrefix;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090030import org.onlab.packet.MacAddress;
31import org.onlab.packet.VlanId;
32import org.onosproject.core.ApplicationId;
33import org.onosproject.core.CoreService;
Jian Li85573f42018-06-27 22:29:14 +090034import org.onosproject.mastership.MastershipService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090035import org.onosproject.net.DeviceId;
36import org.onosproject.net.Host;
37import org.onosproject.net.device.DeviceService;
38import org.onosproject.net.flow.DefaultFlowRule;
39import org.onosproject.net.flow.DefaultTrafficSelector;
40import org.onosproject.net.flow.DefaultTrafficTreatment;
41import org.onosproject.net.flow.FlowEntry;
42import org.onosproject.net.flow.FlowRule;
43import org.onosproject.net.flow.FlowRuleOperations;
44import org.onosproject.net.flow.FlowRuleOperationsContext;
45import org.onosproject.net.flow.FlowRuleService;
46import org.onosproject.net.flow.IndexTableId;
47import org.onosproject.net.flow.TrafficSelector;
48import org.onosproject.net.flow.TrafficTreatment;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090049import org.onosproject.net.flow.criteria.IPCriterion;
50import org.onosproject.net.flow.criteria.IPProtocolCriterion;
51import org.onosproject.net.flow.criteria.TcpPortCriterion;
52import org.onosproject.net.flow.criteria.UdpPortCriterion;
53import org.onosproject.net.host.HostService;
Jian Li753280e2018-07-03 02:24:34 +090054import org.onosproject.openstacknetworking.api.InstancePort;
55import org.onosproject.openstacknetworking.api.InstancePortService;
56import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090057import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li0bbbb1c2018-06-22 22:01:17 +090058import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090059import org.onosproject.openstacktelemetry.api.StatsFlowRule;
60import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
61import org.onosproject.openstacktelemetry.api.StatsInfo;
Jian Li753280e2018-07-03 02:24:34 +090062import org.openstack4j.model.network.Network;
63import org.openstack4j.model.network.NetworkType;
64import org.osgi.service.component.ComponentContext;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090065import org.slf4j.Logger;
66import org.slf4j.LoggerFactory;
67
Jian Li753280e2018-07-03 02:24:34 +090068import java.util.Dictionary;
Jian Li85573f42018-06-27 22:29:14 +090069import java.util.Optional;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090070import java.util.Set;
71import java.util.Timer;
72import java.util.TimerTask;
73
Jian Li753280e2018-07-03 02:24:34 +090074import static com.google.common.base.Preconditions.checkNotNull;
Jian Li0bbbb1c2018-06-22 22:01:17 +090075import static org.onlab.packet.Ethernet.TYPE_IPV4;
76import static org.onlab.packet.IPv4.PROTOCOL_TCP;
77import static org.onlab.packet.IPv4.PROTOCOL_UDP;
78import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
79import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_SRC;
80import static org.onosproject.net.flow.criteria.Criterion.Type.IP_PROTO;
81import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_DST;
82import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_SRC;
83import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_DST;
84import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_SRC;
Jian Li753280e2018-07-03 02:24:34 +090085import static org.onosproject.openstacknetworking.api.Constants.STAT_FLAT_OUTBOUND_TABLE;
Jian Li0bbbb1c2018-06-22 22:01:17 +090086import static org.onosproject.openstacknetworking.api.Constants.STAT_INBOUND_TABLE;
87import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TABLE;
Jian Li753280e2018-07-03 02:24:34 +090088import static org.onosproject.openstacknetworking.api.Constants.VTAP_FLAT_OUTBOUND_TABLE;
Jian Li87ded822018-07-02 18:31:22 +090089import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
90import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090091import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
Jian Li753280e2018-07-03 02:24:34 +090092import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
93import static org.openstack4j.model.network.NetworkType.FLAT;
94import static org.openstack4j.model.network.NetworkType.VLAN;
95import static org.openstack4j.model.network.NetworkType.VXLAN;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090096
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090097/**
98 * Flow rule manager for network statistics of a VM.
99 */
100@Component(immediate = true)
101@Service
102public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
103
104 private final Logger log = LoggerFactory.getLogger(getClass());
105
106 private static final byte FLOW_TYPE_SONA = 1; // VLAN
107
Ray Milkeybcc53d32018-07-02 10:22:57 -0700108 private static final long MILLISECONDS = 1000L;
109 private static final long REFRESH_INTERVAL = 5L;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900110
Jian Li753280e2018-07-03 02:24:34 +0900111 private static final String REVERSE_PATH_STATS = "reversePathStats";
112 private static final String EGRESS_STATS = "egressStats";
113
114 private static final boolean DEFAULT_REVERSE_PATH_STATS = false;
115 private static final boolean DEFAULT_EGRESS_STATS = false;
116
117 private static final String MAC_NOT_NULL = "MAC should not be null";
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected CoreService coreService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected FlowRuleService flowRuleService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected HostService hostService;
127
Jian Li0bbbb1c2018-06-22 22:01:17 +0900128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected DeviceService deviceService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Li85573f42018-06-27 22:29:14 +0900132 protected MastershipService mastershipService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Li753280e2018-07-03 02:24:34 +0900135 protected OpenstackNetworkService osNetworkService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected InstancePortService instancePortService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Li0bbbb1c2018-06-22 22:01:17 +0900141 protected OpenstackTelemetryService telemetryService;
142
Jian Li753280e2018-07-03 02:24:34 +0900143 @Property(name = REVERSE_PATH_STATS, boolValue = DEFAULT_REVERSE_PATH_STATS,
144 label = "A flag which indicates whether to install the rules for " +
145 "collecting the flow-based stats for reversed path.")
146 private boolean reversePathStats = DEFAULT_REVERSE_PATH_STATS;
147
148 @Property(name = EGRESS_STATS, boolValue = DEFAULT_EGRESS_STATS,
149 label = "A flag which indicates whether to install the rules for " +
150 "collecting the flow-based stats for egress port.")
151 private boolean egressStats = DEFAULT_EGRESS_STATS;
152
153 private ApplicationId appId;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900154 private Timer timer;
155 private TimerTask task;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900156
Jian Li0bbbb1c2018-06-22 22:01:17 +0900157 private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900158 private int loopCount = 0;
159
160 private static final int SOURCE_ID = 1;
161 private static final int TARGET_ID = 2;
162 private static final int PRIORITY_BASE = 10000;
163 private static final int METRIC_PRIORITY_SOURCE = SOURCE_ID * PRIORITY_BASE;
164 private static final int METRIC_PRIORITY_TARGET = TARGET_ID * PRIORITY_BASE;
165
Jian Li0bbbb1c2018-06-22 22:01:17 +0900166 private static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900167
168 public StatsFlowRuleManager() {
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900169 this.timer = new Timer("openstack-telemetry-sender");
170 }
171
172 @Activate
173 protected void activate() {
174 appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900175
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900176 this.start();
Jian Li0bbbb1c2018-06-22 22:01:17 +0900177
178 log.info("Started");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900179 }
180
181 @Deactivate
182 protected void deactivate() {
Jian Li0bbbb1c2018-06-22 22:01:17 +0900183 log.info("Stopped");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900184 }
185
Jian Li753280e2018-07-03 02:24:34 +0900186 @Modified
187 protected void modified(ComponentContext context) {
188 readComponentConfiguration(context);
189
190 log.info("Modified");
191 }
192
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900193 @Override
194 public void start() {
195 log.info("Start publishing thread");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900196 task = new InternalTimerTask();
197 timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL,
Jian Li753280e2018-07-03 02:24:34 +0900198 MILLISECONDS * REFRESH_INTERVAL);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900199 }
200
201 @Override
202 public void stop() {
203 log.info("Stop data publishing thread");
204 task.cancel();
205 task = null;
206 }
207
Jian Li0bbbb1c2018-06-22 22:01:17 +0900208 @Override
209 public void createStatFlowRule(StatsFlowRule statsFlowRule) {
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900210
Jian Li0bbbb1c2018-06-22 22:01:17 +0900211 setStatFlowRule(statsFlowRule, true);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900212
Jian Li0bbbb1c2018-06-22 22:01:17 +0900213 log.info("Install stat flow rule for SrcIp:{} DstIp:{}",
Jian Li753280e2018-07-03 02:24:34 +0900214 statsFlowRule.srcIpPrefix().toString(),
215 statsFlowRule.dstIpPrefix().toString());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900216 }
217
218 @Override
219 public void deleteStatFlowRule(StatsFlowRule statsFlowRule) {
220 // FIXME: following code might not be necessary
221 flowRuleService.removeFlowRulesById(appId);
222
223 setStatFlowRule(statsFlowRule, false);
224
225 log.info("Remove stat flow rule for SrcIp:{} DstIp:{}",
Jian Li753280e2018-07-03 02:24:34 +0900226 statsFlowRule.srcIpPrefix().toString(),
227 statsFlowRule.dstIpPrefix().toString());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900228 }
229
230 private void connectTables(DeviceId deviceId, int fromTable, int toTable,
231 StatsFlowRule statsFlowRule, int rulePriority,
232 boolean install) {
233
234 log.debug("Table Transition: {} -> {}", fromTable, toTable);
235 int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
236 int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
237 int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
238 byte protocol = statsFlowRule.ipProtocol();
239
240 TrafficSelector.Builder selectorBuilder =
Jian Li753280e2018-07-03 02:24:34 +0900241 DefaultTrafficSelector.builder()
242 .matchEthType(TYPE_IPV4)
243 .matchIPSrc(statsFlowRule.srcIpPrefix())
244 .matchIPDst(statsFlowRule.dstIpPrefix());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900245
246 if (protocol == PROTOCOL_TCP) {
247 selectorBuilder = selectorBuilder
Jian Li753280e2018-07-03 02:24:34 +0900248 .matchIPProtocol(statsFlowRule.ipProtocol())
249 .matchTcpSrc(statsFlowRule.srcTpPort())
250 .matchTcpDst(statsFlowRule.dstTpPort());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900251
252 } else if (protocol == PROTOCOL_UDP) {
253 selectorBuilder = selectorBuilder
Jian Li753280e2018-07-03 02:24:34 +0900254 .matchIPProtocol(statsFlowRule.ipProtocol())
255 .matchUdpSrc(statsFlowRule.srcTpPort())
256 .matchUdpDst(statsFlowRule.dstTpPort());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900257 } else {
258 log.warn("Unsupported protocol {}", statsFlowRule.ipProtocol());
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900259 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900260
261 TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
262
263 treatmentBuilder.transition(toTable);
264
265 FlowRule flowRule = DefaultFlowRule.builder()
Jian Li753280e2018-07-03 02:24:34 +0900266 .forDevice(deviceId)
267 .withSelector(selectorBuilder.build())
268 .withTreatment(treatmentBuilder.build())
269 .withPriority(prefixLength)
270 .fromApp(appId)
271 .makePermanent()
272 .forTable(fromTable)
273 .build();
Jian Li0bbbb1c2018-06-22 22:01:17 +0900274
275 applyRule(flowRule, install);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900276 }
277
278 /**
Jian Li0bbbb1c2018-06-22 22:01:17 +0900279 * Installs stats related flow rule to switch.
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900280 *
Jian Li0bbbb1c2018-06-22 22:01:17 +0900281 * @param flowRule flow rule
282 * @param install flag to install or not
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900283 */
284 private void applyRule(FlowRule flowRule, boolean install) {
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900285 FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
Jian Li0bbbb1c2018-06-22 22:01:17 +0900286 flowOpsBuilder = install ?
287 flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900288
289 flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
290 @Override
291 public void onSuccess(FlowRuleOperations ops) {
292 log.debug("Provisioned vni or forwarding table: \n {}", ops.toString());
293 }
294
295 @Override
296 public void onError(FlowRuleOperations ops) {
297 log.debug("Failed to provision vni or forwarding table: \n {}", ops.toString());
298 }
299 }));
300 }
301
302 /**
Jian Li0bbbb1c2018-06-22 22:01:17 +0900303 * Gets a set of the flow infos.
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900304 *
Jian Li0bbbb1c2018-06-22 22:01:17 +0900305 * @return a set of flow infos
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900306 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900307 public Set<FlowInfo> getFlowInfo() {
308 Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900309
Jian Li0bbbb1c2018-06-22 22:01:17 +0900310 // obtain all flow rule entries installed by telemetry app
311 for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
312 FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
313 TrafficSelector selector = entry.selector();
314
315 IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
316 IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
317 IPProtocolCriterion ipProtocol =
Jian Li753280e2018-07-03 02:24:34 +0900318 (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900319
320 log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
Jian Li753280e2018-07-03 02:24:34 +0900321 ((IndexTableId) entry.table()).id(),
322 srcIp.ip().toString(),
323 dstIp.ip().toString(),
324 entry.packets(),
325 entry.bytes());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900326
327 fBuilder.withFlowType(FLOW_TYPE_SONA)
328 .withSrcIp(srcIp.ip())
Jian Li85573f42018-06-27 22:29:14 +0900329 .withDstIp(dstIp.ip());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900330
Jian Li85573f42018-06-27 22:29:14 +0900331 if (ipProtocol != null) {
332 fBuilder.withProtocol((byte) ipProtocol.protocol());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900333
Jian Li85573f42018-06-27 22:29:14 +0900334 if (ipProtocol.protocol() == PROTOCOL_TCP) {
335 TcpPortCriterion tcpSrc =
336 (TcpPortCriterion) selector.getCriterion(TCP_SRC);
337 TcpPortCriterion tcpDst =
338 (TcpPortCriterion) selector.getCriterion(TCP_DST);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900339
Jian Li85573f42018-06-27 22:29:14 +0900340 log.debug("TCP SRC Port: {}, DST Port: {}",
341 tcpSrc.tcpPort().toInt(),
342 tcpDst.tcpPort().toInt());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900343
Jian Li85573f42018-06-27 22:29:14 +0900344 fBuilder.withSrcPort(tcpSrc.tcpPort());
345 fBuilder.withDstPort(tcpDst.tcpPort());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900346
Jian Li85573f42018-06-27 22:29:14 +0900347 } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
Jian Li0bbbb1c2018-06-22 22:01:17 +0900348
Jian Li85573f42018-06-27 22:29:14 +0900349 UdpPortCriterion udpSrc =
350 (UdpPortCriterion) selector.getCriterion(UDP_SRC);
351 UdpPortCriterion udpDst =
352 (UdpPortCriterion) selector.getCriterion(UDP_DST);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900353
Jian Li85573f42018-06-27 22:29:14 +0900354 log.debug("UDP SRC Port: {}, DST Port: {}",
355 udpSrc.udpPort().toInt(),
356 udpDst.udpPort().toInt());
357
358 fBuilder.withSrcPort(udpSrc.udpPort());
359 fBuilder.withDstPort(udpDst.udpPort());
360 } else {
361 log.debug("Other protocol: {}", ipProtocol.protocol());
362 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900363 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900364
365 fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
366 .withDstMac(getMacAddress(dstIp.ip().address()))
367 .withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
368 .withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
369 .withVlanId(getVlanId(srcIp.ip().address()))
370 .withDeviceId(entry.deviceId());
371
372 StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
373
374 // TODO: need to collect error and drop packets stats
375 // TODO: need to make the refresh interval configurable
Jian Lide4ef402018-06-27 19:21:14 +0900376 sBuilder.withStartupTime(System.currentTimeMillis())
377 .withFstPktArrTime(System.currentTimeMillis())
Ray Milkeybcc53d32018-07-02 10:22:57 -0700378 .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
Jian Li0bbbb1c2018-06-22 22:01:17 +0900379 .withCurrAccPkts((int) entry.packets())
380 .withCurrAccBytes(entry.bytes())
381 .withErrorPkts((short) 0)
Jian Lide4ef402018-06-27 19:21:14 +0900382 .withDropPkts((short) 0);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900383
384 fBuilder.withStatsInfo(sBuilder.build());
385
386 FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
387
388 flowInfos.add(flowInfo);
389
390 log.debug("FlowInfo: \n{}", flowInfo.toString());
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900391 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900392
393 return flowInfos;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900394 }
395
396 /**
Jian Li0bbbb1c2018-06-22 22:01:17 +0900397 * Merges old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900398 *
399 * @param flowInfo current FlowInfo object
400 * @param fBuilder Builder for FlowInfo
401 * @param sBuilder Builder for StatsInfo
402 * @return Merged FlowInfo object
403 */
404 private FlowInfo mergeFlowInfo(FlowInfo flowInfo,
405 FlowInfo.Builder fBuilder,
406 StatsInfo.Builder sBuilder) {
Jian Li0bbbb1c2018-06-22 22:01:17 +0900407 for (FlowInfo gFlowInfo : gFlowInfoSet) {
408 log.debug("Old FlowInfo:\n{}", gFlowInfo.toString());
409 if (gFlowInfo.roughEquals(flowInfo)) {
410
411 // Get old StatsInfo object and merge the value to current object.
412 StatsInfo oldStatsInfo = gFlowInfo.statsInfo();
413 sBuilder.withPrevAccPkts(oldStatsInfo.currAccPkts());
414 sBuilder.withPrevAccBytes(oldStatsInfo.currAccBytes());
415 FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build())
416 .build();
417
418 gFlowInfoSet.remove(gFlowInfo);
419 gFlowInfoSet.add(newFlowInfo);
Jian Li85573f42018-06-27 22:29:14 +0900420 log.debug("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900421 return newFlowInfo;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900422 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900423 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900424
425 // No such record, then build the FlowInfo object and return this object.
Jian Li85573f42018-06-27 22:29:14 +0900426 log.debug("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900427 FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
428 gFlowInfoSet.add(newFlowInfo);
429 return newFlowInfo;
430 }
431
Jian Li753280e2018-07-03 02:24:34 +0900432 /**
433 * Installs flow rules for collecting both normal and reverse path flow stats.
434 *
435 * @param statsFlowRule flow rule used for collecting stats
436 * @param install flow rule installation flag
437 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900438 private void setStatFlowRule(StatsFlowRule statsFlowRule, boolean install) {
Jian Li753280e2018-07-03 02:24:34 +0900439 setStatFlowRuleBase(statsFlowRule, install);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900440
Jian Li753280e2018-07-03 02:24:34 +0900441 // if reverse path stats is enabled, we will install flow rules for
442 // collecting reverse path vFlow stats
443 if (reversePathStats) {
444 StatsFlowRule reverseFlowRule = DefaultStatsFlowRule.builder()
445 .srcIpPrefix(statsFlowRule.dstIpPrefix())
446 .dstIpPrefix(statsFlowRule.srcIpPrefix())
447 .ipProtocol(statsFlowRule.ipProtocol())
448 .srcTpPort(statsFlowRule.dstTpPort())
449 .dstTpPort(statsFlowRule.srcTpPort())
450 .build();
451 setStatFlowRuleBase(reverseFlowRule, install);
452 }
453 }
454
455 /**
456 * A base method which is for installing flow rules for collecting stats.
457 *
458 * @param statsFlowRule flow rule used for collecting stats
459 * @param install flow rule installation flag
460 */
461 private void setStatFlowRuleBase(StatsFlowRule statsFlowRule, boolean install) {
Jian Li85573f42018-06-27 22:29:14 +0900462 DeviceId srcDeviceId = getDeviceId(statsFlowRule.srcIpPrefix().address());
463 DeviceId dstDeviceId = getDeviceId(statsFlowRule.dstIpPrefix().address());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900464
Jian Li998ec7b2018-06-29 15:15:49 +0900465 if (srcDeviceId == null && dstDeviceId == null) {
Jian Li85573f42018-06-27 22:29:14 +0900466 return;
467 }
468
Jian Li998ec7b2018-06-29 15:15:49 +0900469 if (srcDeviceId != null) {
Jian Li87ded822018-07-02 18:31:22 +0900470 connectTables(srcDeviceId, STAT_INBOUND_TABLE, VTAP_INBOUND_TABLE,
Jian Li998ec7b2018-06-29 15:15:49 +0900471 statsFlowRule, METRIC_PRIORITY_SOURCE, install);
472 }
473
Jian Li753280e2018-07-03 02:24:34 +0900474 if (dstDeviceId != null && egressStats) {
475 NetworkType type = getNetworkType(statsFlowRule.dstIpPrefix());
476 if (type == VXLAN || type == VLAN) {
477 connectTables(dstDeviceId, STAT_OUTBOUND_TABLE, VTAP_OUTBOUND_TABLE,
478 statsFlowRule, METRIC_PRIORITY_TARGET, install);
479 } else if (type == FLAT) {
480 connectTables(dstDeviceId, STAT_FLAT_OUTBOUND_TABLE, VTAP_FLAT_OUTBOUND_TABLE,
481 statsFlowRule, METRIC_PRIORITY_TARGET, install);
482 }
Jian Li998ec7b2018-06-29 15:15:49 +0900483 }
Jian Li85573f42018-06-27 22:29:14 +0900484 }
485
486 /**
Jian Li753280e2018-07-03 02:24:34 +0900487 * Obtains the network type that is used by the VM which has the given IP.
488 *
489 * @param ipPrefix IP prefix
490 * @return network type
491 */
492 private NetworkType getNetworkType(IpPrefix ipPrefix) {
493
494 MacAddress mac = checkNotNull(getMacAddress(ipPrefix.address()), MAC_NOT_NULL);
495 InstancePort instPort = instancePortService.instancePort(mac);
496
497 if (instPort == null) {
498 return null;
499 }
500
501 Network network = osNetworkService.network(instPort.networkId());
502
503 if (network != null) {
504 return network.getNetworkType();
505 }
506
507 return null;
508 }
509
510 /**
Jian Li85573f42018-06-27 22:29:14 +0900511 * Get Device ID which the VM is located.
512 *
513 * @param ipAddress IP Address of host
514 * @return Device ID
515 */
516 private DeviceId getDeviceId(IpAddress ipAddress) {
517 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
518 Optional<Host> host = hostService.getHostsByIp(ipAddress).stream().findAny();
519 return host.map(host1 -> host1.location().deviceId()).orElse(null);
520 } else {
Jian Li998ec7b2018-06-29 15:15:49 +0900521 log.warn("Failed to get DeviceID which is connected to {}. " +
522 "The destination is either a bare-metal or located out of DC",
Jian Li85573f42018-06-27 22:29:14 +0900523 ipAddress.toString());
524 return null;
Jian Li0bbbb1c2018-06-22 22:01:17 +0900525 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900526 }
527
528 /**
529 * Get VLAN ID with respect to IP Address.
530 *
531 * @param ipAddress IP Address of host
532 * @return VLAN ID
533 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900534 private VlanId getVlanId(IpAddress ipAddress) {
535 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
536 Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
537 return host.vlan();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900538 }
539 return VlanId.vlanId();
540 }
541
542 /**
543 * Get Interface ID of Switch which is connected to a host.
544 *
545 * @param ipAddress IP Address of host
546 * @return Interface ID of Switch
547 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900548 private int getInterfaceId(IpAddress ipAddress) {
549 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
550 Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
551 return (int) host.location().port().toLong();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900552 }
553 return -1;
554 }
555
556 /**
557 * Get MAC Address of host.
558 *
559 * @param ipAddress IP Address of host
560 * @return MAC Address of host
561 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900562 private MacAddress getMacAddress(IpAddress ipAddress) {
563 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
564 Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
565 return host.mac();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900566 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900567
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900568 return NO_HOST_MAC;
569 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900570
Jian Li753280e2018-07-03 02:24:34 +0900571 /**
572 * Extracts properties from the component configuration context.
573 *
574 * @param context the component context
575 */
576 private void readComponentConfiguration(ComponentContext context) {
577 Dictionary<?, ?> properties = context.getProperties();
578
579 Boolean reversePathStatsConfigured =
580 getBooleanProperty(properties, REVERSE_PATH_STATS);
581 if (reversePathStatsConfigured == null) {
582 reversePathStats = DEFAULT_REVERSE_PATH_STATS;
583 log.info("Reversed path stats flag is NOT " +
584 "configured, default value is {}", reversePathStats);
585 } else {
586 reversePathStats = reversePathStatsConfigured;
587 log.info("Configured. Reversed path stats flag is {}", reversePathStats);
588 }
589
590 Boolean egressStatsConfigured = getBooleanProperty(properties, EGRESS_STATS);
591 if (egressStatsConfigured == null) {
592 egressStats = DEFAULT_EGRESS_STATS;
593 log.info("Egress stats flag is NOT " +
594 "configured, default value is {}", egressStats);
595 } else {
596 egressStats = egressStatsConfigured;
597 log.info("Configured. Egress stats flag is {}", egressStats);
598 }
599 }
600
Jian Li0bbbb1c2018-06-22 22:01:17 +0900601 private class InternalTimerTask extends TimerTask {
602 @Override
603 public void run() {
Jian Li753280e2018-07-03 02:24:34 +0900604 log.debug("Timer task thread starts ({})", loopCount++);
Jian Li85573f42018-06-27 22:29:14 +0900605
606 Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
607
608 // we only let the master controller of the device where the
609 // stats flow rules are installed send kafka message
610 getFlowInfo().forEach(f -> {
611 DeviceId deviceId = getDeviceId(f.srcIp().address());
612 if (mastershipService.isLocalMaster(deviceId)) {
613 filteredFlowInfos.add(f);
614 }
615 });
616
Jian Li0bbbb1c2018-06-22 22:01:17 +0900617 try {
Jian Li85573f42018-06-27 22:29:14 +0900618 telemetryService.publish(filteredFlowInfos);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900619 } catch (Exception ex) {
620 log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
621 }
622 }
623 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900624}