add multi-threaded REST load generator
Change-Id: Ia73fe2ba3eaf50846041f73038460270c9802f4e
diff --git a/TestON/drivers/common/api/controller/onosrestdriver.py b/TestON/drivers/common/api/controller/onosrestdriver.py
index ed7786c..2a62276 100644
--- a/TestON/drivers/common/api/controller/onosrestdriver.py
+++ b/TestON/drivers/common/api/controller/onosrestdriver.py
@@ -21,6 +21,7 @@
import requests
import types
import sys
+from Queue import Queue
from drivers.common.api.controllerdriver import Controller
@@ -1715,6 +1716,12 @@
main.cleanup()
main.exit()
+ def sendFlowBatchQueue(self, q=Queue(), ip="DEFAULT", port="DEFAULT", debug=False):
+ while True:
+ item = q.get()
+ self.sendFlowBatch(batch = item)
+ q.task_done()
+
def removeFlowBatch( self, batch={},
ip="DEFAULT", port="DEFAULT" ):
"""
diff --git a/TestON/tests/COMPflow/COMPflow.params b/TestON/tests/COMPflow/COMPflow.params
index c31cd64..efb6c60 100755
--- a/TestON/tests/COMPflow/COMPflow.params
+++ b/TestON/tests/COMPflow/COMPflow.params
@@ -10,7 +10,7 @@
# <!-- <testcases>1,10,100,1000,100,2000,100,110</testcases> -->
- <testcases>1,2,10,100,1000,100,2000,100,110</testcases>
+ <testcases>1,2,11,1000,2100,100,3100,100,110</testcases>
<SCALE>
<max>1</max>
@@ -18,7 +18,6 @@
<DEBUG>on</DEBUG>
-
<ENV>
<cellName>temp</cellName>
<cellApps>drivers</cellApps>
@@ -39,7 +38,7 @@
</CASE10>
<CASE11>
- <numSw>63</numSw>
+ <numSw>15</numSw>
<nullTopo>linear</nullTopo>
<nullStart>true</nullStart>
</CASE11>
@@ -49,6 +48,15 @@
<batches>500</batches>
</CASE1000>
+ <CASE2100>
+ <numThreads>16</numThreads>
+ <chkFlowTO>200</chkFlowTO>
+ </CASE2100>
+
+ <CASE3100>
+ <chkFlowTO>200</chkFlowTO>
+ </CASE3100>
+
<SLEEP>
<startup>15</startup>
<startMN>15</startMN>
diff --git a/TestON/tests/COMPflow/COMPflow.py b/TestON/tests/COMPflow/COMPflow.py
index 39246d6..adc86c9 100644
--- a/TestON/tests/COMPflow/COMPflow.py
+++ b/TestON/tests/COMPflow/COMPflow.py
@@ -256,12 +256,11 @@
main.case( "Create a json object for the batched flows" )
- main.step( "Parse batch information" )
+ main.step( "Parse batch creation information" )
main.batchSize = int(main.params['CASE1000']['batchSize'])
main.log.info("Number of flows in a batch is:" + str(main.batchSize))
main.flowJsonBatchList = []
- main.addedBatchList = []
postTimes = []
startSw = 1
@@ -284,10 +283,21 @@
main.flowJsonBatchList.append(flowJsonBatch)
startSw += 1
+ main.log.info( "Number of items created in the batch list is: " + str(len(main.flowJsonBatchList)))
+ def CASE2000(self, main):
+ '''
+ Args:
+ main:
+ Returns:
+
+ '''
+ main.case("Using REST API /flows/{} to post flow batch")
main.step("Using REST API /flows/{} to post flow batch")
+
+ main.addedBatchList = []
tStartPost = time.time()
for item in main.flowJsonBatchList:
ts = time.time()
@@ -319,7 +329,131 @@
str(duration))
main.log.info("Rate of Confirmed Batch Flow ADD is (flows/sec): " + str( main.numFlows / duration))
- def CASE2000(self, main):
+ def CASE2100(self, main):
+ '''
+ Posting flow batches using threads
+ '''
+ main.case("Using REST API /flows/{} to post flow batch - multi-threads")
+ main.step("Using REST API /flows/{} to post flow batch - multi-threads")
+
+ from Queue import Queue
+ from threading import Thread
+ import time
+
+ main.threadID = 0
+ main.addedBatchList = []
+ q = Queue()
+ tAllAdded = 0
+
+ def postWorker(id):
+ while True:
+ item = q.get()
+ status,response = main.ONOSrest.sendFlowBatch(batch = item)
+ main.log.info("Thread {} is working on posting. ".format(id))
+ main.addedBatchList.append(response[1])
+ q.task_done()
+
+ for i in range( int( main.params['CASE2100']['numThreads'])):
+ threadID = "ThreadID-" + str(i)
+ t = Thread(target = postWorker, name = threadID, args=(threadID,) )
+ t.daemon = True
+ t.start()
+
+ tStartPost = time.time()
+ for item in main.flowJsonBatchList:
+ q.put(item)
+
+ q.join()
+ tLastPostEnd = time.time()
+
+ main.step("Check to ensure all flows are in added state.")
+ #pprint(main.addedBatchList)
+ resp = main.FALSE
+ while resp != main.TRUE and ( tAllAdded - tLastPostEnd < int (main.params['CASE2100']['chkFlowTO']) ):
+ resp = main.ONOSrest.checkFlowsState()
+ time.sleep( float(main.params['SLEEP']['chkFlow']) )
+ tAllAdded = time.time()
+
+ if tAllAdded - tLastPostEnd >= int (main.params['CASE2100']['chkFlowTO']):
+ main.log.warn("ONOS Flows still in pending state after: {} seconds.".format(tAllAdded - tLastPostEnd))
+
+ main.numFlows = int(main.params['CASE1000']['batches']) *\
+ int(main.params['CASE1000']['batchSize'])
+ main.log.info("Total number of flows: " + str (main.numFlows) )
+ main.log.info("Sum of each POST elapse time: " + str(numpy.sum(postTimes)) )
+ main.log.info("Total POST elapse time: " + str(tLastPostEnd-tStartPost))
+ main.log.info("Rate of ADD Controller response: " + str(main.numFlows / (tLastPostEnd - tStartPost)))
+
+ duration = tAllAdded - tLastPostEnd
+ main.log.info("Elapse time from end of last REST POST to Flows in ADDED state: " +\
+ str(duration))
+ main.log.info("Rate of Confirmed Batch Flow ADD is (flows/sec): " + str( main.numFlows / duration))
+ main.log.info("Number of flow Batches in the addedBatchList is: " + str( len(main.addedBatchList)))
+
+
+ def CASE3100(self, main):
+ '''
+ DELETE flow batches using threads
+ '''
+ main.case("Using REST API /flows/{} to delete flow batch - multi-threads")
+ main.step("Using REST API /flows/{} to delete flow batch - multi-threads")
+
+ from Queue import Queue
+ from threading import Thread
+ import time
+ import json
+
+ main.threadID = 0
+ q = Queue()
+ tAllRemoved = 0
+
+ main.log.info("Number of flow batches at start of remove: " + str( len( main.addedBatchList)))
+ def removeWorker(id):
+ while True:
+ item = q.get()
+ response = main.ONOSrest.removeFlowBatch(batch = json.loads(item) )
+ main.log.info("Thread {} is working on deleting. ".format(id))
+ q.task_done()
+
+ for i in range( int( main.params['CASE2100']['numThreads'])):
+ threadID = "ThreadID-" + str(i)
+ t = Thread(target = removeWorker, name = threadID, args=(threadID,) )
+ t.daemon = True
+ t.start()
+
+ tStartDelete = time.time()
+ for item in main.addedBatchList:
+ q.put(item)
+
+ q.join()
+ tLastDeleteEnd = time.time()
+ main.log.info("Number of flow batches at end of remove: " + str( len( main.addedBatchList)))
+
+ main.step("Check to ensure all flows are in added state.")
+ #pprint(main.addedBatchList)
+ resp = main.FALSE
+ while resp != main.TRUE and ( tAllRemoved - tLastDeleteEnd < int (main.params['CASE3100']['chkFlowTO']) ):
+ resp = main.ONOSrest.checkFlowsState()
+ time.sleep( float(main.params['SLEEP']['chkFlow']) )
+ tAllRemoved = time.time()
+
+ if tLastDeleteEnd - tLastDeleteEnd >= int (main.params['CASE2100']['chkFlowTO']):
+ main.log.warn("ONOS Flows still in pending state after: {} seconds.".format(tAllAdded - tLastPostEnd))
+
+ main.numFlows = int(main.params['CASE1000']['batches']) *\
+ int(main.params['CASE1000']['batchSize'])
+ main.log.info("Total number of flows: " + str (main.numFlows) )
+ main.log.info("Sum of each DELETE elapse time: " + str(numpy.sum(postTimes)) )
+ main.log.info("Total DELETE elapse time: " + str(tLastDeleteEnd-tStartDelete))
+ main.log.info("Rate of DELETE Controller response: " + str(main.numFlows / (tLastDeleteEnd-tStartDelete)))
+
+ duration = tAllRemoved - tLastDeleteEnd
+ main.log.info("Elapse time from end of last REST DELETE to Flows in REMOVED state: " +\
+ str(duration))
+ main.log.info("Rate of Confirmed Batch Flow REMOVED is (flows/sec): " + str( main.numFlows / duration))
+
+
+ def CASE3000(self, main):
import time
import numpy
import json