Refactor OpenstackTelemetry App for better readability
Change-Id: I93353de31fb9671d8670ee44fc248fe7f36ac12b
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
index 74a5016..3f56362 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
@@ -141,6 +141,16 @@
}
@Override
+ public boolean roughEquals(FlowInfo flowInfo) {
+ return deviceId.equals(flowInfo.deviceId()) &&
+ srcIp.equals(flowInfo.srcIp()) &&
+ dstIp.equals(flowInfo.dstIp()) &&
+ srcPort.equals(flowInfo.srcPort()) &&
+ dstPort.equals(flowInfo.dstPort()) &&
+ (protocol == flowInfo.protocol());
+ }
+
+ @Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index 4629a4e..da492a9 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -21,7 +21,6 @@
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.onosproject.openstacktelemetry.api.ByteBufferCodec;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.GrpcTelemetryService;
import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryService;
@@ -29,12 +28,13 @@
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.RestTelemetryService;
import org.onosproject.openstacktelemetry.api.TelemetryService;
-import org.onosproject.openstacktelemetry.codec.TinaFlowInfoByteBufferCodec;
+import org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Set;
/**
* Openstack telemetry manager.
@@ -45,6 +45,9 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String KAFKA_TOPIC = "sona.flow";
+ private static final String KAFKA_KEY = "flowdata";
+
private List<TelemetryService> telemetryServices = Lists.newArrayList();
@Activate
@@ -68,45 +71,42 @@
}
@Override
- public void publish(FlowInfo flowInfo) {
+ public void publish(Set<FlowInfo> flowInfos) {
telemetryServices.forEach(service -> {
-
if (service instanceof GrpcTelemetryManager) {
- invokeGrpcPublisher((GrpcTelemetryService) service, flowInfo);
+ invokeGrpcPublisher((GrpcTelemetryService) service, flowInfos);
}
if (service instanceof InfluxDbTelemetryManager) {
- invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfo);
+ invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfos);
}
if (service instanceof KafkaTelemetryManager) {
- invokeKafkaPublisher((KafkaTelemetryService) service, flowInfo);
+ invokeKafkaPublisher((KafkaTelemetryService) service, flowInfos);
}
if (service instanceof RestTelemetryManager) {
- invokeRestPublisher((RestTelemetryService) service, flowInfo);
+ invokeRestPublisher((RestTelemetryService) service, flowInfos);
}
-
});
}
- private void invokeGrpcPublisher(GrpcTelemetryService service, FlowInfo flowInfo) {
+ private void invokeGrpcPublisher(GrpcTelemetryService service, Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
- private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, FlowInfo flowInfo) {
+ private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
- private void invokeKafkaPublisher(KafkaTelemetryService service, FlowInfo flowInfo) {
- ByteBufferCodec codec = new TinaFlowInfoByteBufferCodec();
- ByteBuffer buffer = codec.encode(flowInfo);
- service.publish(new ProducerRecord<>("sona.flow", "flowdata", buffer.array()));
+ private void invokeKafkaPublisher(KafkaTelemetryService service, Set<FlowInfo> flowInfos) {
+ TinaMessageByteBufferCodec codec = new TinaMessageByteBufferCodec();
+ ByteBuffer buffer = codec.encode(flowInfos);
+ service.publish(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY, buffer.array()));
}
- private void invokeRestPublisher(RestTelemetryService service, FlowInfo flowInfo) {
+ private void invokeRestPublisher(RestTelemetryService service, Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
-
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index c0646ad..e2f505c 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -22,9 +23,6 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.osgi.DefaultServiceDirectory;
-import org.onlab.packet.Ethernet;
-import org.onlab.packet.IPv4;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
@@ -45,27 +43,39 @@
import org.onosproject.net.flow.IndexTableId;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.flow.criteria.Criterion;
import org.onosproject.net.flow.criteria.IPCriterion;
import org.onosproject.net.flow.criteria.IPProtocolCriterion;
import org.onosproject.net.flow.criteria.TcpPortCriterion;
import org.onosproject.net.flow.criteria.UdpPortCriterion;
import org.onosproject.net.host.HostService;
import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.StatsFlowRule;
import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
import org.onosproject.openstacktelemetry.api.StatsInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import static org.onlab.packet.Ethernet.TYPE_IPV4;
+import static org.onlab.packet.IPv4.PROTOCOL_TCP;
+import static org.onlab.packet.IPv4.PROTOCOL_UDP;
+import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
+import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_SRC;
+import static org.onosproject.net.flow.criteria.Criterion.Type.IP_PROTO;
+import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_DST;
+import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_SRC;
+import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_DST;
+import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_SRC;
+import static org.onosproject.openstacknetworking.api.Constants.DHCP_ARP_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.FORWARDING_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.STAT_INBOUND_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TABLE;
import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
-
/**
* Flow rule manager for network statistics of a VM.
*/
@@ -77,7 +87,7 @@
private static final byte FLOW_TYPE_SONA = 1; // VLAN
- public static final int MILLISECONDS = 1000;
+ private static final int MILLISECONDS = 1000;
private static final int REFRESH_INTERVAL = 5;
private ApplicationId appId;
@@ -91,11 +101,16 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OpenstackTelemetryService telemetryService;
+
private Timer timer;
private TimerTask task;
- private OpenstackTelemetryManager osTelemetryManager;
- Set<FlowInfo> gFlowInfoSet = new HashSet<>();
+ private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
private int loopCount = 0;
private static final int SOURCE_ID = 1;
@@ -104,51 +119,29 @@
private static final int METRIC_PRIORITY_SOURCE = SOURCE_ID * PRIORITY_BASE;
private static final int METRIC_PRIORITY_TARGET = TARGET_ID * PRIORITY_BASE;
- public static final int FLOW_TABLE_VM_SOURCE = 0; // STAT_INBOUND_TABLE
- public static final int FLOW_TABLE_DHCP_ARP = 1; // DHCP_ARP_TABLE
- public static final int FLOW_TABLE_VM_TARGET = 49; // STAT_OUTBOUND_TABLE
- public static final int FLOW_TABLE_FORWARDING = 50; // FORWARDING_TABLE
-
- static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
+ private static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
public StatsFlowRuleManager() {
- log.info("Object is instantiated");
this.timer = new Timer("openstack-telemetry-sender");
}
@Activate
protected void activate() {
appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
- log.info("Application is activated");
- osTelemetryManager = new OpenstackTelemetryManager();
+
this.start();
+
+ log.info("Started");
}
@Deactivate
protected void deactivate() {
- log.info("Application is deactivated");
- }
-
- private class InternalTimerTask extends TimerTask {
- @Override
- public void run() {
- log.debug("Timger Task Thread Starts ({})", loopCount++);
- try {
- Set<FlowInfo> flowInfoSet = getFlowRule();
- for (FlowInfo flowInfo: flowInfoSet) {
- log.info("Publish FlowInfo to NMS: {}", flowInfo.toString());
- osTelemetryManager.publish(flowInfo);
- }
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
- }
- }
+ log.info("Stopped");
}
@Override
public void start() {
log.info("Start publishing thread");
- Set<FlowInfo> gFlowInfoSet = getFlowRule();
task = new InternalTimerTask();
timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL,
MILLISECONDS * REFRESH_INTERVAL);
@@ -161,65 +154,86 @@
task = null;
}
- public void connectTables(
- DeviceId deviceId,
- int fromTable,
- int toTable,
- StatsFlowRule statsFlowRule,
- int rulePriority,
- boolean installFlag) {
- try {
- log.debug("Table Transition: {} -> {}", fromTable, toTable);
- int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
- int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
- int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
+ @Override
+ public void createStatFlowRule(StatsFlowRule statsFlowRule) {
- TrafficSelector.Builder selector;
- if (statsFlowRule == null) {
- selector = DefaultTrafficSelector.builder();
- } else {
- selector = DefaultTrafficSelector.builder()
- .matchEthType(Ethernet.TYPE_IPV4)
- .matchIPSrc(statsFlowRule.srcIpPrefix())
- .matchIPDst(statsFlowRule.dstIpPrefix());
- if (statsFlowRule.ipProtocol() == IPv4.PROTOCOL_TCP) {
- selector = selector.matchIPProtocol(statsFlowRule.ipProtocol())
- .matchTcpSrc(statsFlowRule.srcTpPort())
- .matchTcpDst(statsFlowRule.dstTpPort());
- } else if (statsFlowRule.ipProtocol() == IPv4.PROTOCOL_UDP) {
- selector = selector.matchIPProtocol(statsFlowRule.ipProtocol())
- .matchUdpSrc(statsFlowRule.srcTpPort())
- .matchUdpDst(statsFlowRule.dstTpPort());
- }
- }
+ setStatFlowRule(statsFlowRule, true);
- TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
- treatment.transition(toTable);
- FlowRule flowRule = DefaultFlowRule.builder()
- .forDevice(deviceId)
- .withSelector(selector.build())
- .withTreatment(treatment.build())
- .withPriority(prefixLength)
- .fromApp(appId)
- .makePermanent()
- .forTable(fromTable)
- .build();
- applyRule(flowRule, installFlag);
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+ log.info("Install stat flow rule for SrcIp:{} DstIp:{}",
+ statsFlowRule.srcIpPrefix().toString(),
+ statsFlowRule.dstIpPrefix().toString());
+ }
+
+ @Override
+ public void deleteStatFlowRule(StatsFlowRule statsFlowRule) {
+ // FIXME: following code might not be necessary
+ flowRuleService.removeFlowRulesById(appId);
+
+ setStatFlowRule(statsFlowRule, false);
+
+ log.info("Remove stat flow rule for SrcIp:{} DstIp:{}",
+ statsFlowRule.srcIpPrefix().toString(),
+ statsFlowRule.dstIpPrefix().toString());
+ }
+
+ private void connectTables(DeviceId deviceId, int fromTable, int toTable,
+ StatsFlowRule statsFlowRule, int rulePriority,
+ boolean install) {
+
+ log.debug("Table Transition: {} -> {}", fromTable, toTable);
+ int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
+ int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
+ int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
+ byte protocol = statsFlowRule.ipProtocol();
+
+ TrafficSelector.Builder selectorBuilder =
+ DefaultTrafficSelector.builder()
+ .matchEthType(TYPE_IPV4)
+ .matchIPSrc(statsFlowRule.srcIpPrefix())
+ .matchIPDst(statsFlowRule.dstIpPrefix());
+
+ if (protocol == PROTOCOL_TCP) {
+ selectorBuilder = selectorBuilder
+ .matchIPProtocol(statsFlowRule.ipProtocol())
+ .matchTcpSrc(statsFlowRule.srcTpPort())
+ .matchTcpDst(statsFlowRule.dstTpPort());
+
+ } else if (protocol == PROTOCOL_UDP) {
+ selectorBuilder = selectorBuilder
+ .matchIPProtocol(statsFlowRule.ipProtocol())
+ .matchUdpSrc(statsFlowRule.srcTpPort())
+ .matchUdpDst(statsFlowRule.dstTpPort());
+ } else {
+ log.warn("Unsupported protocol {}", statsFlowRule.ipProtocol());
}
+
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+
+ treatmentBuilder.transition(toTable);
+
+ FlowRule flowRule = DefaultFlowRule.builder()
+ .forDevice(deviceId)
+ .withSelector(selectorBuilder.build())
+ .withTreatment(treatmentBuilder.build())
+ .withPriority(prefixLength)
+ .fromApp(appId)
+ .makePermanent()
+ .forTable(fromTable)
+ .build();
+
+ applyRule(flowRule, install);
}
/**
- * Apply FlowRule to switch.
+ * Installs stats related flow rule to switch.
*
- * @param flowRule FlowRule
- * @param install Flag to install or not
+ * @param flowRule flow rule
+ * @param install flag to install or not
*/
private void applyRule(FlowRule flowRule, boolean install) {
- log.debug("Apply flow rule to bridge device");
FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
- flowOpsBuilder = install ? flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
+ flowOpsBuilder = install ?
+ flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
@Override
@@ -235,184 +249,97 @@
}
/**
- * Craete a flow rule.
+ * Gets a set of the flow infos.
*
- * @param flowRule flow rule for Openstack VMs
+ * @return a set of flow infos
*/
- @Override
- public void createFlowRule(StatsFlowRule flowRule) {
- try {
- log.debug("Create Flow Rule. SrcIp:{} DstIp:{}",
- flowRule.srcIpPrefix().toString(),
- flowRule.dstIpPrefix().toString());
+ public Set<FlowInfo> getFlowInfo() {
+ Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
- // To make a inversed flow rule.
- DefaultStatsFlowRule.Builder inverseFlowRuleBuilder
- = DefaultStatsFlowRule
- .builder()
- .srcIpPrefix(flowRule.dstIpPrefix())
- .dstIpPrefix(flowRule.srcIpPrefix())
- .ipProtocol(flowRule.ipProtocol())
- .srcTpPort(flowRule.dstTpPort())
- .dstTpPort(flowRule.srcTpPort());
- StatsFlowRule inverseFlowRule = inverseFlowRuleBuilder.build();
- DeviceService deviceService = DefaultServiceDirectory.getService(DeviceService.class);
- Iterable<Device> devices = deviceService.getDevices();
- for (Device d : devices) {
- log.debug("Device: {}", d.toString());
- if (d.type() == Device.Type.CONTROLLER) {
- log.info("Don't create flow rule for 'DeviceType=CONTROLLER' ({})",
- d.id().toString());
- continue;
- }
- connectTables(d.id(), FLOW_TABLE_VM_SOURCE, FLOW_TABLE_DHCP_ARP,
- flowRule, METRIC_PRIORITY_SOURCE, true);
- connectTables(d.id(), FLOW_TABLE_VM_TARGET, FLOW_TABLE_FORWARDING,
- inverseFlowRule, METRIC_PRIORITY_TARGET, true);
+ // obtain all flow rule entries installed by telemetry app
+ for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
+ FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
+ TrafficSelector selector = entry.selector();
+
+ IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
+ IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
+ IPProtocolCriterion ipProtocol =
+ (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
+
+ log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
+ ((IndexTableId) entry.table()).id(),
+ srcIp.ip().toString(),
+ dstIp.ip().toString(),
+ entry.packets(),
+ entry.bytes());
+
+ fBuilder.withFlowType(FLOW_TYPE_SONA)
+ .withSrcIp(srcIp.ip())
+ .withDstIp(dstIp.ip())
+ .withProtocol((byte) ipProtocol.protocol());
+
+ if (ipProtocol.protocol() == PROTOCOL_TCP) {
+ TcpPortCriterion tcpSrc =
+ (TcpPortCriterion) selector.getCriterion(TCP_SRC);
+ TcpPortCriterion tcpDst =
+ (TcpPortCriterion) selector.getCriterion(TCP_DST);
+
+ log.debug("TCP SRC Port: {}, DST Port: {}",
+ tcpSrc.tcpPort().toInt(),
+ tcpDst.tcpPort().toInt());
+
+ fBuilder.withSrcPort(tcpSrc.tcpPort());
+ fBuilder.withDstPort(tcpDst.tcpPort());
+
+ } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
+
+ UdpPortCriterion udpSrc =
+ (UdpPortCriterion) selector.getCriterion(UDP_SRC);
+ UdpPortCriterion udpDst =
+ (UdpPortCriterion) selector.getCriterion(UDP_DST);
+
+ log.debug("UDP SRC Port: {}, DST Port: {}",
+ udpSrc.udpPort().toInt(),
+ udpDst.udpPort().toInt());
+
+ fBuilder.withSrcPort(udpSrc.udpPort());
+ fBuilder.withDstPort(udpDst.udpPort());
+ } else {
+ log.debug("Other protocol: {}", ipProtocol.protocol());
}
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+
+ fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
+ .withDstMac(getMacAddress(dstIp.ip().address()))
+ .withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
+ .withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
+ .withVlanId(getVlanId(srcIp.ip().address()))
+ .withDeviceId(entry.deviceId());
+
+ StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
+
+ // TODO: need to collect error and drop packets stats
+ // TODO: need to make the refresh interval configurable
+ sBuilder.withStartupTime(0)
+ .withCurrAccPkts((int) entry.packets())
+ .withCurrAccBytes(entry.bytes())
+ .withErrorPkts((short) 0)
+ .withDropPkts((short) 0)
+ .withLstPktOffset(REFRESH_INTERVAL * MILLISECONDS);
+
+ fBuilder.withStatsInfo(sBuilder.build());
+
+ FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
+
+ flowInfos.add(flowInfo);
+
+ log.debug("FlowInfo: \n{}", flowInfo.toString());
}
+
+ return flowInfos;
}
/**
- * Get FlowRule.
- *
- * @param flowRule Flow rule for a VM
- * @return Set of FlowInfo
- */
- public Set<FlowInfo> getFlowRule(StatsFlowRule flowRule) {
- Set<FlowInfo> flowInfoSet = new HashSet<>();
- log.info("Get flow rule: {}", flowRule.toString());
- // TODO Make a implementation here.
- return flowInfoSet;
- }
-
- /**
- * Delete FlowRule for StatsInfo.
- *
- * @param flowRule Flow rule for Openstack VM
- */
- @Override
- public void deleteFlowRule(StatsFlowRule flowRule) {
- log.debug("Delete Flow Rule: {}", flowRule.toString());
- flowRuleService = DefaultServiceDirectory.getService(FlowRuleService.class);
- flowRuleService.removeFlowRulesById(appId);
- // TODO Write a implementation code here
-
- try {
- log.debug("Delete Flow Rule. SrcIp:{} DstIp:{}",
- flowRule.srcIpPrefix().toString(),
- flowRule.dstIpPrefix().toString());
-
- // To make a inversed flow rule.
- DefaultStatsFlowRule.Builder inverseFlowRuleBuilder
- = DefaultStatsFlowRule
- .builder()
- .srcIpPrefix(flowRule.dstIpPrefix())
- .dstIpPrefix(flowRule.srcIpPrefix())
- .ipProtocol(flowRule.ipProtocol())
- .srcTpPort(flowRule.dstTpPort())
- .dstTpPort(flowRule.srcTpPort());
- StatsFlowRule inverseFlowRule = inverseFlowRuleBuilder.build();
- DeviceService deviceService = DefaultServiceDirectory.getService(DeviceService.class);
- Iterable<Device> devices = deviceService.getDevices();
- for (Device d : devices) {
- log.debug("Device: {}", d.toString());
- if (d.type() == Device.Type.CONTROLLER) {
- log.info("Don't care for 'DeviceType=CONTROLLER' ({})",
- d.id().toString());
- continue;
- }
- connectTables(d.id(), FLOW_TABLE_VM_SOURCE, FLOW_TABLE_DHCP_ARP,
- flowRule, METRIC_PRIORITY_SOURCE, false);
- connectTables(d.id(), FLOW_TABLE_VM_TARGET, FLOW_TABLE_FORWARDING,
- inverseFlowRule, METRIC_PRIORITY_TARGET, false);
- }
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
- }
- }
-
- /**
- * Get a list of the FlowRule Store.
- *
- * @return list of Flow Rule
- */
- public Set<FlowInfo> getFlowRule() {
- log.debug("Get Flow Information List");
- Set<FlowInfo> flowInfoSet = new HashSet<>();
- try {
- flowRuleService = DefaultServiceDirectory.getService(FlowRuleService.class);
- Iterable<FlowEntry> flowEntries = flowRuleService.getFlowEntriesById(appId);
-
- for (FlowEntry entry : flowEntries) {
- FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
- IPCriterion srcIpCriterion =
- (IPCriterion) entry.selector().getCriterion(Criterion.Type.IPV4_SRC);
- IPCriterion dstIpCriterion =
- (IPCriterion) entry.selector().getCriterion(Criterion.Type.IPV4_DST);
- IPProtocolCriterion ipProtocolCriterion =
- (IPProtocolCriterion) entry.selector().getCriterion(Criterion.Type.IP_PROTO);
-
- log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
- ((IndexTableId) entry.table()).id(),
- srcIpCriterion.ip().toString(), dstIpCriterion.ip().toString(),
- entry.packets(), entry.bytes());
-
- fBuilder.withFlowType(FLOW_TYPE_SONA).withSrcIp(srcIpCriterion.ip())
- .withDstIp(dstIpCriterion.ip())
- .withProtocol((byte) ipProtocolCriterion.protocol());
-
- if (ipProtocolCriterion.protocol() == IPv4.PROTOCOL_TCP) {
- TcpPortCriterion tcpSrcCriterion =
- (TcpPortCriterion) entry.selector().getCriterion(Criterion.Type.TCP_SRC);
- TcpPortCriterion tcpDstCriterion =
- (TcpPortCriterion) entry.selector().getCriterion(Criterion.Type.TCP_DST);
- log.debug("TCP SRC Port: {} Dst Port: {}",
- tcpSrcCriterion.tcpPort().toInt(), tcpDstCriterion.tcpPort().toInt());
- fBuilder.withSrcPort(tcpSrcCriterion.tcpPort());
- fBuilder.withDstPort(tcpDstCriterion.tcpPort());
- } else if (ipProtocolCriterion.protocol() == IPv4.PROTOCOL_UDP) {
- UdpPortCriterion udpSrcCriterion =
- (UdpPortCriterion) entry.selector().getCriterion(Criterion.Type.UDP_SRC);
- UdpPortCriterion udpDstCriterion =
- (UdpPortCriterion) entry.selector().getCriterion(Criterion.Type.UDP_DST);
- log.debug("UDP SRC Port: {} Dst Port: {}",
- udpSrcCriterion.udpPort().toInt(), udpDstCriterion.udpPort().toInt());
- fBuilder.withSrcPort(udpSrcCriterion.udpPort());
- fBuilder.withDstPort(udpDstCriterion.udpPort());
- } else {
- log.debug("Other protocol: {}", ipProtocolCriterion.protocol());
- }
-
- fBuilder.withSrcMac(getMacAddress(srcIpCriterion.ip().address()))
- .withDstMac(getMacAddress(dstIpCriterion.ip().address()))
- .withInputInterfaceId(getInterfaceId(srcIpCriterion.ip().address()))
- .withOutputInterfaceId(getInterfaceId(dstIpCriterion.ip().address()))
- .withVlanId(getVlanId(srcIpCriterion.ip().address()))
- .withDeviceId(entry.deviceId());
-
- StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
- sBuilder.withStartupTime(0)
- .withCurrAccPkts((int) entry.packets()).withCurrAccBytes(entry.bytes())
- .withErrorPkts((short) 0).withDropPkts((short) 0)
- .withLstPktOffset(REFRESH_INTERVAL * MILLISECONDS);
-
- fBuilder.withStatsInfo(sBuilder.build());
-
- FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
- flowInfoSet.add(flowInfo);
- log.debug("FlowInfo: \n{}", flowInfo.toString());
- }
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
- }
- return flowInfoSet;
- }
-
- /**
- * Merge old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
+ * Merges old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
*
* @param flowInfo current FlowInfo object
* @param fBuilder Builder for FlowInfo
@@ -422,39 +349,53 @@
private FlowInfo mergeFlowInfo(FlowInfo flowInfo,
FlowInfo.Builder fBuilder,
StatsInfo.Builder sBuilder) {
- try {
- log.debug("Current FlowInfo:\n{}", flowInfo.toString());
- for (FlowInfo gFlowInfo: gFlowInfoSet) {
- log.debug("Old FlowInfo:\n{}", gFlowInfo.toString());
- if (gFlowInfo.deviceId().equals(flowInfo.deviceId()) &&
- gFlowInfo.srcIp().equals(flowInfo.srcIp()) &&
- gFlowInfo.dstIp().equals(flowInfo.dstIp()) &&
- gFlowInfo.srcPort().equals(flowInfo.srcPort()) &&
- gFlowInfo.dstPort().equals(flowInfo.dstPort()) &&
- (gFlowInfo.protocol() == flowInfo.protocol())
- ) {
- // Get old StatsInfo object and merge the value to current object.
- StatsInfo oldStatsInfo = gFlowInfo.statsInfo();
- sBuilder.withPrevAccPkts(oldStatsInfo.currAccPkts());
- sBuilder.withPrevAccBytes(oldStatsInfo.currAccBytes());
- FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
- gFlowInfoSet.remove(gFlowInfo);
- gFlowInfoSet.add(newFlowInfo);
- log.info("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
- return newFlowInfo;
- }
+ for (FlowInfo gFlowInfo : gFlowInfoSet) {
+ log.debug("Old FlowInfo:\n{}", gFlowInfo.toString());
+ if (gFlowInfo.roughEquals(flowInfo)) {
+
+ // Get old StatsInfo object and merge the value to current object.
+ StatsInfo oldStatsInfo = gFlowInfo.statsInfo();
+ sBuilder.withPrevAccPkts(oldStatsInfo.currAccPkts());
+ sBuilder.withPrevAccBytes(oldStatsInfo.currAccBytes());
+ FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build())
+ .build();
+
+ gFlowInfoSet.remove(gFlowInfo);
+ gFlowInfoSet.add(newFlowInfo);
+ log.info("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
+ return newFlowInfo;
}
- // No such record, then build the FlowInfo object and return this object.
- log.info("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
- FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
- gFlowInfoSet.add(newFlowInfo);
- return newFlowInfo;
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
}
- log.debug("Add this FlowInfo {}", flowInfo.toString());
- gFlowInfoSet.add(flowInfo);
- return flowInfo;
+
+ // No such record, then build the FlowInfo object and return this object.
+ log.info("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
+ FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
+ gFlowInfoSet.add(newFlowInfo);
+ return newFlowInfo;
+ }
+
+ private void setStatFlowRule(StatsFlowRule statsFlowRule, boolean install) {
+ StatsFlowRule inverseFlowRule = DefaultStatsFlowRule.builder()
+ .srcIpPrefix(statsFlowRule.dstIpPrefix())
+ .dstIpPrefix(statsFlowRule.srcIpPrefix())
+ .ipProtocol(statsFlowRule.ipProtocol())
+ .srcTpPort(statsFlowRule.dstTpPort())
+ .dstTpPort(statsFlowRule.srcTpPort())
+ .build();
+
+ // FIXME: install stat flow rules for all devices for now
+ // need to query the device where the host with the given IP located
+ for (Device d : deviceService.getDevices()) {
+ if (d.type() == Device.Type.CONTROLLER) {
+ log.info("Not provide stats for 'CONTROLLER' ({})", d.id().toString());
+ continue;
+ }
+
+ connectTables(d.id(), STAT_INBOUND_TABLE, DHCP_ARP_TABLE,
+ statsFlowRule, METRIC_PRIORITY_SOURCE, install);
+ connectTables(d.id(), STAT_OUTBOUND_TABLE, FORWARDING_TABLE,
+ inverseFlowRule, METRIC_PRIORITY_TARGET, install);
+ }
}
/**
@@ -463,14 +404,10 @@
* @param ipAddress IP Address of host
* @return VLAN ID
*/
- public VlanId getVlanId(IpAddress ipAddress) {
- try {
- if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
- Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
- return host.vlan();
- }
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+ private VlanId getVlanId(IpAddress ipAddress) {
+ if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+ Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
+ return host.vlan();
}
return VlanId.vlanId();
}
@@ -481,14 +418,10 @@
* @param ipAddress IP Address of host
* @return Interface ID of Switch
*/
- public int getInterfaceId(IpAddress ipAddress) {
- try {
- if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
- Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
- return (int) host.location().port().toLong();
- }
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+ private int getInterfaceId(IpAddress ipAddress) {
+ if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+ Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
+ return (int) host.location().port().toLong();
}
return -1;
}
@@ -499,15 +432,24 @@
* @param ipAddress IP Address of host
* @return MAC Address of host
*/
- public MacAddress getMacAddress(IpAddress ipAddress) {
- try {
- if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
- Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
- return host.mac();
- }
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+ private MacAddress getMacAddress(IpAddress ipAddress) {
+ if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+ Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
+ return host.mac();
}
+
return NO_HOST_MAC;
}
+
+ private class InternalTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ log.debug("Timer Task Thread Starts ({})", loopCount++);
+ try {
+ telemetryService.publish(getFlowInfo());
+ } catch (Exception ex) {
+ log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+ }
+ }
+ }
}