ONOS-1315 Adding configurability to intent performance test app.

Change-Id: I2782bdc0f78cc49aa60af1fa03eae91a2fd6bce0
diff --git a/apps/intent-perf/pom.xml b/apps/intent-perf/pom.xml
index 2d4ddf1..6015fb4 100644
--- a/apps/intent-perf/pom.xml
+++ b/apps/intent-perf/pom.xml
@@ -42,6 +42,10 @@
             <artifactId>onos-cli</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
index 333fa4e..90eb9d7 100644
--- a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
@@ -50,7 +50,7 @@
 @Service(value = IntentPerfCollector.class)
 public class IntentPerfCollector {
 
-    private static final long SAMPLE_WINDOW = 5_000;
+    private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
     private final Logger log = getLogger(getClass());
 
     private static final int MAX_SAMPLES = 1_000;
@@ -82,9 +82,9 @@
 
     @Activate
     public void activate() {
-        this.nodeId = clusterService.getLocalNode().id();
-        this.newestTime = 0;
+        nodeId = clusterService.getLocalNode().id();
 
+        // TODO: replace with shared executor
         messageHandlingExecutor = Executors.newSingleThreadExecutor(
                 groupedThreads("onos/perf", "message-handler"));
 
@@ -98,9 +98,8 @@
         for (int i = 0; i < nodes.length; i++) {
             nodeToIndex.put(nodes[i].id(), i);
         }
-        overall = new Sample(0, nodes.length);
-        current = new Sample(0, nodes.length);
 
+        clearSamples();
         log.info("Started");
     }
 
@@ -112,19 +111,26 @@
     }
 
     /**
+     * Clears all previously accumulated data.
+     */
+    public void clearSamples() {
+        newestTime = 0;
+        overall = new Sample(0, nodes.length);
+        current = new Sample(0, nodes.length);
+        samples.clear();
+    }
+
+
+    /**
      * Records a sample point of data about intent operation rate.
      *
      * @param overallRate overall rate
      * @param currentRate current rate
      */
     public void recordSample(double overallRate, double currentRate) {
-        try {
-            long now = System.currentTimeMillis();
-            addSample(now, nodeId, overallRate, currentRate);
-            broadcastSample(now, nodeId, overallRate, currentRate);
-        } catch (Exception e) {
-            log.error("Boom!", e);
-        }
+        long now = System.currentTimeMillis();
+        addSample(now, nodeId, overallRate, currentRate);
+        broadcastSample(now, nodeId, overallRate, currentRate);
     }
 
     /**
@@ -173,7 +179,7 @@
     }
 
     private Sample createCurrentSampleIfNeeded(long time) {
-        Sample oldSample = time - newestTime > SAMPLE_WINDOW || current.isComplete() ? current : null;
+        Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
         if (oldSample != null) {
             newestTime = time;
             current = new Sample(time, nodes.length);
@@ -227,9 +233,9 @@
         @Override
         public void handle(ClusterMessage message) {
             String[] fields = new String(message.payload()).split("\\|");
-            log.info("Received sample from {}: {}", message.sender(), fields);
+            log.debug("Received sample from {}: {}", message.sender(), fields);
             addSample(Long.parseLong(fields[0]), message.sender(),
-                      Double.parseDouble(fields[1]), Double.parseDouble(fields[1]));
+                      Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
         }
     }
 }
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
index 029d70a..ff675c1 100644
--- a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -24,8 +24,11 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
 import org.onlab.packet.MacAddress;
 import org.onlab.util.Counter;
 import org.onosproject.cfg.ComponentConfigService;
@@ -50,10 +53,16 @@
 import org.onosproject.net.intent.Key;
 import org.onosproject.net.intent.PartitionService;
 import org.onosproject.net.intent.PointToPointIntent;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -65,11 +74,11 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.lang.String.format;
 import static java.lang.System.currentTimeMillis;
 import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
-import static org.onlab.util.Tools.delay;
-import static org.onlab.util.Tools.groupedThreads;
+import static org.onlab.util.Tools.*;
 import static org.onosproject.net.intent.IntentEvent.Type.*;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -77,20 +86,25 @@
  * Application to test sustained intent throughput.
  */
 @Component(immediate = true)
+@Service(value = IntentPerfInstaller.class)
 public class IntentPerfInstaller {
 
     private final Logger log = getLogger(getClass());
 
     private static final int DEFAULT_NUM_WORKERS = 1;
 
-    private static final int DEFAULT_NUM_KEYS = 40_000;
-    private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1_000; //ms
+    private static final int DEFAULT_NUM_KEYS = 40000;
+    private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms
 
     private static final int DEFAULT_NUM_NEIGHBORS = 0;
 
     private static final int START_DELAY = 5_000; // ms
     private static final int REPORT_PERIOD = 5_000; //ms
 
+    private static final String START = "start";
+    private static final String STOP = "stop";
+    private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");
+
     //FIXME add path length
 
     @Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
@@ -134,6 +148,11 @@
     @Reference(cardinality = MANDATORY_UNARY)
     protected IntentPerfCollector sampleCollector;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService communicationService;
+
+    private ExecutorService messageHandlingExecutor;
+
     private ExecutorService workers;
     private ApplicationId appId;
     private Listener listener;
@@ -145,86 +164,138 @@
     private int lastKey = 0;
 
     private IntentPerfUi perfUi;
+    private NodeId nodeId;
+    private TimerTask reporterTask;
 
     @Activate
-    public void activate() {
-//        configService.registerProperties(getClass());
+    public void activate(ComponentContext context) {
+        configService.registerProperties(getClass());
 
-        String nodeId = clusterService.getLocalNode().ip().toString();
-        appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId);
+        nodeId = clusterService.getLocalNode().id();
+        appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());
 
+        // TODO: replace with shared timer
         reportTimer = new Timer("onos-intent-perf-reporter");
         workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
 
-
         // disable flow backups for testing
         log.info("flow props: {}",
                  configService.getProperties("org.onosproject.store.flow.impl.DistributedFlowRuleStore"));
         configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
                                   "backupEnabled", "false");
 
-        // Schedule delayed start
-        reportTimer.schedule(new TimerTask() {
-            @Override
-            public void run() {
-                start();
-            }
-        }, START_DELAY);
+        // TODO: replace with shared executor
+        messageHandlingExecutor = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/perf", "command-handler"));
+
+        communicationService.addSubscriber(CONTROL, new InternalControl(),
+                                           messageHandlingExecutor);
+
+        listener = new Listener();
+        intentService.addListener(listener);
+
+        // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
+        modify(context);
     }
 
     @Deactivate
     public void deactivate() {
-//        configService.unregisterProperties(getClass(), false);
-        stop();
-    }
+        stopTestRun();
 
-    //FIXME add modified
+        configService.unregisterProperties(getClass(), false);
+        messageHandlingExecutor.shutdown();
+        communicationService.removeSubscriber(CONTROL);
 
-    private void logConfig(String prefix) {
-        log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
-                 prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
-    }
-
-    public void start() {
-        // adjust numNeighbors and generate list of neighbors
-        numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
-
-        // perhaps we want to prime before listening...
-        // we will need to discard the first few results for priming and warmup
-        listener = new Listener();
-        intentService.addListener(listener);
-
-        // Schedule reporter task on report period boundary
-        reportTimer.scheduleAtFixedRate(new TimerTask() {
-            @Override
-            public void run() {
-                //adjustRates(); // FIXME we currently adjust rates in the cycle thread
-                listener.report();
-            }
-        }, REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, REPORT_PERIOD);
-
-        // Submit workers
-        stopped = false;
-        for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
-            workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
-        }
-        logConfig("Started");
-    }
-
-    public void stop() {
-        stopped = true;
         if (listener != null) {
             reportTimer.cancel();
             intentService.removeListener(listener);
             listener = null;
             reportTimer = null;
         }
+    }
+
+    @Modified
+    public void modify(ComponentContext context) {
+        if (context == null) {
+            logConfig("Reconfigured");
+            return;
+        }
+
+        Dictionary<?, ?> properties = context.getProperties();
+        int newNumKeys, newCyclePeriod, newNumNeighbors;
         try {
-            workers.awaitTermination(5, TimeUnit.SECONDS);
+            String s = get(properties, "numKeys");
+            newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());
+
+            s = get(properties, "cyclePeriod");
+            newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());
+
+            s = get(properties, "numNeighbors");
+            newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException | ClassCastException e) {
+            log.warn("Malformed configuration detected; using defaults", e);
+            newNumKeys = DEFAULT_NUM_KEYS;
+            newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
+            newNumNeighbors = DEFAULT_NUM_NEIGHBORS;
+        }
+
+        if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
+            numKeys = newNumKeys;
+            cyclePeriod = newCyclePeriod;
+            numNeighbors = newNumNeighbors;
+            logConfig("Reconfigured");
+        }
+    }
+
+    public void start() {
+        communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, START.getBytes()));
+        startTestRun();
+    }
+
+    public void stop() {
+        communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, STOP.getBytes()));
+        stopTestRun();
+    }
+
+    private void logConfig(String prefix) {
+        log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
+                 prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
+    }
+
+    private void startTestRun() {
+        sampleCollector.clearSamples();
+
+        // adjust numNeighbors and generate list of neighbors
+        numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
+
+        // Schedule reporter task on report period boundary
+        reporterTask = new ReporterTask();
+        reportTimer.scheduleAtFixedRate(reporterTask,
+                                        REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
+                                        REPORT_PERIOD);
+
+        // Submit workers
+        stopped = false;
+        for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
+            workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
+        }
+        log.info("Started test run");
+    }
+
+    private void stopTestRun() {
+        stopped = true;
+        if (reporterTask != null) {
+            reporterTask.cancel();
+            reporterTask = null;
+        }
+
+        try {
+            workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             log.warn("Failed to stop worker", e);
         }
-        log.info("Stopped");
+        log.info("Stopped test run");
     }
 
     private List<NodeId> getNeighbors() {
@@ -491,4 +562,25 @@
         }
     }
 
+    private class InternalControl implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            String cmd = new String(message.payload());
+            log.info("Received command {}", cmd);
+            if (cmd.equals(START)) {
+                startTestRun();
+            } else {
+                stopTestRun();
+            }
+        }
+    }
+
+    private class ReporterTask extends TimerTask {
+        @Override
+        public void run() {
+            //adjustRates(); // FIXME we currently adjust rates in the cycle thread
+            listener.report();
+        }
+    }
+
 }
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCommand.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfListCommand.java
similarity index 87%
rename from apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCommand.java
rename to apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfListCommand.java
index 928a8f8..a556712 100644
--- a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCommand.java
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfListCommand.java
@@ -29,7 +29,7 @@
  */
 @Command(scope = "onos", name = "intent-perf",
         description = "Displays accumulated performance metrics")
-public class IntentPerfCommand extends AbstractShellCommand {
+public class IntentPerfListCommand extends AbstractShellCommand {
 
     @Option(name = "-s", aliases = "--summary", description = "Output just summary",
             required = false, multiValued = false)
@@ -49,11 +49,16 @@
         List<String> headers = collector.getSampleHeaders();
         Sample overall = collector.getOverall();
         double total = 0;
+        print("%12s: %14s", "Node ID", "Overall Rate");
         for (int i = 0; i < overall.data.length; i++) {
-            print("%12s: %12.2f", headers.get(i), overall.data[i]);
-            total += overall.data[i];
+            if (overall.data[i] >= 0) {
+                print("%12s: %14.2f", headers.get(i), overall.data[i]);
+                total += overall.data[i];
+            } else {
+                print("%12s: %14s", headers.get(i), " ");
+            }
         }
-        print("%12s: %12.2f", "total", total);
+        print("%12s: %14.2f", "total", total);
     }
 
     private void printSamples() {
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStartCommand.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStartCommand.java
new file mode 100644
index 0000000..3549153
--- /dev/null
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStartCommand.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.intentperf;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+
+/**
+ * Starts intent performance test run.
+ */
+@Command(scope = "onos", name = "intent-perf-start",
+        description = "Starts intent performance test run")
+public class IntentPerfStartCommand extends AbstractShellCommand {
+
+    @Override
+    protected void execute() {
+        get(IntentPerfInstaller.class).start();
+    }
+
+}
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStopCommand.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStopCommand.java
new file mode 100644
index 0000000..ac45cd8
--- /dev/null
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStopCommand.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.intentperf;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+
+/**
+ * Stops intent performance test run.
+ */
+@Command(scope = "onos", name = "intent-perf-stop",
+        description = "Stops intent performance test run")
+public class IntentPerfStopCommand extends AbstractShellCommand {
+
+    @Override
+    protected void execute() {
+        get(IntentPerfInstaller.class).stop();
+    }
+
+}
diff --git a/apps/intent-perf/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/apps/intent-perf/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 1f1871d..fc46d1d 100644
--- a/apps/intent-perf/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/apps/intent-perf/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -16,7 +16,13 @@
 <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
     <command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.1.0">
         <command>
-            <action class="org.onosproject.intentperf.IntentPerfCommand"/>
+            <action class="org.onosproject.intentperf.IntentPerfListCommand"/>
+        </command>
+        <command>
+            <action class="org.onosproject.intentperf.IntentPerfStartCommand"/>
+        </command>
+        <command>
+            <action class="org.onosproject.intentperf.IntentPerfStopCommand"/>
         </command>
     </command-bundle>
 </blueprint>