blob: 7fe743cf06506a867d4980394b87173676892787 [file] [log] [blame]
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.foo;
import com.google.common.collect.Lists;
import org.onlab.nio.AcceptorLoop;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.lang.String.format;
import static java.lang.System.nanoTime;
import static java.lang.System.out;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.namedThreads;
/**
* Auxiliary test fixture to measure speed of NIO-based channels.
*/
public class IOLoopTestServer {
private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
private static final int PRUNE_FREQUENCY = 1000;
static final int PORT = 9876;
static final long TIMEOUT = 1000;
static final boolean SO_NO_DELAY = false;
static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
private final AcceptorLoop aloop;
private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
private final List<CustomIOLoop> iloops = new ArrayList<>();
private final ExecutorService ipool;
private final int workerCount;
private final int msgLength;
private int lastWorker = -1;
Counter messages;
Counter bytes;
/**
* Main entry point to launch the server.
*
* @param args command-line arguments
* @throws java.io.IOException if unable to crate IO loops
*/
public static void main(String[] args) throws IOException {
startStandalone(args);
System.exit(0);
}
/*
* Starts a standalone IO loop test server.
*
* @param args command-line arguments
*/
public static void startStandalone(String[] args) throws IOException {
InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
log.info("Setting up the server with {} workers, {} byte messages on {}... ",
wc, ml, ip);
IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
server.start();
// Start pruning clients and keep going until their number goes to 0.
int remaining = -1;
while (remaining == -1 || remaining > 0) {
delay(PRUNE_FREQUENCY);
int r = server.prune();
remaining = remaining == -1 && r == 0 ? remaining : r;
}
server.stop();
}
/**
* Creates a speed server.
*
* @param ip optional ip of the adapter where to bind
* @param wc worker count
* @param ml message length in bytes
* @param port listen port
* @throws java.io.IOException if unable to create IO loops
*/
public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
this.workerCount = wc;
this.msgLength = ml;
this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
for (int i = 0; i < workerCount; i++) {
iloops.add(new CustomIOLoop());
}
}
/**
* Start the server IO loops and kicks off throughput tracking.
*/
public void start() {
messages = new Counter();
bytes = new Counter();
for (CustomIOLoop l : iloops) {
ipool.execute(l);
}
apool.execute(aloop);
for (CustomIOLoop l : iloops) {
l.awaitStart(TIMEOUT);
}
aloop.awaitStart(TIMEOUT);
}
/**
* Stop the server IO loops and freezes throughput tracking.
*/
public void stop() {
aloop.shutdown();
for (CustomIOLoop l : iloops) {
l.shutdown();
}
for (CustomIOLoop l : iloops) {
l.awaitStop(TIMEOUT);
}
aloop.awaitStop(TIMEOUT);
messages.freeze();
bytes.freeze();
}
/**
* Reports on the accumulated throughput and latency.
*/
public void report() {
DecimalFormat f = new DecimalFormat("#,##0");
out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
f.format(messages.total()), f.format(bytes.total()),
f.format(messages.throughput()),
f.format(bytes.throughput() / (1024 * msgLength))));
}
/**
* Prunes the IO loops of stale message buffers.
*
* @return number of remaining IO loops among all workers.
*/
public int prune() {
int count = 0;
for (CustomIOLoop l : iloops) {
count += l.pruneStaleStreams();
}
return count;
}
// Get the next worker to which a client should be assigned
private synchronized CustomIOLoop nextWorker() {
lastWorker = (lastWorker + 1) % workerCount;
return iloops.get(lastWorker);
}
// Loop for transfer of fixed-length messages
private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
public CustomIOLoop() throws IOException {
super(500);
}
@Override
protected TestMessageStream createStream(ByteChannel channel) {
return new TestMessageStream(msgLength, channel, this);
}
@Override
protected void removeStream(MessageStream<TestMessage> stream) {
super.removeStream(stream);
messages.add(stream.messagesIn().total());
bytes.add(stream.bytesIn().total());
}
@Override
protected void processMessages(List<TestMessage> messages,
MessageStream<TestMessage> stream) {
try {
stream.write(createResponses(messages));
} catch (IOException e) {
log.error("Unable to echo messages", e);
}
}
private List<TestMessage> createResponses(List<TestMessage> messages) {
List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
for (TestMessage message : messages) {
responses.add(new TestMessage(message.length(), message.requestorTime(),
nanoTime(), message.padding()));
}
return responses;
}
}
// Loop for accepting client connections
private class CustomAcceptLoop extends AcceptorLoop {
public CustomAcceptLoop(SocketAddress address) throws IOException {
super(500, address);
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
nextWorker().acceptStream(sc);
log.info("Connected client");
}
}
}