blob: fddbdfaba1ee906ddc8787e4dbf7f57e5dd01e49 [file] [log] [blame]
Boyoung Jeong9e8faec2018-06-17 21:19:23 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li0bbbb1c2018-06-22 22:01:17 +090018import com.google.common.collect.Sets;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090019import org.apache.commons.lang3.exception.ExceptionUtils;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090026import org.onlab.packet.IpAddress;
27import org.onlab.packet.MacAddress;
28import org.onlab.packet.VlanId;
29import org.onosproject.core.ApplicationId;
30import org.onosproject.core.CoreService;
Jian Li85573f42018-06-27 22:29:14 +090031import org.onosproject.mastership.MastershipService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090032import org.onosproject.net.DeviceId;
33import org.onosproject.net.Host;
34import org.onosproject.net.device.DeviceService;
35import org.onosproject.net.flow.DefaultFlowRule;
36import org.onosproject.net.flow.DefaultTrafficSelector;
37import org.onosproject.net.flow.DefaultTrafficTreatment;
38import org.onosproject.net.flow.FlowEntry;
39import org.onosproject.net.flow.FlowRule;
40import org.onosproject.net.flow.FlowRuleOperations;
41import org.onosproject.net.flow.FlowRuleOperationsContext;
42import org.onosproject.net.flow.FlowRuleService;
43import org.onosproject.net.flow.IndexTableId;
44import org.onosproject.net.flow.TrafficSelector;
45import org.onosproject.net.flow.TrafficTreatment;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090046import org.onosproject.net.flow.criteria.IPCriterion;
47import org.onosproject.net.flow.criteria.IPProtocolCriterion;
48import org.onosproject.net.flow.criteria.TcpPortCriterion;
49import org.onosproject.net.flow.criteria.UdpPortCriterion;
50import org.onosproject.net.host.HostService;
51import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li0bbbb1c2018-06-22 22:01:17 +090052import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090053import org.onosproject.openstacktelemetry.api.StatsFlowRule;
54import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
55import org.onosproject.openstacktelemetry.api.StatsInfo;
56import org.slf4j.Logger;
57import org.slf4j.LoggerFactory;
58
Jian Li85573f42018-06-27 22:29:14 +090059import java.util.Optional;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090060import java.util.Set;
61import java.util.Timer;
62import java.util.TimerTask;
63
Jian Li0bbbb1c2018-06-22 22:01:17 +090064import static org.onlab.packet.Ethernet.TYPE_IPV4;
65import static org.onlab.packet.IPv4.PROTOCOL_TCP;
66import static org.onlab.packet.IPv4.PROTOCOL_UDP;
67import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
68import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_SRC;
69import static org.onosproject.net.flow.criteria.Criterion.Type.IP_PROTO;
70import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_DST;
71import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_SRC;
72import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_DST;
73import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_SRC;
74import static org.onosproject.openstacknetworking.api.Constants.DHCP_ARP_TABLE;
75import static org.onosproject.openstacknetworking.api.Constants.FORWARDING_TABLE;
76import static org.onosproject.openstacknetworking.api.Constants.STAT_INBOUND_TABLE;
77import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TABLE;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090078import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
79
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090080/**
81 * Flow rule manager for network statistics of a VM.
82 */
83@Component(immediate = true)
84@Service
85public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
86
87 private final Logger log = LoggerFactory.getLogger(getClass());
88
89 private static final byte FLOW_TYPE_SONA = 1; // VLAN
90
Jian Li0bbbb1c2018-06-22 22:01:17 +090091 private static final int MILLISECONDS = 1000;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +090092 private static final int REFRESH_INTERVAL = 5;
93
94 private ApplicationId appId;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected CoreService coreService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected FlowRuleService flowRuleService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected HostService hostService;
104
Jian Li0bbbb1c2018-06-22 22:01:17 +0900105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected DeviceService deviceService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Li85573f42018-06-27 22:29:14 +0900109 protected MastershipService mastershipService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jian Li0bbbb1c2018-06-22 22:01:17 +0900112 protected OpenstackTelemetryService telemetryService;
113
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900114 private Timer timer;
115 private TimerTask task;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900116
Jian Li0bbbb1c2018-06-22 22:01:17 +0900117 private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900118 private int loopCount = 0;
119
120 private static final int SOURCE_ID = 1;
121 private static final int TARGET_ID = 2;
122 private static final int PRIORITY_BASE = 10000;
123 private static final int METRIC_PRIORITY_SOURCE = SOURCE_ID * PRIORITY_BASE;
124 private static final int METRIC_PRIORITY_TARGET = TARGET_ID * PRIORITY_BASE;
125
Jian Li0bbbb1c2018-06-22 22:01:17 +0900126 private static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900127
128 public StatsFlowRuleManager() {
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900129 this.timer = new Timer("openstack-telemetry-sender");
130 }
131
132 @Activate
133 protected void activate() {
134 appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900135
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900136 this.start();
Jian Li0bbbb1c2018-06-22 22:01:17 +0900137
138 log.info("Started");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900139 }
140
141 @Deactivate
142 protected void deactivate() {
Jian Li0bbbb1c2018-06-22 22:01:17 +0900143 log.info("Stopped");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900144 }
145
146 @Override
147 public void start() {
148 log.info("Start publishing thread");
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900149 task = new InternalTimerTask();
150 timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL,
151 MILLISECONDS * REFRESH_INTERVAL);
152 }
153
154 @Override
155 public void stop() {
156 log.info("Stop data publishing thread");
157 task.cancel();
158 task = null;
159 }
160
Jian Li0bbbb1c2018-06-22 22:01:17 +0900161 @Override
162 public void createStatFlowRule(StatsFlowRule statsFlowRule) {
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900163
Jian Li0bbbb1c2018-06-22 22:01:17 +0900164 setStatFlowRule(statsFlowRule, true);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900165
Jian Li0bbbb1c2018-06-22 22:01:17 +0900166 log.info("Install stat flow rule for SrcIp:{} DstIp:{}",
167 statsFlowRule.srcIpPrefix().toString(),
168 statsFlowRule.dstIpPrefix().toString());
169 }
170
171 @Override
172 public void deleteStatFlowRule(StatsFlowRule statsFlowRule) {
173 // FIXME: following code might not be necessary
174 flowRuleService.removeFlowRulesById(appId);
175
176 setStatFlowRule(statsFlowRule, false);
177
178 log.info("Remove stat flow rule for SrcIp:{} DstIp:{}",
179 statsFlowRule.srcIpPrefix().toString(),
180 statsFlowRule.dstIpPrefix().toString());
181 }
182
183 private void connectTables(DeviceId deviceId, int fromTable, int toTable,
184 StatsFlowRule statsFlowRule, int rulePriority,
185 boolean install) {
186
187 log.debug("Table Transition: {} -> {}", fromTable, toTable);
188 int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
189 int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
190 int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
191 byte protocol = statsFlowRule.ipProtocol();
192
193 TrafficSelector.Builder selectorBuilder =
194 DefaultTrafficSelector.builder()
195 .matchEthType(TYPE_IPV4)
196 .matchIPSrc(statsFlowRule.srcIpPrefix())
197 .matchIPDst(statsFlowRule.dstIpPrefix());
198
199 if (protocol == PROTOCOL_TCP) {
200 selectorBuilder = selectorBuilder
201 .matchIPProtocol(statsFlowRule.ipProtocol())
202 .matchTcpSrc(statsFlowRule.srcTpPort())
203 .matchTcpDst(statsFlowRule.dstTpPort());
204
205 } else if (protocol == PROTOCOL_UDP) {
206 selectorBuilder = selectorBuilder
207 .matchIPProtocol(statsFlowRule.ipProtocol())
208 .matchUdpSrc(statsFlowRule.srcTpPort())
209 .matchUdpDst(statsFlowRule.dstTpPort());
210 } else {
211 log.warn("Unsupported protocol {}", statsFlowRule.ipProtocol());
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900212 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900213
214 TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
215
216 treatmentBuilder.transition(toTable);
217
218 FlowRule flowRule = DefaultFlowRule.builder()
219 .forDevice(deviceId)
220 .withSelector(selectorBuilder.build())
221 .withTreatment(treatmentBuilder.build())
222 .withPriority(prefixLength)
223 .fromApp(appId)
224 .makePermanent()
225 .forTable(fromTable)
226 .build();
227
228 applyRule(flowRule, install);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900229 }
230
231 /**
Jian Li0bbbb1c2018-06-22 22:01:17 +0900232 * Installs stats related flow rule to switch.
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900233 *
Jian Li0bbbb1c2018-06-22 22:01:17 +0900234 * @param flowRule flow rule
235 * @param install flag to install or not
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900236 */
237 private void applyRule(FlowRule flowRule, boolean install) {
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900238 FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
Jian Li0bbbb1c2018-06-22 22:01:17 +0900239 flowOpsBuilder = install ?
240 flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900241
242 flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
243 @Override
244 public void onSuccess(FlowRuleOperations ops) {
245 log.debug("Provisioned vni or forwarding table: \n {}", ops.toString());
246 }
247
248 @Override
249 public void onError(FlowRuleOperations ops) {
250 log.debug("Failed to provision vni or forwarding table: \n {}", ops.toString());
251 }
252 }));
253 }
254
255 /**
Jian Li0bbbb1c2018-06-22 22:01:17 +0900256 * Gets a set of the flow infos.
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900257 *
Jian Li0bbbb1c2018-06-22 22:01:17 +0900258 * @return a set of flow infos
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900259 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900260 public Set<FlowInfo> getFlowInfo() {
261 Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900262
Jian Li0bbbb1c2018-06-22 22:01:17 +0900263 // obtain all flow rule entries installed by telemetry app
264 for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
265 FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
266 TrafficSelector selector = entry.selector();
267
268 IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
269 IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
270 IPProtocolCriterion ipProtocol =
271 (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
272
273 log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
274 ((IndexTableId) entry.table()).id(),
275 srcIp.ip().toString(),
276 dstIp.ip().toString(),
277 entry.packets(),
278 entry.bytes());
279
280 fBuilder.withFlowType(FLOW_TYPE_SONA)
281 .withSrcIp(srcIp.ip())
Jian Li85573f42018-06-27 22:29:14 +0900282 .withDstIp(dstIp.ip());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900283
Jian Li85573f42018-06-27 22:29:14 +0900284 if (ipProtocol != null) {
285 fBuilder.withProtocol((byte) ipProtocol.protocol());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900286
Jian Li85573f42018-06-27 22:29:14 +0900287 if (ipProtocol.protocol() == PROTOCOL_TCP) {
288 TcpPortCriterion tcpSrc =
289 (TcpPortCriterion) selector.getCriterion(TCP_SRC);
290 TcpPortCriterion tcpDst =
291 (TcpPortCriterion) selector.getCriterion(TCP_DST);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900292
Jian Li85573f42018-06-27 22:29:14 +0900293 log.debug("TCP SRC Port: {}, DST Port: {}",
294 tcpSrc.tcpPort().toInt(),
295 tcpDst.tcpPort().toInt());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900296
Jian Li85573f42018-06-27 22:29:14 +0900297 fBuilder.withSrcPort(tcpSrc.tcpPort());
298 fBuilder.withDstPort(tcpDst.tcpPort());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900299
Jian Li85573f42018-06-27 22:29:14 +0900300 } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
Jian Li0bbbb1c2018-06-22 22:01:17 +0900301
Jian Li85573f42018-06-27 22:29:14 +0900302 UdpPortCriterion udpSrc =
303 (UdpPortCriterion) selector.getCriterion(UDP_SRC);
304 UdpPortCriterion udpDst =
305 (UdpPortCriterion) selector.getCriterion(UDP_DST);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900306
Jian Li85573f42018-06-27 22:29:14 +0900307 log.debug("UDP SRC Port: {}, DST Port: {}",
308 udpSrc.udpPort().toInt(),
309 udpDst.udpPort().toInt());
310
311 fBuilder.withSrcPort(udpSrc.udpPort());
312 fBuilder.withDstPort(udpDst.udpPort());
313 } else {
314 log.debug("Other protocol: {}", ipProtocol.protocol());
315 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900316 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900317
318 fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
319 .withDstMac(getMacAddress(dstIp.ip().address()))
320 .withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
321 .withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
322 .withVlanId(getVlanId(srcIp.ip().address()))
323 .withDeviceId(entry.deviceId());
324
325 StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
326
327 // TODO: need to collect error and drop packets stats
328 // TODO: need to make the refresh interval configurable
Jian Lide4ef402018-06-27 19:21:14 +0900329 sBuilder.withStartupTime(System.currentTimeMillis())
330 .withFstPktArrTime(System.currentTimeMillis())
331 .withLstPktOffset(REFRESH_INTERVAL * MILLISECONDS)
Jian Li0bbbb1c2018-06-22 22:01:17 +0900332 .withCurrAccPkts((int) entry.packets())
333 .withCurrAccBytes(entry.bytes())
334 .withErrorPkts((short) 0)
Jian Lide4ef402018-06-27 19:21:14 +0900335 .withDropPkts((short) 0);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900336
337 fBuilder.withStatsInfo(sBuilder.build());
338
339 FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
340
341 flowInfos.add(flowInfo);
342
343 log.debug("FlowInfo: \n{}", flowInfo.toString());
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900344 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900345
346 return flowInfos;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900347 }
348
349 /**
Jian Li0bbbb1c2018-06-22 22:01:17 +0900350 * Merges old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900351 *
352 * @param flowInfo current FlowInfo object
353 * @param fBuilder Builder for FlowInfo
354 * @param sBuilder Builder for StatsInfo
355 * @return Merged FlowInfo object
356 */
357 private FlowInfo mergeFlowInfo(FlowInfo flowInfo,
358 FlowInfo.Builder fBuilder,
359 StatsInfo.Builder sBuilder) {
Jian Li0bbbb1c2018-06-22 22:01:17 +0900360 for (FlowInfo gFlowInfo : gFlowInfoSet) {
361 log.debug("Old FlowInfo:\n{}", gFlowInfo.toString());
362 if (gFlowInfo.roughEquals(flowInfo)) {
363
364 // Get old StatsInfo object and merge the value to current object.
365 StatsInfo oldStatsInfo = gFlowInfo.statsInfo();
366 sBuilder.withPrevAccPkts(oldStatsInfo.currAccPkts());
367 sBuilder.withPrevAccBytes(oldStatsInfo.currAccBytes());
368 FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build())
369 .build();
370
371 gFlowInfoSet.remove(gFlowInfo);
372 gFlowInfoSet.add(newFlowInfo);
Jian Li85573f42018-06-27 22:29:14 +0900373 log.debug("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900374 return newFlowInfo;
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900375 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900376 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900377
378 // No such record, then build the FlowInfo object and return this object.
Jian Li85573f42018-06-27 22:29:14 +0900379 log.debug("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900380 FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
381 gFlowInfoSet.add(newFlowInfo);
382 return newFlowInfo;
383 }
384
385 private void setStatFlowRule(StatsFlowRule statsFlowRule, boolean install) {
386 StatsFlowRule inverseFlowRule = DefaultStatsFlowRule.builder()
387 .srcIpPrefix(statsFlowRule.dstIpPrefix())
388 .dstIpPrefix(statsFlowRule.srcIpPrefix())
389 .ipProtocol(statsFlowRule.ipProtocol())
390 .srcTpPort(statsFlowRule.dstTpPort())
391 .dstTpPort(statsFlowRule.srcTpPort())
392 .build();
393
Jian Li85573f42018-06-27 22:29:14 +0900394 DeviceId srcDeviceId = getDeviceId(statsFlowRule.srcIpPrefix().address());
395 DeviceId dstDeviceId = getDeviceId(statsFlowRule.dstIpPrefix().address());
Jian Li0bbbb1c2018-06-22 22:01:17 +0900396
Jian Li998ec7b2018-06-29 15:15:49 +0900397 if (srcDeviceId == null && dstDeviceId == null) {
Jian Li85573f42018-06-27 22:29:14 +0900398 return;
399 }
400
Jian Li998ec7b2018-06-29 15:15:49 +0900401 if (srcDeviceId != null) {
402 connectTables(srcDeviceId, STAT_INBOUND_TABLE, DHCP_ARP_TABLE,
403 statsFlowRule, METRIC_PRIORITY_SOURCE, install);
404 }
405
406 if (dstDeviceId != null) {
407 connectTables(dstDeviceId, STAT_OUTBOUND_TABLE, FORWARDING_TABLE,
408 inverseFlowRule, METRIC_PRIORITY_TARGET, install);
409 }
Jian Li85573f42018-06-27 22:29:14 +0900410 }
411
412 /**
413 * Get Device ID which the VM is located.
414 *
415 * @param ipAddress IP Address of host
416 * @return Device ID
417 */
418 private DeviceId getDeviceId(IpAddress ipAddress) {
419 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
420 Optional<Host> host = hostService.getHostsByIp(ipAddress).stream().findAny();
421 return host.map(host1 -> host1.location().deviceId()).orElse(null);
422 } else {
Jian Li998ec7b2018-06-29 15:15:49 +0900423 log.warn("Failed to get DeviceID which is connected to {}. " +
424 "The destination is either a bare-metal or located out of DC",
Jian Li85573f42018-06-27 22:29:14 +0900425 ipAddress.toString());
426 return null;
Jian Li0bbbb1c2018-06-22 22:01:17 +0900427 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900428 }
429
430 /**
431 * Get VLAN ID with respect to IP Address.
432 *
433 * @param ipAddress IP Address of host
434 * @return VLAN ID
435 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900436 private VlanId getVlanId(IpAddress ipAddress) {
437 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
438 Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
439 return host.vlan();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900440 }
441 return VlanId.vlanId();
442 }
443
444 /**
445 * Get Interface ID of Switch which is connected to a host.
446 *
447 * @param ipAddress IP Address of host
448 * @return Interface ID of Switch
449 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900450 private int getInterfaceId(IpAddress ipAddress) {
451 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
452 Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
453 return (int) host.location().port().toLong();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900454 }
455 return -1;
456 }
457
458 /**
459 * Get MAC Address of host.
460 *
461 * @param ipAddress IP Address of host
462 * @return MAC Address of host
463 */
Jian Li0bbbb1c2018-06-22 22:01:17 +0900464 private MacAddress getMacAddress(IpAddress ipAddress) {
465 if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
466 Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
467 return host.mac();
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900468 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900469
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900470 return NO_HOST_MAC;
471 }
Jian Li0bbbb1c2018-06-22 22:01:17 +0900472
473 private class InternalTimerTask extends TimerTask {
474 @Override
475 public void run() {
476 log.debug("Timer Task Thread Starts ({})", loopCount++);
Jian Li85573f42018-06-27 22:29:14 +0900477
478 Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
479
480 // we only let the master controller of the device where the
481 // stats flow rules are installed send kafka message
482 getFlowInfo().forEach(f -> {
483 DeviceId deviceId = getDeviceId(f.srcIp().address());
484 if (mastershipService.isLocalMaster(deviceId)) {
485 filteredFlowInfos.add(f);
486 }
487 });
488
Jian Li0bbbb1c2018-06-22 22:01:17 +0900489 try {
Jian Li85573f42018-06-27 22:29:14 +0900490 telemetryService.publish(filteredFlowInfos);
Jian Li0bbbb1c2018-06-22 22:01:17 +0900491 } catch (Exception ex) {
492 log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
493 }
494 }
495 }
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900496}