Support collecting outbound vFlow stats for FLAT typed network
Change-Id: I359e2a39867126d76e4b7daae84aecf85e0f8c11
diff --git a/apps/openstacktelemetry/BUCK b/apps/openstacktelemetry/BUCK
index 8f90347..fd9d942 100644
--- a/apps/openstacktelemetry/BUCK
+++ b/apps/openstacktelemetry/BUCK
@@ -13,7 +13,7 @@
'//lib:converter-moshi',
'//lib:protobuf-java-3.2.0',
'//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
- "//lib:google-instrumentation-0.3.0",
+ '//lib:google-instrumentation-0.3.0',
'//apps/openstacktelemetry/api:onos-apps-openstacktelemetry-api',
'//apps/openstacktelemetry/app:onos-apps-openstacktelemetry-app',
]
diff --git a/apps/openstacktelemetry/api/BUCK b/apps/openstacktelemetry/api/BUCK
index e09f293..40a119b 100644
--- a/apps/openstacktelemetry/api/BUCK
+++ b/apps/openstacktelemetry/api/BUCK
@@ -1,9 +1,12 @@
+include_defs('//apps/openstacknode/openstack4j.bucklet')
+
COMPILE_DEPS = [
'//lib:CORE_DEPS',
'//lib:kafka-clients',
'//lib:jersey-client',
'//lib:javax.ws.rs-api',
'//lib:influxdb-java',
+ '//lib:openstack4j-core',
]
TEST_DEPS = [
@@ -14,5 +17,7 @@
osgi_jar_with_tests (
deps = COMPILE_DEPS,
- test_deps = TEST_DEPS
+ test_deps = TEST_DEPS,
+
+ import_packages = INCLUDE_PACKAGES + ',' + EXCLUDE_PACKAGES + ',' + ALL_PACKAGES,
)
diff --git a/apps/openstacktelemetry/api/pom.xml b/apps/openstacktelemetry/api/pom.xml
index e6b046a..4536492 100644
--- a/apps/openstacktelemetry/api/pom.xml
+++ b/apps/openstacktelemetry/api/pom.xml
@@ -38,12 +38,6 @@
</dependency>
<dependency>
- <groupId>org.apache.servicemix.bundles</groupId>
- <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
- <version>0.8.2.2_1</version>
- </dependency>
-
- <dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
</dependency>
@@ -56,8 +50,41 @@
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
- <version>2.2</version>
+ <version>${influxdb-java.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.pacesys</groupId>
+ <artifactId>openstack4j-core</artifactId>
+ <version>${openstack4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.servicemix.bundles</groupId>
+ <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+ <version>${kafka-client.version}</version>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Bundle-SymbolicName>
+ ${project.groupId}.${project.artifactId}
+ </Bundle-SymbolicName>
+ <Import-Package>
+ !org.openstack4j.*,
+ *
+ </Import-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/apps/openstacktelemetry/app/BUCK b/apps/openstacktelemetry/app/BUCK
index 97bc0f9..69c2c1e 100644
--- a/apps/openstacktelemetry/app/BUCK
+++ b/apps/openstacktelemetry/app/BUCK
@@ -1,3 +1,5 @@
+include_defs('//apps/openstacknode/openstack4j.bucklet')
+
GRPC_VER = '1.3.1'
COMPILE_DEPS = [
@@ -18,6 +20,14 @@
'//lib:GRPC_1.3',
'//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
'//lib:grpc-protobuf-lite-' + GRPC_VER,
+ '//lib:openstack4j-core',
+ '//lib:openstack4j-http-connector',
+ '//lib:openstack4j-httpclient',
+ '//lib:json-patch',
+ '//lib:jackson-coreutils',
+ '//lib:btf',
+ '//lib:msg-simple',
+ '//lib:snakeyaml',
]
TEST_DEPS = [
@@ -36,4 +46,8 @@
api_version = '1.0',
api_description = 'OpenStack Network Telemetry REST API',
api_package = 'org.onosproject.openstacktelemetry.web',
+
+ # dependency embedding
+ import_packages = INCLUDE_PACKAGES + ',' + EXCLUDE_PACKAGES + ',' + ALL_PACKAGES,
+ bundle_classpath = get_openstack4j_deps_path() + get_jackson_deps_path()
)
diff --git a/apps/openstacktelemetry/app/pom.xml b/apps/openstacktelemetry/app/pom.xml
index 7d81b2a..4573d45 100644
--- a/apps/openstacktelemetry/app/pom.xml
+++ b/apps/openstacktelemetry/app/pom.xml
@@ -145,6 +145,47 @@
</dependency>
<dependency>
+ <groupId>org.pacesys</groupId>
+ <artifactId>openstack4j-core</artifactId>
+ <version>${openstack4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.pacesys.openstack4j.connectors</groupId>
+ <artifactId>openstack4j-http-connector</artifactId>
+ <version>${openstack4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.pacesys.openstack4j.connectors</groupId>
+ <artifactId>openstack4j-httpclient</artifactId>
+ <version>${openstack4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>json-patch</artifactId>
+ <version>${json-patch.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>jackson-coreutils</artifactId>
+ <version>${jackson-coreutils.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>btf</artifactId>
+ <version>${btf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>msg-simple</artifactId>
+ <version>${msg-simple.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>${snakeyaml.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
<scope>test</scope>
@@ -236,8 +277,21 @@
${project.groupId}.${project.artifactId}
</Bundle-SymbolicName>
<Import-Package>
+ !org.apache.http.*,
+ !com.fasterxml.jackson.dataformat.*,
+ !javax.annotation,
*,org.glassfish.jersey.servlet
</Import-Package>
+ <Embed-Dependency>
+ openstack4j-core,
+ openstack4j-http-connector,
+ openstack4j-httpclient,
+ json-patch,
+ jackson-coreutils,
+ btf,
+ msg-simple,
+ snakeyaml
+ </Embed-Dependency>
<Web-ContextPath>${web.context}</Web-ContextPath>
</instructions>
</configuration>
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index e3a20cb..5453a27 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -20,10 +20,13 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onosproject.core.ApplicationId;
@@ -48,19 +51,27 @@
import org.onosproject.net.flow.criteria.TcpPortCriterion;
import org.onosproject.net.flow.criteria.UdpPortCriterion;
import org.onosproject.net.host.HostService;
+import org.onosproject.openstacknetworking.api.InstancePort;
+import org.onosproject.openstacknetworking.api.InstancePortService;
+import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.StatsFlowRule;
import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
import org.onosproject.openstacktelemetry.api.StatsInfo;
+import org.openstack4j.model.network.Network;
+import org.openstack4j.model.network.NetworkType;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Dictionary;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.packet.Ethernet.TYPE_IPV4;
import static org.onlab.packet.IPv4.PROTOCOL_TCP;
import static org.onlab.packet.IPv4.PROTOCOL_UDP;
@@ -71,11 +82,17 @@
import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_SRC;
import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_DST;
import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_SRC;
+import static org.onosproject.openstacknetworking.api.Constants.STAT_FLAT_OUTBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.STAT_INBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.VTAP_FLAT_OUTBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
+import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
+import static org.openstack4j.model.network.NetworkType.FLAT;
+import static org.openstack4j.model.network.NetworkType.VLAN;
+import static org.openstack4j.model.network.NetworkType.VXLAN;
/**
* Flow rule manager for network statistics of a VM.
@@ -91,7 +108,13 @@
private static final long MILLISECONDS = 1000L;
private static final long REFRESH_INTERVAL = 5L;
- private ApplicationId appId;
+ private static final String REVERSE_PATH_STATS = "reversePathStats";
+ private static final String EGRESS_STATS = "egressStats";
+
+ private static final boolean DEFAULT_REVERSE_PATH_STATS = false;
+ private static final boolean DEFAULT_EGRESS_STATS = false;
+
+ private static final String MAC_NOT_NULL = "MAC should not be null";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@@ -109,8 +132,25 @@
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OpenstackNetworkService osNetworkService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected InstancePortService instancePortService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackTelemetryService telemetryService;
+ @Property(name = REVERSE_PATH_STATS, boolValue = DEFAULT_REVERSE_PATH_STATS,
+ label = "A flag which indicates whether to install the rules for " +
+ "collecting the flow-based stats for reversed path.")
+ private boolean reversePathStats = DEFAULT_REVERSE_PATH_STATS;
+
+ @Property(name = EGRESS_STATS, boolValue = DEFAULT_EGRESS_STATS,
+ label = "A flag which indicates whether to install the rules for " +
+ "collecting the flow-based stats for egress port.")
+ private boolean egressStats = DEFAULT_EGRESS_STATS;
+
+ private ApplicationId appId;
private Timer timer;
private TimerTask task;
@@ -143,12 +183,19 @@
log.info("Stopped");
}
+ @Modified
+ protected void modified(ComponentContext context) {
+ readComponentConfiguration(context);
+
+ log.info("Modified");
+ }
+
@Override
public void start() {
log.info("Start publishing thread");
task = new InternalTimerTask();
timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL,
- MILLISECONDS * REFRESH_INTERVAL);
+ MILLISECONDS * REFRESH_INTERVAL);
}
@Override
@@ -164,8 +211,8 @@
setStatFlowRule(statsFlowRule, true);
log.info("Install stat flow rule for SrcIp:{} DstIp:{}",
- statsFlowRule.srcIpPrefix().toString(),
- statsFlowRule.dstIpPrefix().toString());
+ statsFlowRule.srcIpPrefix().toString(),
+ statsFlowRule.dstIpPrefix().toString());
}
@Override
@@ -176,8 +223,8 @@
setStatFlowRule(statsFlowRule, false);
log.info("Remove stat flow rule for SrcIp:{} DstIp:{}",
- statsFlowRule.srcIpPrefix().toString(),
- statsFlowRule.dstIpPrefix().toString());
+ statsFlowRule.srcIpPrefix().toString(),
+ statsFlowRule.dstIpPrefix().toString());
}
private void connectTables(DeviceId deviceId, int fromTable, int toTable,
@@ -191,22 +238,22 @@
byte protocol = statsFlowRule.ipProtocol();
TrafficSelector.Builder selectorBuilder =
- DefaultTrafficSelector.builder()
- .matchEthType(TYPE_IPV4)
- .matchIPSrc(statsFlowRule.srcIpPrefix())
- .matchIPDst(statsFlowRule.dstIpPrefix());
+ DefaultTrafficSelector.builder()
+ .matchEthType(TYPE_IPV4)
+ .matchIPSrc(statsFlowRule.srcIpPrefix())
+ .matchIPDst(statsFlowRule.dstIpPrefix());
if (protocol == PROTOCOL_TCP) {
selectorBuilder = selectorBuilder
- .matchIPProtocol(statsFlowRule.ipProtocol())
- .matchTcpSrc(statsFlowRule.srcTpPort())
- .matchTcpDst(statsFlowRule.dstTpPort());
+ .matchIPProtocol(statsFlowRule.ipProtocol())
+ .matchTcpSrc(statsFlowRule.srcTpPort())
+ .matchTcpDst(statsFlowRule.dstTpPort());
} else if (protocol == PROTOCOL_UDP) {
selectorBuilder = selectorBuilder
- .matchIPProtocol(statsFlowRule.ipProtocol())
- .matchUdpSrc(statsFlowRule.srcTpPort())
- .matchUdpDst(statsFlowRule.dstTpPort());
+ .matchIPProtocol(statsFlowRule.ipProtocol())
+ .matchUdpSrc(statsFlowRule.srcTpPort())
+ .matchUdpDst(statsFlowRule.dstTpPort());
} else {
log.warn("Unsupported protocol {}", statsFlowRule.ipProtocol());
}
@@ -216,14 +263,14 @@
treatmentBuilder.transition(toTable);
FlowRule flowRule = DefaultFlowRule.builder()
- .forDevice(deviceId)
- .withSelector(selectorBuilder.build())
- .withTreatment(treatmentBuilder.build())
- .withPriority(prefixLength)
- .fromApp(appId)
- .makePermanent()
- .forTable(fromTable)
- .build();
+ .forDevice(deviceId)
+ .withSelector(selectorBuilder.build())
+ .withTreatment(treatmentBuilder.build())
+ .withPriority(prefixLength)
+ .fromApp(appId)
+ .makePermanent()
+ .forTable(fromTable)
+ .build();
applyRule(flowRule, install);
}
@@ -268,14 +315,14 @@
IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
IPProtocolCriterion ipProtocol =
- (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
+ (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
- ((IndexTableId) entry.table()).id(),
- srcIp.ip().toString(),
- dstIp.ip().toString(),
- entry.packets(),
- entry.bytes());
+ ((IndexTableId) entry.table()).id(),
+ srcIp.ip().toString(),
+ dstIp.ip().toString(),
+ entry.packets(),
+ entry.bytes());
fBuilder.withFlowType(FLOW_TYPE_SONA)
.withSrcIp(srcIp.ip())
@@ -382,15 +429,36 @@
return newFlowInfo;
}
+ /**
+ * Installs flow rules for collecting both normal and reverse path flow stats.
+ *
+ * @param statsFlowRule flow rule used for collecting stats
+ * @param install flow rule installation flag
+ */
private void setStatFlowRule(StatsFlowRule statsFlowRule, boolean install) {
- StatsFlowRule inverseFlowRule = DefaultStatsFlowRule.builder()
- .srcIpPrefix(statsFlowRule.dstIpPrefix())
- .dstIpPrefix(statsFlowRule.srcIpPrefix())
- .ipProtocol(statsFlowRule.ipProtocol())
- .srcTpPort(statsFlowRule.dstTpPort())
- .dstTpPort(statsFlowRule.srcTpPort())
- .build();
+ setStatFlowRuleBase(statsFlowRule, install);
+ // if reverse path stats is enabled, we will install flow rules for
+ // collecting reverse path vFlow stats
+ if (reversePathStats) {
+ StatsFlowRule reverseFlowRule = DefaultStatsFlowRule.builder()
+ .srcIpPrefix(statsFlowRule.dstIpPrefix())
+ .dstIpPrefix(statsFlowRule.srcIpPrefix())
+ .ipProtocol(statsFlowRule.ipProtocol())
+ .srcTpPort(statsFlowRule.dstTpPort())
+ .dstTpPort(statsFlowRule.srcTpPort())
+ .build();
+ setStatFlowRuleBase(reverseFlowRule, install);
+ }
+ }
+
+ /**
+ * A base method which is for installing flow rules for collecting stats.
+ *
+ * @param statsFlowRule flow rule used for collecting stats
+ * @param install flow rule installation flag
+ */
+ private void setStatFlowRuleBase(StatsFlowRule statsFlowRule, boolean install) {
DeviceId srcDeviceId = getDeviceId(statsFlowRule.srcIpPrefix().address());
DeviceId dstDeviceId = getDeviceId(statsFlowRule.dstIpPrefix().address());
@@ -403,13 +471,43 @@
statsFlowRule, METRIC_PRIORITY_SOURCE, install);
}
- if (dstDeviceId != null) {
- connectTables(dstDeviceId, STAT_OUTBOUND_TABLE, VTAP_OUTBOUND_TABLE,
- inverseFlowRule, METRIC_PRIORITY_TARGET, install);
+ if (dstDeviceId != null && egressStats) {
+ NetworkType type = getNetworkType(statsFlowRule.dstIpPrefix());
+ if (type == VXLAN || type == VLAN) {
+ connectTables(dstDeviceId, STAT_OUTBOUND_TABLE, VTAP_OUTBOUND_TABLE,
+ statsFlowRule, METRIC_PRIORITY_TARGET, install);
+ } else if (type == FLAT) {
+ connectTables(dstDeviceId, STAT_FLAT_OUTBOUND_TABLE, VTAP_FLAT_OUTBOUND_TABLE,
+ statsFlowRule, METRIC_PRIORITY_TARGET, install);
+ }
}
}
/**
+ * Obtains the network type that is used by the VM which has the given IP.
+ *
+ * @param ipPrefix IP prefix
+ * @return network type
+ */
+ private NetworkType getNetworkType(IpPrefix ipPrefix) {
+
+ MacAddress mac = checkNotNull(getMacAddress(ipPrefix.address()), MAC_NOT_NULL);
+ InstancePort instPort = instancePortService.instancePort(mac);
+
+ if (instPort == null) {
+ return null;
+ }
+
+ Network network = osNetworkService.network(instPort.networkId());
+
+ if (network != null) {
+ return network.getNetworkType();
+ }
+
+ return null;
+ }
+
+ /**
* Get Device ID which the VM is located.
*
* @param ipAddress IP Address of host
@@ -470,10 +568,40 @@
return NO_HOST_MAC;
}
+ /**
+ * Extracts properties from the component configuration context.
+ *
+ * @param context the component context
+ */
+ private void readComponentConfiguration(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+
+ Boolean reversePathStatsConfigured =
+ getBooleanProperty(properties, REVERSE_PATH_STATS);
+ if (reversePathStatsConfigured == null) {
+ reversePathStats = DEFAULT_REVERSE_PATH_STATS;
+ log.info("Reversed path stats flag is NOT " +
+ "configured, default value is {}", reversePathStats);
+ } else {
+ reversePathStats = reversePathStatsConfigured;
+ log.info("Configured. Reversed path stats flag is {}", reversePathStats);
+ }
+
+ Boolean egressStatsConfigured = getBooleanProperty(properties, EGRESS_STATS);
+ if (egressStatsConfigured == null) {
+ egressStats = DEFAULT_EGRESS_STATS;
+ log.info("Egress stats flag is NOT " +
+ "configured, default value is {}", egressStats);
+ } else {
+ egressStats = egressStatsConfigured;
+ log.info("Configured. Egress stats flag is {}", egressStats);
+ }
+ }
+
private class InternalTimerTask extends TimerTask {
@Override
public void run() {
- log.debug("Timer Task Thread Starts ({})", loopCount++);
+ log.debug("Timer task thread starts ({})", loopCount++);
Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
diff --git a/apps/openstacktelemetry/pom.xml b/apps/openstacktelemetry/pom.xml
index 3cae090..f6372ad 100644
--- a/apps/openstacktelemetry/pom.xml
+++ b/apps/openstacktelemetry/pom.xml
@@ -30,6 +30,17 @@
<description>SONA Openstack Telemetry Application</description>
+ <properties>
+ <openstack4j.version>3.1.0</openstack4j.version>
+ <influxdb-java.version>2.2</influxdb-java.version>
+ <kafka-client.version>0.8.2.2_1</kafka-client.version>
+ <json-patch.version>1.9</json-patch.version>
+ <jackson-coreutils.version>1.6</jackson-coreutils.version>
+ <btf.version>1.2</btf.version>
+ <msg-simple.version>1.1</msg-simple.version>
+ <snakeyaml.version>1.15</snakeyaml.version>
+ </properties>
+
<modules>
<module>api</module>
<module>app</module>