blob: 430211f5bc86194b7da136a9486f08408e646c64 [file] [log] [blame]
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.databaseperf;
import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
/**
* Application to measure partitioned database performance.
*/
@Component(immediate = true)
public class DatabasePerfInstaller {
private final Logger log = getLogger(getClass());
@Reference(cardinality = MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = MANDATORY_UNARY)
protected StorageService storageService;
private boolean stopped;
private ApplicationId appId;
private static final long REPORT_PERIOD = 5000L; //ms
private Timer reportTimer;
private AtomicInteger successCount = new AtomicInteger(0);
private AtomicInteger failureCount = new AtomicInteger(0);
private ConsistentMap<String, String> cmap;
private ControllerNode localNode;
private long reportStartTime = System.currentTimeMillis();
private static final int NUM_TASK_THREADS = 2;
private ExecutorService taskExecutor;
private static final Serializer SERIALIZER = new Serializer() {
KryoNamespace kryo = new KryoNamespace.Builder().build();
@Override
public <T> byte[] encode(T object) {
return kryo.serialize(object);
}
@Override
public <T> T decode(byte[] bytes) {
return kryo.deserialize(bytes);
}
};
@Activate
public void activate() {
localNode = clusterService.getLocalNode();
String nodeId = localNode.ip().toString();
appId = coreService.registerApplication("org.onosproject.nettyperf."
+ nodeId);
cmap = storageService.createConsistentMap("onos-app-database-perf-test-map", SERIALIZER);
taskExecutor = Executors.newFixedThreadPool(NUM_TASK_THREADS, groupedThreads("onos/database-perf", "worker"));
log.info("Started with Application ID {}", appId.id());
start();
}
@Deactivate
public void deactivate() {
stop();
log.info("Stopped");
}
public void start() {
long delay = System.currentTimeMillis() % REPORT_PERIOD;
reportTimer = new Timer("onos-netty-perf-reporter");
reportTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
report();
}
}, delay, REPORT_PERIOD);
stopped = false;
IntStream.range(0, NUM_TASK_THREADS).forEach(i -> {
taskExecutor.submit(() -> {
delay(2000); // take a breath to start
while (!stopped) {
performDBOperation();
delay(2); // take a breather
}
});
});
}
private void performDBOperation() {
String key = String.format("test%d", RandomUtils.nextInt(1000));
try {
if (RandomUtils.nextBoolean()) {
cmap.put(key, UUID.randomUUID().toString());
} else {
cmap.get(key);
}
successCount.incrementAndGet();
} catch (Exception e) {
failureCount.incrementAndGet();
}
}
private void report() {
long delta = System.currentTimeMillis() - reportStartTime;
if (delta > 0) {
int rate = (int) Math.round(((successCount.get() * 1000.0) / delta));
log.info("Passed: {}, Failed: {}, Rate: {}",
successCount.getAndSet(0), failureCount.getAndSet(0), rate);
reportStartTime = System.currentTimeMillis();
}
}
public void stop() {
reportTimer.cancel();
reportTimer = null;
stopped = true;
try {
taskExecutor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Failed to stop worker.");
}
}
}