ONOS-1315 Adding configurability to intent performance test app.
Change-Id: I2782bdc0f78cc49aa60af1fa03eae91a2fd6bce0
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
index 029d70a..ff675c1 100644
--- a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -24,8 +24,11 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.MacAddress;
import org.onlab.util.Counter;
import org.onosproject.cfg.ComponentConfigService;
@@ -50,10 +53,16 @@
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.net.intent.PointToPointIntent;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Dictionary;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -65,11 +74,11 @@
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
-import static org.onlab.util.Tools.delay;
-import static org.onlab.util.Tools.groupedThreads;
+import static org.onlab.util.Tools.*;
import static org.onosproject.net.intent.IntentEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
@@ -77,20 +86,25 @@
* Application to test sustained intent throughput.
*/
@Component(immediate = true)
+@Service(value = IntentPerfInstaller.class)
public class IntentPerfInstaller {
private final Logger log = getLogger(getClass());
private static final int DEFAULT_NUM_WORKERS = 1;
- private static final int DEFAULT_NUM_KEYS = 40_000;
- private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1_000; //ms
+ private static final int DEFAULT_NUM_KEYS = 40000;
+ private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms
private static final int DEFAULT_NUM_NEIGHBORS = 0;
private static final int START_DELAY = 5_000; // ms
private static final int REPORT_PERIOD = 5_000; //ms
+ private static final String START = "start";
+ private static final String STOP = "stop";
+ private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");
+
//FIXME add path length
@Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
@@ -134,6 +148,11 @@
@Reference(cardinality = MANDATORY_UNARY)
protected IntentPerfCollector sampleCollector;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService communicationService;
+
+ private ExecutorService messageHandlingExecutor;
+
private ExecutorService workers;
private ApplicationId appId;
private Listener listener;
@@ -145,86 +164,138 @@
private int lastKey = 0;
private IntentPerfUi perfUi;
+ private NodeId nodeId;
+ private TimerTask reporterTask;
@Activate
- public void activate() {
-// configService.registerProperties(getClass());
+ public void activate(ComponentContext context) {
+ configService.registerProperties(getClass());
- String nodeId = clusterService.getLocalNode().ip().toString();
- appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId);
+ nodeId = clusterService.getLocalNode().id();
+ appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());
+ // TODO: replace with shared timer
reportTimer = new Timer("onos-intent-perf-reporter");
workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
-
// disable flow backups for testing
log.info("flow props: {}",
configService.getProperties("org.onosproject.store.flow.impl.DistributedFlowRuleStore"));
configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
"backupEnabled", "false");
- // Schedule delayed start
- reportTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- start();
- }
- }, START_DELAY);
+ // TODO: replace with shared executor
+ messageHandlingExecutor = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/perf", "command-handler"));
+
+ communicationService.addSubscriber(CONTROL, new InternalControl(),
+ messageHandlingExecutor);
+
+ listener = new Listener();
+ intentService.addListener(listener);
+
+ // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
+ modify(context);
}
@Deactivate
public void deactivate() {
-// configService.unregisterProperties(getClass(), false);
- stop();
- }
+ stopTestRun();
- //FIXME add modified
+ configService.unregisterProperties(getClass(), false);
+ messageHandlingExecutor.shutdown();
+ communicationService.removeSubscriber(CONTROL);
- private void logConfig(String prefix) {
- log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
- prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
- }
-
- public void start() {
- // adjust numNeighbors and generate list of neighbors
- numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
-
- // perhaps we want to prime before listening...
- // we will need to discard the first few results for priming and warmup
- listener = new Listener();
- intentService.addListener(listener);
-
- // Schedule reporter task on report period boundary
- reportTimer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- //adjustRates(); // FIXME we currently adjust rates in the cycle thread
- listener.report();
- }
- }, REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, REPORT_PERIOD);
-
- // Submit workers
- stopped = false;
- for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
- workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
- }
- logConfig("Started");
- }
-
- public void stop() {
- stopped = true;
if (listener != null) {
reportTimer.cancel();
intentService.removeListener(listener);
listener = null;
reportTimer = null;
}
+ }
+
+ @Modified
+ public void modify(ComponentContext context) {
+ if (context == null) {
+ logConfig("Reconfigured");
+ return;
+ }
+
+ Dictionary<?, ?> properties = context.getProperties();
+ int newNumKeys, newCyclePeriod, newNumNeighbors;
try {
- workers.awaitTermination(5, TimeUnit.SECONDS);
+ String s = get(properties, "numKeys");
+ newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());
+
+ s = get(properties, "cyclePeriod");
+ newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());
+
+ s = get(properties, "numNeighbors");
+ newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());
+
+ } catch (NumberFormatException | ClassCastException e) {
+ log.warn("Malformed configuration detected; using defaults", e);
+ newNumKeys = DEFAULT_NUM_KEYS;
+ newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
+ newNumNeighbors = DEFAULT_NUM_NEIGHBORS;
+ }
+
+ if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
+ numKeys = newNumKeys;
+ cyclePeriod = newCyclePeriod;
+ numNeighbors = newNumNeighbors;
+ logConfig("Reconfigured");
+ }
+ }
+
+ public void start() {
+ communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, START.getBytes()));
+ startTestRun();
+ }
+
+ public void stop() {
+ communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, STOP.getBytes()));
+ stopTestRun();
+ }
+
+ private void logConfig(String prefix) {
+ log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
+ prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
+ }
+
+ private void startTestRun() {
+ sampleCollector.clearSamples();
+
+ // adjust numNeighbors and generate list of neighbors
+ numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
+
+ // Schedule reporter task on report period boundary
+ reporterTask = new ReporterTask();
+ reportTimer.scheduleAtFixedRate(reporterTask,
+ REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
+ REPORT_PERIOD);
+
+ // Submit workers
+ stopped = false;
+ for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
+ workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
+ }
+ log.info("Started test run");
+ }
+
+ private void stopTestRun() {
+ stopped = true;
+ if (reporterTask != null) {
+ reporterTask.cancel();
+ reporterTask = null;
+ }
+
+ try {
+ workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("Failed to stop worker", e);
}
- log.info("Stopped");
+ log.info("Stopped test run");
}
private List<NodeId> getNeighbors() {
@@ -491,4 +562,25 @@
}
}
+ private class InternalControl implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ String cmd = new String(message.payload());
+ log.info("Received command {}", cmd);
+ if (cmd.equals(START)) {
+ startTestRun();
+ } else {
+ stopTestRun();
+ }
+ }
+ }
+
+ private class ReporterTask extends TimerTask {
+ @Override
+ public void run() {
+ //adjustRates(); // FIXME we currently adjust rates in the cycle thread
+ listener.report();
+ }
+ }
+
}