Support collecting outbound vFlow stats for FLAT typed network

Change-Id: I359e2a39867126d76e4b7daae84aecf85e0f8c11
diff --git a/apps/openstacktelemetry/app/BUCK b/apps/openstacktelemetry/app/BUCK
index 97bc0f9..69c2c1e 100644
--- a/apps/openstacktelemetry/app/BUCK
+++ b/apps/openstacktelemetry/app/BUCK
@@ -1,3 +1,5 @@
+include_defs('//apps/openstacknode/openstack4j.bucklet')
+
 GRPC_VER = '1.3.1'
 
 COMPILE_DEPS = [
@@ -18,6 +20,14 @@
     '//lib:GRPC_1.3',
     '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER,
     '//lib:grpc-protobuf-lite-' + GRPC_VER,
+    '//lib:openstack4j-core',
+    '//lib:openstack4j-http-connector',
+    '//lib:openstack4j-httpclient',
+    '//lib:json-patch',
+    '//lib:jackson-coreutils',
+    '//lib:btf',
+    '//lib:msg-simple',
+    '//lib:snakeyaml',
 ]
 
 TEST_DEPS = [
@@ -36,4 +46,8 @@
     api_version = '1.0',
     api_description = 'OpenStack Network Telemetry REST API',
     api_package = 'org.onosproject.openstacktelemetry.web',
+
+    # dependency embedding
+    import_packages = INCLUDE_PACKAGES + ',' + EXCLUDE_PACKAGES  + ',' + ALL_PACKAGES,
+    bundle_classpath = get_openstack4j_deps_path() + get_jackson_deps_path()
 )
diff --git a/apps/openstacktelemetry/app/pom.xml b/apps/openstacktelemetry/app/pom.xml
index 7d81b2a..4573d45 100644
--- a/apps/openstacktelemetry/app/pom.xml
+++ b/apps/openstacktelemetry/app/pom.xml
@@ -145,6 +145,47 @@
         </dependency>
 
         <dependency>
+            <groupId>org.pacesys</groupId>
+            <artifactId>openstack4j-core</artifactId>
+            <version>${openstack4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.pacesys.openstack4j.connectors</groupId>
+            <artifactId>openstack4j-http-connector</artifactId>
+            <version>${openstack4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.pacesys.openstack4j.connectors</groupId>
+            <artifactId>openstack4j-httpclient</artifactId>
+            <version>${openstack4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>json-patch</artifactId>
+            <version>${json-patch.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>jackson-coreutils</artifactId>
+            <version>${jackson-coreutils.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>btf</artifactId>
+            <version>${btf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>msg-simple</artifactId>
+            <version>${msg-simple.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>${snakeyaml.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.glassfish.jersey.core</groupId>
             <artifactId>jersey-client</artifactId>
             <scope>test</scope>
@@ -236,8 +277,21 @@
                             ${project.groupId}.${project.artifactId}
                         </Bundle-SymbolicName>
                         <Import-Package>
+                            !org.apache.http.*,
+                            !com.fasterxml.jackson.dataformat.*,
+                            !javax.annotation,
                             *,org.glassfish.jersey.servlet
                         </Import-Package>
+                        <Embed-Dependency>
+                            openstack4j-core,
+                            openstack4j-http-connector,
+                            openstack4j-httpclient,
+                            json-patch,
+                            jackson-coreutils,
+                            btf,
+                            msg-simple,
+                            snakeyaml
+                        </Embed-Dependency>
                         <Web-ContextPath>${web.context}</Web-ContextPath>
                     </instructions>
                 </configuration>
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index e3a20cb..5453a27 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -20,10 +20,13 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onosproject.core.ApplicationId;
@@ -48,19 +51,27 @@
 import org.onosproject.net.flow.criteria.TcpPortCriterion;
 import org.onosproject.net.flow.criteria.UdpPortCriterion;
 import org.onosproject.net.host.HostService;
+import org.onosproject.openstacknetworking.api.InstancePort;
+import org.onosproject.openstacknetworking.api.InstancePortService;
+import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
 import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.StatsFlowRule;
 import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
 import org.onosproject.openstacktelemetry.api.StatsInfo;
+import org.openstack4j.model.network.Network;
+import org.openstack4j.model.network.NetworkType;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Dictionary;
 import java.util.Optional;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.packet.Ethernet.TYPE_IPV4;
 import static org.onlab.packet.IPv4.PROTOCOL_TCP;
 import static org.onlab.packet.IPv4.PROTOCOL_UDP;
@@ -71,11 +82,17 @@
 import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_SRC;
 import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_DST;
 import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_SRC;
+import static org.onosproject.openstacknetworking.api.Constants.STAT_FLAT_OUTBOUND_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.STAT_INBOUND_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.VTAP_FLAT_OUTBOUND_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
 import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
+import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
+import static org.openstack4j.model.network.NetworkType.FLAT;
+import static org.openstack4j.model.network.NetworkType.VLAN;
+import static org.openstack4j.model.network.NetworkType.VXLAN;
 
 /**
  * Flow rule manager for network statistics of a VM.
@@ -91,7 +108,13 @@
     private static final long MILLISECONDS = 1000L;
     private static final long REFRESH_INTERVAL = 5L;
 
-    private ApplicationId appId;
+    private static final String REVERSE_PATH_STATS = "reversePathStats";
+    private static final String EGRESS_STATS = "egressStats";
+
+    private static final boolean DEFAULT_REVERSE_PATH_STATS = false;
+    private static final boolean DEFAULT_EGRESS_STATS = false;
+
+    private static final String MAC_NOT_NULL = "MAC should not be null";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
@@ -109,8 +132,25 @@
     protected MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected OpenstackNetworkService osNetworkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected InstancePortService instancePortService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackTelemetryService telemetryService;
 
+    @Property(name = REVERSE_PATH_STATS, boolValue = DEFAULT_REVERSE_PATH_STATS,
+            label = "A flag which indicates whether to install the rules for " +
+                    "collecting the flow-based stats for reversed path.")
+    private boolean reversePathStats = DEFAULT_REVERSE_PATH_STATS;
+
+    @Property(name = EGRESS_STATS, boolValue = DEFAULT_EGRESS_STATS,
+            label = "A flag which indicates whether to install the rules for " +
+                    "collecting the flow-based stats for egress port.")
+    private boolean egressStats = DEFAULT_EGRESS_STATS;
+
+    private ApplicationId appId;
     private Timer timer;
     private TimerTask task;
 
@@ -143,12 +183,19 @@
         log.info("Stopped");
     }
 
+    @Modified
+    protected void modified(ComponentContext context) {
+        readComponentConfiguration(context);
+
+        log.info("Modified");
+    }
+
     @Override
     public void start() {
         log.info("Start publishing thread");
         task = new InternalTimerTask();
         timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL,
-                                  MILLISECONDS * REFRESH_INTERVAL);
+                MILLISECONDS * REFRESH_INTERVAL);
     }
 
     @Override
@@ -164,8 +211,8 @@
         setStatFlowRule(statsFlowRule, true);
 
         log.info("Install stat flow rule for SrcIp:{} DstIp:{}",
-                                        statsFlowRule.srcIpPrefix().toString(),
-                                        statsFlowRule.dstIpPrefix().toString());
+                statsFlowRule.srcIpPrefix().toString(),
+                statsFlowRule.dstIpPrefix().toString());
     }
 
     @Override
@@ -176,8 +223,8 @@
         setStatFlowRule(statsFlowRule, false);
 
         log.info("Remove stat flow rule for SrcIp:{} DstIp:{}",
-                                        statsFlowRule.srcIpPrefix().toString(),
-                                        statsFlowRule.dstIpPrefix().toString());
+                statsFlowRule.srcIpPrefix().toString(),
+                statsFlowRule.dstIpPrefix().toString());
     }
 
     private void connectTables(DeviceId deviceId, int fromTable, int toTable,
@@ -191,22 +238,22 @@
         byte protocol = statsFlowRule.ipProtocol();
 
         TrafficSelector.Builder selectorBuilder =
-                                        DefaultTrafficSelector.builder()
-                                        .matchEthType(TYPE_IPV4)
-                                        .matchIPSrc(statsFlowRule.srcIpPrefix())
-                                        .matchIPDst(statsFlowRule.dstIpPrefix());
+                DefaultTrafficSelector.builder()
+                        .matchEthType(TYPE_IPV4)
+                        .matchIPSrc(statsFlowRule.srcIpPrefix())
+                        .matchIPDst(statsFlowRule.dstIpPrefix());
 
         if (protocol == PROTOCOL_TCP) {
             selectorBuilder = selectorBuilder
-                                        .matchIPProtocol(statsFlowRule.ipProtocol())
-                                        .matchTcpSrc(statsFlowRule.srcTpPort())
-                                        .matchTcpDst(statsFlowRule.dstTpPort());
+                    .matchIPProtocol(statsFlowRule.ipProtocol())
+                    .matchTcpSrc(statsFlowRule.srcTpPort())
+                    .matchTcpDst(statsFlowRule.dstTpPort());
 
         } else if (protocol == PROTOCOL_UDP) {
             selectorBuilder = selectorBuilder
-                                        .matchIPProtocol(statsFlowRule.ipProtocol())
-                                        .matchUdpSrc(statsFlowRule.srcTpPort())
-                                        .matchUdpDst(statsFlowRule.dstTpPort());
+                    .matchIPProtocol(statsFlowRule.ipProtocol())
+                    .matchUdpSrc(statsFlowRule.srcTpPort())
+                    .matchUdpDst(statsFlowRule.dstTpPort());
         } else {
             log.warn("Unsupported protocol {}", statsFlowRule.ipProtocol());
         }
@@ -216,14 +263,14 @@
         treatmentBuilder.transition(toTable);
 
         FlowRule flowRule = DefaultFlowRule.builder()
-                                        .forDevice(deviceId)
-                                        .withSelector(selectorBuilder.build())
-                                        .withTreatment(treatmentBuilder.build())
-                                        .withPriority(prefixLength)
-                                        .fromApp(appId)
-                                        .makePermanent()
-                                        .forTable(fromTable)
-                                        .build();
+                .forDevice(deviceId)
+                .withSelector(selectorBuilder.build())
+                .withTreatment(treatmentBuilder.build())
+                .withPriority(prefixLength)
+                .fromApp(appId)
+                .makePermanent()
+                .forTable(fromTable)
+                .build();
 
         applyRule(flowRule, install);
     }
@@ -268,14 +315,14 @@
             IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
             IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
             IPProtocolCriterion ipProtocol =
-                                (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
+                    (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
 
             log.debug("[FlowInfo]  TableID:{}  SRC_IP:{}  DST_IP:{}  Pkt:{}  Byte:{}",
-                                                ((IndexTableId) entry.table()).id(),
-                                                srcIp.ip().toString(),
-                                                dstIp.ip().toString(),
-                                                entry.packets(),
-                                                entry.bytes());
+                    ((IndexTableId) entry.table()).id(),
+                    srcIp.ip().toString(),
+                    dstIp.ip().toString(),
+                    entry.packets(),
+                    entry.bytes());
 
             fBuilder.withFlowType(FLOW_TYPE_SONA)
                     .withSrcIp(srcIp.ip())
@@ -382,15 +429,36 @@
         return newFlowInfo;
     }
 
+    /**
+     * Installs flow rules for collecting both normal and reverse path flow stats.
+     *
+     * @param statsFlowRule flow rule used for collecting stats
+     * @param install flow rule installation flag
+     */
     private void setStatFlowRule(StatsFlowRule statsFlowRule, boolean install) {
-        StatsFlowRule inverseFlowRule = DefaultStatsFlowRule.builder()
-                                        .srcIpPrefix(statsFlowRule.dstIpPrefix())
-                                        .dstIpPrefix(statsFlowRule.srcIpPrefix())
-                                        .ipProtocol(statsFlowRule.ipProtocol())
-                                        .srcTpPort(statsFlowRule.dstTpPort())
-                                        .dstTpPort(statsFlowRule.srcTpPort())
-                                        .build();
+        setStatFlowRuleBase(statsFlowRule, install);
 
+        // if reverse path stats is enabled, we will install flow rules for
+        // collecting reverse path vFlow stats
+        if (reversePathStats) {
+            StatsFlowRule reverseFlowRule = DefaultStatsFlowRule.builder()
+                                            .srcIpPrefix(statsFlowRule.dstIpPrefix())
+                                            .dstIpPrefix(statsFlowRule.srcIpPrefix())
+                                            .ipProtocol(statsFlowRule.ipProtocol())
+                                            .srcTpPort(statsFlowRule.dstTpPort())
+                                            .dstTpPort(statsFlowRule.srcTpPort())
+                                            .build();
+            setStatFlowRuleBase(reverseFlowRule, install);
+        }
+    }
+
+    /**
+     * A base method which is for installing flow rules for collecting stats.
+     *
+     * @param statsFlowRule flow rule used for collecting stats
+     * @param install flow rule installation flag
+     */
+    private void setStatFlowRuleBase(StatsFlowRule statsFlowRule, boolean install) {
         DeviceId srcDeviceId = getDeviceId(statsFlowRule.srcIpPrefix().address());
         DeviceId dstDeviceId = getDeviceId(statsFlowRule.dstIpPrefix().address());
 
@@ -403,13 +471,43 @@
                     statsFlowRule, METRIC_PRIORITY_SOURCE, install);
         }
 
-        if (dstDeviceId != null) {
-            connectTables(dstDeviceId, STAT_OUTBOUND_TABLE, VTAP_OUTBOUND_TABLE,
-                    inverseFlowRule, METRIC_PRIORITY_TARGET, install);
+        if (dstDeviceId != null && egressStats) {
+            NetworkType type = getNetworkType(statsFlowRule.dstIpPrefix());
+            if (type == VXLAN || type == VLAN) {
+                connectTables(dstDeviceId, STAT_OUTBOUND_TABLE, VTAP_OUTBOUND_TABLE,
+                        statsFlowRule, METRIC_PRIORITY_TARGET, install);
+            } else if (type == FLAT) {
+                connectTables(dstDeviceId, STAT_FLAT_OUTBOUND_TABLE, VTAP_FLAT_OUTBOUND_TABLE,
+                        statsFlowRule, METRIC_PRIORITY_TARGET, install);
+            }
         }
     }
 
     /**
+     * Obtains the network type that is used by the VM which has the given IP.
+     *
+     * @param ipPrefix IP prefix
+     * @return network type
+     */
+    private NetworkType getNetworkType(IpPrefix ipPrefix) {
+
+        MacAddress mac = checkNotNull(getMacAddress(ipPrefix.address()), MAC_NOT_NULL);
+        InstancePort instPort = instancePortService.instancePort(mac);
+
+        if (instPort == null) {
+            return null;
+        }
+
+        Network network = osNetworkService.network(instPort.networkId());
+
+        if (network != null) {
+            return network.getNetworkType();
+        }
+
+        return null;
+    }
+
+    /**
      * Get Device ID which the VM is located.
      *
      * @param ipAddress IP Address of host
@@ -470,10 +568,40 @@
         return NO_HOST_MAC;
     }
 
+    /**
+     * Extracts properties from the component configuration context.
+     *
+     * @param context the component context
+     */
+    private void readComponentConfiguration(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+
+        Boolean reversePathStatsConfigured =
+                            getBooleanProperty(properties, REVERSE_PATH_STATS);
+        if (reversePathStatsConfigured == null) {
+            reversePathStats = DEFAULT_REVERSE_PATH_STATS;
+            log.info("Reversed path stats flag is NOT " +
+                     "configured, default value is {}", reversePathStats);
+        } else {
+            reversePathStats = reversePathStatsConfigured;
+            log.info("Configured. Reversed path stats flag is {}", reversePathStats);
+        }
+
+        Boolean egressStatsConfigured = getBooleanProperty(properties, EGRESS_STATS);
+        if (egressStatsConfigured == null) {
+            egressStats = DEFAULT_EGRESS_STATS;
+            log.info("Egress stats flag is NOT " +
+                     "configured, default value is {}", egressStats);
+        } else {
+            egressStats = egressStatsConfigured;
+            log.info("Configured. Egress stats flag is {}", egressStats);
+        }
+    }
+
     private class InternalTimerTask extends TimerTask {
         @Override
         public void run() {
-            log.debug("Timer Task Thread Starts ({})", loopCount++);
+            log.debug("Timer task thread starts ({})", loopCount++);
 
             Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();