[SDFAB-505] Add TRex driver to generate traffic

New driver to interact with TRex server to push configuration and
generate traffic at line rate.

Change-Id: Ief926ad9740e8e477cbb81fb4e119b1ebbab17d5
diff --git a/TestON/drivers/common/api/controller/trexclientdriver.py b/TestON/drivers/common/api/controller/trexclientdriver.py
new file mode 100644
index 0000000..29c8a1a
--- /dev/null
+++ b/TestON/drivers/common/api/controller/trexclientdriver.py
@@ -0,0 +1,581 @@
+"""
+Copyright 2021 Open Networking Foundation (ONF)
+
+Please refer questions to either the onos test mailing list at <onos-test@onosproject.org>,
+the System Testing Plans and Results wiki page at <https://wiki.onosproject.org/x/voMg>,
+or the System Testing Guide page at <https://wiki.onosproject.org/x/WYQg>
+
+"""
+import time
+import os
+import collections
+import numpy as np
+
+from drivers.common.api.controllerdriver import Controller
+from trex.stl.api import STLClient, STLStreamDstMAC_PKT
+from trex_stf_lib.trex_client import CTRexClient
+from trex_stl_lib.api import STLFlowLatencyStats, STLPktBuilder, STLStream, \
+    STLTXCont
+
+from socket import error as ConnectionRefusedError
+from distutils.util import strtobool
+
+TREX_FILES_DIR = "/tmp/trex_files/"
+
+LatencyStats = collections.namedtuple(
+    "LatencyStats",
+    [
+        "pg_id",
+        "jitter",
+        "average",
+        "total_max",
+        "total_min",
+        "last_max",
+        "histogram",
+        "dropped",
+        "out_of_order",
+        "duplicate",
+        "seq_too_high",
+        "seq_too_low",
+        "percentile_50",
+        "percentile_75",
+        "percentile_90",
+        "percentile_99",
+        "percentile_99_9",
+        "percentile_99_99",
+        "percentile_99_999",
+    ],
+)
+
+PortStats = collections.namedtuple(
+    "PortStats",
+    [
+        "tx_packets",
+        "rx_packets",
+        "tx_bytes",
+        "rx_bytes",
+        "tx_errors",
+        "rx_errors",
+        "tx_bps",
+        "tx_pps",
+        "tx_bps_L1",
+        "tx_util",
+        "rx_bps",
+        "rx_pps",
+        "rx_bps_L1",
+        "rx_util",
+    ],
+)
+
+FlowStats = collections.namedtuple(
+    "FlowStats",
+    [
+        "pg_id",
+        "tx_packets",
+        "rx_packets",
+        "tx_bytes",
+        "rx_bytes",
+    ],
+)
+
+
+class TrexClientDriver(Controller):
+    """
+    Implements a Trex Client Driver
+    """
+
+    def __init__(self):
+        self.trex_address = "localhost"
+        self.trex_config = None  # Relative path in dependencies of the test using this driver
+        self.force_restart = True
+        self.sofware_mode = False
+        self.setup_successful = False
+        self.stats = None
+        self.trex_client = None
+        self.trex_daemon_client = None
+        super(TrexClientDriver, self).__init__()
+
+    def connect(self, **connectargs):
+        try:
+            for key in connectargs:
+                vars(self)[key] = connectargs[key]
+            for key in self.options:
+                if key == "trex_address":
+                    self.trex_address = self.options[key]
+                elif key == "trex_config":
+                    self.trex_config = self.options[key]
+                elif key == "force_restart":
+                    self.force_restart = bool(strtobool(self.options[key]))
+                elif key == "software_mode":
+                    self.software_mode = bool(strtobool(self.options[key]))
+            self.name = self.options["name"]
+        except Exception as inst:
+            main.log.error("Uncaught exception: " + str(inst))
+            main.cleanAndExit()
+        return super(TrexClientDriver, self).connect()
+
+    def disconnect(self):
+        """
+        Called when Test is complete
+        """
+        self.disconnectTrexClient()
+        self.stopTrexServer()
+        return main.TRUE
+
+    def setupTrex(self, pathToTrexConfig):
+        """
+        Setup TRex server passing the TRex configuration.
+        :return: True if setup successful, False otherwise
+        """
+        main.log.debug(self.name + ": Setting up TRex server")
+        if self.software_mode:
+            trex_args = "--software --no-hw-flow-stat"
+        else:
+            trex_args = None
+        self.trex_daemon_client = CTRexClient(self.trex_address,
+                                              trex_args=trex_args)
+        success = self.__set_up_trex_server(
+            self.trex_daemon_client, self.trex_address,
+            pathToTrexConfig + self.trex_config,
+            self.force_restart
+        )
+        if not success:
+            main.log.error("Failed to set up TRex daemon!")
+            return False
+        self.setup_successful = True
+        return True
+
+    def connectTrexClient(self):
+        if not self.setup_successful:
+            main.log.error("Cannot connect TRex Client, first setup TRex")
+            return False
+        main.log.info("Connecting TRex Client")
+        self.trex_client = STLClient(server=self.trex_address)
+        self.trex_client.connect()
+        self.trex_client.acquire()
+        self.trex_client.reset()  # Resets configs from all ports
+        self.trex_client.clear_stats()  # Clear status from all ports
+        # Put all ports to promiscuous mode, otherwise they will drop all
+        # incoming packets if the destination mac is not the port mac address.
+        self.trex_client.set_port_attr(self.trex_client.get_all_ports(),
+                                       promiscuous=True)
+        # Reset the used sender ports
+        self.all_sender_port = set()
+        self.stats = None
+        return True
+
+    def disconnectTrexClient(self):
+        # Teardown TREX Client
+        if self.trex_client is not None:
+            main.log.info("Tearing down STLClient...")
+            self.trex_client.stop()
+            self.trex_client.release()
+            self.trex_client.disconnect()
+            self.trex_client = None
+            # Do not reset stats
+
+    def stopTrexServer(self):
+        if self.trex_daemon_client is not None:
+            self.trex_daemon_client.stop_trex()
+            self.trex_daemon_client = None
+
+    def addStream(self, pkt, trex_port, l1_bps=None, percentage=None,
+                  delay=0, flow_id=None, flow_stats=False):
+        """
+        :param pkt: Scapy packet, TRex will send copy of this packet
+        :param trex_port: Port number to send packet from, must match a port in the TRex config file
+        :param l1_bps: L1 Throughput generated by TRex (mutually exclusive with percentage)
+        :param percentage: Percentage usage of the selected port bandwidth (mutually exlusive with l1_bps)
+        :param flow_id: Flow ID, required when saving latency statistics
+        :param flow_stats: True to measure flow statistics (latency and packet), False otherwise, might require software mode
+        :return: True if the stream is create, false otherwise
+        """
+        if (percentage is None and l1_bps is None) or (
+                percentage is not None and l1_bps is not None):
+            main.log.error(
+                "Either percentage or l1_bps must be provided when creating a stream")
+            return False
+        main.log.debug("Creating flow stream")
+        main.log.debug(
+            "port: %d, l1_bps: %s, percentage: %s, delay: %d, flow_id:%s, flow_stats: %s" % (
+                trex_port, str(l1_bps), str(percentage), delay, str(flow_id),
+                str(flow_stats)))
+        main.log.debug(pkt.summary())
+        if flow_stats:
+            traffic_stream = self.__create_latency_stats_stream(
+                pkt,
+                pg_id=flow_id,
+                isg=delay,
+                percentage=percentage,
+                l1_bps=l1_bps)
+        else:
+            traffic_stream = self.__create_background_stream(
+                pkt,
+                percentage=percentage,
+                l1_bps=l1_bps)
+        self.trex_client.add_streams(traffic_stream, ports=trex_port)
+        self.all_sender_port.add(trex_port)
+        return True
+
+    def startAndWaitTraffic(self, duration=10):
+        """
+        Start generating traffic and wait traffic to be send
+        :param duration: Traffic generation duration
+        :return:
+        """
+        if not self.trex_client:
+            main.log.error(
+                "Cannot start traffic, first connect the TRex client")
+            return False
+        main.log.info("Start sending traffic for %d seconds" % duration)
+        self.trex_client.start(list(self.all_sender_port), mult="1",
+                               duration=duration)
+        main.log.info("Waiting until all traffic is sent..")
+        self.trex_client.wait_on_traffic(ports=list(self.all_sender_port),
+                                         rx_delay_ms=100)
+        main.log.info("...traffic sent!")
+        # Reset sender port so we can run other tests with the same TRex client
+        self.all_sender_port = set()
+        main.log.info("Getting stats")
+        self.stats = self.trex_client.get_stats()
+        main.log.info("GOT stats")
+
+    def getFlowStats(self, flow_id):
+        if self.stats is None:
+            main.log.error("No stats saved!")
+            return None
+        return TrexClientDriver.__get_flow_stats(flow_id, self.stats)
+
+    def logFlowStats(self, flow_id):
+        main.log.info("Statistics for flow {}: {}".format(
+            flow_id,
+            TrexClientDriver.__get_readable_flow_stats(
+                self.getFlowStats(flow_id))))
+
+    def getLatencyStats(self, flow_id):
+        if self.stats is None:
+            main.log.error("No stats saved!")
+            return None
+        return TrexClientDriver.__get_latency_stats(flow_id, self.stats)
+
+    def logLatencyStats(self, flow_id):
+        main.log.info("Latency statistics for flow {}: {}".format(
+            flow_id,
+            TrexClientDriver.__get_readable_latency_stats(
+                self.getLatencyStats(flow_id))))
+
+    def getPortStats(self, port_id):
+        if self.stats is None:
+            main.log.error("No stats saved!")
+            return None
+        return TrexClientDriver.__get_port_stats(port_id, self.stats)
+
+    def logPortStats(self, port_id):
+        if self.stats is None:
+            main.log.error("No stats saved!")
+            return None
+        main.log.info("Statistics for port {}: {}".format(
+            port_id, TrexClientDriver.__get_readable_port_stats(
+                self.stats.get(port_id))))
+
+    # From ptf/test/common/ptf_runner.py
+    def __set_up_trex_server(self, trex_daemon_client, trex_address,
+                             trex_config,
+                             force_restart):
+        try:
+            main.log.info("Pushing Trex config %s to the server" % trex_config)
+            if not trex_daemon_client.push_files(trex_config):
+                main.log.error("Unable to push %s to Trex server" % trex_config)
+                return False
+
+            if force_restart:
+                main.log.info("Restarting TRex")
+                trex_daemon_client.kill_all_trexes()
+                time.sleep(1)
+
+            if not trex_daemon_client.is_idle():
+                main.log.info("The Trex server process is running")
+                main.log.warn(
+                    "A Trex server process is still running, "
+                    + "use --force-restart to kill it if necessary."
+                )
+                return False
+
+            trex_config_file_on_server = TREX_FILES_DIR + os.path.basename(
+                trex_config)
+            trex_daemon_client.start_stateless(cfg=trex_config_file_on_server)
+        except ConnectionRefusedError:
+            main.log.error(
+                "Unable to connect to server %s.\n" + "Did you start the Trex daemon?" % trex_address)
+            return False
+
+        return True
+
+    def __create_latency_stats_stream(self, pkt, pg_id,
+                                      name=None,
+                                      l1_bps=None,
+                                      percentage=None,
+                                      isg=0):
+        assert (percentage is None and l1_bps is not None) or (
+                percentage is not None and l1_bps is None)
+        return STLStream(
+            name=name,
+            packet=STLPktBuilder(pkt=pkt),
+            mode=STLTXCont(bps_L1=l1_bps, percentage=percentage),
+            isg=isg,
+            flow_stats=STLFlowLatencyStats(pg_id=pg_id)
+        )
+
+    def __create_background_stream(self, pkt, name=None, percentage=None,
+                                   l1_bps=None):
+        assert (percentage is None and l1_bps is not None) or (
+                percentage is not None and l1_bps is None)
+        return STLStream(
+            name=name,
+            packet=STLPktBuilder(pkt=pkt),
+            mode=STLTXCont(bps_L1=l1_bps, percentage=percentage)
+        )
+
+    # Multiplier for data rates
+    K = 1000
+    M = 1000 * K
+    G = 1000 * M
+
+    @staticmethod
+    def __to_readable(src, unit="bps"):
+        """
+        Convert number to human readable string.
+        For example: 1,000,000 bps to 1Mbps. 1,000 bytes to 1KB
+
+        :parameters:
+            src : int
+                the original data
+            unit : str
+                the unit ('bps', 'pps', or 'bytes')
+        :returns:
+            A human readable string
+        """
+        if src < 1000:
+            return "{:.1f} {}".format(src, unit)
+        elif src < 1000000:
+            return "{:.1f} K{}".format(src / 1000, unit)
+        elif src < 1000000000:
+            return "{:.1f} M{}".format(src / 1000000, unit)
+        else:
+            return "{:.1f} G{}".format(src / 1000000000, unit)
+
+    @staticmethod
+    def __get_readable_port_stats(port_stats):
+        opackets = port_stats.get("opackets", 0)
+        ipackets = port_stats.get("ipackets", 0)
+        obytes = port_stats.get("obytes", 0)
+        ibytes = port_stats.get("ibytes", 0)
+        oerrors = port_stats.get("oerrors", 0)
+        ierrors = port_stats.get("ierrors", 0)
+        tx_bps = port_stats.get("tx_bps", 0)
+        tx_pps = port_stats.get("tx_pps", 0)
+        tx_bps_L1 = port_stats.get("tx_bps_L1", 0)
+        tx_util = port_stats.get("tx_util", 0)
+        rx_bps = port_stats.get("rx_bps", 0)
+        rx_pps = port_stats.get("rx_pps", 0)
+        rx_bps_L1 = port_stats.get("rx_bps_L1", 0)
+        rx_util = port_stats.get("rx_util", 0)
+        return """
+        Output packets: {}
+        Input packets: {}
+        Output bytes: {} ({})
+        Input bytes: {} ({})
+        Output errors: {}
+        Input errors: {}
+        TX bps: {} ({})
+        TX pps: {} ({})
+        L1 TX bps: {} ({})
+        TX util: {}
+        RX bps: {} ({})
+        RX pps: {} ({})
+        L1 RX bps: {} ({})
+        RX util: {}""".format(
+            opackets,
+            ipackets,
+            obytes,
+            TrexClientDriver.__to_readable(obytes, "Bytes"),
+            ibytes,
+            TrexClientDriver.__to_readable(ibytes, "Bytes"),
+            oerrors,
+            ierrors,
+            tx_bps,
+            TrexClientDriver.__to_readable(tx_bps),
+            tx_pps,
+            TrexClientDriver.__to_readable(tx_pps, "pps"),
+            tx_bps_L1,
+            TrexClientDriver.__to_readable(tx_bps_L1),
+            tx_util,
+            rx_bps,
+            TrexClientDriver.__to_readable(rx_bps),
+            rx_pps,
+            TrexClientDriver.__to_readable(rx_pps, "pps"),
+            rx_bps_L1,
+            TrexClientDriver.__to_readable(rx_bps_L1),
+            rx_util,
+        )
+
+    @staticmethod
+    def __get_port_stats(port, stats):
+        """
+        :param port: int
+        :param stats:
+        :return:
+        """
+        port_stats = stats.get(port)
+        return PortStats(
+            tx_packets=port_stats.get("opackets", 0),
+            rx_packets=port_stats.get("ipackets", 0),
+            tx_bytes=port_stats.get("obytes", 0),
+            rx_bytes=port_stats.get("ibytes", 0),
+            tx_errors=port_stats.get("oerrors", 0),
+            rx_errors=port_stats.get("ierrors", 0),
+            tx_bps=port_stats.get("tx_bps", 0),
+            tx_pps=port_stats.get("tx_pps", 0),
+            tx_bps_L1=port_stats.get("tx_bps_L1", 0),
+            tx_util=port_stats.get("tx_util", 0),
+            rx_bps=port_stats.get("rx_bps", 0),
+            rx_pps=port_stats.get("rx_pps", 0),
+            rx_bps_L1=port_stats.get("rx_bps_L1", 0),
+            rx_util=port_stats.get("rx_util", 0),
+        )
+
+    @staticmethod
+    def __get_latency_stats(pg_id, stats):
+        """
+        :param pg_id: int
+        :param stats:
+        :return:
+        """
+
+        lat_stats = stats["latency"].get(pg_id)
+        lat = lat_stats["latency"]
+        # Estimate latency percentiles from the histogram.
+        l = list(lat["histogram"].keys())
+        l.sort()
+        all_latencies = []
+        for sample in l:
+            range_start = sample
+            if range_start == 0:
+                range_end = 10
+            else:
+                range_end = range_start + pow(10, (len(str(range_start)) - 1))
+            val = lat["histogram"][sample]
+            # Assume whole the bucket experienced the range_end latency.
+            all_latencies += [range_end] * val
+        q = [50, 75, 90, 99, 99.9, 99.99, 99.999]
+        percentiles = np.percentile(all_latencies, q)
+
+        ret = LatencyStats(
+            pg_id=pg_id,
+            jitter=lat["jitter"],
+            average=lat["average"],
+            total_max=lat["total_max"],
+            total_min=lat["total_min"],
+            last_max=lat["last_max"],
+            histogram=lat["histogram"],
+            dropped=lat_stats["err_cntrs"]["dropped"],
+            out_of_order=lat_stats["err_cntrs"]["out_of_order"],
+            duplicate=lat_stats["err_cntrs"]["dup"],
+            seq_too_high=lat_stats["err_cntrs"]["seq_too_high"],
+            seq_too_low=lat_stats["err_cntrs"]["seq_too_low"],
+            percentile_50=percentiles[0],
+            percentile_75=percentiles[1],
+            percentile_90=percentiles[2],
+            percentile_99=percentiles[3],
+            percentile_99_9=percentiles[4],
+            percentile_99_99=percentiles[5],
+            percentile_99_999=percentiles[6],
+        )
+        return ret
+
+    @staticmethod
+    def __get_readable_latency_stats(stats):
+        """
+        :param stats: LatencyStats
+        :return:
+        """
+        histogram = ""
+        # need to listify in order to be able to sort them.
+        l = list(stats.histogram.keys())
+        l.sort()
+        for sample in l:
+            range_start = sample
+            if range_start == 0:
+                range_end = 10
+            else:
+                range_end = range_start + pow(10, (len(str(range_start)) - 1))
+            val = stats.histogram[sample]
+            histogram = (
+                    histogram
+                    + "\n        Packets with latency between {0:>5} us and {1:>5} us: {2:>10}".format(
+                range_start, range_end, val
+            )
+            )
+
+        return """
+        Latency info for pg_id {}
+        Dropped packets: {}
+        Out-of-order packets: {}
+        Sequence too high packets: {}
+        Sequence too low packets: {}
+        Maximum latency: {} us
+        Minimum latency: {} us
+        Maximum latency in last sampling period: {} us
+        Average latency: {} us
+        50th percentile latency: {} us
+        75th percentile latency: {} us
+        90th percentile latency: {} us
+        99th percentile latency: {} us
+        99.9th percentile latency: {} us
+        99.99th percentile latency: {} us
+        99.999th percentile latency: {} us
+        Jitter: {} us
+        Latency distribution histogram: {}
+        """.format(stats.pg_id, stats.dropped, stats.out_of_order,
+                   stats.seq_too_high, stats.seq_too_low, stats.total_max,
+                   stats.total_min, stats.last_max, stats.average,
+                   stats.percentile_50, stats.percentile_75,
+                   stats.percentile_90,
+                   stats.percentile_99, stats.percentile_99_9,
+                   stats.percentile_99_99,
+                   stats.percentile_99_999, stats.jitter, histogram)
+
+    @staticmethod
+    def __get_flow_stats(pg_id, stats):
+        """
+        :param pg_id: int
+        :param stats:
+        :return:
+        """
+        FlowStats = collections.namedtuple(
+            "FlowStats",
+            ["pg_id", "tx_packets", "rx_packets", "tx_bytes", "rx_bytes", ],
+        )
+        flow_stats = stats["flow_stats"].get(pg_id)
+        ret = FlowStats(
+            pg_id=pg_id,
+            tx_packets=flow_stats["tx_pkts"]["total"],
+            rx_packets=flow_stats["rx_pkts"]["total"],
+            tx_bytes=flow_stats["tx_bytes"]["total"],
+            rx_bytes=flow_stats["rx_bytes"]["total"],
+        )
+        return ret
+
+    @staticmethod
+    def __get_readable_flow_stats(stats):
+        """
+        :param stats: FlowStats
+        :return:
+        """
+        return """Flow info for pg_id {}
+        TX packets: {}
+        RX packets: {}
+        TX bytes: {}
+        RX bytes: {}""".format(stats.pg_id, stats.tx_packets,
+                               stats.rx_packets, stats.tx_bytes,
+                               stats.rx_bytes)