blob: 50c8c39df28986c8cd31641002497782348b7e5b [file] [log] [blame]
"""
Copyright 2021 Open Networking Foundation (ONF)
Please refer questions to either the onos test mailing list at <onos-test@onosproject.org>,
the System Testing Plans and Results wiki page at <https://wiki.onosproject.org/x/voMg>,
or the System Testing Guide page at <https://wiki.onosproject.org/x/WYQg>
"""
import time
import os
import sys
import importlib
import collections
import numpy as np
from drivers.common.api.controllerdriver import Controller
from socket import error as ConnectionRefusedError
from distutils.util import strtobool
TREX_FILES_DIR = "/tmp/trex_files/"
LatencyStats = collections.namedtuple(
"LatencyStats",
[
"pg_id",
"jitter",
"average",
"total_max",
"total_min",
"last_max",
"histogram",
"dropped",
"out_of_order",
"duplicate",
"seq_too_high",
"seq_too_low",
"percentile_50",
"percentile_75",
"percentile_90",
"percentile_99",
"percentile_99_9",
"percentile_99_99",
"percentile_99_999",
],
)
PortStats = collections.namedtuple(
"PortStats",
[
"tx_packets",
"rx_packets",
"tx_bytes",
"rx_bytes",
"tx_errors",
"rx_errors",
"tx_bps",
"tx_pps",
"tx_bps_L1",
"tx_util",
"rx_bps",
"rx_pps",
"rx_bps_L1",
"rx_util",
],
)
FlowStats = collections.namedtuple(
"FlowStats",
[
"pg_id",
"tx_packets",
"rx_packets",
"tx_bytes",
"rx_bytes",
],
)
class TrexClientDriver(Controller):
"""
Implements a Trex Client Driver
"""
def __init__(self):
self.trex_address = "localhost"
self.trex_config = None # Relative path in dependencies of the test using this driver
self.force_restart = True
self.sofware_mode = False
self.setup_successful = False
self.stats = None
self.trex_client = None
self.trex_daemon_client = None
self.trex_library_python_path = None
super(TrexClientDriver, self).__init__()
def connect(self, **connectargs):
global STLClient, STLStreamDstMAC_PKT, CTRexClient, STLPktBuilder, \
STLFlowLatencyStats, STLStream, STLTXCont
try:
for key in connectargs:
vars(self)[key] = connectargs[key]
for key in self.options:
if key == "trex_address":
self.trex_address = self.options[key]
elif key == "trex_config":
self.trex_config = self.options[key]
elif key == "force_restart":
self.force_restart = bool(strtobool(self.options[key]))
elif key == "software_mode":
self.software_mode = bool(strtobool(self.options[key]))
elif key == "trex_library_python_path":
self.trex_library_python_path = self.options[key]
self.name = self.options["name"]
if self.trex_library_python_path is not None:
sys.path.append(self.trex_library_python_path)
# Import after appending the TRex library Python path
STLClient = getattr(importlib.import_module("trex.stl.api"), "STLClient")
STLStreamDstMAC_PKT = getattr(importlib.import_module("trex.stl.api"), "STLStreamDstMAC_PKT")
CTRexClient = getattr(importlib.import_module("trex_stf_lib.trex_client"), "CTRexClient")
STLFlowLatencyStats = getattr(importlib.import_module("trex_stl_lib.api"), "STLFlowLatencyStats")
STLPktBuilder = getattr(importlib.import_module("trex_stl_lib.api"), "STLPktBuilder")
STLStream = getattr(importlib.import_module("trex_stl_lib.api"), "STLStream")
STLTXCont = getattr(importlib.import_module("trex_stl_lib.api"), "STLTXCont")
except Exception as inst:
main.log.error("Uncaught exception: " + str(inst))
main.cleanAndExit()
return super(TrexClientDriver, self).connect()
def disconnect(self):
"""
Called when Test is complete
"""
self.disconnectTrexClient()
self.stopTrexServer()
return main.TRUE
def setupTrex(self, pathToTrexConfig):
"""
Setup TRex server passing the TRex configuration.
:return: True if setup successful, False otherwise
"""
main.log.debug(self.name + ": Setting up TRex server")
if self.software_mode:
trex_args = "--software --no-hw-flow-stat"
else:
trex_args = None
self.trex_daemon_client = CTRexClient(self.trex_address,
trex_args=trex_args)
success = self.__set_up_trex_server(
self.trex_daemon_client, self.trex_address,
os.path.join(pathToTrexConfig, self.trex_config),
self.force_restart
)
if not success:
main.log.error("Failed to set up TRex daemon!")
return False
self.setup_successful = True
return True
def connectTrexClient(self):
if not self.setup_successful:
main.log.error("Cannot connect TRex Client, first setup TRex")
return False
main.log.info("Connecting TRex Client")
self.trex_client = STLClient(server=self.trex_address)
self.trex_client.connect()
self.trex_client.acquire()
self.trex_client.reset() # Resets configs from all ports
self.trex_client.clear_stats() # Clear status from all ports
# Put all ports to promiscuous mode, otherwise they will drop all
# incoming packets if the destination mac is not the port mac address.
self.trex_client.set_port_attr(self.trex_client.get_all_ports(),
promiscuous=True)
# Reset the used sender ports
self.all_sender_port = set()
self.stats = None
return True
def disconnectTrexClient(self):
# Teardown TREX Client
if self.trex_client is not None:
main.log.info("Tearing down STLClient...")
self.trex_client.stop()
self.trex_client.release()
self.trex_client.disconnect()
self.trex_client = None
# Do not reset stats
def stopTrexServer(self):
if self.trex_daemon_client is not None:
self.trex_daemon_client.stop_trex()
self.trex_daemon_client = None
def addStream(self, pkt, trex_port, l1_bps=None, percentage=None,
delay=0, flow_id=None, flow_stats=False):
"""
:param pkt: Scapy packet, TRex will send copy of this packet
:param trex_port: Port number to send packet from, must match a port in the TRex config file
:param l1_bps: L1 Throughput generated by TRex (mutually exclusive with percentage)
:param percentage: Percentage usage of the selected port bandwidth (mutually exlusive with l1_bps)
:param flow_id: Flow ID, required when saving latency statistics
:param flow_stats: True to measure flow statistics (latency and packet), False otherwise, might require software mode
:return: True if the stream is create, false otherwise
"""
if (percentage is None and l1_bps is None) or (
percentage is not None and l1_bps is not None):
main.log.error(
"Either percentage or l1_bps must be provided when creating a stream")
return False
main.log.debug("Creating flow stream")
main.log.debug(
"port: %d, l1_bps: %s, percentage: %s, delay: %d, flow_id:%s, flow_stats: %s" % (
trex_port, str(l1_bps), str(percentage), delay, str(flow_id),
str(flow_stats)))
main.log.debug(pkt.summary())
if flow_stats:
traffic_stream = self.__create_latency_stats_stream(
pkt,
pg_id=flow_id,
isg=delay,
percentage=percentage,
l1_bps=l1_bps)
else:
traffic_stream = self.__create_background_stream(
pkt,
percentage=percentage,
l1_bps=l1_bps)
self.trex_client.add_streams(traffic_stream, ports=trex_port)
self.all_sender_port.add(trex_port)
return True
def startAndWaitTraffic(self, duration=10, ports=[]):
"""
Start generating traffic and wait traffic to be send
:param duration: Traffic generation duration
:param ports: Ports IDs to monitor while traffic is active
:return: port statistics collected while traffic is active
"""
if not self.trex_client:
main.log.error(
"Cannot start traffic, first connect the TRex client")
return False
main.log.info("Start sending traffic for %d seconds" % duration)
self.trex_client.start(list(self.all_sender_port), mult="1",
duration=duration)
main.log.info("Waiting until all traffic is sent..")
result = self.__monitor_port_stats(ports)
self.trex_client.wait_on_traffic(ports=list(self.all_sender_port),
rx_delay_ms=100)
main.log.info("...traffic sent!")
# Reset sender port so we can run other tests with the same TRex client
self.all_sender_port = set()
main.log.info("Getting stats")
self.stats = self.trex_client.get_stats()
return result
def getFlowStats(self, flow_id):
if self.stats is None:
main.log.error("No stats saved!")
return None
return TrexClientDriver.__get_flow_stats(flow_id, self.stats)
def logFlowStats(self, flow_id):
main.log.info("Statistics for flow {}: {}".format(
flow_id,
TrexClientDriver.__get_readable_flow_stats(
self.getFlowStats(flow_id))))
def getLatencyStats(self, flow_id):
if self.stats is None:
main.log.error("No stats saved!")
return None
return TrexClientDriver.__get_latency_stats(flow_id, self.stats)
def logLatencyStats(self, flow_id):
main.log.info("Latency statistics for flow {}: {}".format(
flow_id,
TrexClientDriver.__get_readable_latency_stats(
self.getLatencyStats(flow_id))))
def getPortStats(self, port_id):
if self.stats is None:
main.log.error("No stats saved!")
return None
return TrexClientDriver.__get_port_stats(port_id, self.stats)
def logPortStats(self, port_id):
if self.stats is None:
main.log.error("No stats saved!")
return None
main.log.info("Statistics for port {}: {}".format(
port_id, TrexClientDriver.__get_readable_port_stats(
self.stats.get(port_id))))
# From ptf/test/common/ptf_runner.py
def __set_up_trex_server(self, trex_daemon_client, trex_address,
trex_config,
force_restart):
try:
main.log.info("Pushing Trex config %s to the server" % trex_config)
if not trex_daemon_client.push_files(trex_config):
main.log.error("Unable to push %s to Trex server" % trex_config)
return False
if force_restart:
main.log.info("Restarting TRex")
trex_daemon_client.kill_all_trexes()
time.sleep(1)
if not trex_daemon_client.is_idle():
main.log.info("The Trex server process is running")
main.log.warn(
"A Trex server process is still running, "
+ "use --force-restart to kill it if necessary."
)
return False
trex_config_file_on_server = TREX_FILES_DIR + os.path.basename(
trex_config)
trex_daemon_client.start_stateless(cfg=trex_config_file_on_server)
except ConnectionRefusedError:
main.log.error(
"Unable to connect to server %s.\n" +
"Did you start the Trex daemon?" % trex_address)
return False
return True
def __create_latency_stats_stream(self, pkt, pg_id,
name=None,
l1_bps=None,
percentage=None,
isg=0):
assert (percentage is None and l1_bps is not None) or (
percentage is not None and l1_bps is None)
return STLStream(
name=name,
packet=STLPktBuilder(pkt=pkt),
mode=STLTXCont(bps_L1=l1_bps, percentage=percentage),
isg=isg,
flow_stats=STLFlowLatencyStats(pg_id=pg_id)
)
def __create_background_stream(self, pkt, name=None, percentage=None,
l1_bps=None):
assert (percentage is None and l1_bps is not None) or (
percentage is not None and l1_bps is None)
return STLStream(
name=name,
packet=STLPktBuilder(pkt=pkt),
mode=STLTXCont(bps_L1=l1_bps, percentage=percentage)
)
# Multiplier for data rates
K = 1000
M = 1000 * K
G = 1000 * M
def __monitor_port_stats(self, ports, time_interval=1):
"""
List some port stats continuously while traffic is active
:param ports: List of ports ids to monitor
:param time_interval: Interval between read
:return: Statistics read while traffic is active
"""
results = {
port_id: {"rx_bps": [], "tx_bps": [], "rx_pps": [], "tx_pps": []}
for port_id in ports
}
results["duration"] = []
prev = {
port_id: {
"opackets": 0,
"ipackets": 0,
"obytes": 0,
"ibytes": 0,
"time": time.time(),
}
for port_id in ports
}
s_time = time.time()
while self.trex_client.is_traffic_active():
stats = self.trex_client.get_stats(ports=ports)
if not stats:
break
main.log.debug(
"\nTRAFFIC RUNNING {:.2f} SEC".format(time.time() - s_time))
main.log.debug(
"{:^4} | {:<10} | {:<10} | {:<10} | {:<10} |".format(
"Port", "RX bps", "TX bps", "RX pps", "TX pps"
)
)
main.log.debug(
"----------------------------------------------------------")
for port in ports:
opackets = stats[port]["opackets"]
ipackets = stats[port]["ipackets"]
obytes = stats[port]["obytes"]
ibytes = stats[port]["ibytes"]
time_diff = time.time() - prev[port]["time"]
rx_bps = 8 * (ibytes - prev[port]["ibytes"]) / time_diff
tx_bps = 8 * (obytes - prev[port]["obytes"]) / time_diff
rx_pps = ipackets - prev[port]["ipackets"] / time_diff
tx_pps = opackets - prev[port]["opackets"] / time_diff
main.log.debug(
"{:^4} | {:<10} | {:<10} | {:<10} | {:<10} |".format(
port,
TrexClientDriver.__to_readable(rx_bps, "bps"),
TrexClientDriver.__to_readable(tx_bps, "bps"),
TrexClientDriver.__to_readable(rx_pps, "pps"),
TrexClientDriver.__to_readable(tx_pps, "pps"),
)
)
results["duration"].append(time.time() - s_time)
results[port]["rx_bps"].append(rx_bps)
results[port]["tx_bps"].append(tx_bps)
results[port]["rx_pps"].append(rx_pps)
results[port]["tx_pps"].append(tx_pps)
prev[port]["opackets"] = opackets
prev[port]["ipackets"] = ipackets
prev[port]["obytes"] = obytes
prev[port]["ibytes"] = ibytes
prev[port]["time"] = time.time()
time.sleep(time_interval)
main.log.debug("")
return results
@staticmethod
def __to_readable(src, unit="bps"):
"""
Convert number to human readable string.
For example: 1,000,000 bps to 1Mbps. 1,000 bytes to 1KB
:parameters:
src : int
the original data
unit : str
the unit ('bps', 'pps', or 'bytes')
:returns:
A human readable string
"""
if src < 1000:
return "{:.1f} {}".format(src, unit)
elif src < 1000000:
return "{:.1f} K{}".format(src / 1000, unit)
elif src < 1000000000:
return "{:.1f} M{}".format(src / 1000000, unit)
else:
return "{:.1f} G{}".format(src / 1000000000, unit)
@staticmethod
def __get_readable_port_stats(port_stats):
opackets = port_stats.get("opackets", 0)
ipackets = port_stats.get("ipackets", 0)
obytes = port_stats.get("obytes", 0)
ibytes = port_stats.get("ibytes", 0)
oerrors = port_stats.get("oerrors", 0)
ierrors = port_stats.get("ierrors", 0)
tx_bps = port_stats.get("tx_bps", 0)
tx_pps = port_stats.get("tx_pps", 0)
tx_bps_L1 = port_stats.get("tx_bps_L1", 0)
tx_util = port_stats.get("tx_util", 0)
rx_bps = port_stats.get("rx_bps", 0)
rx_pps = port_stats.get("rx_pps", 0)
rx_bps_L1 = port_stats.get("rx_bps_L1", 0)
rx_util = port_stats.get("rx_util", 0)
return """
Output packets: {}
Input packets: {}
Output bytes: {} ({})
Input bytes: {} ({})
Output errors: {}
Input errors: {}
TX bps: {} ({})
TX pps: {} ({})
L1 TX bps: {} ({})
TX util: {}
RX bps: {} ({})
RX pps: {} ({})
L1 RX bps: {} ({})
RX util: {}""".format(
opackets,
ipackets,
obytes,
TrexClientDriver.__to_readable(obytes, "Bytes"),
ibytes,
TrexClientDriver.__to_readable(ibytes, "Bytes"),
oerrors,
ierrors,
tx_bps,
TrexClientDriver.__to_readable(tx_bps),
tx_pps,
TrexClientDriver.__to_readable(tx_pps, "pps"),
tx_bps_L1,
TrexClientDriver.__to_readable(tx_bps_L1),
tx_util,
rx_bps,
TrexClientDriver.__to_readable(rx_bps),
rx_pps,
TrexClientDriver.__to_readable(rx_pps, "pps"),
rx_bps_L1,
TrexClientDriver.__to_readable(rx_bps_L1),
rx_util,
)
@staticmethod
def __get_port_stats(port, stats):
"""
:param port: int
:param stats:
:return:
"""
port_stats = stats.get(port)
return PortStats(
tx_packets=port_stats.get("opackets", 0),
rx_packets=port_stats.get("ipackets", 0),
tx_bytes=port_stats.get("obytes", 0),
rx_bytes=port_stats.get("ibytes", 0),
tx_errors=port_stats.get("oerrors", 0),
rx_errors=port_stats.get("ierrors", 0),
tx_bps=port_stats.get("tx_bps", 0),
tx_pps=port_stats.get("tx_pps", 0),
tx_bps_L1=port_stats.get("tx_bps_L1", 0),
tx_util=port_stats.get("tx_util", 0),
rx_bps=port_stats.get("rx_bps", 0),
rx_pps=port_stats.get("rx_pps", 0),
rx_bps_L1=port_stats.get("rx_bps_L1", 0),
rx_util=port_stats.get("rx_util", 0),
)
@staticmethod
def __get_latency_stats(pg_id, stats):
"""
:param pg_id: int
:param stats:
:return:
"""
lat_stats = stats["latency"].get(pg_id)
lat = lat_stats["latency"]
# Estimate latency percentiles from the histogram.
l = list(lat["histogram"].keys())
l.sort()
all_latencies = []
for sample in l:
range_start = sample
if range_start == 0:
range_end = 10
else:
range_end = range_start + pow(10, (len(str(range_start)) - 1))
val = lat["histogram"][sample]
# Assume whole the bucket experienced the range_end latency.
all_latencies += [range_end] * val
q = [50, 75, 90, 99, 99.9, 99.99, 99.999]
percentiles = np.percentile(all_latencies, q)
ret = LatencyStats(
pg_id=pg_id,
jitter=lat["jitter"],
average=lat["average"],
total_max=lat["total_max"],
total_min=lat["total_min"],
last_max=lat["last_max"],
histogram=lat["histogram"],
dropped=lat_stats["err_cntrs"]["dropped"],
out_of_order=lat_stats["err_cntrs"]["out_of_order"],
duplicate=lat_stats["err_cntrs"]["dup"],
seq_too_high=lat_stats["err_cntrs"]["seq_too_high"],
seq_too_low=lat_stats["err_cntrs"]["seq_too_low"],
percentile_50=percentiles[0],
percentile_75=percentiles[1],
percentile_90=percentiles[2],
percentile_99=percentiles[3],
percentile_99_9=percentiles[4],
percentile_99_99=percentiles[5],
percentile_99_999=percentiles[6],
)
return ret
@staticmethod
def __get_readable_latency_stats(stats):
"""
:param stats: LatencyStats
:return:
"""
histogram = ""
# need to listify in order to be able to sort them.
l = list(stats.histogram.keys())
l.sort()
for sample in l:
range_start = sample
if range_start == 0:
range_end = 10
else:
range_end = range_start + pow(10, (len(str(range_start)) - 1))
val = stats.histogram[sample]
histogram = (
histogram
+ "\n Packets with latency between {0:>5} us and {1:>5} us: {2:>10}".format(
range_start, range_end, val
)
)
return """
Latency info for pg_id {}
Dropped packets: {}
Out-of-order packets: {}
Sequence too high packets: {}
Sequence too low packets: {}
Maximum latency: {} us
Minimum latency: {} us
Maximum latency in last sampling period: {} us
Average latency: {} us
50th percentile latency: {} us
75th percentile latency: {} us
90th percentile latency: {} us
99th percentile latency: {} us
99.9th percentile latency: {} us
99.99th percentile latency: {} us
99.999th percentile latency: {} us
Jitter: {} us
Latency distribution histogram: {}
""".format(stats.pg_id, stats.dropped, stats.out_of_order,
stats.seq_too_high, stats.seq_too_low, stats.total_max,
stats.total_min, stats.last_max, stats.average,
stats.percentile_50, stats.percentile_75,
stats.percentile_90,
stats.percentile_99, stats.percentile_99_9,
stats.percentile_99_99,
stats.percentile_99_999, stats.jitter, histogram)
@staticmethod
def __get_flow_stats(pg_id, stats):
"""
:param pg_id: int
:param stats:
:return:
"""
FlowStats = collections.namedtuple(
"FlowStats",
["pg_id", "tx_packets", "rx_packets", "tx_bytes", "rx_bytes", ],
)
flow_stats = stats["flow_stats"].get(pg_id)
ret = FlowStats(
pg_id=pg_id,
tx_packets=flow_stats["tx_pkts"]["total"],
rx_packets=flow_stats["rx_pkts"]["total"],
tx_bytes=flow_stats["tx_bytes"]["total"],
rx_bytes=flow_stats["rx_bytes"]["total"],
)
return ret
@staticmethod
def __get_readable_flow_stats(stats):
"""
:param stats: FlowStats
:return:
"""
return """Flow info for pg_id {}
TX packets: {}
RX packets: {}
TX bytes: {}
RX bytes: {}""".format(stats.pg_id, stats.tx_packets,
stats.rx_packets, stats.tx_bytes,
stats.rx_bytes)