IntentPerfInstaller: using feedback to determine submit size

Change-Id: Iaa4eb657ee0e22d008597c40561ea89105a09a15
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
index f7c48c9..ccd29ab 100644
--- a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -61,22 +61,22 @@
 import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
 import static org.onlab.util.Tools.delay;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.net.intent.IntentEvent.Type.INSTALLED;
-import static org.onosproject.net.intent.IntentEvent.Type.WITHDRAWN;
+import static org.onosproject.net.intent.IntentEvent.Type.*;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Application to set up demos.
+ * Application to test sustained intent throughput.
  */
 @Component(immediate = true)
 public class IntentPerfInstaller {
 
     //FIXME make this configurable
     private static final int NUM_WORKERS = 1;
-    private static final int NUM_KEYS = 10_000;
+    private static final int NUM_KEYS = 20_000;
 
     public static final int START_DELAY = 5_000; // ms
     private static final int REPORT_PERIOD = 5_000; //ms
+    private static final int GOAL_CYCLE_PERIOD = 1_000; //ms
 
     private final Logger log = getLogger(getClass());
 
@@ -99,6 +99,7 @@
 
     private Timer reportTimer;
 
+    // FIXME this variable isn't shared properly between multiple worker threads
     private int lastKey = 0;
 
     @Activate
@@ -135,6 +136,7 @@
         reportTimer.scheduleAtFixedRate(new TimerTask() {
             @Override
             public void run() {
+                //adjustRates(); // FIXME we currently adjust rates in the cycle thread
                 listener.report();
             }
         }, REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, REPORT_PERIOD);
@@ -158,16 +160,10 @@
         try {
             workers.awaitTermination(5, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
-            log.warn("Failed to stop worker.");
+            log.warn("Failed to stop worker", e);
         }
     }
 
-    private Iterable<Intent> subset(Set<Intent> intents) {
-        List<Intent> subset = Lists.newArrayList(intents);
-        Collections.shuffle(subset);
-        return subset.subList(0, subset.size() / 2);
-    }
-
     /**
      * Creates a specified number of intents for testing purposes.
      *
@@ -200,7 +196,7 @@
                 continue;
             }
 
-            //FIXME
+            //FIXME we currently ignore the path length and always use the same device
             TrafficSelector selector = DefaultTrafficSelector.builder().build();
             TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
             ConnectPoint ingress = new ConnectPoint(ingressDevice.id(), PortNumber.portNumber(1));
@@ -216,7 +212,7 @@
             count++;
             lastKey = k;
             if (lastKey % 1000 == 0) {
-                log.info("Building intents... {} ({})", count, lastKey);
+                log.info("Building intents... {} (attempt: {})", lastKey, count);
             }
         }
         log.info("Created {} intents", numberOfKeys);
@@ -226,24 +222,33 @@
     // Submits intent operations.
     final class Submitter implements Runnable {
 
+        private long lastDuration;
+        private int lastCount;
+
         private Set<Intent> intents = Sets.newHashSet();
         private Set<Intent> submitted = Sets.newHashSet();
         private Set<Intent> withdrawn = Sets.newHashSet();
 
         private Submitter(Set<Intent> intents) {
             this.intents = intents;
+            lastCount = NUM_KEYS / 4;
+            lastDuration = 1000; // 1 second
         }
 
         @Override
         public void run() {
-            delay(2000); // take a breath to start
             prime();
             while (!stopped) {
                 cycle();
-                delay(800); // take a breath
             }
         }
 
+        private Iterable<Intent> subset(Set<Intent> intents) {
+            List<Intent> subset = Lists.newArrayList(intents);
+            Collections.shuffle(subset);
+            return subset.subList(0, lastCount);
+        }
+
         // Submits the specified intent.
         private void submit(Intent intent) {
             intentService.submit(intent);
@@ -273,23 +278,52 @@
 
         // Runs a single operation cycle.
         private void cycle() {
+            //TODO consider running without rate adjustment
+            adjustRates();
+
             long start = currentTimeMillis();
             subset(submitted).forEach(this::withdraw);
             subset(withdrawn).forEach(this::submit);
             long delta = currentTimeMillis() - start;
-            if (delta > 5000 || delta < 0) {
+
+            if (delta > GOAL_CYCLE_PERIOD * 3 || delta < 0) {
                 log.warn("Cycle took {} ms", delta);
             }
+
+            int difference = GOAL_CYCLE_PERIOD - (int) delta;
+            if (difference > 0) {
+                delay(difference);
+            }
+
+            lastDuration = delta;
+        }
+
+        int cycleCount = 0;
+        private void adjustRates() {
+            //FIXME need to iron out the rate adjustment
+            if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
+                if (listener.requestThroughput() - listener.processedThroughput() <= 500 &&
+                        lastDuration <= GOAL_CYCLE_PERIOD) {
+                    lastCount = Math.min(lastCount + 100, intents.size() / 2);
+                } else {
+                    lastCount *= 0.8;
+                }
+                log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
+                         lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
+            }
+
         }
     }
 
-
     // Event listener to monitor throughput.
     final class Listener implements IntentListener {
 
-        private final Map<IntentEvent.Type, Counter> counters;
+        private Map<IntentEvent.Type, Counter> counters;
         private final Counter runningTotal = new Counter();
 
+        private volatile double processedThroughput = 0;
+        private volatile double requestThroughput = 0;
+
         public Listener() {
             counters = initCounters();
         }
@@ -302,6 +336,14 @@
             return map;
         }
 
+        public double processedThroughput() {
+            return processedThroughput;
+        }
+
+        public double requestThroughput() {
+            return requestThroughput;
+        }
+
         @Override
         public void event(IntentEvent event) {
             if (event.subject().appId().equals(appId)) {
@@ -310,25 +352,29 @@
         }
 
         public void report() {
-            StringBuilder stringBuilder = new StringBuilder();
-            Counter installed = counters.get(INSTALLED);
-            Counter withdrawn = counters.get(WITHDRAWN);
-            double current = installed.throughput() + withdrawn.throughput();
+            Map<IntentEvent.Type, Counter> reportCounters = counters;
+            counters = initCounters();
+
+            // update running total and latest throughput
+            Counter installed = reportCounters.get(INSTALLED);
+            Counter withdrawn = reportCounters.get(WITHDRAWN);
+            processedThroughput = installed.throughput() + withdrawn.throughput();
             runningTotal.add(installed.total() + withdrawn.total());
+
+            Counter installReq = reportCounters.get(INSTALL_REQ);
+            Counter withdrawReq = reportCounters.get(WITHDRAW_REQ);
+            requestThroughput = installReq.throughput() + withdrawReq.throughput();
+
+            // build the string to report
+            StringBuilder stringBuilder = new StringBuilder();
             for (IntentEvent.Type type : IntentEvent.Type.values()) {
-                stringBuilder.append(printCounter(type)).append("; ");
+                Counter counter = reportCounters.get(type);
+                stringBuilder.append(format("%s=%.2f;", type, counter.throughput()));
             }
             log.info("Throughput: OVERALL={}; CURRENT={}; {}",
                      format("%.2f", runningTotal.throughput()),
-                     format("%.2f", current), stringBuilder);
-        }
-
-        private String printCounter(IntentEvent.Type event) {
-            Counter counter = counters.get(event);
-            String result = format("%s=%.2f", event, counter.throughput());
-            counter.reset();
-            return result;
+                     format("%.2f", processedThroughput),
+                     stringBuilder);
         }
     }
-
 }