blob: 18566d7a430abae3aa14edf7219d07d9e7c7c4a5 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
tom1ae3d162014-09-26 09:38:16 -07003import com.google.common.collect.Lists;
tom2d6d3972014-09-25 22:38:57 -07004import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -07005import org.slf4j.Logger;
6import org.slf4j.LoggerFactory;
7
8import java.io.IOException;
9import java.net.InetAddress;
10import java.net.InetSocketAddress;
11import java.net.Socket;
12import java.net.SocketAddress;
13import java.nio.channels.ByteChannel;
14import java.nio.channels.ServerSocketChannel;
15import java.nio.channels.SocketChannel;
16import java.text.DecimalFormat;
17import java.util.ArrayList;
18import java.util.List;
19import java.util.concurrent.ExecutorService;
20import java.util.concurrent.Executors;
21
tom74d49652014-09-25 23:48:46 -070022import static java.lang.String.format;
tom1ae3d162014-09-26 09:38:16 -070023import static java.lang.System.currentTimeMillis;
tom74d49652014-09-25 23:48:46 -070024import static java.lang.System.out;
tom1ae3d162014-09-26 09:38:16 -070025import static org.onlab.util.Tools.delay;
toma7083182014-09-25 21:38:03 -070026import static org.onlab.util.Tools.namedThreads;
27
28/**
29 * Auxiliary test fixture to measure speed of NIO-based channels.
30 */
tom74d49652014-09-25 23:48:46 -070031public class IOLoopTestServer {
toma7083182014-09-25 21:38:03 -070032
tom74d49652014-09-25 23:48:46 -070033 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
toma7083182014-09-25 21:38:03 -070034
35 private static final int PRUNE_FREQUENCY = 1000;
36
37 static final int PORT = 9876;
38 static final long TIMEOUT = 1000;
39
40 static final boolean SO_NO_DELAY = false;
tom74d49652014-09-25 23:48:46 -070041 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
42 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
toma7083182014-09-25 21:38:03 -070043
44 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
45
46 private final AcceptorLoop aloop;
47 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
48
49 private final List<CustomIOLoop> iloops = new ArrayList<>();
50 private final ExecutorService ipool;
51
52 private final int workerCount;
53 private final int msgLength;
54 private int lastWorker = -1;
55
tom2d6d3972014-09-25 22:38:57 -070056 Counter messages;
57 Counter bytes;
toma7083182014-09-25 21:38:03 -070058
59 /**
60 * Main entry point to launch the server.
61 *
62 * @param args command-line arguments
tom1ae3d162014-09-26 09:38:16 -070063 * @throws java.io.IOException if unable to crate IO loops
toma7083182014-09-25 21:38:03 -070064 */
65 public static void main(String[] args) throws IOException {
tom74d49652014-09-25 23:48:46 -070066 startStandalone(args);
67 System.exit(0);
68 }
69
70 /**
71 * Starts a standalone IO loop test server.
72 *
73 * @param args command-line arguments
74 */
tomf110fff2014-09-26 00:38:18 -070075 public static void startStandalone(String[] args) throws IOException {
toma7083182014-09-25 21:38:03 -070076 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
77 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
78 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
79
80 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
81 wc, ml, ip);
tom74d49652014-09-25 23:48:46 -070082 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
83 server.start();
toma7083182014-09-25 21:38:03 -070084
85 // Start pruning clients.
86 while (true) {
87 delay(PRUNE_FREQUENCY);
tom74d49652014-09-25 23:48:46 -070088 server.prune();
toma7083182014-09-25 21:38:03 -070089 }
90 }
91
92 /**
93 * Creates a speed server.
94 *
95 * @param ip optional ip of the adapter where to bind
96 * @param wc worker count
97 * @param ml message length in bytes
98 * @param port listen port
tom1ae3d162014-09-26 09:38:16 -070099 * @throws java.io.IOException if unable to create IO loops
toma7083182014-09-25 21:38:03 -0700100 */
tom74d49652014-09-25 23:48:46 -0700101 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -0700102 this.workerCount = wc;
103 this.msgLength = ml;
104 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
105
106 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
107 for (int i = 0; i < workerCount; i++) {
108 iloops.add(new CustomIOLoop());
109 }
110 }
111
112 /**
113 * Start the server IO loops and kicks off throughput tracking.
114 */
115 public void start() {
tom2d6d3972014-09-25 22:38:57 -0700116 messages = new Counter();
117 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700118
119 for (CustomIOLoop l : iloops) {
120 ipool.execute(l);
121 }
122 apool.execute(aloop);
tom2d6d3972014-09-25 22:38:57 -0700123
tom74d49652014-09-25 23:48:46 -0700124 for (CustomIOLoop l : iloops) {
125 l.awaitStart(TIMEOUT);
126 }
127 aloop.awaitStart(TIMEOUT);
toma7083182014-09-25 21:38:03 -0700128 }
129
130 /**
131 * Stop the server IO loops and freezes throughput tracking.
132 */
133 public void stop() {
134 aloop.shutdown();
135 for (CustomIOLoop l : iloops) {
136 l.shutdown();
137 }
138
tom74d49652014-09-25 23:48:46 -0700139 for (CustomIOLoop l : iloops) {
140 l.awaitStop(TIMEOUT);
141 }
142 aloop.awaitStop(TIMEOUT);
143
tom2d6d3972014-09-25 22:38:57 -0700144 messages.freeze();
145 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700146 }
147
148 /**
149 * Reports on the accumulated throughput trackers.
150 */
151 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700152 DecimalFormat f = new DecimalFormat("#,##0");
tom74d49652014-09-25 23:48:46 -0700153 out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
154 f.format(messages.total()), f.format(bytes.total()),
155 f.format(messages.throughput()),
156 f.format(bytes.throughput() / (1024 * msgLength))));
toma7083182014-09-25 21:38:03 -0700157 }
158
159 /**
160 * Prunes the IO loops of stale message buffers.
161 */
162 public void prune() {
163 for (CustomIOLoop l : iloops) {
164 l.pruneStaleStreams();
165 }
166 }
167
168 // Get the next worker to which a client should be assigned
169 private synchronized CustomIOLoop nextWorker() {
170 lastWorker = (lastWorker + 1) % workerCount;
171 return iloops.get(lastWorker);
172 }
173
174 // Loop for transfer of fixed-length messages
175 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
176
177 public CustomIOLoop() throws IOException {
178 super(500);
179 }
180
181 @Override
182 protected TestMessageStream createStream(ByteChannel channel) {
183 return new TestMessageStream(msgLength, channel, this);
184 }
185
186 @Override
187 protected void removeStream(MessageStream<TestMessage> stream) {
188 super.removeStream(stream);
tom2d6d3972014-09-25 22:38:57 -0700189
190 messages.add(stream.messagesIn().total());
191 bytes.add(stream.bytesIn().total());
192
tom74d49652014-09-25 23:48:46 -0700193// out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
194// FORMAT.format(stream.messagesIn().throughput()),
195// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
196// FORMAT.format(stream.messagesOut().throughput()),
197// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
toma7083182014-09-25 21:38:03 -0700198 }
199
200 @Override
201 protected void processMessages(List<TestMessage> messages,
202 MessageStream<TestMessage> stream) {
203 try {
tom1ae3d162014-09-26 09:38:16 -0700204 stream.write(createResponses(messages));
toma7083182014-09-25 21:38:03 -0700205 } catch (IOException e) {
206 log.error("Unable to echo messages", e);
207 }
208 }
tom1ae3d162014-09-26 09:38:16 -0700209
210 private List<TestMessage> createResponses(List<TestMessage> messages) {
211 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
212 for (TestMessage message : messages) {
213 responses.add(new TestMessage(message.length(), message.requestorTime(),
214 currentTimeMillis(), message.padding()));
215 }
216 return responses;
217 }
toma7083182014-09-25 21:38:03 -0700218 }
219
220 // Loop for accepting client connections
221 private class CustomAcceptLoop extends AcceptorLoop {
222
223 public CustomAcceptLoop(SocketAddress address) throws IOException {
224 super(500, address);
225 }
226
227 @Override
228 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
229 SocketChannel sc = channel.accept();
230 sc.configureBlocking(false);
231
232 Socket so = sc.socket();
233 so.setTcpNoDelay(SO_NO_DELAY);
234 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
235 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
236
237 nextWorker().acceptStream(sc);
238 log.info("Connected client");
239 }
240 }
241
242}