Merge pull request #474 from bocon13/master

Adding FlowSynchronizer performance test
diff --git a/perf-scripts/flow-sync-perf.py b/perf-scripts/flow-sync-perf.py
new file mode 100755
index 0000000..d552404
--- /dev/null
+++ b/perf-scripts/flow-sync-perf.py
@@ -0,0 +1,194 @@
+#!/usr/bin/python
+'''
+ Script that tests Flow Synchronizer performance
+ Author: Brian O'Connor <bocon@onlab.us>
+
+ Usage: 
+   1. Ensure that ONOS is running
+   2. sudo ./flow-sync-perf.sh <list of tests>
+      e.g. sudo ./flow-sync-perf.sh 1 10 100 1000 
+      or to run the default tests:
+      sudo ./flow-sync-perf.sh
+   3. Results are CSV files in a date stamped directory
+'''
+
+import csv
+import os
+import sys
+from time import sleep, strftime
+from subprocess import Popen, call, check_output, PIPE
+from mininet.net import Mininet
+from mininet.topo import SingleSwitchTopo
+from mininet.node import RemoteController
+from mininet.cli import CLI
+from mininet.log import setLogLevel
+try:
+  import pexpect
+except:
+  # install pexpect if it cannot be found and re-import
+  print '* Installing Pexpect'
+  call( 'apt-get install -y python-pexpect', stdout=PIPE, shell=True )
+  import pexpect
+
+ONOS_HOME = '..'
+
+# Verify that tcpkill is installed
+if not Popen( 'which tcpkill', stdout=PIPE, shell=True).communicate():
+  print '* Installing tcpkill'
+  call( 'apt-get install -y dsniff', stdout=PIPE, shell=True )
+
+# ----------------- Tests scenarios -------------------------
+def doNothing(n):
+  print "Doing nothing with %d flows..." % n
+
+def addFakeFlows(n):
+  print "Adding %d random flows..." % n
+  for i in range( 1, (n+1) ):
+    a = i / (256*256) % 256
+    b = i / 256 % 256
+    c = i % 256
+    ip = '10.%d.%d.%d' % (a,b,c)
+    call( 'ovs-ofctl add-flow s1 "ip, nw_src=%s/32, idle_timeout=0, hard_timeout=0, cookie=%d, actions=output:2"' % ( ip, i ), shell=True )
+
+def delFlowsFromSwitch(n):
+  print "Removing all %d flows from switch..." % n
+  call( 'ovs-ofctl del-flows s1', shell=True )
+
+
+# ----------------- Utility Functions -------------------------
+def disconnect():
+  tail = Popen( "exec tail -0f ../onos-logs/onos.onosdev1.log", stdout=PIPE, shell=True )
+  tcp  = Popen( 'exec tcpkill -i lo -9 port 6633 > /dev/null 2>&1', shell=True )
+  tcp  = Popen( 'exec tcpkill -i lo -9 port 6633 > /tmp/tcp 2>&1', shell=True )
+  sleep(1)
+  tcp.kill()
+  results = waitForResult(tail)
+  tail.kill()
+  return results
+
+def startNet(net):
+  tail = pexpect.spawn( 'tail -0f %s/onos-logs/onos.onosdev1.log' % ONOS_HOME )
+  net.start()
+  index = tail.expect(['Sync time \(ms\)', pexpect.EOF, pexpect.TIMEOUT])
+  if index >= 1:
+    print '* ONOS not started'
+    net.stop()
+    exit(1)
+  tail.terminate()
+
+def dumpFlows():
+  return check_output( 'ovs-ofctl dump-flows s1', shell=True )
+
+def addFlowsToONOS(n):
+  call( './generate_flows.py 1 %d > /tmp/flows.txt' % n, shell=True )
+  call( '%s/web/add_flow.py -m onos -f /tmp/flows.txt' % ONOS_HOME, shell=True )
+  while True:
+    output = check_output( 'ovs-ofctl dump-flows s1', shell=True )
+    lines = len(output.split('\n'))
+    if lines >= (n+2):
+      break
+    sleep(1)
+  count = 0
+  while True:
+    output = pexpect.spawn( '%s/web/get_flow.py all' % ONOS_HOME )
+    while count < n:
+      if output.expect(['FlowEntry', pexpect.EOF], timeout=2000) == 1:
+        break
+      count += 1 
+      return
+    sleep(5)
+
+def removeFlowsFromONOS():
+  call( '%s/web/delete_flow.py all' % ONOS_HOME, shell=True )
+  while True:
+    output = check_output( 'ovs-ofctl dump-flows s1', shell=True )
+    lines = len(output.split('\n'))
+    if lines == 2:
+      break
+    sleep(1)
+  while True:
+    output = pexpect.spawn( '%s/web/get_flow.py all' % ONOS_HOME )
+    if output.expect(['FlowEntry', pexpect.EOF], timeout=2000) == 1:
+      break
+    sleep(5)
+
+
+# ----------------- Running the test and output  -------------------------
+def test(i, fn):
+  # Start tailing the onos log
+  tail = pexpect.spawn( "tail -0f %s/onos-logs/onos.onosdev1.log" % ONOS_HOME )
+  # disconnect the switch from the controller using tcpkill
+  tcp  = Popen( 'exec tcpkill -i lo -9 port 6633 > /dev/null 2>&1', shell=True )
+  # wait until the switch has been disconnected
+  tail.expect( 'Switch removed' )
+  # call the test function
+  fn(i) 
+  # dump to flows to ensure they have all made it to ovs
+  dumpFlows() 
+  # end tcpkill process to reconnect the switch to the controller
+  tcp.terminate()
+  tail.expect('Sync time \(ms\):', timeout=6000)
+  tail.expect('([\d.]+,?)+\s')
+  print tail.match.group(0)
+  tail.terminate()
+  sleep(3)
+  return tail.match.group(0).strip().split(',')
+
+def initResults(files):
+  headers = ['# of FEs', 'Flow IDs from Graph', 'FEs from Switch', 'Compare', 
+             'Read FE from graph', 'Extract FE', 'Push', 'Total' ]
+  for filename in files.values():
+    with open(filename, 'w') as csvfile:
+      writer = csv.writer(csvfile)
+      writer.writerow(headers)
+
+def outputResults(filename, n, results):
+  results.insert(0, n)
+  print results
+  with open(filename, 'a') as csvfile:
+    writer = csv.writer(csvfile)
+    writer.writerow(results)
+
+def runPerf( resultDir, tests):
+  fileMap = { 'add':    os.path.join(resultDir, 'add.csv'),
+              'delete': os.path.join(resultDir, 'delete.csv'),
+              'sync':   os.path.join(resultDir, 'sync.csv') }
+  initResults(fileMap)
+  # start Mininet
+  topo = SingleSwitchTopo()
+  net = Mininet(topo=topo, controller=RemoteController)
+  startNet(net)
+  removeFlowsFromONOS() # clear ONOS before starting
+  sleep(30) # let ONOS "warm-up"
+  for i in tests:
+    addFlowsToONOS(i)
+    outputResults(fileMap['sync'],   i, test(i, doNothing))
+    outputResults(fileMap['delete'], i, test(i, delFlowsFromSwitch))
+    removeFlowsFromONOS()
+    outputResults(fileMap['add'],    i, test(i, addFakeFlows)) # test needs empty DB
+  net.stop()
+
+def waitForResult(tail):
+  while True:
+    line = tail.stdout.readline()
+    index = line.find('n.o.o.o.f.FlowSynchronizer')
+    if index > 0:
+      print line,
+    index = line.find('Sync time (ms):')
+    if index > 0:
+      line = line[index + 15:].strip()
+      line = line.replace('-->', '')
+      return line.split() # graph, switch, compare, total
+
+if __name__ == '__main__':
+  setLogLevel( 'output' )
+  resultDir = strftime( '%Y%m%d-%H%M%S' )
+  os.mkdir( resultDir )
+  tests = sys.argv[1:]
+  if not tests:
+    tests = [1, 10, 100, 1000, 10000]
+  runPerf( resultDir, tests )
+
+exit()
+
+# ---------------------------
diff --git a/perf-scripts/generate_flows.py b/perf-scripts/generate_flows.py
new file mode 100755
index 0000000..11d9c19
--- /dev/null
+++ b/perf-scripts/generate_flows.py
@@ -0,0 +1,90 @@
+#! /usr/bin/env python
+# -*- Mode: python; py-indent-offset: 4; tab-width: 8; indent-tabs-mode: t; -*-
+
+#
+# A script for generating a number of flows.
+#
+# The output of the script should be saved to a file, and the flows from
+# that file should be added by the following command:
+#
+#   web/add_flow.py -f filename
+# 
+# NOTE: Currently, some of the parameters fo the flows are hard-coded,
+# and all flows are between same source and destination DPID and ports
+# (differentiated by different matchSrcMac and matchDstMac).
+#
+
+import copy
+import pprint
+import os
+import sys
+import subprocess
+import json
+import argparse
+import io
+import time
+
+## Global Var ##
+
+DEBUG=0
+pp = pprint.PrettyPrinter(indent=4)
+
+## Worker Functions ##
+def log_error(txt):
+  print '%s' % (txt)
+
+def debug(txt):
+  if DEBUG:
+    print '%s' % (txt)
+
+
+if __name__ == "__main__":
+  usage_msg = "Generate a number of flows by using a pre-defined template.\n"
+  usage_msg = usage_msg + "\n"
+  usage_msg = usage_msg + "NOTE: This script is work-in-progress. Currently all flows are within same\n"
+  usage_msg = usage_msg + "pair of switch ports and contain auto-generated MAC-based matching conditions.\n"
+  usage_msg = usage_msg + "\n"
+  usage_msg = usage_msg + "Usage: %s <begin-flow-id> <end-flow-id>\n" % (sys.argv[0])
+  usage_msg = usage_msg + "\n"
+  usage_msg = usage_msg + "    The output should be saved to a file, and the flows should be installed\n"
+  usage_msg = usage_msg + "    by using the command './add_flow.py -f filename'\n"
+
+
+  # app.debug = False;
+
+  # Usage info
+  if len(sys.argv) > 1 and (sys.argv[1] == "-h" or sys.argv[1] == "--help"):
+    print(usage_msg)
+    exit(0)
+
+  # Check arguments
+  if len(sys.argv) < 3:
+    log_error(usage_msg)
+    exit(1)
+
+  # Extract the arguments
+  begin_flow_id = int(sys.argv[1], 0)
+  end_flow_id = int(sys.argv[2], 0)
+  if begin_flow_id > end_flow_id:
+    log_error(usage_msg)
+    exit(1)
+
+  #
+  # Do the work
+  #
+  # NOTE: Currently, up to 65536 flows are supported.
+  # More flows can be supported by iterating by, say, iterating over some of
+  # the other bytes of the autogenereated source/destination MAC addresses.
+  #
+  flow_id = begin_flow_id
+  idx = 0
+  while flow_id <= end_flow_id:
+    mac3 = idx / 255
+    mac4 = idx % 255
+    str_mac3 = "%0.2x" % mac3
+    str_mac4 = "%0.2x" % mac4
+    src_mac = "00:00:" + str_mac3 + ":" + str_mac4 + ":00:00";
+    dst_mac = "00:01:" + str_mac3 + ":" + str_mac4 + ":00:00";
+    print "%s FOOBAR 00:00:00:00:00:00:00:01 1 00:00:00:00:00:00:00:01 2 matchSrcMac %s matchDstMac %s" % (flow_id, src_mac, dst_mac)
+    flow_id = flow_id + 1
+    idx = idx + 1
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index 461d231..f7d7b0e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -44,7 +44,7 @@
 				       IOFSwitchListener {
     @SuppressWarnings("unused")
     // flag to enable FlowSynchronizer
-    private static final boolean enableFlowSync = false;
+    private static final boolean enableFlowSync = true;
     protected static Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
     protected volatile IFloodlightProviderService floodlightProvider;
     protected volatile IControllerRegistryService registryService;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index 415b1b1..64f6cac 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -93,17 +93,45 @@
 	    this.swObj = dbHandler.searchSwitch(dpid.toString());
 	}
 
+	double graphIDTime, switchTime, compareTime, graphEntryTime, extractTime, pushTime, totalTime;
 	@Override
 	public SyncResult call() {
 	    // TODO: stop adding other flow entries while synchronizing
 	    //pusher.suspend(sw);
+	    long start = System.nanoTime();
 	    Set<FlowEntryWrapper> graphEntries = getFlowEntriesFromGraph();
+	    long step1 = System.nanoTime();
 	    Set<FlowEntryWrapper> switchEntries = getFlowEntriesFromSwitch();
+	    long step2 = System.nanoTime();
 	    SyncResult result = compare(graphEntries, switchEntries);
+	    long step3 = System.nanoTime();
+	    graphIDTime = (step1 - start); 
+	    switchTime = (step2 - step1);
+	    compareTime = (step3 - step2);
+	    totalTime = (step3 - start);
+	    outputTime();
 	    //pusher.resume(sw);
 	    
 	    return result;
 	}
+	
+	private void outputTime() {
+	    double div = Math.pow(10, 6); //convert nanoseconds to ms
+	    graphIDTime /= div;
+	    switchTime /= div;
+	    compareTime = (compareTime - graphEntryTime - extractTime - pushTime) / div;
+	    graphEntryTime /= div;
+	    extractTime /= div;
+	    pushTime /= div;
+	    log.debug("Sync time (ms):" +
+	    		  graphIDTime + "," +
+	     		  switchTime + "," + 
+	    		  compareTime + "," +
+	     		  graphEntryTime + "," +
+	    		  extractTime + "," + 
+	     		  pushTime + "," +
+	              totalTime);
+	}
 
 	/**
 	 * Compare flows entries in GraphDB and switch to pick up necessary
@@ -128,6 +156,9 @@
 	    for(FlowEntryWrapper entry : graphEntries) {
 		// add flow entry to switch
 		entry.addToSwitch(sw);
+		graphEntryTime += entry.dbTime;
+		extractTime += entry.extractTime;
+		pushTime += entry.pushTime;
 		added++;
 	    }	  
 	    log.debug("Flow entries added "+ added + ", " +
@@ -223,6 +254,7 @@
 	 * Install this FlowEntry to a switch via FlowPusher.
 	 * @param sw Switch to which flow will be installed.
 	 */
+	double dbTime, extractTime, pushTime;
 	public void addToSwitch(IOFSwitch sw) {
 	    if (statisticsReply != null) {
 		log.error("Error adding existing flow entry {} to sw {}", 
@@ -230,6 +262,7 @@
 		return;
 	    }
 
+	    double startDB = System.nanoTime();
 	    // Get the Flow Entry state from the Network Graph
 	    IFlowEntry iFlowEntry = null;
 	    try {
@@ -244,7 +277,9 @@
 			  flowEntryId, sw.getId());
 		return;
 	    }
+	    dbTime = System.nanoTime() - startDB;
 
+	    double startExtract = System.nanoTime();
 	    FlowEntry flowEntry =
 		FlowDatabaseOperation.extractFlowEntry(iFlowEntry);
 	    if (flowEntry == null) {
@@ -252,8 +287,11 @@
 			  flowEntryId, sw.getId());
 		return;
 	    }
-
+	    extractTime = System.nanoTime() - startExtract;
+	    
+	    double startPush = System.nanoTime();
 	    pusher.pushFlowEntry(sw, flowEntry);
+	    pushTime = System.nanoTime() - startPush;
 	}
 	
 	/**