ONOS-1315 Adding configurability to intent performance test app.

Change-Id: I2782bdc0f78cc49aa60af1fa03eae91a2fd6bce0
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
index 029d70a..ff675c1 100644
--- a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -24,8 +24,11 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
 import org.onlab.packet.MacAddress;
 import org.onlab.util.Counter;
 import org.onosproject.cfg.ComponentConfigService;
@@ -50,10 +53,16 @@
 import org.onosproject.net.intent.Key;
 import org.onosproject.net.intent.PartitionService;
 import org.onosproject.net.intent.PointToPointIntent;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -65,11 +74,11 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.lang.String.format;
 import static java.lang.System.currentTimeMillis;
 import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
-import static org.onlab.util.Tools.delay;
-import static org.onlab.util.Tools.groupedThreads;
+import static org.onlab.util.Tools.*;
 import static org.onosproject.net.intent.IntentEvent.Type.*;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -77,20 +86,25 @@
  * Application to test sustained intent throughput.
  */
 @Component(immediate = true)
+@Service(value = IntentPerfInstaller.class)
 public class IntentPerfInstaller {
 
     private final Logger log = getLogger(getClass());
 
     private static final int DEFAULT_NUM_WORKERS = 1;
 
-    private static final int DEFAULT_NUM_KEYS = 40_000;
-    private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1_000; //ms
+    private static final int DEFAULT_NUM_KEYS = 40000;
+    private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms
 
     private static final int DEFAULT_NUM_NEIGHBORS = 0;
 
     private static final int START_DELAY = 5_000; // ms
     private static final int REPORT_PERIOD = 5_000; //ms
 
+    private static final String START = "start";
+    private static final String STOP = "stop";
+    private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");
+
     //FIXME add path length
 
     @Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
@@ -134,6 +148,11 @@
     @Reference(cardinality = MANDATORY_UNARY)
     protected IntentPerfCollector sampleCollector;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService communicationService;
+
+    private ExecutorService messageHandlingExecutor;
+
     private ExecutorService workers;
     private ApplicationId appId;
     private Listener listener;
@@ -145,86 +164,138 @@
     private int lastKey = 0;
 
     private IntentPerfUi perfUi;
+    private NodeId nodeId;
+    private TimerTask reporterTask;
 
     @Activate
-    public void activate() {
-//        configService.registerProperties(getClass());
+    public void activate(ComponentContext context) {
+        configService.registerProperties(getClass());
 
-        String nodeId = clusterService.getLocalNode().ip().toString();
-        appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId);
+        nodeId = clusterService.getLocalNode().id();
+        appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());
 
+        // TODO: replace with shared timer
         reportTimer = new Timer("onos-intent-perf-reporter");
         workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
 
-
         // disable flow backups for testing
         log.info("flow props: {}",
                  configService.getProperties("org.onosproject.store.flow.impl.DistributedFlowRuleStore"));
         configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
                                   "backupEnabled", "false");
 
-        // Schedule delayed start
-        reportTimer.schedule(new TimerTask() {
-            @Override
-            public void run() {
-                start();
-            }
-        }, START_DELAY);
+        // TODO: replace with shared executor
+        messageHandlingExecutor = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/perf", "command-handler"));
+
+        communicationService.addSubscriber(CONTROL, new InternalControl(),
+                                           messageHandlingExecutor);
+
+        listener = new Listener();
+        intentService.addListener(listener);
+
+        // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
+        modify(context);
     }
 
     @Deactivate
     public void deactivate() {
-//        configService.unregisterProperties(getClass(), false);
-        stop();
-    }
+        stopTestRun();
 
-    //FIXME add modified
+        configService.unregisterProperties(getClass(), false);
+        messageHandlingExecutor.shutdown();
+        communicationService.removeSubscriber(CONTROL);
 
-    private void logConfig(String prefix) {
-        log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
-                 prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
-    }
-
-    public void start() {
-        // adjust numNeighbors and generate list of neighbors
-        numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
-
-        // perhaps we want to prime before listening...
-        // we will need to discard the first few results for priming and warmup
-        listener = new Listener();
-        intentService.addListener(listener);
-
-        // Schedule reporter task on report period boundary
-        reportTimer.scheduleAtFixedRate(new TimerTask() {
-            @Override
-            public void run() {
-                //adjustRates(); // FIXME we currently adjust rates in the cycle thread
-                listener.report();
-            }
-        }, REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, REPORT_PERIOD);
-
-        // Submit workers
-        stopped = false;
-        for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
-            workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
-        }
-        logConfig("Started");
-    }
-
-    public void stop() {
-        stopped = true;
         if (listener != null) {
             reportTimer.cancel();
             intentService.removeListener(listener);
             listener = null;
             reportTimer = null;
         }
+    }
+
+    @Modified
+    public void modify(ComponentContext context) {
+        if (context == null) {
+            logConfig("Reconfigured");
+            return;
+        }
+
+        Dictionary<?, ?> properties = context.getProperties();
+        int newNumKeys, newCyclePeriod, newNumNeighbors;
         try {
-            workers.awaitTermination(5, TimeUnit.SECONDS);
+            String s = get(properties, "numKeys");
+            newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());
+
+            s = get(properties, "cyclePeriod");
+            newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());
+
+            s = get(properties, "numNeighbors");
+            newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException | ClassCastException e) {
+            log.warn("Malformed configuration detected; using defaults", e);
+            newNumKeys = DEFAULT_NUM_KEYS;
+            newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
+            newNumNeighbors = DEFAULT_NUM_NEIGHBORS;
+        }
+
+        if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
+            numKeys = newNumKeys;
+            cyclePeriod = newCyclePeriod;
+            numNeighbors = newNumNeighbors;
+            logConfig("Reconfigured");
+        }
+    }
+
+    public void start() {
+        communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, START.getBytes()));
+        startTestRun();
+    }
+
+    public void stop() {
+        communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, STOP.getBytes()));
+        stopTestRun();
+    }
+
+    private void logConfig(String prefix) {
+        log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
+                 prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
+    }
+
+    private void startTestRun() {
+        sampleCollector.clearSamples();
+
+        // adjust numNeighbors and generate list of neighbors
+        numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
+
+        // Schedule reporter task on report period boundary
+        reporterTask = new ReporterTask();
+        reportTimer.scheduleAtFixedRate(reporterTask,
+                                        REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
+                                        REPORT_PERIOD);
+
+        // Submit workers
+        stopped = false;
+        for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
+            workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
+        }
+        log.info("Started test run");
+    }
+
+    private void stopTestRun() {
+        stopped = true;
+        if (reporterTask != null) {
+            reporterTask.cancel();
+            reporterTask = null;
+        }
+
+        try {
+            workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             log.warn("Failed to stop worker", e);
         }
-        log.info("Stopped");
+        log.info("Stopped test run");
     }
 
     private List<NodeId> getNeighbors() {
@@ -491,4 +562,25 @@
         }
     }
 
+    private class InternalControl implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            String cmd = new String(message.payload());
+            log.info("Received command {}", cmd);
+            if (cmd.equals(START)) {
+                startTestRun();
+            } else {
+                stopTestRun();
+            }
+        }
+    }
+
+    private class ReporterTask extends TimerTask {
+        @Override
+        public void run() {
+            //adjustRates(); // FIXME we currently adjust rates in the cycle thread
+            listener.report();
+        }
+    }
+
 }