/*
 * Copyright 2018-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.openstacktelemetry.impl;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.SharedScheduledExecutors;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleOperationsContext;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criterion;
import org.onosproject.net.flow.criteria.IPCriterion;
import org.onosproject.net.flow.criteria.IPProtocolCriterion;
import org.onosproject.net.flow.criteria.PortCriterion;
import org.onosproject.net.flow.criteria.TcpPortCriterion;
import org.onosproject.net.flow.criteria.UdpPortCriterion;
import org.onosproject.net.host.HostService;
import org.onosproject.openstacknetworking.api.InstancePort;
import org.onosproject.openstacknetworking.api.InstancePortService;
import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
import org.onosproject.openstacknode.api.OpenstackNode;
import org.onosproject.openstacknode.api.OpenstackNodeService;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.StatsFlowRule;
import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
import org.onosproject.openstacktelemetry.api.StatsInfo;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Dictionary;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.onlab.packet.Ethernet.TYPE_IPV4;
import static org.onlab.packet.IPv4.PROTOCOL_TCP;
import static org.onlab.packet.IPv4.PROTOCOL_UDP;
import static org.onosproject.net.Device.Type.SWITCH;
import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_SRC;
import static org.onosproject.net.flow.criteria.Criterion.Type.IP_PROTO;
import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_DST;
import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_SRC;
import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_DST;
import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_SRC;
import static org.onosproject.openstacknetworking.api.Constants.STAT_FLAT_OUTBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.STAT_INBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.VTAP_FLAT_OUTBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.CONTROLLER;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_DATA_POINT_SIZE;
import static org.onosproject.openstacktelemetry.api.Constants.FLAT;
import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
import static org.onosproject.openstacktelemetry.api.Constants.VLAN;
import static org.onosproject.openstacktelemetry.api.Constants.VXLAN;
import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;

/**
 * Flow rule manager for network statistics of a VM.
 */
@Component(immediate = true)
@Service
public class StatsFlowRuleManager implements StatsFlowRuleAdminService {

    private final Logger log = LoggerFactory.getLogger(getClass());

    private static final byte FLOW_TYPE_SONA = 1; // VLAN

    private static final long MILLISECONDS = 1000L;
    private static final long INITIAL_DELAY = 5L;
    private static final long REFRESH_INTERVAL = 5L;
    private static final TimeUnit TIME_UNIT_SECOND = TimeUnit.SECONDS;

    private static final String REVERSE_PATH_STATS = "reversePathStats";
    private static final String EGRESS_STATS = "egressStats";
    private static final String PORT_STATS = "portStats";

    private static final String MONITOR_OVERLAY = "monitorOverlay";
    private static final String MONITOR_UNDERLAY = "monitorUnderlay";

    private static final String OVS_DRIVER_NAME = "ovs";

    private static final boolean DEFAULT_REVERSE_PATH_STATS = false;
    private static final boolean DEFAULT_EGRESS_STATS = false;
    private static final boolean DEFAULT_PORT_STATS = true;

    private static final boolean DEFAULT_MONITOR_OVERLAY = true;
    private static final boolean DEFAULT_MONITOR_UNDERLAY = true;

    private static final String ARBITRARY_IP = "0.0.0.0/32";
    private static final int ARBITRARY_LENGTH = 32;
    private static final String ARBITRARY_MAC = "00:00:00:00:00:00";
    private static final IpAddress NO_HOST_IP = IpAddress.valueOf("255.255.255.255");
    private static final MacAddress NO_HOST_MAC = MacAddress.valueOf(ARBITRARY_MAC);
    private static final int ARBITRARY_IN_INTF = 0;
    private static final int ARBITRARY_OUT_INTF = 0;

    private static final boolean RECOVER_FROM_FAILURE = true;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected CoreService coreService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected FlowRuleService flowRuleService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected HostService hostService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected DeviceService deviceService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected DriverService driverService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ComponentConfigService componentConfigService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected OpenstackNetworkService osNetworkService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected InstancePortService instPortService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected OpenstackNodeService osNodeService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected OpenstackTelemetryService telemetryService;

    @Property(name = REVERSE_PATH_STATS, boolValue = DEFAULT_REVERSE_PATH_STATS,
            label = "A flag which indicates whether to install the rules for " +
                    "collecting the flow-based stats for reversed path.")
    private boolean reversePathStats = DEFAULT_REVERSE_PATH_STATS;

    @Property(name = EGRESS_STATS, boolValue = DEFAULT_EGRESS_STATS,
            label = "A flag which indicates whether to install the rules for " +
                    "collecting the flow-based stats for egress port.")
    private boolean egressStats = DEFAULT_EGRESS_STATS;

    @Property(name = PORT_STATS, boolValue = DEFAULT_PORT_STATS,
            label = "A flag which indicates whether to collect port TX & RX stats.")
    private boolean portStats = DEFAULT_PORT_STATS;

    @Property(name = MONITOR_OVERLAY, boolValue = DEFAULT_MONITOR_OVERLAY,
            label = "A flag which indicates whether to monitor overlay network port stats.")
    private boolean monitorOverlay = DEFAULT_MONITOR_OVERLAY;

    @Property(name = MONITOR_UNDERLAY, boolValue = DEFAULT_MONITOR_UNDERLAY,
            label = "A flag which indicates whether to monitor underlay network port stats.")
    private boolean monitorUnderlay = DEFAULT_MONITOR_UNDERLAY;

    private ApplicationId telemetryAppId;
    private TelemetryCollector collector;
    private ScheduledFuture result;

    private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
    private final Map<String, Queue<FlowInfo>> flowInfoMap = Maps.newConcurrentMap();

    private static final int SOURCE_ID = 1;
    private static final int TARGET_ID = 2;
    private static final int PRIORITY_BASE = 10000;
    private static final int METRIC_PRIORITY_SOURCE  = SOURCE_ID * PRIORITY_BASE;
    private static final int METRIC_PRIORITY_TARGET  = TARGET_ID * PRIORITY_BASE;

    @Activate
    protected void activate() {
        telemetryAppId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);

        componentConfigService.registerProperties(getClass());
        start();

        log.info("Started");
    }

    @Deactivate
    protected void deactivate() {
        componentConfigService.unregisterProperties(getClass(), false);
        flowRuleService.removeFlowRulesById(telemetryAppId);
        stop();

        log.info("Stopped");
    }

    @Modified
    protected void modified(ComponentContext context) {
        readComponentConfiguration(context);

        log.info("Modified");
    }

    @Override
    public void start() {
        log.info("Start publishing thread");
        collector = new TelemetryCollector();

        result = SharedScheduledExecutors.getSingleThreadExecutor()
                    .scheduleAtFixedRate(collector, INITIAL_DELAY,
                        REFRESH_INTERVAL, TIME_UNIT_SECOND, RECOVER_FROM_FAILURE);
    }

    @Override
    public void stop() {
        log.info("Stop data publishing thread");
        result.cancel(true);
        collector = null;
    }

    @Override
    public void createStatFlowRule(StatsFlowRule statsFlowRule) {

        setStatFlowRule(statsFlowRule, true);
    }

    @Override
    public void deleteStatFlowRule(StatsFlowRule statsFlowRule) {

        setStatFlowRule(statsFlowRule, false);
    }


    @Override
    public Map<String, Queue<FlowInfo>> getFlowInfoMap() {
        return flowInfoMap;
    }


    @Override
    public Set<FlowInfo> getUnderlayFlowInfos() {

        Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();

        for (Device device : getUnderlayDevices()) {

            if (!isEdgeSwitch(device.id())) {
                continue;
            }

            for (FlowEntry entry : flowRuleService.getFlowEntries(device.id())) {
                FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
                TrafficSelector selector = entry.selector();
                Criterion inPort = selector.getCriterion(Criterion.Type.IN_PORT);
                Criterion dstIpCriterion = selector.getCriterion(Criterion.Type.IPV4_DST);
                if (inPort != null && dstIpCriterion != null) {
                    IpAddress srcIp = getIpAddress(device, (PortCriterion) inPort);
                    IpAddress dstIp = ((IPCriterion) dstIpCriterion).ip().address();

                    if (srcIp == null) {
                        continue;
                    }

                    fBuilder.withFlowType(FLOW_TYPE_SONA)
                            .withSrcIp(IpPrefix.valueOf(srcIp, ARBITRARY_LENGTH))
                            .withDstIp(IpPrefix.valueOf(dstIp, ARBITRARY_LENGTH))
                            .withSrcMac(getMacAddress(srcIp))
                            .withDstMac(getMacAddress(dstIp))
                            .withInputInterfaceId(getInterfaceId(srcIp))
                            .withOutputInterfaceId(getInterfaceId(dstIp))
                            .withDeviceId(entry.deviceId());

                    StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();

                    sBuilder.withStartupTime(System.currentTimeMillis())
                            .withFstPktArrTime(System.currentTimeMillis())
                            .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
                            .withCurrAccPkts((int) entry.packets())
                            .withCurrAccBytes(entry.bytes())
                            .withErrorPkts((short) 0)
                            .withDropPkts((short) 0);

                    fBuilder.withStatsInfo(sBuilder.build());

                    FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);

                    flowInfos.add(flowInfo);
                }
            }
        }

        return flowInfos;
    }

    @Override
    public Set<FlowInfo> getOverlayFlowInfos() {

        Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();

        // obtain all flow rule entries installed by telemetry app
        for (FlowEntry entry : flowRuleService.getFlowEntriesById(telemetryAppId)) {
            FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
            TrafficSelector selector = entry.selector();
            IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
            IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
            IPProtocolCriterion ipProtocol =
                    (IPProtocolCriterion) selector.getCriterion(IP_PROTO);

            fBuilder.withFlowType(FLOW_TYPE_SONA)
                    .withSrcIp(srcIp.ip())
                    .withDstIp(dstIp.ip());

            if (ipProtocol != null) {
                fBuilder.withProtocol((byte) ipProtocol.protocol());

                if (ipProtocol.protocol() == PROTOCOL_TCP) {
                    TcpPortCriterion tcpSrc =
                            (TcpPortCriterion) selector.getCriterion(TCP_SRC);
                    TcpPortCriterion tcpDst =
                            (TcpPortCriterion) selector.getCriterion(TCP_DST);
                    fBuilder.withSrcPort(tcpSrc.tcpPort());
                    fBuilder.withDstPort(tcpDst.tcpPort());
                } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
                    UdpPortCriterion udpSrc =
                            (UdpPortCriterion) selector.getCriterion(UDP_SRC);
                    UdpPortCriterion udpDst =
                            (UdpPortCriterion) selector.getCriterion(UDP_DST);
                    fBuilder.withSrcPort(udpSrc.udpPort());
                    fBuilder.withDstPort(udpDst.udpPort());
                } else {
                    log.debug("Other protocol: {}", ipProtocol.protocol());
                }
            }

            fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
                    .withDstMac(getMacAddress(dstIp.ip().address()))
                    .withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
                    .withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
                    .withVlanId(getVlanId(srcIp.ip().address()))
                    .withDeviceId(entry.deviceId());

            StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();

            sBuilder.withStartupTime(System.currentTimeMillis())
                    .withFstPktArrTime(System.currentTimeMillis())
                    .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
                    .withCurrAccPkts((int) entry.packets())
                    .withCurrAccBytes(entry.bytes())
                    .withErrorPkts((short) 0)
                    .withDropPkts((short) 0);

            fBuilder.withStatsInfo(sBuilder.build());

            FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);

            flowInfos.add(flowInfo);

            log.debug("FlowInfo: \n{}", flowInfo.toString());
        }

        return flowInfos;
    }

    /**
     * Gets a set of flow infos by referring to overlay destination VM port.
     *
     * @return flow infos
     */
    private Set<FlowInfo> getOverlayDstPortBasedFlowInfos() {
        Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
        Set<PortNumber> instPortNums = instPortService.instancePorts()
                                                .stream()
                                                .map(InstancePort::portNumber)
                                                .collect(Collectors.toSet());
        Set<DeviceId> deviceIds = osNodeService.completeNodes(COMPUTE)
                                                .stream()
                                                .map(OpenstackNode::intgBridge)
                                                .collect(Collectors.toSet());

        deviceIds.forEach(d -> {
            List<PortStatistics> stats =
                                deviceService.getPortStatistics(d)
                                .stream()
                                .filter(s -> instPortNums.contains(s.portNumber()))
                                .collect(Collectors.toList());

            stats.forEach(s -> {
                InstancePort instPort = getInstancePort(d, s.portNumber());
                flowInfos.add(buildTxFlowInfoFromInstancePort(instPort, s));
                flowInfos.add(buildRxFlowInfoFromInstancePort(instPort, s));
            });
        });

        return flowInfos;
    }

    /**
     * Gets a set of flow infos by referring to underlay destination port.
     *
     * @return flow infos
     */
    private Set<FlowInfo> getUnderlayDstPortBasedFlowInfos() {
        Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();

        for (Device d : getUnderlayDevices()) {
            List<PortStatistics> stats =
                    new ArrayList<>(deviceService.getPortStatistics(d.id()));
            stats.forEach(s -> {
                Host host = hostService.getConnectedHosts(new ConnectPoint(d.id(), s.portNumber()))
                        .stream().findFirst().orElse(null);
                if (host != null) {
                    flowInfos.add(buildTxFlowInfoFromHost(host, s));
                    flowInfos.add(buildRxFlowInfoFromHost(host, s));
                }
            });
        }

        return flowInfos;
    }

    /**
     * Obtains a set of device instances which construct underlay network.
     *
     * @return a set of device instances
     */
    private Set<Device> getUnderlayDevices() {

        Set<Device> underlayDevices = Sets.newConcurrentHashSet();

        Set<DeviceId> overlayDeviceIds = osNodeService.completeNodes()
                .stream()
                .filter(n -> n.type() != CONTROLLER)
                .map(OpenstackNode::intgBridge)
                .collect(Collectors.toSet());

        for (Device d : deviceService.getAvailableDevices(SWITCH)) {
            if (overlayDeviceIds.contains(d.id())) {
                continue;
            }

            underlayDevices.add(d);
        }

        return underlayDevices;
    }

    /**
     * Checks whether the given drivers contains OVS driver.
     *
     * @param drivers a set of drivers
     * @return true if the given drivers contain any OVS driver, false otherwise
     */
    private boolean hasOvsDriver(List<Driver> drivers) {

        for (Driver driver : drivers) {
            if (OVS_DRIVER_NAME.equals(driver.name())) {
                return true;
            }
        }

        return false;
    }

    /**
     * Obtains the flow info generated by TX port from instance port.
     *
     * @param instPort instance port
     * @param stat port statistics
     * @return flow info
     */
    private FlowInfo buildTxFlowInfoFromInstancePort(InstancePort instPort,
                                                     PortStatistics stat) {
        return buildTxFlowInfo(instPort.ipAddress(), instPort.macAddress(),
                                                     instPort.deviceId(), stat);
    }

    /**
     * Obtains the flow info generated from RX port from instance port.
     *
     * @param instPort instance port
     * @param stat port statistics
     * @return flow info
     */
    private FlowInfo buildRxFlowInfoFromInstancePort(InstancePort instPort,
                                                     PortStatistics stat) {
        return buildRxFlowInfo(instPort.ipAddress(), instPort.macAddress(),
                instPort.deviceId(), stat);
    }

    /**
     * Obtains the flow info generated by TX port from host.
     *
     * @param host host
     * @param stat port statistics
     * @return flow info
     */
    private FlowInfo buildTxFlowInfoFromHost(Host host, PortStatistics stat) {
        IpAddress ip = host.ipAddresses().stream().findFirst().orElse(null);

        if (ip != null) {
            return buildTxFlowInfo(ip, host.mac(), host.location().deviceId(), stat);
        }
        return null;
    }

    /**
     * Obtains the flow info generated by RX @param host host.
     *
     * @param host host
     * @param stat port statistics
     * @return flow info
     */
    private FlowInfo buildRxFlowInfoFromHost(Host host, PortStatistics stat) {
        IpAddress ip = host.ipAddresses().stream().findFirst().orElse(null);

        if (ip != null) {
            return buildRxFlowInfo(ip, host.mac(), host.location().deviceId(), stat);
        }
        return null;
    }

    /**
     * Obtains the flow info generated from TX port.
     *
     * @param ipAddress         IP address
     * @param macAddress        MAC address
     * @param deviceId          device identifier
     * @param stat              port statistics
     * @return flow info
     */
    private FlowInfo buildTxFlowInfo(IpAddress ipAddress,
                                     MacAddress macAddress,
                                     DeviceId deviceId,
                                     PortStatistics stat) {
        FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();

        fBuilder.withFlowType(FLOW_TYPE_SONA)
                .withSrcIp(IpPrefix.valueOf(ipAddress, ARBITRARY_LENGTH))
                .withDstIp(IpPrefix.valueOf(ARBITRARY_IP))
                .withSrcMac(macAddress)
                .withDstMac(NO_HOST_MAC)
                .withDeviceId(deviceId)
                .withInputInterfaceId(ARBITRARY_IN_INTF)
                .withOutputInterfaceId(ARBITRARY_OUT_INTF)
                .withVlanId(VlanId.vlanId());

        StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
        sBuilder.withStartupTime(System.currentTimeMillis())
                .withFstPktArrTime(System.currentTimeMillis())
                .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
                .withCurrAccPkts((int) stat.packetsSent())
                .withCurrAccBytes(stat.bytesSent())
                .withErrorPkts((short) stat.packetsTxErrors())
                .withDropPkts((short) stat.packetsTxDropped());

        fBuilder.withStatsInfo(sBuilder.build());

        return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
    }

    /**
     * Obtains the flow info generated from RX port.
     *
     * @param ipAddress         IP address
     * @param macAddress        MAC address
     * @param deviceId          Device identifier
     * @param stat port statistics
     * @return flow info
     */
    private FlowInfo buildRxFlowInfo(IpAddress ipAddress,
                                     MacAddress macAddress,
                                     DeviceId deviceId,
                                     PortStatistics stat) {
        FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();

        fBuilder.withFlowType(FLOW_TYPE_SONA)
                .withSrcIp(IpPrefix.valueOf(ARBITRARY_IP))
                .withDstIp(IpPrefix.valueOf(ipAddress, ARBITRARY_LENGTH))
                .withSrcMac(NO_HOST_MAC)
                .withDstMac(macAddress)
                .withDeviceId(deviceId)
                .withInputInterfaceId(ARBITRARY_IN_INTF)
                .withOutputInterfaceId(ARBITRARY_OUT_INTF)
                .withVlanId(VlanId.vlanId());

        StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
        sBuilder.withStartupTime(System.currentTimeMillis())
                .withFstPktArrTime(System.currentTimeMillis())
                .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
                .withCurrAccPkts((int) stat.packetsReceived())
                .withCurrAccBytes(stat.bytesReceived())
                .withErrorPkts((short) stat.packetsRxErrors())
                .withDropPkts((short) stat.packetsRxDropped());

        fBuilder.withStatsInfo(sBuilder.build());

        return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
    }

    /**
     * Obtains instance port which associated with the given device identifier
     * and port number.
     *
     * @param deviceId      device identifier
     * @param portNumber    port number
     * @return instance port
     */
    private InstancePort getInstancePort(DeviceId deviceId, PortNumber portNumber) {
        return instPortService.instancePorts().stream()
                                .filter(p -> p.deviceId().equals(deviceId))
                                .filter(p -> p.portNumber().equals(portNumber))
                                .findFirst().orElse(null);
    }

    /**
     * Installs a flow rule where the source table is fromTable, while destination
     * table is toTable.
     *
     * @param deviceId          device identifier
     * @param fromTable         source table
     * @param toTable           destination table
     * @param statsFlowRule     stats flow rule
     * @param rulePriority      rule priority
     * @param install           installation flag
     */
    private void connectTables(DeviceId deviceId, int fromTable, int toTable,
                               StatsFlowRule statsFlowRule, int rulePriority,
                               boolean install) {

        int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
        int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
        int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
        byte protocol = statsFlowRule.ipProtocol();

        TrafficSelector.Builder selectorBuilder =
                DefaultTrafficSelector.builder()
                        .matchEthType(TYPE_IPV4)
                        .matchIPSrc(statsFlowRule.srcIpPrefix())
                        .matchIPDst(statsFlowRule.dstIpPrefix());

        if (protocol == PROTOCOL_TCP) {
            selectorBuilder = selectorBuilder
                    .matchIPProtocol(statsFlowRule.ipProtocol())
                    .matchTcpSrc(statsFlowRule.srcTpPort())
                    .matchTcpDst(statsFlowRule.dstTpPort());

        } else if (protocol == PROTOCOL_UDP) {
            selectorBuilder = selectorBuilder
                    .matchIPProtocol(statsFlowRule.ipProtocol())
                    .matchUdpSrc(statsFlowRule.srcTpPort())
                    .matchUdpDst(statsFlowRule.dstTpPort());
        } else {
            log.warn("Unsupported protocol {}", statsFlowRule.ipProtocol());
        }

        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();

        treatmentBuilder.transition(toTable);

        FlowRule flowRule = DefaultFlowRule.builder()
                .forDevice(deviceId)
                .withSelector(selectorBuilder.build())
                .withTreatment(treatmentBuilder.build())
                .withPriority(prefixLength)
                .fromApp(telemetryAppId)
                .makePermanent()
                .forTable(fromTable)
                .build();

        applyRule(flowRule, install);
    }

    /**
     * Installs stats related flow rule to switch.
     *
     * @param flowRule flow rule
     * @param install flag to install or not
     */
    private void applyRule(FlowRule flowRule, boolean install) {
        FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
        flowOpsBuilder = install ?
                flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);

        flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
            @Override
            public void onSuccess(FlowRuleOperations ops) {
                log.debug("Install rules for telemetry stats: \n {}",
                                                                ops.toString());
            }

            @Override
            public void onError(FlowRuleOperations ops) {
                log.debug("Failed to install rules for telemetry stats: \n {}",
                                                                ops.toString());
            }
        }));
    }

    /**
     * Merges old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
     *
     * @param flowInfo current FlowInfo object
     * @param fBuilder Builder for FlowInfo
     * @param sBuilder Builder for StatsInfo
     * @return Merged FlowInfo object
     */
    private FlowInfo mergeFlowInfo(FlowInfo flowInfo,
                                   FlowInfo.Builder fBuilder,
                                   StatsInfo.Builder sBuilder) {
        for (FlowInfo gFlowInfo : gFlowInfoSet) {
            log.debug("Old FlowInfo:\n{}", gFlowInfo.toString());
            if (gFlowInfo.roughEquals(flowInfo)) {

                // Get old StatsInfo object and merge the value to current object.
                StatsInfo oldStatsInfo = gFlowInfo.statsInfo();
                sBuilder.withPrevAccPkts(oldStatsInfo.currAccPkts());
                sBuilder.withPrevAccBytes(oldStatsInfo.currAccBytes());
                FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build())
                        .build();

                gFlowInfoSet.remove(gFlowInfo);
                gFlowInfoSet.add(newFlowInfo);
                log.debug("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
                return newFlowInfo;
            }
        }

        // No such record, then build the FlowInfo object and return this object.
        log.debug("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
        FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
        gFlowInfoSet.add(newFlowInfo);
        return newFlowInfo;
    }

    /**
     * Installs flow rules for collecting both normal and reverse path flow stats.
     *
     * @param statsFlowRule flow rule used for collecting stats
     * @param install flow rule installation flag
     */
    private void setStatFlowRule(StatsFlowRule statsFlowRule, boolean install) {
        setStatFlowRuleBase(statsFlowRule, install);

        // if reverse path stats is enabled, we will install flow rules for
        // collecting reverse path vFlow stats
        if (reversePathStats) {
            StatsFlowRule reverseFlowRule = DefaultStatsFlowRule.builder()
                                            .srcIpPrefix(statsFlowRule.dstIpPrefix())
                                            .dstIpPrefix(statsFlowRule.srcIpPrefix())
                                            .ipProtocol(statsFlowRule.ipProtocol())
                                            .srcTpPort(statsFlowRule.dstTpPort())
                                            .dstTpPort(statsFlowRule.srcTpPort())
                                            .build();
            setStatFlowRuleBase(reverseFlowRule, install);
        }
    }

    /**
     * A base method which is for installing flow rules for collecting stats.
     *
     * @param statsFlowRule flow rule used for collecting stats
     * @param install flow rule installation flag
     */
    private void setStatFlowRuleBase(StatsFlowRule statsFlowRule, boolean install) {

        IpPrefix srcIp = statsFlowRule.srcIpPrefix();
        IpPrefix dstIp = statsFlowRule.dstIpPrefix();
        DeviceId srcDeviceId = getDeviceId(srcIp.address());
        DeviceId dstDeviceId = getDeviceId(dstIp.address());

        if (srcDeviceId == null && dstDeviceId == null) {
            return;
        }

        if (srcDeviceId != null) {
            connectTables(srcDeviceId, STAT_INBOUND_TABLE, VTAP_INBOUND_TABLE,
                    statsFlowRule, METRIC_PRIORITY_SOURCE, install);

            if (install) {
                log.info("Install ingress stat flow rule for SrcIp:{} DstIp:{}",
                                            srcIp.toString(), dstIp.toString());
            } else {
                log.info("Remove ingress stat flow rule for SrcIp:{} DstIp:{}",
                                            srcIp.toString(), dstIp.toString());
            }
        }

        Set<IpPrefix> vxlanIps = osNetworkService.getFixedIpsByNetworkType(VXLAN);
        Set<IpPrefix> vlanIps = osNetworkService.getFixedIpsByNetworkType(VLAN);
        Set<IpPrefix> flatIps = osNetworkService.getFixedIpsByNetworkType(FLAT);

        int fromTable, toTable;

        if (dstDeviceId != null && egressStats) {

            IpPrefix dstIpPrefix = statsFlowRule.dstIpPrefix();

            if (vxlanIps.contains(dstIpPrefix) || vlanIps.contains(dstIpPrefix)) {
                fromTable = STAT_OUTBOUND_TABLE;
                toTable = VTAP_OUTBOUND_TABLE;
            } else if (flatIps.contains(dstIpPrefix)) {
                fromTable = STAT_FLAT_OUTBOUND_TABLE;
                toTable = VTAP_FLAT_OUTBOUND_TABLE;
            } else {
                return;
            }

            connectTables(dstDeviceId, fromTable, toTable,
                    statsFlowRule, METRIC_PRIORITY_TARGET, install);

            if (install) {
                log.info("Install egress stat flow rule for SrcIp:{} DstIp:{}",
                                            srcIp.toString(), dstIp.toString());
            } else {
                log.info("Remove egress stat flow rule for SrcIp:{} DstIp:{}",
                                            srcIp.toString(), dstIp.toString());
            }
        }
    }

    /**
     * Gets Device ID which the VM is located.
     *
     * @param ipAddress IP Address of host
     * @return Device ID
     */
    private DeviceId getDeviceId(IpAddress ipAddress) {
        if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
            Optional<Host> host = hostService.getHostsByIp(ipAddress).stream().findAny();
            return host.map(host1 -> host1.location().deviceId()).orElse(null);
        } else {
            log.debug("No DeviceID is associated to {}", ipAddress.toString());
            return null;
        }
    }

    /**
     * Gets VLAN ID with respect to IP Address.
     *
     * @param ipAddress IP Address of host
     * @return VLAN ID
     */
    private VlanId getVlanId(IpAddress ipAddress) {
        if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
            Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
            return host.vlan();
        }
        return VlanId.vlanId();
    }

    /**
     * Gets Interface ID of Switch which is connected to a host.
     *
     * @param ipAddress IP Address of host
     * @return Interface ID of Switch
     */
    private int getInterfaceId(IpAddress ipAddress) {
        if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
            Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
            return (int) host.location().port().toLong();
        }
        return -1;
    }

    /**
     * Gets MAC Address of host.
     *
     * @param ipAddress IP Address of host
     * @return MAC Address of host
     */
    private MacAddress getMacAddress(IpAddress ipAddress) {
        if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
            Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
            return host.mac();
        }

        return NO_HOST_MAC;
    }

    /**
     * Gets IP address of the host which is attached to the given device and port.
     *
     * @param device    device
     * @param inPort    IN port number
     * @return IP address
     */
    private IpAddress getIpAddress(Device device, PortCriterion inPort) {

        Host host = hostService.getConnectedHosts(device.id()).stream()
                .filter(h -> h.location().port().equals(inPort.port()))
                .findAny().orElse(null);

        if (host != null) {
            return host.ipAddresses().stream().findAny().get();
        }

        return NO_HOST_IP;
    }

    private void enqFlowInfo(FlowInfo flowInfo) {
        String key = flowInfo.uniqueFlowInfoKey();
        Queue<FlowInfo> queue = flowInfoMap.get(key);
        if (queue == null) {
            Queue<FlowInfo> newQueue = new LinkedList<FlowInfo>();
            newQueue.offer(flowInfo);
            flowInfoMap.put(key, newQueue);
            return;
        }
        queue.offer(flowInfo);

        while (queue.size() > DEFAULT_DATA_POINT_SIZE) {
            queue.remove(); // Removes a garbage data in the queue.
        }
    }

    /**
     * Checks whether the given device is edge switch or not.
     *
     * @param id device identifier
     * @return true if the given device is edge switch, false otherwise
     */
    private boolean isEdgeSwitch(DeviceId id) {

        return !hostService.getConnectedHosts(id).isEmpty();
    }

    /**
     * Extracts properties from the component configuration context.
     *
     * @param context the component context
     */
    private void readComponentConfiguration(ComponentContext context) {
        Dictionary<?, ?> properties = context.getProperties();

        Boolean reversePathStatsConfigured =
                            getBooleanProperty(properties, REVERSE_PATH_STATS);
        if (reversePathStatsConfigured == null) {
            reversePathStats = DEFAULT_REVERSE_PATH_STATS;
            log.info("Reversed path stats flag is NOT " +
                     "configured, default value is {}", reversePathStats);
        } else {
            reversePathStats = reversePathStatsConfigured;
            log.info("Configured. Reversed path stats flag is {}", reversePathStats);
        }

        Boolean egressStatsConfigured = getBooleanProperty(properties, EGRESS_STATS);
        if (egressStatsConfigured == null) {
            egressStats = DEFAULT_EGRESS_STATS;
            log.info("Egress stats flag is NOT " +
                     "configured, default value is {}", egressStats);
        } else {
            egressStats = egressStatsConfigured;
            log.info("Configured. Egress stats flag is {}", egressStats);
        }

        Boolean portStatsConfigured = getBooleanProperty(properties, PORT_STATS);
        if (portStatsConfigured == null) {
            portStats = DEFAULT_PORT_STATS;
            log.info("Port stats flag is NOT " +
                    "configured, default value is {}", portStats);
        } else {
            portStats = portStatsConfigured;
            log.info("Configured. Port stats flag is {}", portStats);
        }

        Boolean monitorOverlayConfigured = getBooleanProperty(properties, MONITOR_OVERLAY);
        if (monitorOverlayConfigured == null) {
            monitorOverlay = DEFAULT_MONITOR_OVERLAY;
            log.info("Monitor overlay flag is NOT " +
                    "configured, default value is {}", monitorOverlay);
        } else {
            monitorOverlay = monitorOverlayConfigured;
            log.info("Configured. Monitor overlay flag is {}", monitorOverlay);
        }

        Boolean monitorUnderlayConfigured = getBooleanProperty(properties, MONITOR_UNDERLAY);
        if (monitorUnderlayConfigured == null) {
            monitorUnderlay = DEFAULT_MONITOR_UNDERLAY;
            log.info("Monitor underlay flag is NOT " +
                    "configured, default value is {}", monitorUnderlay);
        } else {
            monitorUnderlay = monitorUnderlayConfigured;
            log.info("Configured. Monitor underlay flag is {}", monitorUnderlay);
        }
    }

    private class TelemetryCollector implements Runnable {
        @Override
        public void run() {
            Set<FlowInfo> filteredOverlayFlowInfos = Sets.newConcurrentHashSet();
            Set<FlowInfo> filteredUnderlayFlowInfos = Sets.newConcurrentHashSet();

            // we only let the master controller of the device where the
            // stats flow rules are installed send stats message
            if (monitorOverlay) {
                getOverlayFlowInfos().forEach(f -> {
                    if (checkSrcDstLocalMaster(f)) {
                        filteredOverlayFlowInfos.add(f);
                    }
                });
            }
            if (monitorUnderlay) {
                getUnderlayFlowInfos().forEach(f -> {
                    if (checkSrcDstLocalMaster(f)) {
                        filteredUnderlayFlowInfos.add(f);
                    }
                });
            }

            // we only let the master controller of the device where the port
            // is located to send stats message
            if (portStats) {
                if (monitorOverlay) {
                    getOverlayDstPortBasedFlowInfos().forEach(f -> {
                        if (checkSrcDstLocalMaster(f)) {
                            filteredOverlayFlowInfos.add(f);
                        }
                    });
                }

                if (monitorUnderlay) {
                    getUnderlayDstPortBasedFlowInfos().forEach(f -> {
                        if (checkSrcDstLocalMaster(f)) {
                            filteredUnderlayFlowInfos.add(f);
                        }
                    });
                }
            }


            if (monitorOverlay) {
                telemetryService.publish(filteredOverlayFlowInfos);

                // TODO: Refactor the following code to "TelemetryService" style.
                filteredOverlayFlowInfos.forEach(StatsFlowRuleManager.this::enqFlowInfo);
            }

            if (monitorUnderlay) {
                telemetryService.publish(filteredUnderlayFlowInfos);
            }
        }

        private boolean checkSrcDstLocalMaster(FlowInfo info) {
            DeviceId srcDeviceId = getDeviceId(info.srcIp().address());
            DeviceId dstDeviceId = getDeviceId(info.dstIp().address());

            boolean isSrcLocalMaster = srcDeviceId != null &&
                    mastershipService.isLocalMaster(srcDeviceId);
            boolean isDstLocalMaster = dstDeviceId != null &&
                    mastershipService.isLocalMaster(dstDeviceId);

            return isSrcLocalMaster || isDstLocalMaster;
        }
    }
}
