add multi-threaded REST load generator

Change-Id: Ia73fe2ba3eaf50846041f73038460270c9802f4e
diff --git a/TestON/drivers/common/api/controller/onosrestdriver.py b/TestON/drivers/common/api/controller/onosrestdriver.py
index ed7786c..2a62276 100644
--- a/TestON/drivers/common/api/controller/onosrestdriver.py
+++ b/TestON/drivers/common/api/controller/onosrestdriver.py
@@ -21,6 +21,7 @@
 import requests
 import types
 import sys
+from Queue import Queue
 
 from drivers.common.api.controllerdriver import Controller
 
@@ -1715,6 +1716,12 @@
             main.cleanup()
             main.exit()
 
+    def sendFlowBatchQueue(self, q=Queue(), ip="DEFAULT", port="DEFAULT", debug=False):
+        while True:
+            item = q.get()
+            self.sendFlowBatch(batch = item)
+            q.task_done()
+
     def removeFlowBatch( self, batch={},
                        ip="DEFAULT", port="DEFAULT" ):
         """
diff --git a/TestON/tests/COMPflow/COMPflow.params b/TestON/tests/COMPflow/COMPflow.params
index c31cd64..efb6c60 100755
--- a/TestON/tests/COMPflow/COMPflow.params
+++ b/TestON/tests/COMPflow/COMPflow.params
@@ -10,7 +10,7 @@
 
 
     # <!-- <testcases>1,10,100,1000,100,2000,100,110</testcases> -->
-    <testcases>1,2,10,100,1000,100,2000,100,110</testcases>
+    <testcases>1,2,11,1000,2100,100,3100,100,110</testcases>
 
     <SCALE>
         <max>1</max>
@@ -18,7 +18,6 @@
 
     <DEBUG>on</DEBUG>
 
-
     <ENV>
         <cellName>temp</cellName>
         <cellApps>drivers</cellApps>
@@ -39,7 +38,7 @@
     </CASE10>
 
     <CASE11>
-        <numSw>63</numSw>
+        <numSw>15</numSw>
         <nullTopo>linear</nullTopo>
         <nullStart>true</nullStart>
     </CASE11>
@@ -49,6 +48,15 @@
         <batches>500</batches>
     </CASE1000>
 
+    <CASE2100>
+        <numThreads>16</numThreads>
+        <chkFlowTO>200</chkFlowTO>
+    </CASE2100>
+
+    <CASE3100>
+        <chkFlowTO>200</chkFlowTO>
+    </CASE3100>
+
     <SLEEP>
         <startup>15</startup>
         <startMN>15</startMN>
diff --git a/TestON/tests/COMPflow/COMPflow.py b/TestON/tests/COMPflow/COMPflow.py
index 39246d6..adc86c9 100644
--- a/TestON/tests/COMPflow/COMPflow.py
+++ b/TestON/tests/COMPflow/COMPflow.py
@@ -256,12 +256,11 @@
 
         main.case( "Create a json object for the batched flows" )
 
-        main.step( "Parse batch information" )
+        main.step( "Parse batch creation information" )
         main.batchSize = int(main.params['CASE1000']['batchSize'])
         main.log.info("Number of flows in a batch is:" + str(main.batchSize))
 
         main.flowJsonBatchList = []
-        main.addedBatchList = []
         postTimes = []
         startSw = 1
 
@@ -284,10 +283,21 @@
             main.flowJsonBatchList.append(flowJsonBatch)
 
             startSw += 1
+        main.log.info( "Number of items created in the batch list is: " + str(len(main.flowJsonBatchList)))
 
 
+    def CASE2000(self, main):
+        '''
+        Args:
+            main:
 
+        Returns:
+
+        '''
+        main.case("Using REST API /flows/{} to post flow batch")
         main.step("Using REST API /flows/{} to post flow batch")
+
+        main.addedBatchList = []
         tStartPost = time.time()
         for item in main.flowJsonBatchList:
             ts = time.time()
@@ -319,7 +329,131 @@
                       str(duration))
         main.log.info("Rate of Confirmed Batch Flow ADD is (flows/sec): " + str( main.numFlows / duration))
 
-    def CASE2000(self, main):
+    def CASE2100(self, main):
+        '''
+            Posting flow batches using threads
+        '''
+        main.case("Using REST API /flows/{} to post flow batch - multi-threads")
+        main.step("Using REST API /flows/{} to post flow batch - multi-threads")
+
+        from Queue import Queue
+        from threading import Thread
+        import time
+
+        main.threadID = 0
+        main.addedBatchList = []
+        q = Queue()
+        tAllAdded = 0
+
+        def postWorker(id):
+            while True:
+                item = q.get()
+                status,response = main.ONOSrest.sendFlowBatch(batch = item)
+                main.log.info("Thread {} is working on posting. ".format(id))
+                main.addedBatchList.append(response[1])
+                q.task_done()
+
+        for i in range( int( main.params['CASE2100']['numThreads'])):
+            threadID = "ThreadID-" + str(i)
+            t = Thread(target = postWorker, name = threadID, args=(threadID,) )
+            t.daemon = True
+            t.start()
+
+        tStartPost = time.time()
+        for item in main.flowJsonBatchList:
+            q.put(item)
+
+        q.join()
+        tLastPostEnd = time.time()
+
+        main.step("Check to ensure all flows are in added state.")
+        #pprint(main.addedBatchList)
+        resp = main.FALSE
+        while resp != main.TRUE and ( tAllAdded - tLastPostEnd < int (main.params['CASE2100']['chkFlowTO']) ):
+            resp = main.ONOSrest.checkFlowsState()
+            time.sleep( float(main.params['SLEEP']['chkFlow']) )
+            tAllAdded = time.time()
+
+        if tAllAdded - tLastPostEnd >= int (main.params['CASE2100']['chkFlowTO']):
+            main.log.warn("ONOS Flows still in pending state after: {} seconds.".format(tAllAdded - tLastPostEnd))
+
+        main.numFlows = int(main.params['CASE1000']['batches']) *\
+                                                    int(main.params['CASE1000']['batchSize'])
+        main.log.info("Total number of flows: " + str (main.numFlows) )
+        main.log.info("Sum of each POST elapse time: " + str(numpy.sum(postTimes)) )
+        main.log.info("Total POST elapse time: " + str(tLastPostEnd-tStartPost))
+        main.log.info("Rate of ADD Controller response: " + str(main.numFlows / (tLastPostEnd - tStartPost)))
+
+        duration = tAllAdded - tLastPostEnd
+        main.log.info("Elapse time from end of last REST POST to Flows in ADDED state: " +\
+                      str(duration))
+        main.log.info("Rate of Confirmed Batch Flow ADD is (flows/sec): " + str( main.numFlows / duration))
+        main.log.info("Number of flow Batches in the addedBatchList is: " + str( len(main.addedBatchList)))
+
+
+    def CASE3100(self, main):
+        '''
+            DELETE flow batches using threads
+        '''
+        main.case("Using REST API /flows/{} to delete flow batch - multi-threads")
+        main.step("Using REST API /flows/{} to delete flow batch - multi-threads")
+
+        from Queue import Queue
+        from threading import Thread
+        import time
+        import json
+
+        main.threadID = 0
+        q = Queue()
+        tAllRemoved = 0
+
+        main.log.info("Number of flow batches at start of remove: " + str( len( main.addedBatchList)))
+        def removeWorker(id):
+            while True:
+                item = q.get()
+                response = main.ONOSrest.removeFlowBatch(batch = json.loads(item) )
+                main.log.info("Thread {} is working on deleting. ".format(id))
+                q.task_done()
+
+        for i in range( int( main.params['CASE2100']['numThreads'])):
+            threadID = "ThreadID-" + str(i)
+            t = Thread(target = removeWorker, name = threadID, args=(threadID,) )
+            t.daemon = True
+            t.start()
+
+        tStartDelete = time.time()
+        for item in main.addedBatchList:
+            q.put(item)
+
+        q.join()
+        tLastDeleteEnd = time.time()
+        main.log.info("Number of flow batches at end of remove: " + str( len( main.addedBatchList)))
+
+        main.step("Check to ensure all flows are in added state.")
+        #pprint(main.addedBatchList)
+        resp = main.FALSE
+        while resp != main.TRUE and ( tAllRemoved - tLastDeleteEnd < int (main.params['CASE3100']['chkFlowTO']) ):
+            resp = main.ONOSrest.checkFlowsState()
+            time.sleep( float(main.params['SLEEP']['chkFlow']) )
+            tAllRemoved = time.time()
+
+        if tLastDeleteEnd - tLastDeleteEnd >= int (main.params['CASE2100']['chkFlowTO']):
+            main.log.warn("ONOS Flows still in pending state after: {} seconds.".format(tAllAdded - tLastPostEnd))
+
+        main.numFlows = int(main.params['CASE1000']['batches']) *\
+                                                    int(main.params['CASE1000']['batchSize'])
+        main.log.info("Total number of flows: " + str (main.numFlows) )
+        main.log.info("Sum of each DELETE elapse time: " + str(numpy.sum(postTimes)) )
+        main.log.info("Total DELETE elapse time: " + str(tLastDeleteEnd-tStartDelete))
+        main.log.info("Rate of DELETE Controller response: " + str(main.numFlows / (tLastDeleteEnd-tStartDelete)))
+
+        duration = tAllRemoved - tLastDeleteEnd
+        main.log.info("Elapse time from end of last REST DELETE to Flows in REMOVED state: " +\
+                      str(duration))
+        main.log.info("Rate of Confirmed Batch Flow REMOVED is (flows/sec): " + str( main.numFlows / duration))
+
+
+    def CASE3000(self, main):
         import time
         import numpy
         import json