[SDFAB-861] Add a testcase to UP4 with PFCP agent
- Due to only being able to add one application filter per ue
session with the pfcpsim, created a new testcase to individually
test the apllication filters from the params file plus the default
application filter
- Using a Mock SMF to send messages to the pfcp agent
- Add a parameter to topo file to use mock_smf or p4rt cli to add
flows
- Modify ue ips to work around smf limitations
- Modify teids for UL and DL to be different. This reduces
differences between smf and p4rtcli generated flows
Change-Id: I1ba3ef43919dd375f5e5bf54e97f61c09c7323d9
diff --git a/TestON/drivers/common/cli/mocksmfdriver.py b/TestON/drivers/common/cli/mocksmfdriver.py
new file mode 100644
index 0000000..6ff9390
--- /dev/null
+++ b/TestON/drivers/common/cli/mocksmfdriver.py
@@ -0,0 +1,277 @@
+"""
+Copyright 2022 Open Networking Foundation (ONF)
+
+Please refer questions to either the onos test mailing list at <onos-test@onosproject.org>,
+the System Testing Plans and Results wiki page at <https://wiki.onosproject.org/x/voMg>,
+or the System Testing Guide page at <https://wiki.onosproject.org/x/WYQg>
+
+"""
+
+import pexpect
+import os
+from drivers.common.clidriver import CLI
+
+
+class MockSMFDriver( CLI ):
+ """
+ Runs commands using the mock smf program to send messages to a UPF.
+
+ """
+
+ def __init__( self ):
+ """
+ Initialize client
+ """
+ super( MockSMFDriver, self ).__init__()
+ self.name = None
+ self.handle = None
+ self.prompt = "\$"
+ self.mock_smf_path = None
+
+ def connect( self, **connectargs ):
+ """
+ Creates the ssh handle for the mock smf
+ """
+ try:
+ for key in connectargs:
+ vars( self )[ key ] = connectargs[ key ]
+ self.name = self.options.get( "name", "" )
+ self.mock_smf_path = self.options.get( "mock_smf_path", None )
+ try:
+ if os.getenv( str( self.ip_address ) ) is not None:
+ self.ip_address = os.getenv( str( self.ip_address ) )
+ else:
+ main.log.info( self.name + ": ip set to " + self.ip_address )
+ except KeyError:
+ main.log.info( self.name + ": Invalid host name," +
+ "defaulting to 'localhost' instead" )
+ self.ip_address = 'localhost'
+ except Exception as e:
+ main.log.error( "Uncaught exception: " + str( e ) )
+
+ self.handle = super( MockSMFDriver, self ).connect(
+ user_name=self.user_name,
+ ip_address=self.ip_address,
+ port=None,
+ pwd=self.pwd )
+ if self.handle:
+ main.log.info( "Connection successful to the host " +
+ self.user_name +
+ "@" +
+ self.ip_address )
+ self.handle.sendline( "" )
+ self.handle.expect( self.prompt )
+ return main.TRUE
+ else:
+ main.log.error( "Connection failed to " +
+ self.user_name +
+ "@" +
+ self.ip_address )
+ return main.FALSE
+ except pexpect.EOF:
+ main.log.error( self.name + ": EOF exception found" )
+ main.log.error( self.name + ": " + self.handle.before )
+ main.cleanAndExit()
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ main.cleanAndExit()
+
+ def startSMF( self, pcap_file="mock_smf.pcap", timeout=10 ):
+ """
+ Start Mock SMF and connect to the given UPF address.
+ """
+ #TODO: add interface option?
+ try:
+ main.log.debug( self.name + ": Starting Mock SMF CLI" )
+ # TODO: Add pcap logging folder and other options
+ cmd = "pfcpsim &"
+ #if pcap_file:
+ # TODO: Start pcap separately for go based mock smf
+ if self.mock_smf_path:
+ self.handle.sendline( "cd " + self.mock_smf_path )
+ self.handle.expect( self.prompt )
+ main.log.debug( self.handle.before )
+ self.handle.sendline( cmd )
+ i = self.handle.expect( [ "command not found",
+ "unknown",
+ "password for",
+ self.prompt,
+ pexpect.TIMEOUT ], timeout )
+ #TODO refactor this
+ if i == 2:
+ main.log.debug( "%s: Sudo asking for password" % self.name )
+ self.handle.sendline( self.pwd if self.pwd else "jenkins" )
+ j = self.handle.expect( [ "not found", self.prompt ] )
+ if j == 0:
+ main.log.error( "%s: Error starting mock smf" % self.name )
+ main.log.debug( self.handle.before + str( self.handle.after ) )
+ main.cleanAndExit()
+ elif i == 3:
+ # Exit backgrounded pcfpsim, even if test aborts early
+ self.preDisconnect = self.stop
+ else:
+ main.log.error( "%s: Error starting mock smf" % self.name )
+ main.log.debug( self.handle.before + str( self.handle.after ) )
+ main.cleanAndExit()
+ return main.TRUE
+ except pexpect.EOF:
+ main.log.error( self.name + ": EOF exception found" )
+ main.log.error( self.name + ": " + self.handle.before )
+ main.cleanAndExit()
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ main.cleanAndExit()
+
+ def sendline( self, cmd, timeout=10 ):
+ """
+ Handles the cli output from the mock smf. Returns main.TRUE if no error, else main.FALSE
+ """
+ try:
+ main.log.debug( "%s: Sending %s" % ( self.name, cmd ) )
+ self.handle.sendline( "pfcpctl %s" % cmd )
+ i = self.handle.expect( [ "command not found",
+ "unknown",
+ "ERRO",
+ "FATA",
+ self.prompt,
+ pexpect.TIMEOUT ], timeout )
+ if i == 4:
+ return main.TRUE
+ else:
+ main.log.error( "%s: Error with mock smf cmd: %s" % ( self.name, cmd ) )
+ output = self.handle.before + str( self.handle.after )
+ if i < 3:
+ # If not timeout, make sure we get rest of prompt from buffer
+ self.handle.expect( [ self.prompt, pexpect.TIMEOUT ], timeout )
+ output += self.handle.before
+ main.log.debug( "%s:%s" % ( self.name, output ) )
+ return main.FALSE
+ except pexpect.EOF:
+ main.log.error( self.name + ": EOF exception found" )
+ main.log.error( self.name + ": " + self.handle.before )
+ main.cleanAndExit()
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ main.cleanAndExit()
+
+ def configure( self, n3_addr, upf_addr, upf_port=None ):
+ """
+ Configure pfcpsim to connect to upf
+ """
+ try:
+ cmd = "service configure --n3-addr %s --remote-peer-addr %s%s" % (n3_addr,
+ upf_addr,
+ "" if not upf_port else ":%s" % upf_port)
+ return self.sendline( cmd )
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ main.cleanAndExit()
+
+ def associate( self ):
+ """
+ Setup PFCP Association
+ """
+ try:
+ cmd = "service associate"
+ return self.sendline( cmd )
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ main.cleanAndExit()
+
+ def disassociate( self ):
+ """
+ Teardown PFCP Association
+ """
+ try:
+ cmd = "service disassociate"
+ return self.sendline( cmd )
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ main.cleanAndExit()
+
+ def create( self, ue_pool, gnb_addr, session_count=1,
+ base_id=1, sdf=[] ):
+ """
+ Create PFCP Session(s)
+
+ Arguements:
+ - ue_pool: The IPv4 prefix from which UE addresses will be drawn
+ - gnb_addr: The IPv4 address of the eNodeB
+ Optional Arguments:
+ - session_count: The number of sessions for which UE flows should be
+ created. Defaults to 1
+ - base_id: The first id to use for the IDs. Further IDs will be
+ generated by incrementing. Defaults to 1
+ - sdf: The sdf filter string to pass to pfcpctl
+ """
+ try:
+ cmd = "session create --count %s --ue-pool %s --gnb-addr %s --baseID %s %s" % (
+ session_count, ue_pool, gnb_addr, base_id,
+ " ".join( sdf ) )
+ return self.sendline( cmd )
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ main.cleanAndExit()
+
+ def modify( self, ue_pool, gnb_addr, session_count=1,
+ base_id=1, buffering=False, notifycp=False ):
+ """
+ Modify PFCP Sessions(s)
+
+ Arguements:
+ - ue_pool: The IPv4 prefix from which UE addresses will be drawn
+ - gnb_addr: The IPv4 address of the eNodeB
+ Optional Arguments:
+ - session_count: The number of sessions for which UE flows should be
+ created. Defaults to 1
+ - base_id: The first id to use for the IDs. Further IDs will be
+ generated by incrementing. Defaults to 1
+ - buffering: If this argument is present, downlink FARs will have the
+ buffering flag set to true. Defaults to False
+ - notifycp: If this argument is present, downlink FARs will have the notify
+ CP flag set to true. Defaults to False
+ """
+ try:
+ cmd = "session modify --count %s --ue-pool %s --gnb-addr %s --baseID %s %s %s" % (
+ session_count, ue_pool, gnb_addr, base_id,
+ "--buffer" if buffering else "",
+ "--notifycp" if notifycp else "" )
+ return self.sendline( cmd )
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ main.cleanAndExit()
+
+ def delete( self, session_count=1, base_id=1 ):
+ """
+ Delete PFPC Session(s)
+
+ Arguements:
+ - session_count: The number of sessions for which UE flows should be
+ created. Defaults to 1
+ - base_id: The first id to use for the IDs. Further IDs will be
+ generated by incrementing. Defaults to 1
+ """
+ try:
+ cmd = "session delete --count %s --baseID %s" % (
+ session_count, base_id )
+ return self.sendline( cmd )
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ main.cleanAndExit()
+
+ def stop( self ):
+ """
+ Exits Mock SMF
+ """
+ try:
+ self.handle.sendline( "fg" )
+ self.handle.send( "\x03" )
+ self.clearBuffer()
+ except pexpect.EOF:
+ main.log.error( self.name + ": EOF exception found" )
+ main.log.error( self.name + ": " + self.handle.before )
+ main.cleanAndExit()
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ main.cleanAndExit()
+
diff --git a/TestON/drivers/common/cli/onosclusterdriver.py b/TestON/drivers/common/cli/onosclusterdriver.py
index d096d1b..e2fd28d 100755
--- a/TestON/drivers/common/cli/onosclusterdriver.py
+++ b/TestON/drivers/common/cli/onosclusterdriver.py
@@ -28,6 +28,7 @@
or the System Testing Guide page at <https://wiki.onosproject.org/x/WYQg>
"""
+from distutils.util import strtobool
import pexpect
import os
from drivers.common.clidriver import CLI
@@ -68,10 +69,13 @@
if hasattr( self.p4rtUp4, name ):
main.log.debug( "%s: Using UP4 driver's attribute for '%s'" % ( self.name, name ) )
return getattr( self.p4rtUp4, name )
+ if hasattr( self.mock_smf, name ):
+ main.log.debug( "%s: Using mock smf driver's attribute for '%s'" % ( self.name, name ) )
+ return getattr( self.mock_smf, name )
raise AttributeError( "Could not find the attribute %s in %r or it's component handles" % ( name, self ) )
def __init__( self, name, ipAddress, CLI=None, REST=None, Bench=None, pos=None,
- userName=None, server=None, k8s=None, p4rtUp4=None, dockerPrompt=None ):
+ userName=None, server=None, k8s=None, p4rtUp4=None, mock_smf=None, dockerPrompt=None ):
# TODO: validate these arguments
self.name = str( name )
self.ipAddress = ipAddress
@@ -85,6 +89,7 @@
self.server = server
self.k8s = k8s
self.p4rtUp4 = p4rtUp4
+ self.mock_smf = mock_smf
self.dockerPrompt = dockerPrompt
class OnosClusterDriver( CLI ):
@@ -106,6 +111,7 @@
self.nodePass = None
self.nodes = []
self.up4Port = None
+ self.mock_smf = None
super( OnosClusterDriver, self ).__init__()
def connect( self, **connectargs ):
@@ -153,6 +159,8 @@
elif key == "up4_port":
# Defining up4_port triggers the creation of the P4RuntimeCliDriver component
self.up4Port = self.options[ key ]
+ elif key == "mock_smf":
+ self.mock_smf = strtobool( self.options[ key ] )
self.home = self.checkOptions( self.home, "~/onos" )
self.karafUser = self.checkOptions( self.karafUser, self.user_name )
@@ -169,6 +177,7 @@
self.maxNodes = int( self.checkOptions( self.maxNodes, 100 ) )
self.kubeConfig = self.checkOptions( self.kubeConfig, None )
self.up4Port = self.checkOptions( self.up4Port, None )
+ self.mock_smf = self.checkOptions( self.mock_smf, None )
self.name = self.options[ 'name' ]
@@ -266,7 +275,7 @@
elif self.up4Port and port == int( self.up4Port ):
node.p4rtUp4.p4rtPort = localPort
# Set kubeconfig for all components
- for shell in [ node.CLI, node.Bench, node.k8s, node.p4rtUp4 ]:
+ for shell in [ node.CLI, node.Bench, node.k8s, node.p4rtUp4, node.mock_smf ]:
if shell:
shell.setEnv( "KUBECONFIG", value=kubectl.kubeConfig )
main.log.info( "Setting up port forward for pod %s: [ %s ]" % ( self.podNames[ index ], portsList ) )
@@ -529,6 +538,49 @@
main.log.error( name + " component already exists!" )
main.cleanAndExit()
+ def setMockSMFOptions( self, name, ipAddress ):
+ """
+ Parse the cluster options to create a mock smf component with the given name
+
+ Arguments:
+ name - The name of the P4RuntimeCLI component
+ ipAddress - The ip address of the ONOS instance
+ """
+ main.componentDictionary[name] = main.componentDictionary[self.name].copy()
+ main.componentDictionary[name]['type'] = "MockSMFDriver"
+ main.componentDictionary[name]['host'] = ipAddress
+ main.componentDictionary[name]['connect_order'] = str( int( main.componentDictionary[name]['connect_order'] ) + 1 )
+
+ def createMockSMFComponent( self, name, ipAddress ):
+ """
+ Creates a new mock smf component. This will be connected to the node
+ ONOS is running on.
+
+ Arguments:
+ name - The string of the name of this component. The new component
+ will be assigned to main.<name> .
+ In addition, main.<name>.name = str( name )
+ ipAddress - The ip address of the server
+ """
+ try:
+ # look to see if this component already exists
+ getattr( main, name )
+ except AttributeError:
+ # namespace is clear, creating component
+ self.setMockSMFOptions( name, ipAddress )
+ return main.componentInit( name )
+ except pexpect.EOF:
+ main.log.error( self.name + ": EOF exception found" )
+ main.log.error( self.name + ": " + self.handle.before )
+ main.cleanAndExit()
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ main.cleanAndExit()
+ else:
+ # namespace is not clear!
+ main.log.error( name + " component already exists!" )
+ main.cleanAndExit()
+
def createComponents( self, prefix='', createServer=True ):
"""
Creates a CLI and REST component for each nodes in the cluster
@@ -540,6 +592,7 @@
serverPrefix = prefix + "server"
k8sPrefix = prefix + "k8s"
up4Prefix = prefix + "up4cl"
+ smfPrefix = prefix + "smf"
for i in xrange( 1, self.maxNodes + 1 ):
cliName = cliPrefix + str( i )
restName = restPrefix + str( i )
@@ -549,6 +602,8 @@
k8sName = k8sPrefix + str( i )
if self.up4Port:
up4Name = up4Prefix + str( i )
+ if self.mock_smf:
+ smfName = smfPrefix + str( i )
# Unfortunately this means we need to have a cell set beofre running TestON,
# Even if it is just the entire possible cluster size
@@ -560,9 +615,11 @@
server = self.createServerComponent( serverName, ip ) if createServer else None
k8s = self.createServerComponent( k8sName, ip ) if self.kubeConfig else None
p4rtUp4 = self.createP4rtCLIComponent( up4Name, ip ) if self.up4Port else None
+ smf = self.createMockSMFComponent( smfName, ip ) if self.mock_smf else None
+ self.mock_smf = smf
if self.kubeConfig:
k8s.kubeConfig = self.kubeConfig
k8s.podName = None
self.nodes.append( Controller( prefix + str( i ), ip, cli, rest, bench, i - 1,
self.user_name, server=server, k8s=k8s,
- p4rtUp4=p4rtUp4, dockerPrompt=self.dockerPrompt ) )
+ p4rtUp4=p4rtUp4, mock_smf=smf, dockerPrompt=self.dockerPrompt ) )
diff --git a/TestON/drivers/common/cli/p4runtimeclidriver.py b/TestON/drivers/common/cli/p4runtimeclidriver.py
index b44c2cb..9f371dc 100644
--- a/TestON/drivers/common/cli/p4runtimeclidriver.py
+++ b/TestON/drivers/common/cli/p4runtimeclidriver.py
@@ -125,7 +125,7 @@
self.preDisconnect = self.stopP4RtClient
except pexpect.TIMEOUT:
main.log.exception(self.name + ": Command timed out")
- return main.FALSE
+ main.cleanAndExit()
except pexpect.EOF:
main.log.exception(self.name + ": connection closed.")
main.cleanAndExit()
diff --git a/TestON/drivers/common/clidriver.py b/TestON/drivers/common/clidriver.py
index 1e4d233..4f081ea 100644
--- a/TestON/drivers/common/clidriver.py
+++ b/TestON/drivers/common/clidriver.py
@@ -156,7 +156,7 @@
def disconnect( self ):
result = self.preDisconnect()
- result = super( CLI, self ).disconnect( self )
+ result = super( CLI, self ).disconnect( )
result = main.TRUE
def Prompt( self ):
@@ -1220,6 +1220,49 @@
main.log.exception( self.name + ": Uncaught exception!" )
return None
+ def kubectlCmd( self, cmd, kubeconfig=None, namespace=None ):
+ """
+ Run an arbitrary command using kubectl
+ Arguments:
+ - cmd: Command string to send to kubectl
+ Optional Arguments:
+ - kubeconfig: The path to a kubeconfig file
+ - namespace: The namespace to search in
+ Returns a string of the node name or None
+ """
+ try:
+ self.handle.sendline( "" )
+ self.handle.expect( self.prompt )
+ main.log.debug( self.handle.before + self.handle.after )
+ cmdStr = "kubectl %s %s %s" % (
+ "--kubeconfig %s" % kubeconfig if kubeconfig else "",
+ "-n %s" % namespace if namespace else "",
+ cmd )
+ main.log.info( self.name + ": sending: " + repr( cmdStr ) )
+ self.handle.sendline( cmdStr )
+ i = self.handle.expect( [ "not found", "error", "The connection to the server", self.prompt ] )
+ if i == 3:
+ output = self.handle.before
+ main.log.debug( self.name + ": " + output )
+ output = output.splitlines()
+ main.log.warn( output )
+ return output[1] if len( output ) == 3 else None
+ else:
+ main.log.error( self.name + ": Error executing command" )
+ main.log.debug( self.name + ": " + self.handle.before + str( self.handle.after ) )
+ return None
+ except pexpect.EOF:
+ main.log.error( self.name + ": EOF exception found" )
+ main.log.error( self.name + ": " + self.handle.before )
+ return None
+ except pexpect.TIMEOUT:
+ main.log.exception( self.name + ": TIMEOUT exception found" )
+ main.log.error( self.name + ": " + self.handle.before )
+ return None
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ return None
+
def sternLogs( self, podString, dstPath, kubeconfig=None, namespace=None, since='1h', wait=60 ):
"""
Use stern to get the logs from a pod
@@ -1425,7 +1468,7 @@
except pexpect.TIMEOUT:
main.log.exception( self.name + ": TIMEOUT exception found" )
main.log.error( self.name + ": " + self.handle.before )
- return main.FALSE
+ return self.checkPortForward( podName, portsList, kubeconfig, namespace )
except Exception:
main.log.exception( self.name + ": Uncaught exception!" )
return main.FALSE
@@ -1459,6 +1502,7 @@
main.log.warn( "%s: port-forwarding session to %s closed, attempting to reestablish." % ( self.name, podName ) )
return self.kubectlPortForward( podName, portsList, kubeconfig, namespace )
elif i == 1:
+ main.log.debug( "%s: We seem to still be in port-forwarding session" % self.name )
# Still in a command, port-forward is probably still active
return main.TRUE
except pexpect.EOF:
@@ -1665,6 +1709,58 @@
main.log.exception( self.name + ": Uncaught exception!" )
return main.FALSE
+ def kubectlGetServiceIP( self, serviceName, kubeconfig=None, namespace=None, timeout=240 ):
+ try:
+ cmdStr = "kubectl %s %s get service %s " \
+ "--output=jsonpath='{.spec.clusterIP}{\"\\n\"}'" % (
+ "--kubeconfig %s" % kubeconfig if kubeconfig else "",
+ "-n %s" % namespace if namespace else "",
+ serviceName )
+ main.log.info( self.name + ": sending: " + repr( cmdStr ) )
+ self.handle.sendline( cmdStr )
+ self.handle.expect( self.prompt, timeout=timeout )
+ output = self.handle.before
+ clusterIP = output.splitlines()
+ main.log.debug( repr( clusterIP ) )
+ return clusterIP[-2]
+ except pexpect.EOF:
+ main.log.error( self.name + ": EOF exception found" )
+ main.log.error( self.name + ": " + self.handle.before )
+ return None
+ except pexpect.TIMEOUT:
+ main.log.exception( self.name + ": TIMEOUT exception found" )
+ main.log.error( self.name + ": " + self.handle.before )
+ return None
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ return None
+
+ def kubectlGetNodePort( self, serviceName, kubeconfig=None, namespace=None, timeout=240 ):
+ try:
+ cmdStr = "kubectl %s %s get service %s " \
+ "--output=jsonpath='{.spec.ports[*].nodePort}{\"\\n\"}'" % (
+ "--kubeconfig %s" % kubeconfig if kubeconfig else "",
+ "-n %s" % namespace if namespace else "",
+ serviceName )
+ main.log.info( self.name + ": sending: " + repr( cmdStr ) )
+ self.handle.sendline( cmdStr )
+ self.handle.expect( self.prompt, timeout=timeout )
+ output = self.handle.before
+ clusterIP = output.splitlines()
+ main.log.debug( repr( clusterIP ) )
+ return clusterIP[-2]
+ except pexpect.EOF:
+ main.log.error( self.name + ": EOF exception found" )
+ main.log.error( self.name + ": " + self.handle.before )
+ return None
+ except pexpect.TIMEOUT:
+ main.log.exception( self.name + ": TIMEOUT exception found" )
+ main.log.error( self.name + ": " + self.handle.before )
+ return None
+ except Exception:
+ main.log.exception( self.name + ": Uncaught exception!" )
+ return None
+
def clearBuffer(self):
i = 0
response = ''