ONOS-1315 Adding configurability to intent performance test app.
Change-Id: I2782bdc0f78cc49aa60af1fa03eae91a2fd6bce0
diff --git a/apps/intent-perf/pom.xml b/apps/intent-perf/pom.xml
index 2d4ddf1..6015fb4 100644
--- a/apps/intent-perf/pom.xml
+++ b/apps/intent-perf/pom.xml
@@ -42,6 +42,10 @@
<artifactId>onos-cli</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
index 333fa4e..90eb9d7 100644
--- a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
@@ -50,7 +50,7 @@
@Service(value = IntentPerfCollector.class)
public class IntentPerfCollector {
- private static final long SAMPLE_WINDOW = 5_000;
+ private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
private final Logger log = getLogger(getClass());
private static final int MAX_SAMPLES = 1_000;
@@ -82,9 +82,9 @@
@Activate
public void activate() {
- this.nodeId = clusterService.getLocalNode().id();
- this.newestTime = 0;
+ nodeId = clusterService.getLocalNode().id();
+ // TODO: replace with shared executor
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/perf", "message-handler"));
@@ -98,9 +98,8 @@
for (int i = 0; i < nodes.length; i++) {
nodeToIndex.put(nodes[i].id(), i);
}
- overall = new Sample(0, nodes.length);
- current = new Sample(0, nodes.length);
+ clearSamples();
log.info("Started");
}
@@ -112,19 +111,26 @@
}
/**
+ * Clears all previously accumulated data.
+ */
+ public void clearSamples() {
+ newestTime = 0;
+ overall = new Sample(0, nodes.length);
+ current = new Sample(0, nodes.length);
+ samples.clear();
+ }
+
+
+ /**
* Records a sample point of data about intent operation rate.
*
* @param overallRate overall rate
* @param currentRate current rate
*/
public void recordSample(double overallRate, double currentRate) {
- try {
- long now = System.currentTimeMillis();
- addSample(now, nodeId, overallRate, currentRate);
- broadcastSample(now, nodeId, overallRate, currentRate);
- } catch (Exception e) {
- log.error("Boom!", e);
- }
+ long now = System.currentTimeMillis();
+ addSample(now, nodeId, overallRate, currentRate);
+ broadcastSample(now, nodeId, overallRate, currentRate);
}
/**
@@ -173,7 +179,7 @@
}
private Sample createCurrentSampleIfNeeded(long time) {
- Sample oldSample = time - newestTime > SAMPLE_WINDOW || current.isComplete() ? current : null;
+ Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
if (oldSample != null) {
newestTime = time;
current = new Sample(time, nodes.length);
@@ -227,9 +233,9 @@
@Override
public void handle(ClusterMessage message) {
String[] fields = new String(message.payload()).split("\\|");
- log.info("Received sample from {}: {}", message.sender(), fields);
+ log.debug("Received sample from {}: {}", message.sender(), fields);
addSample(Long.parseLong(fields[0]), message.sender(),
- Double.parseDouble(fields[1]), Double.parseDouble(fields[1]));
+ Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
}
}
}
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
index 029d70a..ff675c1 100644
--- a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -24,8 +24,11 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.MacAddress;
import org.onlab.util.Counter;
import org.onosproject.cfg.ComponentConfigService;
@@ -50,10 +53,16 @@
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.net.intent.PointToPointIntent;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Dictionary;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -65,11 +74,11 @@
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
-import static org.onlab.util.Tools.delay;
-import static org.onlab.util.Tools.groupedThreads;
+import static org.onlab.util.Tools.*;
import static org.onosproject.net.intent.IntentEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
@@ -77,20 +86,25 @@
* Application to test sustained intent throughput.
*/
@Component(immediate = true)
+@Service(value = IntentPerfInstaller.class)
public class IntentPerfInstaller {
private final Logger log = getLogger(getClass());
private static final int DEFAULT_NUM_WORKERS = 1;
- private static final int DEFAULT_NUM_KEYS = 40_000;
- private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1_000; //ms
+ private static final int DEFAULT_NUM_KEYS = 40000;
+ private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms
private static final int DEFAULT_NUM_NEIGHBORS = 0;
private static final int START_DELAY = 5_000; // ms
private static final int REPORT_PERIOD = 5_000; //ms
+ private static final String START = "start";
+ private static final String STOP = "stop";
+ private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");
+
//FIXME add path length
@Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
@@ -134,6 +148,11 @@
@Reference(cardinality = MANDATORY_UNARY)
protected IntentPerfCollector sampleCollector;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService communicationService;
+
+ private ExecutorService messageHandlingExecutor;
+
private ExecutorService workers;
private ApplicationId appId;
private Listener listener;
@@ -145,86 +164,138 @@
private int lastKey = 0;
private IntentPerfUi perfUi;
+ private NodeId nodeId;
+ private TimerTask reporterTask;
@Activate
- public void activate() {
-// configService.registerProperties(getClass());
+ public void activate(ComponentContext context) {
+ configService.registerProperties(getClass());
- String nodeId = clusterService.getLocalNode().ip().toString();
- appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId);
+ nodeId = clusterService.getLocalNode().id();
+ appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());
+ // TODO: replace with shared timer
reportTimer = new Timer("onos-intent-perf-reporter");
workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
-
// disable flow backups for testing
log.info("flow props: {}",
configService.getProperties("org.onosproject.store.flow.impl.DistributedFlowRuleStore"));
configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
"backupEnabled", "false");
- // Schedule delayed start
- reportTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- start();
- }
- }, START_DELAY);
+ // TODO: replace with shared executor
+ messageHandlingExecutor = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/perf", "command-handler"));
+
+ communicationService.addSubscriber(CONTROL, new InternalControl(),
+ messageHandlingExecutor);
+
+ listener = new Listener();
+ intentService.addListener(listener);
+
+ // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
+ modify(context);
}
@Deactivate
public void deactivate() {
-// configService.unregisterProperties(getClass(), false);
- stop();
- }
+ stopTestRun();
- //FIXME add modified
+ configService.unregisterProperties(getClass(), false);
+ messageHandlingExecutor.shutdown();
+ communicationService.removeSubscriber(CONTROL);
- private void logConfig(String prefix) {
- log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
- prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
- }
-
- public void start() {
- // adjust numNeighbors and generate list of neighbors
- numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
-
- // perhaps we want to prime before listening...
- // we will need to discard the first few results for priming and warmup
- listener = new Listener();
- intentService.addListener(listener);
-
- // Schedule reporter task on report period boundary
- reportTimer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- //adjustRates(); // FIXME we currently adjust rates in the cycle thread
- listener.report();
- }
- }, REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, REPORT_PERIOD);
-
- // Submit workers
- stopped = false;
- for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
- workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
- }
- logConfig("Started");
- }
-
- public void stop() {
- stopped = true;
if (listener != null) {
reportTimer.cancel();
intentService.removeListener(listener);
listener = null;
reportTimer = null;
}
+ }
+
+ @Modified
+ public void modify(ComponentContext context) {
+ if (context == null) {
+ logConfig("Reconfigured");
+ return;
+ }
+
+ Dictionary<?, ?> properties = context.getProperties();
+ int newNumKeys, newCyclePeriod, newNumNeighbors;
try {
- workers.awaitTermination(5, TimeUnit.SECONDS);
+ String s = get(properties, "numKeys");
+ newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());
+
+ s = get(properties, "cyclePeriod");
+ newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());
+
+ s = get(properties, "numNeighbors");
+ newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());
+
+ } catch (NumberFormatException | ClassCastException e) {
+ log.warn("Malformed configuration detected; using defaults", e);
+ newNumKeys = DEFAULT_NUM_KEYS;
+ newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
+ newNumNeighbors = DEFAULT_NUM_NEIGHBORS;
+ }
+
+ if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
+ numKeys = newNumKeys;
+ cyclePeriod = newCyclePeriod;
+ numNeighbors = newNumNeighbors;
+ logConfig("Reconfigured");
+ }
+ }
+
+ public void start() {
+ communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, START.getBytes()));
+ startTestRun();
+ }
+
+ public void stop() {
+ communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, STOP.getBytes()));
+ stopTestRun();
+ }
+
+ private void logConfig(String prefix) {
+ log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
+ prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
+ }
+
+ private void startTestRun() {
+ sampleCollector.clearSamples();
+
+ // adjust numNeighbors and generate list of neighbors
+ numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
+
+ // Schedule reporter task on report period boundary
+ reporterTask = new ReporterTask();
+ reportTimer.scheduleAtFixedRate(reporterTask,
+ REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
+ REPORT_PERIOD);
+
+ // Submit workers
+ stopped = false;
+ for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
+ workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
+ }
+ log.info("Started test run");
+ }
+
+ private void stopTestRun() {
+ stopped = true;
+ if (reporterTask != null) {
+ reporterTask.cancel();
+ reporterTask = null;
+ }
+
+ try {
+ workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("Failed to stop worker", e);
}
- log.info("Stopped");
+ log.info("Stopped test run");
}
private List<NodeId> getNeighbors() {
@@ -491,4 +562,25 @@
}
}
+ private class InternalControl implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ String cmd = new String(message.payload());
+ log.info("Received command {}", cmd);
+ if (cmd.equals(START)) {
+ startTestRun();
+ } else {
+ stopTestRun();
+ }
+ }
+ }
+
+ private class ReporterTask extends TimerTask {
+ @Override
+ public void run() {
+ //adjustRates(); // FIXME we currently adjust rates in the cycle thread
+ listener.report();
+ }
+ }
+
}
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCommand.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfListCommand.java
similarity index 87%
rename from apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCommand.java
rename to apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfListCommand.java
index 928a8f8..a556712 100644
--- a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCommand.java
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfListCommand.java
@@ -29,7 +29,7 @@
*/
@Command(scope = "onos", name = "intent-perf",
description = "Displays accumulated performance metrics")
-public class IntentPerfCommand extends AbstractShellCommand {
+public class IntentPerfListCommand extends AbstractShellCommand {
@Option(name = "-s", aliases = "--summary", description = "Output just summary",
required = false, multiValued = false)
@@ -49,11 +49,16 @@
List<String> headers = collector.getSampleHeaders();
Sample overall = collector.getOverall();
double total = 0;
+ print("%12s: %14s", "Node ID", "Overall Rate");
for (int i = 0; i < overall.data.length; i++) {
- print("%12s: %12.2f", headers.get(i), overall.data[i]);
- total += overall.data[i];
+ if (overall.data[i] >= 0) {
+ print("%12s: %14.2f", headers.get(i), overall.data[i]);
+ total += overall.data[i];
+ } else {
+ print("%12s: %14s", headers.get(i), " ");
+ }
}
- print("%12s: %12.2f", "total", total);
+ print("%12s: %14.2f", "total", total);
}
private void printSamples() {
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStartCommand.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStartCommand.java
new file mode 100644
index 0000000..3549153
--- /dev/null
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStartCommand.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.intentperf;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+
+/**
+ * Starts intent performance test run.
+ */
+@Command(scope = "onos", name = "intent-perf-start",
+ description = "Starts intent performance test run")
+public class IntentPerfStartCommand extends AbstractShellCommand {
+
+ @Override
+ protected void execute() {
+ get(IntentPerfInstaller.class).start();
+ }
+
+}
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStopCommand.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStopCommand.java
new file mode 100644
index 0000000..ac45cd8
--- /dev/null
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStopCommand.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.intentperf;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+
+/**
+ * Stops intent performance test run.
+ */
+@Command(scope = "onos", name = "intent-perf-stop",
+ description = "Stops intent performance test run")
+public class IntentPerfStopCommand extends AbstractShellCommand {
+
+ @Override
+ protected void execute() {
+ get(IntentPerfInstaller.class).stop();
+ }
+
+}
diff --git a/apps/intent-perf/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/apps/intent-perf/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 1f1871d..fc46d1d 100644
--- a/apps/intent-perf/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/apps/intent-perf/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -16,7 +16,13 @@
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
<command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.1.0">
<command>
- <action class="org.onosproject.intentperf.IntentPerfCommand"/>
+ <action class="org.onosproject.intentperf.IntentPerfListCommand"/>
+ </command>
+ <command>
+ <action class="org.onosproject.intentperf.IntentPerfStartCommand"/>
+ </command>
+ <command>
+ <action class="org.onosproject.intentperf.IntentPerfStopCommand"/>
</command>
</command-bundle>
</blueprint>