blob: 7af34afa1217ecdd8615c502159da188995e88bc [file] [log] [blame]
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.primitiveperf;
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import com.google.common.collect.Lists;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.System.currentTimeMillis;
import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Application to test sustained primitive throughput.
*/
@Component(immediate = true)
@Service(value = PrimitivePerfApp.class)
public class PrimitivePerfApp {
private final Logger log = getLogger(getClass());
private static final int DEFAULT_NUM_CLIENTS = 64;
private static final int DEFAULT_WRITE_PERCENTAGE = 100;
private static final int REPORT_PERIOD = 1_000; //ms
private static final String START = "start";
private static final String STOP = "stop";
private static final MessageSubject CONTROL = new MessageSubject("primitive-perf-ctl");
@Property(name = "numClients", intValue = DEFAULT_NUM_CLIENTS,
label = "Number of clients to use to submit writes")
private int numClients = DEFAULT_NUM_CLIENTS;
@Property(name = "writePercentage", intValue = DEFAULT_WRITE_PERCENTAGE,
label = "Percentage of operations to perform as writes")
private int writePercentage = DEFAULT_WRITE_PERCENTAGE;
@Reference(cardinality = MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = MANDATORY_UNARY)
protected ComponentConfigService configService;
@Reference(cardinality = MANDATORY_UNARY)
protected PrimitivePerfCollector sampleCollector;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService communicationService;
private ExecutorService messageHandlingExecutor;
private ExecutorService workers;
private boolean stopped = true;
private Timer reportTimer;
private NodeId nodeId;
private TimerTask reporterTask;
private long startTime;
private long currentStartTime;
private AtomicLong overallCounter;
private AtomicLong currentCounter;
@Activate
public void activate(ComponentContext context) {
configService.registerProperties(getClass());
nodeId = clusterService.getLocalNode().id();
reportTimer = new Timer("onos-primitive-perf-reporter");
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/perf", "command-handler"));
communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
messageHandlingExecutor);
// TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
modify(context);
}
@Deactivate
public void deactivate() {
stopTestRun();
configService.unregisterProperties(getClass(), false);
messageHandlingExecutor.shutdown();
communicationService.removeSubscriber(CONTROL);
if (reportTimer != null) {
reportTimer.cancel();
reportTimer = null;
}
}
@Modified
public void modify(ComponentContext context) {
if (context == null) {
logConfig("Reconfigured");
return;
}
Dictionary<?, ?> properties = context.getProperties();
int newNumClients;
try {
String s = get(properties, "numClients");
newNumClients = isNullOrEmpty(s) ? numClients : Integer.parseInt(s.trim());
} catch (NumberFormatException | ClassCastException e) {
log.warn("Malformed configuration detected; using defaults", e);
newNumClients = DEFAULT_NUM_CLIENTS;
}
int newWritePercentage;
try {
String s = get(properties, "writePercentage");
newWritePercentage = isNullOrEmpty(s) ? writePercentage : Integer.parseInt(s.trim());
} catch (NumberFormatException | ClassCastException e) {
log.warn("Malformed configuration detected; using defaults", e);
newWritePercentage = DEFAULT_WRITE_PERCENTAGE;
}
if (newNumClients != numClients || newWritePercentage != writePercentage) {
numClients = newNumClients;
writePercentage = newWritePercentage;
logConfig("Reconfigured");
if (!stopped) {
stop();
start();
}
}
}
public void start() {
if (stopped) {
stopped = false;
communicationService.broadcast(START, CONTROL, str -> str.getBytes());
startTestRun();
}
}
public void stop() {
if (!stopped) {
communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
stopTestRun();
}
}
private void logConfig(String prefix) {
log.info("{} with numClients = {}; writePercentage = {}", prefix, numClients, writePercentage);
}
private void startTestRun() {
sampleCollector.clearSamples();
startTime = System.currentTimeMillis();
currentStartTime = startTime;
currentCounter = new AtomicLong();
overallCounter = new AtomicLong();
reporterTask = new ReporterTask();
reportTimer.scheduleAtFixedRate(reporterTask,
REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
REPORT_PERIOD);
stopped = false;
Map<String, ControllerNode> nodes = new TreeMap<>();
for (ControllerNode node : clusterService.getNodes()) {
nodes.put(node.id().id(), node);
}
// Compute the index of the local node in a sorted list of nodes.
List<String> sortedNodes = Lists.newArrayList(nodes.keySet());
int nodeCount = nodes.size();
int index = sortedNodes.indexOf(nodeId.id());
// Count the number of workers assigned to this node.
int workerCount = 0;
for (int i = 1; i <= numClients; i++) {
if (i % nodeCount == index) {
workerCount++;
}
}
// Create a worker pool and start the workers for this node.
workers = Executors.newFixedThreadPool(workerCount, groupedThreads("onos/primitive-perf", "worker-%d"));
for (int i = 0; i < workerCount; i++) {
workers.submit(new Runner(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
}
log.info("Started test run");
}
private void stopTestRun() {
if (reporterTask != null) {
reporterTask.cancel();
reporterTask = null;
}
try {
workers.awaitTermination(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("Failed to stop worker", e);
}
sampleCollector.recordSample(0, 0);
sampleCollector.recordSample(0, 0);
stopped = true;
log.info("Stopped test run");
}
// Submits primitive operations.
final class Runner implements Runnable {
private final String key;
private final String value;
private ConsistentMap<String, String> map;
private Runner(String key, String value) {
this.key = key;
this.value = value;
}
@Override
public void run() {
setup();
while (!stopped) {
try {
submit();
} catch (Exception e) {
log.warn("Exception during cycle", e);
}
}
teardown();
}
private void setup() {
map = storageService.<String, String>consistentMapBuilder()
.withName("perf-test")
.withSerializer(Serializer.using(KryoNamespaces.BASIC))
.build();
}
private void submit() {
if (currentCounter.incrementAndGet() % 100 < writePercentage) {
map.put(key, value);
} else {
map.get(key);
}
}
private void teardown() {
map.destroy();
}
}
private class InternalControl implements Consumer<String> {
@Override
public void accept(String cmd) {
log.info("Received command {}", cmd);
if (cmd.equals(START)) {
startTestRun();
} else {
stopTestRun();
}
}
}
private class ReporterTask extends TimerTask {
@Override
public void run() {
long endTime = System.currentTimeMillis();
long overallTime = endTime - startTime;
long currentTime = endTime - currentStartTime;
long currentCount = currentCounter.getAndSet(0);
long overallCount = overallCounter.addAndGet(currentCount);
sampleCollector.recordSample(overallTime > 0 ? overallCount / (overallTime / 1000d) : 0,
currentTime > 0 ? currentCount / (currentTime / 1000d) : 0);
currentStartTime = System.currentTimeMillis();
}
}
}