blob: 7ddd53237f335f6fec5c0a1b8888ac9c6fa0281f [file] [log] [blame]
#Testing the basic functionality of ONOS Next
#For sanity and driver functionality excercises only.
import time
import sys
import os
import re
import time
import json
time.sleep(1)
class OpticalFunc13:
def __init__(self):
self.default = ''
def CASE1(self, main):
'''
Startup sequence:
git pull
mvn clean install
onos-package
cell <name>
onos-verify-cell
onos-install -f
onos-wait-for-start
'''
cell_name = main.params['ENV']['cellName']
ONOS1_ip = main.params['CTRL']['ip1']
ONOS1_port = main.params['CTRL']['port1']
main.case("Setting up test environment")
main.step("Git checkout and pull master and get version")
main.ONOSbench.git_checkout("master")
git_pull_result = main.ONOSbench.git_pull()
print "git_pull_result = ", git_pull_result
version_result = main.ONOSbench.get_version()
main.log.report(main.ONOSbench.get_version())
if git_pull_result == 1:
main.step("Using mvn clean & install")
clean_install_result = main.ONOSbench.clean_install()
#clean_install_result = main.TRUE
main.step("Applying cell variable to environment")
cell_result1 = main.ONOSbench.set_cell(cell_name)
verify_result = main.ONOSbench.verify_cell()
cell_result2 = main.ONOS2.set_cell(cell_name)
#verify_result = main.ONOS2.verify_cell()
main.ONOS2.start_onos_cli(ONOS_ip=main.params['CTRL']['ip1'])
cell_result = cell_result1 and cell_result2
main.step("Creating ONOS package")
package_result = main.ONOSbench.onos_package()
#main.step("Creating a cell")
#cell_create_result = main.ONOSbench.create_cell_file(**************)
main.step("Installing ONOS package")
onos_install_result = main.ONOSbench.onos_install()
onos1_isup = main.ONOSbench.isup()
main.step("Starting ONOS service")
start_result = main.ONOSbench.onos_start(ONOS1_ip)
case1_result = (package_result and\
cell_result and verify_result and onos_install_result and\
onos1_isup and start_result )
utilities.assert_equals(expect=main.TRUE, actual=case1_result,
onpass="Test startup successful",
onfail="Test startup NOT successful")
def CASE11(self, main):
'''
Cleanup sequence:
onos-service <node_ip> stop
onos-uninstall
TODO: Define rest of cleanup
'''
ONOS1_ip = main.params['CTRL']['ip1']
main.case("Cleaning up test environment")
main.step("Testing ONOS kill function")
kill_result = main.ONOSbench.onos_kill(ONOS1_ip)
main.step("Stopping ONOS service")
stop_result = main.ONOSbench.onos_stop(ONOS1_ip)
main.step("Uninstalling ONOS service")
uninstall_result = main.ONOSbench.onos_uninstall()
def CASE21(self, main):
import time
'''
On ONOS bench, run this command: ./~/ONOS/tools/test/bin/onos-topo-cfg
which starts the rest and copies the links json file to the onos instance
Note that in case of Packet Optical, the links are not learnt from the topology, instead the links are learnt
from the json config file
'''
main.log.report("This testcase starts the packet layer topology and REST")
main.log.report("_____________________________________________")
sart_console_result = main.LincOE1.start_console()
optical_mn_script = main.LincOE2.run_optical_mn_script()
onos_topo_cfg_result = main.ONOSbench.run_onos_topo_cfg(instance_name = main.params['CTRL']['ip1'], json_file = main.params['OPTICAL']['jsonfile'])
def CASE22(self, main):
'''
Curretly we use, 4 linear switch optical topology and 2 packet layer mininet switches each with one host.
Therefore, the roadmCount variable = 4, packetLayerSWCount variable = 2, hostCount =2
and this is hardcoded in the testcase. If the topology changes, these hardcoded values need to be changed
'''
main.log.report("This testcase compares the optical+packet topology against what is expected")
main.ONOS2.start_onos_cli(ONOS_ip=main.params['CTRL']['ip1'])
devices_result = main.ONOS2.devices(json_format = False)
print "devices_result = ", devices_result
devices_linewise = devices_result.split("\n")
devices_linewise = devices_linewise[1:-1]
roadmCount = 0
packetLayerSWCount = 0
for line in devices_linewise:
components = line.split(",")
availability = components[1].split("=")[1]
type = components[3].split("=")[1]
if availability == 'true' and type == 'ROADM':
roadmCount += 1
elif availability == 'true' and type =='SWITCH':
packetLayerSWCount += 1
if roadmCount == 4:
print "Number of Optical Switches = %d and is correctly detected" %roadmCount
main.log.info ("Number of Optical Switches = " +str(roadmCount) +" and is correctly detected")
opticalSW_result = main.TRUE
else:
print "Number of Optical Switches = %d and is wrong" %roadCount
main.log.info ("Number of Optical Switches = " +str(roadmCount) +" and is wrong")
opticalSW_result = main.FALSE
if packetLayerSWCount == 2:
print "Number of Packet layer or mininet Switches = %d and is correctly detected" %packetLayerSWCount
main.log.info("Number of Packet layer or mininet Switches = " +str(packetLayerSWCount) + " and is correctly detected")
packetSW_result = main.TRUE
else:
print "Number of Packet layer or mininet Switches = %d and is wrong" %packetLayerSWCount
main.log.info("Number of Packet layer or mininet Switches = " +str(packetLayerSWCount) + " and is wrong")
packetSW_result = main.FALSE
print "_________________________________"
links_result = main.ONOS2.links(json_format = False)
print "links_result = ", links_result
print "_________________________________"
#Discover hosts using pingall
pingall_result = main.LincOE2.pingall()
hosts_result = main.ONOS2.hosts(json_format = False)
print "hosts_result = ", hosts_result
print "_________________________________"
hosts_linewise = hosts_result.split("\n")
hosts_linewise = hosts_linewise[1:-1]
hostCount = 0
for line in hosts_linewise:
hostid = line.split(",")[0].split("=")[1]
hostCount +=1
if hostCount ==2:
print "Number of hosts = %d and is correctly detected" %hostCount
main.log.info("Number of hosts = " + str(hostCount) +" and is correctly detected")
hostDiscovery = main.TRUE
else:
print "Number of hosts = %d and is wrong" %hostCount
main.log.info("Number of hosts = " + str(hostCount) +" and is wrong")
hostDiscovery = main.FALSE
case22_result = opticalSW_result and packetSW_result and hostDiscovery
utilities.assert_equals(expect=main.TRUE, actual=case22_result,
onpass="Packet optical topology discovery successful",
onfail="Packet optical topology discovery failed")
def CASE23(self, main):
import time
'''
Add bidirectional point intents between 2 packet layer(mininet) devices and
ping mininet hosts
'''
main.log.report("This testcase adds bidirectional point intents between 2 packet layer(mininet) devices and ping mininet hosts")
ptp_intent_result = main.ONOS2.add_point_intent("of:0000ffffffff0001", 1, "of:0000ffffffff0002", 1)
if ptp_intent_result == main.TRUE:
get_intent_result = main.ONOS2.intents()
main.log.info("Point to point intent install successful")
main.log.info(get_intent_result)
ptp_intent_result = main.ONOS2.add_point_intent("of:0000ffffffff0002", 1, "of:0000ffffffff0001", 1)
if ptp_intent_result == main.TRUE:
get_intent_result = main.ONOS2.intents()
main.log.info("Point to point intent install successful")
main.log.info(get_intent_result)
time.sleep(10)
flowHandle = main.ONOS2.flows()
#print "flowHandle = ", flowHandle
main.log.info("flows :" + flowHandle)
intentHandle = main.ONOS2.intents()
main.log.info("intents :" + intentHandle)
Ping_Result = main.TRUE
count = 1
main.log.info("\n\nh1 is Pinging h2")
ping = main.LincOE2.pingHostOptical(src="h1", target="h2")
#ping = main.LincOE2.pinghost()
if ping == main.FALSE and count<5:
count+=1
Ping_Result = main.FALSE
main.log.report("Ping between h1 and h2 failed. Making attempt number "+str(count) + " in 2 seconds")
time.sleep(2)
ping = main.LincOE2.pingHostOptical(src="h1", target="h2")
#ping = main.LincOE2.pinghost()
elif ping==main.FALSE:
main.log.report("All ping attempts between h1 and h2 have failed")
Ping_Result = main.FALSE
elif ping==main.TRUE:
main.log.info("Ping test between h1 and h2 passed!")
Ping_Result = main.TRUE
else:
main.log.info("Unknown error")
Ping_Result = main.ERROR
if Ping_Result==main.FALSE:
main.log.report("Point intents for packet optical have not ben installed correctly. Cleaning up")
if Ping_Result==main.TRUE:
main.log.report("Point Intents for packet optical have been installed correctly")
case23_result = Ping_Result
utilities.assert_equals(expect=main.TRUE, actual=case23_result,
onpass="Point intents addition for packet optical and Pingall Test successful",
onfail="Point intents addition for packet optical and Pingall Test NOT successful")