Merge branch 'ONOS-ONRC2014-Measurements' of github.com:OPENNETWORKINGLAB/ONOS into RAMCloud-merge
diff --git a/perf-scripts/flow-manager-perf.py b/perf-scripts/flow-manager-perf.py
index 3168f63..4ee6beb 100644
--- a/perf-scripts/flow-manager-perf.py
+++ b/perf-scripts/flow-manager-perf.py
@@ -8,6 +8,7 @@
import csv
import os
import sys
+import re
from time import sleep, strftime
from subprocess import Popen, call, check_output, PIPE
from datetime import datetime
@@ -22,37 +23,88 @@
ONOS_HOME = '..'
ONOS_LOG = '%s/onos-logs/onos.%s.log' % ( ONOS_HOME, check_output( 'hostname').strip() )
-ONOS_LOG = '/tmp/onos-1.logs/onos.onos-vm.log'
+ONOS_URL = 'http://127.0.0.1:8080/wm/onos/flows/get/%d/json'
+ONOS_LOG = '/tmp/onos-0.logs/onos.onos-vm.log'
print "ONOS Log File:", ONOS_LOG
PORT = 's1-eth2'
-N = 100
+N = 5
# ----------------- Running the test and output -------------------------
-# 17:43:37.206 [main] ERROR n.o.o.o.f.PerformanceMonitor - Performance Results: {a=0.001ms, b=0.0ms, c=0.0ms} with measurement overhead: 0.022 ms
+class Result(object):
+ def __init__(self, tsharkTime, flowmods, onosTime, overhead, details):
+ self.tsharkTime = tsharkTime
+ self.flowmods = flowmods
+ self.onosTime = onosTime
+ self.overhead = overhead
+ # sorted by start time
+ self.details = sorted(details, key=lambda x: float(x[2]) )
+
+ def __repr__(self):
+ return '%f %f %f %d %s' % (self.tsharkTime, self.onosTime, self.overhead, self.flowmods, self.details)
+
+
+def clearResults():
+ cmd = 'curl %s' % ONOS_URL % -200
+ call( cmd, shell=True )
+ pass
+
+def reportResults():
+ cmd = 'curl %s' % ONOS_URL % -100
+ call( cmd, shell=True )
def test():
# Start tailing the onos log
tail = pexpect.spawn( "tail -0f %s" % ONOS_LOG )
+ tshark = pexpect.spawn( 'tshark -i lo -R "of.type == 12 || of.type == 14"' )
+ tshark.expect('Capturing on lo')
+ sleep(1) # wait for tshark to start
+ clearResults() # REST call to ONOS
# Take link down
call( 'ifconfig %s down' % PORT, shell=True )
- # Wait for performance results in the log
- tail.expect('Performance Results: \{([^\}]*)\} with measurement overhead: ([\d.]+) ms', timeout=6000)
- s = tail.match.group(1)
- overhead = float( tail.match.group(2) )
- results = dict( re.findall(r'(\w[\w\s]*)=([\d.]+)', s) )
+ # collect openflow packets using tshark
+ count = 0
+ timeout = 6000
+ start = -1
+ end = -1
+ while True:
+ i = tshark.expect( ['(\d+\.\d+)', pexpect.TIMEOUT], timeout=timeout )
+ if i == 1:
+ break
+ time = float(tshark.match.group(1))
+ if start == -1:
+ start = time
+ if time > end:
+ end = time
+ i = tshark.expect( ['Port Status', 'Flow Mod'] )
+ if i == 1:
+ count += 1
+ timeout = 3 #sec
+ elapsed = (end - start) * 1000
+ # read the performance results from ONOS
+ reportResults() # REST call
+
+ # Wait for performance results in the log
+ tail.expect('Performance Results: \(avg/start/stop/count\)', timeout=10)
+ i = tail.expect('TotalTime:([\d\.]+)/Overhead:([\d\.]+)')
+ totalTime = float(tail.match.group(1))
+ overhead = float(tail.match.group(2))
+ tags = re.findall( r'([\w\.]+)=([\d\.]+)/([\d\.]+)/([\d\.]+)/(\d+)', tail.before )
+ result = Result(elapsed, count, totalTime, overhead, tags)
# Output results
- print "* Results:", results, "Overhead:", overhead
-
+ print result
+
+
# Bring port back up
call( 'ifconfig %s up' % PORT, shell=True )
tail.terminate()
- return results
+ tshark.terminate()
+ return []
def outputResults(filename, results):
with open(filename, 'a') as csvfile:
@@ -65,10 +117,12 @@
start = datetime.now()
for i in range(n):
results = test()
- outputResults(filename, results)
+ #outputResults(filename, results)
+ sys.stdout.write('$')
+ sys.stdout.flush()
+ sleep(5) # sleep for 5 seconds between tests
sys.stdout.write('.')
sys.stdout.flush()
- sleep(5000) # sleep for 5 seconds between tests
totalTime = datetime.now() - start
print '\nExperiments complete in %s (h:m:s.s)' % totalTime
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index b76aa58..45995bf 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -18,6 +18,7 @@
import net.floodlightcontroller.restserver.IRestApiService;
import net.onrc.onos.datagrid.web.DatagridWebRoutable;
import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
+import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor.Measurement;
import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
@@ -450,8 +451,9 @@
*/
@Override
public void entryRemoved(EntryEvent<String, byte[]> event) {
- String tag = "TopologyEntryRemoved.NotificationReceived." + event.getKey();
- PerformanceMonitor.start(tag);
+// String tag = "TopologyEntryRemoved.NotificationReceived." + event.getKey();
+ String tag = "TopologyEntryRemoved.NotificationReceived";
+ PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
byte[] valueBytes = event.getValue();
//
@@ -463,8 +465,9 @@
kryo.readObject(input, TopologyElement.class);
kryoFactory.deleteKryo(kryo);
flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
- PerformanceMonitor.stop(tag);
- PerformanceMonitor.report(tag);
+// PerformanceMonitor.stop(tag);
+ m.stop();
+// PerformanceMonitor.report(tag);
}
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 1471c49..ba1e448 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -399,13 +399,15 @@
PerformanceMonitor.stop("EventHandler.ProcessAllEvents");
- PerformanceMonitor.report("EventHandler.SwitchDpidEvents");
- PerformanceMonitor.report("EventHandler.FlowIdEvents");
- PerformanceMonitor.report("EventHandler.ReadTopology");
- PerformanceMonitor.report("EventHandler.RecomputeFlows");
- PerformanceMonitor.report("EventHandler.WriteFlowsToDb");
- PerformanceMonitor.report("EventHandler.NotificationSend.FlowEntryRemoved");
- PerformanceMonitor.report("EventHandler.ProcessAllEvents");
+// PerformanceMonitor.report("EventHandler.SwitchDpidEvents");
+// PerformanceMonitor.report("EventHandler.FlowIdEvents");
+// PerformanceMonitor.report("EventHandler.ReadTopology");
+// PerformanceMonitor.report("EventHandler.RecomputeFlows");
+// PerformanceMonitor.report("EventHandler.WriteFlowsToDb");
+// PerformanceMonitor.report("EventHandler.NotificationSend.FlowEntryRemoved");
+// PerformanceMonitor.report("EventHandler.ProcessAllEvents");
+// PerformanceMonitor.report();
+// PerformanceMonitor.clear();
return;
}
@@ -1421,13 +1423,15 @@
@Override
public void notificationRecvFlowEntryAdded(FlowEntry flowEntry) {
if (enableOnrc2014MeasurementsFlows) {
- String tag = "EventHandler.AddFlowEntryToSwitch." + flowEntry.flowEntryId();
- PerformanceMonitor.start(tag);
+// String tag = "EventHandler.AddFlowEntryToSwitch." + flowEntry.flowEntryId();
+ String tag = "EventHandler.AddFlowEntryToSwitch";
+ PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
Collection entries = new ArrayList();
entries.add(flowEntry);
flowManager.pushModifiedFlowEntriesToSwitches(entries);
- PerformanceMonitor.stop(tag);
- PerformanceMonitor.report(tag);
+// PerformanceMonitor.stop(tag);
+ m.stop();
+// PerformanceMonitor.report(tag);
return;
}
@@ -1444,8 +1448,9 @@
@Override
public void notificationRecvFlowEntryRemoved(FlowEntry flowEntry) {
if (enableOnrc2014MeasurementsFlows) {
- String tag = "EventHandler.RemoveFlowEntryFromSwitch." + flowEntry.flowEntryId();
- PerformanceMonitor.start(tag);
+// String tag = "EventHandler.RemoveFlowEntryFromSwitch." + flowEntry.flowEntryId();
+ String tag = "EventHandler.RemoveFlowEntryFromSwitch";
+ PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
//
// NOTE: Must update the state to DELETE, because
// the notification contains the original state.
@@ -1455,8 +1460,9 @@
Collection entries = new ArrayList();
entries.add(flowEntry);
flowManager.pushModifiedFlowEntriesToSwitches(entries);
- PerformanceMonitor.stop(tag);
- PerformanceMonitor.report(tag);
+// PerformanceMonitor.stop(tag);
+ m.stop();
+// PerformanceMonitor.report(tag);
return;
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index fc5ae34..791008d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -41,7 +41,6 @@
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
import com.thinkaurelius.titan.core.TitanException;
-
import com.esotericsoftware.kryo2.Kryo;
import org.slf4j.Logger;
@@ -334,6 +333,16 @@
*/
@Override
public FlowPath getFlow(FlowId flowId) {
+ log.debug("FlowID: {}", flowId);
+ if(flowId.value() == -100) {
+ log.debug("Printing results...");
+ PerformanceMonitor.report();
+ PerformanceMonitor.clear();
+ }
+ else if(flowId.value() == -200) {
+ log.debug("Clearing results...");
+ PerformanceMonitor.clear();
+ }
return FlowDatabaseOperation.getFlow(dbHandlerApi, flowId);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
index fff89dc..5d9af5a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
@@ -261,12 +261,16 @@
@Override
public Boolean call() throws Exception {
- String tag1 = "FlowDatabaseOperation.AddFlow." + flowPath.flowId();
- String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry." + flowPath.flowId();
- PerformanceMonitor.start(tag1);
+// String tag1 = "FlowDatabaseOperation.AddFlow." + flowPath.flowId();
+ String tag1 = "FlowDatabaseOperation.AddFlow";
+// String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry." + flowPath.flowId();
+ String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry";
+ PerformanceMonitor.Measurement m;
+ m = PerformanceMonitor.start(tag1);
boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
PerformanceMonitor.stop(tag1);
- PerformanceMonitor.start(tag2);
+ m.stop();
+ m = PerformanceMonitor.start(tag2);
if(success) {
if(datagridService != null) {
// Send notifications for each Flow Entry
@@ -298,9 +302,9 @@
else {
log.error("Error adding flow path {} to database", flowPath);
}
- PerformanceMonitor.stop(tag2);
- PerformanceMonitor.report(tag1);
- PerformanceMonitor.report(tag2);
+ m.stop();
+// PerformanceMonitor.report(tag1);
+// PerformanceMonitor.report(tag2);
return success;
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
index 293b426..7c38a3c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
@@ -1,6 +1,9 @@
package net.onrc.onos.ofcontroller.flowmanager;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
@@ -10,26 +13,35 @@
* Class for collecting performance measurements
*/
public class PerformanceMonitor {
- private final static Map<String, Measurement> map = new ConcurrentHashMap<String, Measurement>();;
+ private final static Map<String, List<Measurement>> map = new ConcurrentHashMap<>();;
private final static Logger log = LoggerFactory.getLogger(PerformanceMonitor.class);
- private static long overhead;
-
+ private static long overhead;
+ private static long experimentStart = Long.MAX_VALUE;
+ private final static double normalization = Math.pow(10, 6);
+
/**
* Start a performance measurement, identified by a tag
*
* Note: Only a single measurement can use the same tag at a time.
+ * ..... not true anymore.
*
* @param tag for performance measurement
*/
- public static void start(String tag) {
+ public static Measurement start(String tag) {
long start = System.nanoTime();
- Measurement m = new Measurement();
- if(map.put(tag, m) != null) {
- // if there was a previous entry, we have just overwritten it
- log.error("Tag {} already exists", tag);
+ if(start < experimentStart) {
+ experimentStart = start;
}
+ List<Measurement> list = map.get(tag);
+ if(list == null) {
+ list = new ArrayList<Measurement>();
+ map.put(tag, list);
+ }
+ Measurement m = new Measurement();
+ list.add(m);
m.start();
overhead += System.nanoTime() - start;
+ return m;
}
/**
@@ -41,46 +53,75 @@
*/
public static void stop(String tag) {
long time = System.nanoTime();
- Measurement m = map.get(tag);
- if(m == null) {
+ List<Measurement> list = map.get(tag);
+ if(list == null) {
log.error("Tag {} does not exist", tag);
}
+ else if(list.size() == 1) {
+ list.get(0).stop(time);
+ }
else {
- map.get(tag).stop(time);
+ log.error("Tag {} has multiple measurements", tag);
}
overhead += System.nanoTime() - time;
}
-
- /**
- * Find a measurement, identified by tag, and return the result
- *
- * @param tag for performance measurement
- * @return the time in nanoseconds
- */
- public static long result(String tag) {
- Measurement m = map.get(tag);
- if(m != null) {
- return m.elapsed();
- }
- else {
- return -1;
- }
- }
-
+
/**
* Clear all performance measurements.
*/
public static void clear() {
map.clear();
overhead = 0;
+ experimentStart = Long.MAX_VALUE;
}
/**
* Write all performance measurements to the log
*/
public static void report() {
- double overheadMilli = overhead / Math.pow(10, 6);
- log.error("Performance Results: {} with measurement overhead: {} ms", map, overheadMilli);
+ String result = "Performance Results: (avg/start/stop/count)\n";
+ if(map.size() == 0) {
+ result += "No Measurements";
+ log.error(result);
+ return;
+ }
+ long experimentEnd = -1;
+ for(Entry<String, List<Measurement>> e : map.entrySet()) {
+ String key = e.getKey();
+ List<Measurement> list = e.getValue();
+ int total = 0, count = 0;
+ long start = Long.MAX_VALUE, stop = -1;
+ for(Measurement m : list) {
+ // Collect overall start and end times
+ if(m.start < start) {
+ start = m.start;
+ }
+ if(m.stop > stop) {
+ stop = m.stop;
+ if(stop > experimentEnd) {
+ experimentEnd = stop;
+ }
+ }
+
+ // Collect statistics for average
+ total += m.elapsed();
+ count++;
+ }
+ double avg = (double) total / count;
+ // Normalize start/stop
+ start -= experimentStart;
+ stop -= experimentStart;
+ result += key + '=' +
+ (avg / normalization) + '/' +
+ (start / normalization) + '/' +
+ (stop / normalization) + '/' +
+ count + '\n';
+ }
+ double overheadMs = overhead / normalization;
+ double experimentElapsed = (experimentEnd - experimentStart) / normalization;
+ result += "TotalTime:" + experimentElapsed + "/Overhead:" + overheadMs;
+ log.error(result);
+// log.error("Performance Results: {} with measurement overhead: {} ms", map, overheadMilli);
}
/**
@@ -89,21 +130,26 @@
* @param tag the tag name.
*/
public static void report(String tag) {
- Measurement m = map.get(tag);
+ List<Measurement> list = map.get(tag);
+ if(list == null) {
+ return; //TODO
+ }
+ //TODO: fix this;
+ Measurement m = list.get(0);
if (m != null) {
- log.error("Performance Results: tag = {} start = {} stop = {} elapsed = {}",
+ log.error("Performance Result: tag = {} start = {} stop = {} elapsed = {}",
tag, m.start, m.stop, m.toString());
} else {
- log.error("Performance Results: unknown tag {}", tag);
+ log.error("Performance Result: unknown tag {}", tag);
}
}
/**
* A single performance measurement
*/
- static class Measurement {
+ public static class Measurement {
long start;
- long stop;
+ long stop = -1;
/**
* Start the measurement
@@ -116,7 +162,8 @@
* Stop the measurement
*/
public void stop() {
- stop = System.nanoTime();
+ long now = System.nanoTime();
+ stop(now);
}
/**
@@ -133,29 +180,36 @@
* @return the measurement time in nanoseconds, or -1 if the measurement is stil running.
*/
public long elapsed() {
- if(stop == 0) {
+ if(stop <= 0) {
return -1;
}
else {
return stop - start;
}
}
-
+
/**
* Returns the number of milliseconds for the measurement as a String.
*/
public String toString() {
- double milli = elapsed() / Math.pow(10, 6);
- return Double.toString(milli) + " ms";
+ double milli = elapsed() / normalization;
+ double startMs = start / normalization;
+ double stopMs = stop / normalization;
+
+ return milli + "ms/" + startMs + '/' + stopMs;
}
}
public static void main(String args[]){
// test the measurement overhead
String tag;
- for(int i = 0; i < 100; i++){
+ for(int i = 0; i < 2; i++){
tag = "foo foo foo";
- start(tag); stop(tag);
+ Measurement m;
+ m = start(tag); System.out.println(tag); m.stop();
+ m = start(tag); System.out.println(tag); m.stop();
+ m = start(tag); System.out.println(tag); m.stop();
+ m = start(tag); System.out.println(tag); m.stop();
tag = "bar";
start(tag); stop(tag);
tag = "baz";
@@ -163,15 +217,15 @@
report();
clear();
}
- for(int i = 0; i < 100; i++){
- tag = "a";
- start(tag); stop(tag);
- tag = "b";
- start(tag); stop(tag);
- tag = "c";
- start(tag); stop(tag);
- report();
- clear();
- }
+// for(int i = 0; i < 100; i++){
+// tag = "a";
+// start(tag); stop(tag);
+// tag = "b";
+// start(tag); stop(tag);
+// tag = "c";
+// start(tag); stop(tag);
+// report();
+// clear();
+// }
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
index 1cbeece..de625d7 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
@@ -1,6 +1,7 @@
package net.onrc.onos.ofcontroller.flowmanager.web;
import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index 65bc40b..85d122d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -477,6 +477,11 @@
MACAddress srcMacAddress = installedFlowPath.flowEntryMatch().srcMac();
MACAddress dstMacAddress = installedFlowPath.flowEntryMatch().dstMac();
+ if (srcMacAddress == null || dstMacAddress == null) {
+ // Not our flow path, ignore
+ return;
+ }
+
Collection<PacketToPush> packets;
synchronized (lock) {
log.debug("Flow {} has been installed, sending queued packets",