IntentPerfInstaller: using feedback to determine submit size
Change-Id: Iaa4eb657ee0e22d008597c40561ea89105a09a15
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
index f7c48c9..ccd29ab 100644
--- a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -61,22 +61,22 @@
import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.net.intent.IntentEvent.Type.INSTALLED;
-import static org.onosproject.net.intent.IntentEvent.Type.WITHDRAWN;
+import static org.onosproject.net.intent.IntentEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
- * Application to set up demos.
+ * Application to test sustained intent throughput.
*/
@Component(immediate = true)
public class IntentPerfInstaller {
//FIXME make this configurable
private static final int NUM_WORKERS = 1;
- private static final int NUM_KEYS = 10_000;
+ private static final int NUM_KEYS = 20_000;
public static final int START_DELAY = 5_000; // ms
private static final int REPORT_PERIOD = 5_000; //ms
+ private static final int GOAL_CYCLE_PERIOD = 1_000; //ms
private final Logger log = getLogger(getClass());
@@ -99,6 +99,7 @@
private Timer reportTimer;
+ // FIXME this variable isn't shared properly between multiple worker threads
private int lastKey = 0;
@Activate
@@ -135,6 +136,7 @@
reportTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
+ //adjustRates(); // FIXME we currently adjust rates in the cycle thread
listener.report();
}
}, REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, REPORT_PERIOD);
@@ -158,16 +160,10 @@
try {
workers.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- log.warn("Failed to stop worker.");
+ log.warn("Failed to stop worker", e);
}
}
- private Iterable<Intent> subset(Set<Intent> intents) {
- List<Intent> subset = Lists.newArrayList(intents);
- Collections.shuffle(subset);
- return subset.subList(0, subset.size() / 2);
- }
-
/**
* Creates a specified number of intents for testing purposes.
*
@@ -200,7 +196,7 @@
continue;
}
- //FIXME
+ //FIXME we currently ignore the path length and always use the same device
TrafficSelector selector = DefaultTrafficSelector.builder().build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
ConnectPoint ingress = new ConnectPoint(ingressDevice.id(), PortNumber.portNumber(1));
@@ -216,7 +212,7 @@
count++;
lastKey = k;
if (lastKey % 1000 == 0) {
- log.info("Building intents... {} ({})", count, lastKey);
+ log.info("Building intents... {} (attempt: {})", lastKey, count);
}
}
log.info("Created {} intents", numberOfKeys);
@@ -226,24 +222,33 @@
// Submits intent operations.
final class Submitter implements Runnable {
+ private long lastDuration;
+ private int lastCount;
+
private Set<Intent> intents = Sets.newHashSet();
private Set<Intent> submitted = Sets.newHashSet();
private Set<Intent> withdrawn = Sets.newHashSet();
private Submitter(Set<Intent> intents) {
this.intents = intents;
+ lastCount = NUM_KEYS / 4;
+ lastDuration = 1000; // 1 second
}
@Override
public void run() {
- delay(2000); // take a breath to start
prime();
while (!stopped) {
cycle();
- delay(800); // take a breath
}
}
+ private Iterable<Intent> subset(Set<Intent> intents) {
+ List<Intent> subset = Lists.newArrayList(intents);
+ Collections.shuffle(subset);
+ return subset.subList(0, lastCount);
+ }
+
// Submits the specified intent.
private void submit(Intent intent) {
intentService.submit(intent);
@@ -273,23 +278,52 @@
// Runs a single operation cycle.
private void cycle() {
+ //TODO consider running without rate adjustment
+ adjustRates();
+
long start = currentTimeMillis();
subset(submitted).forEach(this::withdraw);
subset(withdrawn).forEach(this::submit);
long delta = currentTimeMillis() - start;
- if (delta > 5000 || delta < 0) {
+
+ if (delta > GOAL_CYCLE_PERIOD * 3 || delta < 0) {
log.warn("Cycle took {} ms", delta);
}
+
+ int difference = GOAL_CYCLE_PERIOD - (int) delta;
+ if (difference > 0) {
+ delay(difference);
+ }
+
+ lastDuration = delta;
+ }
+
+ int cycleCount = 0;
+ private void adjustRates() {
+ //FIXME need to iron out the rate adjustment
+ if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
+ if (listener.requestThroughput() - listener.processedThroughput() <= 500 &&
+ lastDuration <= GOAL_CYCLE_PERIOD) {
+ lastCount = Math.min(lastCount + 100, intents.size() / 2);
+ } else {
+ lastCount *= 0.8;
+ }
+ log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
+ lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
+ }
+
}
}
-
// Event listener to monitor throughput.
final class Listener implements IntentListener {
- private final Map<IntentEvent.Type, Counter> counters;
+ private Map<IntentEvent.Type, Counter> counters;
private final Counter runningTotal = new Counter();
+ private volatile double processedThroughput = 0;
+ private volatile double requestThroughput = 0;
+
public Listener() {
counters = initCounters();
}
@@ -302,6 +336,14 @@
return map;
}
+ public double processedThroughput() {
+ return processedThroughput;
+ }
+
+ public double requestThroughput() {
+ return requestThroughput;
+ }
+
@Override
public void event(IntentEvent event) {
if (event.subject().appId().equals(appId)) {
@@ -310,25 +352,29 @@
}
public void report() {
- StringBuilder stringBuilder = new StringBuilder();
- Counter installed = counters.get(INSTALLED);
- Counter withdrawn = counters.get(WITHDRAWN);
- double current = installed.throughput() + withdrawn.throughput();
+ Map<IntentEvent.Type, Counter> reportCounters = counters;
+ counters = initCounters();
+
+ // update running total and latest throughput
+ Counter installed = reportCounters.get(INSTALLED);
+ Counter withdrawn = reportCounters.get(WITHDRAWN);
+ processedThroughput = installed.throughput() + withdrawn.throughput();
runningTotal.add(installed.total() + withdrawn.total());
+
+ Counter installReq = reportCounters.get(INSTALL_REQ);
+ Counter withdrawReq = reportCounters.get(WITHDRAW_REQ);
+ requestThroughput = installReq.throughput() + withdrawReq.throughput();
+
+ // build the string to report
+ StringBuilder stringBuilder = new StringBuilder();
for (IntentEvent.Type type : IntentEvent.Type.values()) {
- stringBuilder.append(printCounter(type)).append("; ");
+ Counter counter = reportCounters.get(type);
+ stringBuilder.append(format("%s=%.2f;", type, counter.throughput()));
}
log.info("Throughput: OVERALL={}; CURRENT={}; {}",
format("%.2f", runningTotal.throughput()),
- format("%.2f", current), stringBuilder);
- }
-
- private String printCounter(IntentEvent.Type event) {
- Counter counter = counters.get(event);
- String result = format("%s=%.2f", event, counter.throughput());
- counter.reset();
- return result;
+ format("%.2f", processedThroughput),
+ stringBuilder);
}
}
-
}