Merge branch 'ONOS-ONRC2014-Measurements' of github.com:OPENNETWORKINGLAB/ONOS into RAMCloud-merge
diff --git a/perf-scripts/flow-manager-perf.py b/perf-scripts/flow-manager-perf.py
index 3168f63..4ee6beb 100644
--- a/perf-scripts/flow-manager-perf.py
+++ b/perf-scripts/flow-manager-perf.py
@@ -8,6 +8,7 @@
 import csv
 import os
 import sys
+import re
 from time import sleep, strftime
 from subprocess import Popen, call, check_output, PIPE
 from datetime import datetime
@@ -22,37 +23,88 @@
 
 ONOS_HOME = '..'
 ONOS_LOG = '%s/onos-logs/onos.%s.log' % ( ONOS_HOME, check_output( 'hostname').strip() )
-ONOS_LOG = '/tmp/onos-1.logs/onos.onos-vm.log'
+ONOS_URL = 'http://127.0.0.1:8080/wm/onos/flows/get/%d/json'
+ONOS_LOG = '/tmp/onos-0.logs/onos.onos-vm.log'
 print "ONOS Log File:", ONOS_LOG
 
 PORT = 's1-eth2'
-N = 100
+N = 5
 
 # ----------------- Running the test and output  -------------------------
 
-# 17:43:37.206 [main] ERROR n.o.o.o.f.PerformanceMonitor - Performance Results: {a=0.001ms, b=0.0ms, c=0.0ms} with measurement overhead: 0.022 ms
+class Result(object):
+  def __init__(self, tsharkTime, flowmods, onosTime, overhead, details):
+    self.tsharkTime = tsharkTime
+    self.flowmods = flowmods
+    self.onosTime = onosTime
+    self.overhead = overhead
+    # sorted by start time
+    self.details = sorted(details, key=lambda x: float(x[2]) )
+
+  def __repr__(self):
+    return '%f %f %f %d %s' % (self.tsharkTime, self.onosTime, self.overhead, self.flowmods, self.details)
+
+
+def clearResults():
+  cmd = 'curl %s' % ONOS_URL % -200
+  call( cmd, shell=True )
+  pass
+
+def reportResults():
+  cmd = 'curl %s' % ONOS_URL % -100
+  call( cmd, shell=True )
 
 def test():
   # Start tailing the onos log
   tail = pexpect.spawn( "tail -0f %s" % ONOS_LOG )
+  tshark = pexpect.spawn( 'tshark -i lo -R "of.type == 12 || of.type == 14"' )
+  tshark.expect('Capturing on lo')
+  sleep(1) # wait for tshark to start
 
+  clearResults() # REST call to ONOS
   # Take link down
   call( 'ifconfig %s down' % PORT, shell=True )
 
-  # Wait for performance results in the log
-  tail.expect('Performance Results: \{([^\}]*)\} with measurement overhead: ([\d.]+) ms', timeout=6000)
-  s = tail.match.group(1)
-  overhead = float( tail.match.group(2) )
-  results = dict( re.findall(r'(\w[\w\s]*)=([\d.]+)', s) )
+  # collect openflow packets using tshark
+  count = 0 
+  timeout = 6000
+  start = -1
+  end = -1
+  while True:
+    i = tshark.expect( ['(\d+\.\d+)', pexpect.TIMEOUT], timeout=timeout )
+    if i == 1:
+      break
+    time = float(tshark.match.group(1))
+    if start == -1:
+      start = time
+    if time > end:
+      end = time
+    i = tshark.expect( ['Port Status', 'Flow Mod'] )
+    if i == 1:
+      count += 1
+      timeout = 3 #sec
+  elapsed = (end - start) * 1000
 
+  # read the performance results from ONOS
+  reportResults() # REST call
+
+  # Wait for performance results in the log
+  tail.expect('Performance Results: \(avg/start/stop/count\)', timeout=10)
+  i = tail.expect('TotalTime:([\d\.]+)/Overhead:([\d\.]+)')
+  totalTime = float(tail.match.group(1))
+  overhead = float(tail.match.group(2))
+  tags = re.findall( r'([\w\.]+)=([\d\.]+)/([\d\.]+)/([\d\.]+)/(\d+)', tail.before )
+  result = Result(elapsed, count, totalTime, overhead, tags)
   # Output results
-  print "* Results:", results, "Overhead:", overhead
- 
+  print result
+
+
   # Bring port back up
   call( 'ifconfig %s up' % PORT, shell=True )
 
   tail.terminate()
-  return results
+  tshark.terminate()
+  return []
 
 def outputResults(filename, results):
   with open(filename, 'a') as csvfile:
@@ -65,10 +117,12 @@
   start = datetime.now()
   for i in range(n):
     results = test()
-    outputResults(filename, results)
+    #outputResults(filename, results)
+    sys.stdout.write('$')
+    sys.stdout.flush()
+    sleep(5) # sleep for 5 seconds between tests
     sys.stdout.write('.')
     sys.stdout.flush()
-    sleep(5000) # sleep for 5 seconds between tests
   totalTime = datetime.now() - start
   print '\nExperiments complete in %s (h:m:s.s)' % totalTime
 
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index b76aa58..45995bf 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -18,6 +18,7 @@
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.onrc.onos.datagrid.web.DatagridWebRoutable;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
+import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor.Measurement;
 import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
 import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
@@ -450,8 +451,9 @@
 	 */
 	@Override
 	public void entryRemoved(EntryEvent<String, byte[]> event) {
-	    String tag = "TopologyEntryRemoved.NotificationReceived." + event.getKey();
-	    PerformanceMonitor.start(tag);
+//	    String tag = "TopologyEntryRemoved.NotificationReceived." + event.getKey();
+	    String tag = "TopologyEntryRemoved.NotificationReceived";
+	    PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
 	    byte[] valueBytes = event.getValue();
 
 	    //
@@ -463,8 +465,9 @@
 		kryo.readObject(input, TopologyElement.class);
 	    kryoFactory.deleteKryo(kryo);
 	    flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
-	    PerformanceMonitor.stop(tag);
-	    PerformanceMonitor.report(tag);
+//	    PerformanceMonitor.stop(tag);
+	    m.stop();
+//	    PerformanceMonitor.report(tag);
 	}
 
 	/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 1471c49..ba1e448 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -399,13 +399,15 @@
 	    PerformanceMonitor.stop("EventHandler.ProcessAllEvents");
 
 
-	    PerformanceMonitor.report("EventHandler.SwitchDpidEvents");
-	    PerformanceMonitor.report("EventHandler.FlowIdEvents");
-	    PerformanceMonitor.report("EventHandler.ReadTopology");
-	    PerformanceMonitor.report("EventHandler.RecomputeFlows");
-	    PerformanceMonitor.report("EventHandler.WriteFlowsToDb");
-	    PerformanceMonitor.report("EventHandler.NotificationSend.FlowEntryRemoved");
-	    PerformanceMonitor.report("EventHandler.ProcessAllEvents");
+//	    PerformanceMonitor.report("EventHandler.SwitchDpidEvents");
+//	    PerformanceMonitor.report("EventHandler.FlowIdEvents");
+//	    PerformanceMonitor.report("EventHandler.ReadTopology");
+//	    PerformanceMonitor.report("EventHandler.RecomputeFlows");
+//	    PerformanceMonitor.report("EventHandler.WriteFlowsToDb");
+//	    PerformanceMonitor.report("EventHandler.NotificationSend.FlowEntryRemoved");
+//	    PerformanceMonitor.report("EventHandler.ProcessAllEvents");
+//	    PerformanceMonitor.report();
+//	    PerformanceMonitor.clear();
 
 	    return;
 	}
@@ -1421,13 +1423,15 @@
     @Override
     public void notificationRecvFlowEntryAdded(FlowEntry flowEntry) {
 	if (enableOnrc2014MeasurementsFlows) {
-	    String tag = "EventHandler.AddFlowEntryToSwitch." + flowEntry.flowEntryId();
-	    PerformanceMonitor.start(tag);
+//	    String tag = "EventHandler.AddFlowEntryToSwitch." + flowEntry.flowEntryId();
+	    String tag = "EventHandler.AddFlowEntryToSwitch";
+	    PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
 	    Collection entries = new ArrayList();
 	    entries.add(flowEntry);
 	    flowManager.pushModifiedFlowEntriesToSwitches(entries);
-	    PerformanceMonitor.stop(tag);
-	    PerformanceMonitor.report(tag);
+//	    PerformanceMonitor.stop(tag);
+	    m.stop();
+//	    PerformanceMonitor.report(tag);
 	    return;
 	}
 
@@ -1444,8 +1448,9 @@
     @Override
     public void notificationRecvFlowEntryRemoved(FlowEntry flowEntry) {
 	if (enableOnrc2014MeasurementsFlows) {
-	    String tag = "EventHandler.RemoveFlowEntryFromSwitch." + flowEntry.flowEntryId();
-	    PerformanceMonitor.start(tag);
+//	    String tag = "EventHandler.RemoveFlowEntryFromSwitch." + flowEntry.flowEntryId();
+	    String tag = "EventHandler.RemoveFlowEntryFromSwitch";
+	    PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
 	    //
 	    // NOTE: Must update the state to DELETE, because
 	    // the notification contains the original state.
@@ -1455,8 +1460,9 @@
 	    Collection entries = new ArrayList();
 	    entries.add(flowEntry);
 	    flowManager.pushModifiedFlowEntriesToSwitches(entries);
-	    PerformanceMonitor.stop(tag);
-	    PerformanceMonitor.report(tag);
+//	    PerformanceMonitor.stop(tag);
+	    m.stop();
+//	    PerformanceMonitor.report(tag);
 	    return;
 	}
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index fc5ae34..791008d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -41,7 +41,6 @@
 import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
 
 import com.thinkaurelius.titan.core.TitanException;
-
 import com.esotericsoftware.kryo2.Kryo;
 
 import org.slf4j.Logger;
@@ -334,6 +333,16 @@
      */
     @Override
     public FlowPath getFlow(FlowId flowId) {
+	log.debug("FlowID: {}", flowId);
+	if(flowId.value() == -100) {
+	    log.debug("Printing results...");
+	    PerformanceMonitor.report();
+	    PerformanceMonitor.clear();
+	}
+	else if(flowId.value() == -200) {
+	    log.debug("Clearing results...");
+	    PerformanceMonitor.clear();
+	}
 	return FlowDatabaseOperation.getFlow(dbHandlerApi, flowId);
     }
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
index fff89dc..5d9af5a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
@@ -261,12 +261,16 @@
 	
 	@Override
 	public Boolean call() throws Exception {
-	    String tag1 = "FlowDatabaseOperation.AddFlow." + flowPath.flowId();
-	    String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry." + flowPath.flowId();
-	    PerformanceMonitor.start(tag1);
+//	    String tag1 = "FlowDatabaseOperation.AddFlow." + flowPath.flowId();
+	    String tag1 = "FlowDatabaseOperation.AddFlow";
+//	    String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry." + flowPath.flowId();
+	    String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry";
+	    PerformanceMonitor.Measurement m;
+	    m = PerformanceMonitor.start(tag1);
 	    boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
 	    PerformanceMonitor.stop(tag1);
-	    PerformanceMonitor.start(tag2);
+	    m.stop();
+	    m = PerformanceMonitor.start(tag2);
 	    if(success) {
 		if(datagridService != null) {
 		    // Send notifications for each Flow Entry
@@ -298,9 +302,9 @@
 	    else {
 		log.error("Error adding flow path {} to database", flowPath);
 	    }
-	    PerformanceMonitor.stop(tag2);
-	    PerformanceMonitor.report(tag1);
-	    PerformanceMonitor.report(tag2);
+	    m.stop();
+//	    PerformanceMonitor.report(tag1);
+//	    PerformanceMonitor.report(tag2);
 	    return success;
 
 	}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
index 293b426..7c38a3c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
@@ -1,6 +1,9 @@
 package net.onrc.onos.ofcontroller.flowmanager;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
@@ -10,26 +13,35 @@
  * Class for collecting performance measurements
  */
 public class PerformanceMonitor {
-    private final static Map<String, Measurement> map = new ConcurrentHashMap<String, Measurement>();;
+    private final static Map<String, List<Measurement>> map = new ConcurrentHashMap<>();;
     private final static Logger log = LoggerFactory.getLogger(PerformanceMonitor.class);
-    private static long overhead;    
-    
+    private static long overhead;
+    private static long experimentStart = Long.MAX_VALUE;
+    private final static double normalization = Math.pow(10, 6);
+
     /**
      * Start a performance measurement, identified by a tag
      * 
      * Note: Only a single measurement can use the same tag at a time.
+     * ..... not true anymore.
      * 
      * @param tag for performance measurement
      */
-    public static void start(String tag) {
+    public static Measurement start(String tag) {
 	long start = System.nanoTime();
-	Measurement m = new Measurement();
-	if(map.put(tag, m) != null) {
-	    // if there was a previous entry, we have just overwritten it
-	    log.error("Tag {} already exists", tag);
+	if(start < experimentStart) {
+	    experimentStart = start;
 	}
+	List<Measurement> list = map.get(tag);
+	if(list == null) {
+	    list = new ArrayList<Measurement>();
+	    map.put(tag, list);
+	}
+	Measurement m = new Measurement();
+	list.add(m);
 	m.start();
 	overhead += System.nanoTime() - start;
+	return m;
     }
     
     /**
@@ -41,46 +53,75 @@
      */
     public static void stop(String tag) {
 	long time = System.nanoTime();
-	Measurement m = map.get(tag);
-	if(m == null) {
+	List<Measurement> list = map.get(tag);
+	if(list == null) {
 	    log.error("Tag {} does not exist", tag);
 	}
+	else if(list.size() == 1) {
+	    list.get(0).stop(time);
+	}
 	else {
-	    map.get(tag).stop(time);
+	    log.error("Tag {} has multiple measurements", tag);
 	}
 	overhead += System.nanoTime() - time;
     }
-    
-    /**
-     * Find a measurement, identified by tag, and return the result
-     * 
-     * @param tag for performance measurement
-     * @return the time in nanoseconds
-     */
-    public static long result(String tag) {
-	Measurement m = map.get(tag);
-	if(m != null) {
-	    return m.elapsed();
-	}
-	else {
-	    return -1;
-	}
-    }
-    
+        
     /**
      * Clear all performance measurements.
      */
     public static void clear() {
 	map.clear();
 	overhead = 0;
+	experimentStart = Long.MAX_VALUE;
     }
     
     /**
      * Write all performance measurements to the log
      */
     public static void report() {
-	double overheadMilli = overhead / Math.pow(10, 6);
-	log.error("Performance Results: {} with measurement overhead: {} ms", map, overheadMilli);
+	String result = "Performance Results: (avg/start/stop/count)\n";
+	if(map.size() == 0) {
+	    result += "No Measurements";
+	    log.error(result);
+	    return;
+	}
+	long experimentEnd = -1;
+	for(Entry<String, List<Measurement>> e : map.entrySet()) {
+	    String key = e.getKey();
+	    List<Measurement> list = e.getValue();
+	    int total = 0, count = 0;
+	    long start = Long.MAX_VALUE, stop = -1;
+	    for(Measurement m : list) {
+		// Collect overall start and end times
+		if(m.start < start) {
+		    start = m.start;
+		}
+		if(m.stop > stop) {
+		    stop = m.stop;
+		    if(stop > experimentEnd) {
+			experimentEnd = stop;
+		    }
+		}
+		
+		// Collect statistics for average
+		total += m.elapsed();
+		count++;
+	    }
+	    double avg = (double) total / count;
+	    // Normalize start/stop
+	    start -= experimentStart;
+	    stop -= experimentStart;
+	    result += key + '=' + 
+		    (avg / normalization) + '/' + 
+		    (start / normalization) + '/' + 
+		    (stop / normalization) + '/' + 
+		    count + '\n';
+	}
+	double overheadMs = overhead / normalization;
+	double experimentElapsed = (experimentEnd - experimentStart) / normalization;
+	result += "TotalTime:" + experimentElapsed + "/Overhead:" + overheadMs;
+	log.error(result);
+//	log.error("Performance Results: {} with measurement overhead: {} ms", map, overheadMilli);
     }
 
     /**
@@ -89,21 +130,26 @@
      * @param tag the tag name.
      */
     public static void report(String tag) {
-	Measurement m = map.get(tag);
+	List<Measurement> list = map.get(tag);
+	if(list == null) {
+	    return; //TODO
+	}
+	//TODO: fix this;
+	Measurement m = list.get(0);
 	if (m != null) {
-	    log.error("Performance Results: tag = {} start = {} stop = {} elapsed = {}",
+	    log.error("Performance Result: tag = {} start = {} stop = {} elapsed = {}",
 		      tag, m.start, m.stop, m.toString());
 	} else {
-	    log.error("Performance Results: unknown tag {}", tag);
+	    log.error("Performance Result: unknown tag {}", tag);
 	}
     }
 
     /**
      * A single performance measurement
      */
-    static class Measurement {
+    public static class Measurement {
 	long start;
-	long stop;
+	long stop = -1;
 	
 	/** 
 	 * Start the measurement
@@ -116,7 +162,8 @@
 	 * Stop the measurement
 	 */
 	public void stop() {
-	    stop = System.nanoTime();
+	    long now = System.nanoTime();
+	    stop(now);
 	}
 	
 	/**
@@ -133,29 +180,36 @@
 	 * @return the measurement time in nanoseconds, or -1 if the measurement is stil running.
 	 */
 	public long elapsed() {
-	    if(stop == 0) {
+	    if(stop <= 0) {
 		return -1;
 	    }
 	    else {
 		return stop - start;
 	    }
 	}
-	
+		
 	/**
 	 * Returns the number of milliseconds for the measurement as a String.
 	 */
 	public String toString() {
-	    double milli = elapsed() / Math.pow(10, 6);
-	    return Double.toString(milli) + " ms";
+	    double milli = elapsed() / normalization;
+	    double startMs = start / normalization;
+	    double stopMs = stop / normalization;
+	    
+	    return milli + "ms/" + startMs + '/' + stopMs;
 	}
     }
     
     public static void main(String args[]){
 	// test the measurement overhead
 	String tag;
-	for(int i = 0; i < 100; i++){
+	for(int i = 0; i < 2; i++){
 	    tag = "foo foo foo";
-	    start(tag); stop(tag);
+	    Measurement m;
+	    m = start(tag); System.out.println(tag); m.stop();
+	    m = start(tag); System.out.println(tag); m.stop();
+	    m = start(tag); System.out.println(tag); m.stop();
+	    m = start(tag); System.out.println(tag); m.stop();
 	    tag = "bar";
 	    start(tag); stop(tag);
 	    tag = "baz";
@@ -163,15 +217,15 @@
 	    report();
 	    clear();
 	}
-	for(int i = 0; i < 100; i++){
-	    tag = "a";
-	    start(tag); stop(tag);
-	    tag = "b";
-	    start(tag); stop(tag);
-	    tag = "c";
-	    start(tag); stop(tag);
-	    report();
-	    clear();
-	}
+//	for(int i = 0; i < 100; i++){
+//	    tag = "a";
+//	    start(tag); stop(tag);
+//	    tag = "b";
+//	    start(tag); stop(tag);
+//	    tag = "c";
+//	    start(tag); stop(tag);
+//	    report();
+//	    clear();
+//	}
     }
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
index 1cbeece..de625d7 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
@@ -1,6 +1,7 @@
 package net.onrc.onos.ofcontroller.flowmanager.web;
 
 import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor;
 import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index 65bc40b..85d122d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -477,6 +477,11 @@
 		MACAddress srcMacAddress = installedFlowPath.flowEntryMatch().srcMac();
 		MACAddress dstMacAddress = installedFlowPath.flowEntryMatch().dstMac();
 		
+		if (srcMacAddress == null || dstMacAddress == null) {
+			// Not our flow path, ignore
+			return;
+		}
+		
 		Collection<PacketToPush> packets;
 		synchronized (lock) {
 			log.debug("Flow {} has been installed, sending queued packets",