Support collecting outbound vFlow stats for FLAT typed network

Change-Id: I359e2a39867126d76e4b7daae84aecf85e0f8c11
diff --git a/apps/openstacknetworking/api/BUCK b/apps/openstacknetworking/api/BUCK
index b502204..cbc9b22 100644
--- a/apps/openstacknetworking/api/BUCK
+++ b/apps/openstacknetworking/api/BUCK
@@ -1,4 +1,4 @@
-include_defs('//apps/openstacknetworking/openstack4j.bucklet')
+include_defs('//apps/openstacknode/openstack4j.bucklet')
 
 COMPILE_DEPS = [
     '//lib:CORE_DEPS',
diff --git a/apps/openstacknetworking/app/BUCK b/apps/openstacknetworking/app/BUCK
index 7167236..63b9477 100644
--- a/apps/openstacknetworking/app/BUCK
+++ b/apps/openstacknetworking/app/BUCK
@@ -1,4 +1,4 @@
-include_defs('//apps/openstacknetworking/openstack4j.bucklet')
+include_defs('//apps/openstacknode/openstack4j.bucklet')
 
 COMPILE_DEPS = [
     '//lib:CORE_DEPS',
diff --git a/apps/openstacknetworking/openstack4j.bucklet b/apps/openstacknetworking/openstack4j.bucklet
deleted file mode 100644
index 6de5d52..0000000
--- a/apps/openstacknetworking/openstack4j.bucklet
+++ /dev/null
@@ -1,41 +0,0 @@
-INCLUDE_PACKAGES = 'com.google.common.net,com.google.common.io,com.fasterxml.jackson.annotation'
-EXCLUDE_PACKAGES = '!org.openstack4j,!org.openstack4j.*'
-ALL_PACKAGES = '*'
-
-def get_openstack4j_deps_path():
-
-    WEB_INF_PATH = 'WEB-INF/classes/deps/'
-    OPENSTACK4J_DEPS = [
-        'openstack4j-core',
-        'openstack4j-http-connector',
-        'openstack4j-httpclient',
-    ]
-    OPENSTACK4J_VER = '3.1.0'
-
-    openstack_deps_path = ''
-
-    for dep in OPENSTACK4J_DEPS:
-        name = dep + '-' + OPENSTACK4J_VER + '.jar'
-        path = WEB_INF_PATH + name
-        openstack_deps_path = openstack_deps_path + path + ','
-
-    return openstack_deps_path
-
-def get_jackson_deps_path():
-
-    WEB_INF_PATH = 'WEB-INF/classes/deps/'
-    JACKSON_DEPS_WITH_VER = [
-        'json-patch-1.9.jar',
-        'jackson-coreutils-1.6.jar',
-        'msg-simple-1.1.jar',
-        'btf-1.2.jar',
-        'snakeyaml-1.15.jar'
-    ]
-
-    jackson_deps_path = ''
-
-    for dep in JACKSON_DEPS_WITH_VER:
-        path = WEB_INF_PATH + dep
-        jackson_deps_path = jackson_deps_path + path + ','
-
-    return jackson_deps_path
\ No newline at end of file
diff --git a/apps/openstacknode/api/BUCK b/apps/openstacknode/api/BUCK
index 4f335f3..02cd049 100644
--- a/apps/openstacknode/api/BUCK
+++ b/apps/openstacknode/api/BUCK
@@ -1,4 +1,4 @@
-include_defs('//apps/openstacknetworking/openstack4j.bucklet')
+include_defs('//apps/openstacknode/openstack4j.bucklet')
 
 COMPILE_DEPS = [
     '//lib:CORE_DEPS',
diff --git a/apps/openstacknode/openstack4j.bucklet b/apps/openstacknode/openstack4j.bucklet
index 6de5d52..d9e16f4 100644
--- a/apps/openstacknode/openstack4j.bucklet
+++ b/apps/openstacknode/openstack4j.bucklet
@@ -1,5 +1,5 @@
 INCLUDE_PACKAGES = 'com.google.common.net,com.google.common.io,com.fasterxml.jackson.annotation'
-EXCLUDE_PACKAGES = '!org.openstack4j,!org.openstack4j.*'
+EXCLUDE_PACKAGES = '!org.openstack4j,!org.openstack4j.*,!org.openstack4j.model.network'
 ALL_PACKAGES = '*'
 
 def get_openstack4j_deps_path():
diff --git a/apps/openstacktelemetry/BUCK b/apps/openstacktelemetry/BUCK
index 8f90347..fd9d942 100644
--- a/apps/openstacktelemetry/BUCK
+++ b/apps/openstacktelemetry/BUCK
@@ -13,7 +13,7 @@
     '//lib:converter-moshi',
     '//lib:protobuf-java-3.2.0',
     '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
-    "//lib:google-instrumentation-0.3.0",
+    '//lib:google-instrumentation-0.3.0',
     '//apps/openstacktelemetry/api:onos-apps-openstacktelemetry-api',
     '//apps/openstacktelemetry/app:onos-apps-openstacktelemetry-app',
 ]
diff --git a/apps/openstacktelemetry/api/BUCK b/apps/openstacktelemetry/api/BUCK
index e09f293..40a119b 100644
--- a/apps/openstacktelemetry/api/BUCK
+++ b/apps/openstacktelemetry/api/BUCK
@@ -1,9 +1,12 @@
+include_defs('//apps/openstacknode/openstack4j.bucklet')
+
 COMPILE_DEPS = [
     '//lib:CORE_DEPS',
     '//lib:kafka-clients',
     '//lib:jersey-client',
     '//lib:javax.ws.rs-api',
     '//lib:influxdb-java',
+    '//lib:openstack4j-core',
 ]
 
 TEST_DEPS = [
@@ -14,5 +17,7 @@
 
 osgi_jar_with_tests (
     deps = COMPILE_DEPS,
-    test_deps = TEST_DEPS
+    test_deps = TEST_DEPS,
+
+    import_packages = INCLUDE_PACKAGES + ',' + EXCLUDE_PACKAGES  + ',' + ALL_PACKAGES,
 )
diff --git a/apps/openstacktelemetry/api/pom.xml b/apps/openstacktelemetry/api/pom.xml
index e6b046a..4536492 100644
--- a/apps/openstacktelemetry/api/pom.xml
+++ b/apps/openstacktelemetry/api/pom.xml
@@ -38,12 +38,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.servicemix.bundles</groupId>
-            <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
-            <version>0.8.2.2_1</version>
-        </dependency>
-
-        <dependency>
             <groupId>org.glassfish.jersey.core</groupId>
             <artifactId>jersey-client</artifactId>
         </dependency>
@@ -56,8 +50,41 @@
         <dependency>
             <groupId>org.influxdb</groupId>
             <artifactId>influxdb-java</artifactId>
-            <version>2.2</version>
+            <version>${influxdb-java.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.pacesys</groupId>
+            <artifactId>openstack4j-core</artifactId>
+            <version>${openstack4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.servicemix.bundles</groupId>
+            <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+            <version>${kafka-client.version}</version>
         </dependency>
 
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Bundle-SymbolicName>
+                            ${project.groupId}.${project.artifactId}
+                        </Bundle-SymbolicName>
+                        <Import-Package>
+                            !org.openstack4j.*,
+                            *
+                        </Import-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/apps/openstacktelemetry/app/BUCK b/apps/openstacktelemetry/app/BUCK
index 97bc0f9..69c2c1e 100644
--- a/apps/openstacktelemetry/app/BUCK
+++ b/apps/openstacktelemetry/app/BUCK
@@ -1,3 +1,5 @@
+include_defs('//apps/openstacknode/openstack4j.bucklet')
+
 GRPC_VER = '1.3.1'
 
 COMPILE_DEPS = [
@@ -18,6 +20,14 @@
     '//lib:GRPC_1.3',
     '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
     '//lib:grpc-protobuf-lite-' + GRPC_VER,
+    '//lib:openstack4j-core',
+    '//lib:openstack4j-http-connector',
+    '//lib:openstack4j-httpclient',
+    '//lib:json-patch',
+    '//lib:jackson-coreutils',
+    '//lib:btf',
+    '//lib:msg-simple',
+    '//lib:snakeyaml',
 ]
 
 TEST_DEPS = [
@@ -36,4 +46,8 @@
     api_version = '1.0',
     api_description = 'OpenStack Network Telemetry REST API',
     api_package = 'org.onosproject.openstacktelemetry.web',
+
+    # dependency embedding
+    import_packages = INCLUDE_PACKAGES + ',' + EXCLUDE_PACKAGES  + ',' + ALL_PACKAGES,
+    bundle_classpath = get_openstack4j_deps_path() + get_jackson_deps_path()
 )
diff --git a/apps/openstacktelemetry/app/pom.xml b/apps/openstacktelemetry/app/pom.xml
index 7d81b2a..4573d45 100644
--- a/apps/openstacktelemetry/app/pom.xml
+++ b/apps/openstacktelemetry/app/pom.xml
@@ -145,6 +145,47 @@
         </dependency>
 
         <dependency>
+            <groupId>org.pacesys</groupId>
+            <artifactId>openstack4j-core</artifactId>
+            <version>${openstack4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.pacesys.openstack4j.connectors</groupId>
+            <artifactId>openstack4j-http-connector</artifactId>
+            <version>${openstack4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.pacesys.openstack4j.connectors</groupId>
+            <artifactId>openstack4j-httpclient</artifactId>
+            <version>${openstack4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>json-patch</artifactId>
+            <version>${json-patch.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>jackson-coreutils</artifactId>
+            <version>${jackson-coreutils.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>btf</artifactId>
+            <version>${btf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>msg-simple</artifactId>
+            <version>${msg-simple.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>${snakeyaml.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.glassfish.jersey.core</groupId>
             <artifactId>jersey-client</artifactId>
             <scope>test</scope>
@@ -236,8 +277,21 @@
                             ${project.groupId}.${project.artifactId}
                         </Bundle-SymbolicName>
                         <Import-Package>
+                            !org.apache.http.*,
+                            !com.fasterxml.jackson.dataformat.*,
+                            !javax.annotation,
                             *,org.glassfish.jersey.servlet
                         </Import-Package>
+                        <Embed-Dependency>
+                            openstack4j-core,
+                            openstack4j-http-connector,
+                            openstack4j-httpclient,
+                            json-patch,
+                            jackson-coreutils,
+                            btf,
+                            msg-simple,
+                            snakeyaml
+                        </Embed-Dependency>
                         <Web-ContextPath>${web.context}</Web-ContextPath>
                     </instructions>
                 </configuration>
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index e3a20cb..5453a27 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -20,10 +20,13 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onosproject.core.ApplicationId;
@@ -48,19 +51,27 @@
 import org.onosproject.net.flow.criteria.TcpPortCriterion;
 import org.onosproject.net.flow.criteria.UdpPortCriterion;
 import org.onosproject.net.host.HostService;
+import org.onosproject.openstacknetworking.api.InstancePort;
+import org.onosproject.openstacknetworking.api.InstancePortService;
+import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
 import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.StatsFlowRule;
 import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
 import org.onosproject.openstacktelemetry.api.StatsInfo;
+import org.openstack4j.model.network.Network;
+import org.openstack4j.model.network.NetworkType;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Dictionary;
 import java.util.Optional;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.packet.Ethernet.TYPE_IPV4;
 import static org.onlab.packet.IPv4.PROTOCOL_TCP;
 import static org.onlab.packet.IPv4.PROTOCOL_UDP;
@@ -71,11 +82,17 @@
 import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_SRC;
 import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_DST;
 import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_SRC;
+import static org.onosproject.openstacknetworking.api.Constants.STAT_FLAT_OUTBOUND_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.STAT_INBOUND_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.VTAP_FLAT_OUTBOUND_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
 import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
+import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
+import static org.openstack4j.model.network.NetworkType.FLAT;
+import static org.openstack4j.model.network.NetworkType.VLAN;
+import static org.openstack4j.model.network.NetworkType.VXLAN;
 
 /**
  * Flow rule manager for network statistics of a VM.
@@ -91,7 +108,13 @@
     private static final long MILLISECONDS = 1000L;
     private static final long REFRESH_INTERVAL = 5L;
 
-    private ApplicationId appId;
+    private static final String REVERSE_PATH_STATS = "reversePathStats";
+    private static final String EGRESS_STATS = "egressStats";
+
+    private static final boolean DEFAULT_REVERSE_PATH_STATS = false;
+    private static final boolean DEFAULT_EGRESS_STATS = false;
+
+    private static final String MAC_NOT_NULL = "MAC should not be null";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
@@ -109,8 +132,25 @@
     protected MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected OpenstackNetworkService osNetworkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected InstancePortService instancePortService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackTelemetryService telemetryService;
 
+    @Property(name = REVERSE_PATH_STATS, boolValue = DEFAULT_REVERSE_PATH_STATS,
+            label = "A flag which indicates whether to install the rules for " +
+                    "collecting the flow-based stats for reversed path.")
+    private boolean reversePathStats = DEFAULT_REVERSE_PATH_STATS;
+
+    @Property(name = EGRESS_STATS, boolValue = DEFAULT_EGRESS_STATS,
+            label = "A flag which indicates whether to install the rules for " +
+                    "collecting the flow-based stats for egress port.")
+    private boolean egressStats = DEFAULT_EGRESS_STATS;
+
+    private ApplicationId appId;
     private Timer timer;
     private TimerTask task;
 
@@ -143,12 +183,19 @@
         log.info("Stopped");
     }
 
+    @Modified
+    protected void modified(ComponentContext context) {
+        readComponentConfiguration(context);
+
+        log.info("Modified");
+    }
+
     @Override
     public void start() {
         log.info("Start publishing thread");
         task = new InternalTimerTask();
         timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL,
-                                  MILLISECONDS * REFRESH_INTERVAL);
+                MILLISECONDS * REFRESH_INTERVAL);
     }
 
     @Override
@@ -164,8 +211,8 @@
         setStatFlowRule(statsFlowRule, true);
 
         log.info("Install stat flow rule for SrcIp:{} DstIp:{}",
-                                        statsFlowRule.srcIpPrefix().toString(),
-                                        statsFlowRule.dstIpPrefix().toString());
+                statsFlowRule.srcIpPrefix().toString(),
+                statsFlowRule.dstIpPrefix().toString());
     }
 
     @Override
@@ -176,8 +223,8 @@
         setStatFlowRule(statsFlowRule, false);
 
         log.info("Remove stat flow rule for SrcIp:{} DstIp:{}",
-                                        statsFlowRule.srcIpPrefix().toString(),
-                                        statsFlowRule.dstIpPrefix().toString());
+                statsFlowRule.srcIpPrefix().toString(),
+                statsFlowRule.dstIpPrefix().toString());
     }
 
     private void connectTables(DeviceId deviceId, int fromTable, int toTable,
@@ -191,22 +238,22 @@
         byte protocol = statsFlowRule.ipProtocol();
 
         TrafficSelector.Builder selectorBuilder =
-                                        DefaultTrafficSelector.builder()
-                                        .matchEthType(TYPE_IPV4)
-                                        .matchIPSrc(statsFlowRule.srcIpPrefix())
-                                        .matchIPDst(statsFlowRule.dstIpPrefix());
+                DefaultTrafficSelector.builder()
+                        .matchEthType(TYPE_IPV4)
+                        .matchIPSrc(statsFlowRule.srcIpPrefix())
+                        .matchIPDst(statsFlowRule.dstIpPrefix());
 
         if (protocol == PROTOCOL_TCP) {
             selectorBuilder = selectorBuilder
-                                        .matchIPProtocol(statsFlowRule.ipProtocol())
-                                        .matchTcpSrc(statsFlowRule.srcTpPort())
-                                        .matchTcpDst(statsFlowRule.dstTpPort());
+                    .matchIPProtocol(statsFlowRule.ipProtocol())
+                    .matchTcpSrc(statsFlowRule.srcTpPort())
+                    .matchTcpDst(statsFlowRule.dstTpPort());
 
         } else if (protocol == PROTOCOL_UDP) {
             selectorBuilder = selectorBuilder
-                                        .matchIPProtocol(statsFlowRule.ipProtocol())
-                                        .matchUdpSrc(statsFlowRule.srcTpPort())
-                                        .matchUdpDst(statsFlowRule.dstTpPort());
+                    .matchIPProtocol(statsFlowRule.ipProtocol())
+                    .matchUdpSrc(statsFlowRule.srcTpPort())
+                    .matchUdpDst(statsFlowRule.dstTpPort());
         } else {
             log.warn("Unsupported protocol {}", statsFlowRule.ipProtocol());
         }
@@ -216,14 +263,14 @@
         treatmentBuilder.transition(toTable);
 
         FlowRule flowRule = DefaultFlowRule.builder()
-                                        .forDevice(deviceId)
-                                        .withSelector(selectorBuilder.build())
-                                        .withTreatment(treatmentBuilder.build())
-                                        .withPriority(prefixLength)
-                                        .fromApp(appId)
-                                        .makePermanent()
-                                        .forTable(fromTable)
-                                        .build();
+                .forDevice(deviceId)
+                .withSelector(selectorBuilder.build())
+                .withTreatment(treatmentBuilder.build())
+                .withPriority(prefixLength)
+                .fromApp(appId)
+                .makePermanent()
+                .forTable(fromTable)
+                .build();
 
         applyRule(flowRule, install);
     }
@@ -268,14 +315,14 @@
             IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
             IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
             IPProtocolCriterion ipProtocol =
-                                (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
+                    (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
 
             log.debug("[FlowInfo]  TableID:{}  SRC_IP:{}  DST_IP:{}  Pkt:{}  Byte:{}",
-                                                ((IndexTableId) entry.table()).id(),
-                                                srcIp.ip().toString(),
-                                                dstIp.ip().toString(),
-                                                entry.packets(),
-                                                entry.bytes());
+                    ((IndexTableId) entry.table()).id(),
+                    srcIp.ip().toString(),
+                    dstIp.ip().toString(),
+                    entry.packets(),
+                    entry.bytes());
 
             fBuilder.withFlowType(FLOW_TYPE_SONA)
                     .withSrcIp(srcIp.ip())
@@ -382,15 +429,36 @@
         return newFlowInfo;
     }
 
+    /**
+     * Installs flow rules for collecting both normal and reverse path flow stats.
+     *
+     * @param statsFlowRule flow rule used for collecting stats
+     * @param install flow rule installation flag
+     */
     private void setStatFlowRule(StatsFlowRule statsFlowRule, boolean install) {
-        StatsFlowRule inverseFlowRule = DefaultStatsFlowRule.builder()
-                                        .srcIpPrefix(statsFlowRule.dstIpPrefix())
-                                        .dstIpPrefix(statsFlowRule.srcIpPrefix())
-                                        .ipProtocol(statsFlowRule.ipProtocol())
-                                        .srcTpPort(statsFlowRule.dstTpPort())
-                                        .dstTpPort(statsFlowRule.srcTpPort())
-                                        .build();
+        setStatFlowRuleBase(statsFlowRule, install);
 
+        // if reverse path stats is enabled, we will install flow rules for
+        // collecting reverse path vFlow stats
+        if (reversePathStats) {
+            StatsFlowRule reverseFlowRule = DefaultStatsFlowRule.builder()
+                                            .srcIpPrefix(statsFlowRule.dstIpPrefix())
+                                            .dstIpPrefix(statsFlowRule.srcIpPrefix())
+                                            .ipProtocol(statsFlowRule.ipProtocol())
+                                            .srcTpPort(statsFlowRule.dstTpPort())
+                                            .dstTpPort(statsFlowRule.srcTpPort())
+                                            .build();
+            setStatFlowRuleBase(reverseFlowRule, install);
+        }
+    }
+
+    /**
+     * A base method which is for installing flow rules for collecting stats.
+     *
+     * @param statsFlowRule flow rule used for collecting stats
+     * @param install flow rule installation flag
+     */
+    private void setStatFlowRuleBase(StatsFlowRule statsFlowRule, boolean install) {
         DeviceId srcDeviceId = getDeviceId(statsFlowRule.srcIpPrefix().address());
         DeviceId dstDeviceId = getDeviceId(statsFlowRule.dstIpPrefix().address());
 
@@ -403,13 +471,43 @@
                     statsFlowRule, METRIC_PRIORITY_SOURCE, install);
         }
 
-        if (dstDeviceId != null) {
-            connectTables(dstDeviceId, STAT_OUTBOUND_TABLE, VTAP_OUTBOUND_TABLE,
-                    inverseFlowRule, METRIC_PRIORITY_TARGET, install);
+        if (dstDeviceId != null && egressStats) {
+            NetworkType type = getNetworkType(statsFlowRule.dstIpPrefix());
+            if (type == VXLAN || type == VLAN) {
+                connectTables(dstDeviceId, STAT_OUTBOUND_TABLE, VTAP_OUTBOUND_TABLE,
+                        statsFlowRule, METRIC_PRIORITY_TARGET, install);
+            } else if (type == FLAT) {
+                connectTables(dstDeviceId, STAT_FLAT_OUTBOUND_TABLE, VTAP_FLAT_OUTBOUND_TABLE,
+                        statsFlowRule, METRIC_PRIORITY_TARGET, install);
+            }
         }
     }
 
     /**
+     * Obtains the network type that is used by the VM which has the given IP.
+     *
+     * @param ipPrefix IP prefix
+     * @return network type
+     */
+    private NetworkType getNetworkType(IpPrefix ipPrefix) {
+
+        MacAddress mac = checkNotNull(getMacAddress(ipPrefix.address()), MAC_NOT_NULL);
+        InstancePort instPort = instancePortService.instancePort(mac);
+
+        if (instPort == null) {
+            return null;
+        }
+
+        Network network = osNetworkService.network(instPort.networkId());
+
+        if (network != null) {
+            return network.getNetworkType();
+        }
+
+        return null;
+    }
+
+    /**
      * Get Device ID which the VM is located.
      *
      * @param ipAddress IP Address of host
@@ -470,10 +568,40 @@
         return NO_HOST_MAC;
     }
 
+    /**
+     * Extracts properties from the component configuration context.
+     *
+     * @param context the component context
+     */
+    private void readComponentConfiguration(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+
+        Boolean reversePathStatsConfigured =
+                            getBooleanProperty(properties, REVERSE_PATH_STATS);
+        if (reversePathStatsConfigured == null) {
+            reversePathStats = DEFAULT_REVERSE_PATH_STATS;
+            log.info("Reversed path stats flag is NOT " +
+                     "configured, default value is {}", reversePathStats);
+        } else {
+            reversePathStats = reversePathStatsConfigured;
+            log.info("Configured. Reversed path stats flag is {}", reversePathStats);
+        }
+
+        Boolean egressStatsConfigured = getBooleanProperty(properties, EGRESS_STATS);
+        if (egressStatsConfigured == null) {
+            egressStats = DEFAULT_EGRESS_STATS;
+            log.info("Egress stats flag is NOT " +
+                     "configured, default value is {}", egressStats);
+        } else {
+            egressStats = egressStatsConfigured;
+            log.info("Configured. Egress stats flag is {}", egressStats);
+        }
+    }
+
     private class InternalTimerTask extends TimerTask {
         @Override
         public void run() {
-            log.debug("Timer Task Thread Starts ({})", loopCount++);
+            log.debug("Timer task thread starts ({})", loopCount++);
 
             Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
 
diff --git a/apps/openstacktelemetry/pom.xml b/apps/openstacktelemetry/pom.xml
index 3cae090..f6372ad 100644
--- a/apps/openstacktelemetry/pom.xml
+++ b/apps/openstacktelemetry/pom.xml
@@ -30,6 +30,17 @@
 
     <description>SONA Openstack Telemetry Application</description>
 
+    <properties>
+        <openstack4j.version>3.1.0</openstack4j.version>
+        <influxdb-java.version>2.2</influxdb-java.version>
+        <kafka-client.version>0.8.2.2_1</kafka-client.version>
+        <json-patch.version>1.9</json-patch.version>
+        <jackson-coreutils.version>1.6</jackson-coreutils.version>
+        <btf.version>1.2</btf.version>
+        <msg-simple.version>1.1</msg-simple.version>
+        <snakeyaml.version>1.15</snakeyaml.version>
+    </properties>
+
     <modules>
         <module>api</module>
         <module>app</module>