blob: 3f25826a3b7c00b7d953aef647a0bbd5982ebb0c [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
toma7083182014-09-25 21:38:03 -070019package org.onlab.nio;
20
tom1ae3d162014-09-26 09:38:16 -070021import com.google.common.collect.Lists;
tom2d6d3972014-09-25 22:38:57 -070022import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -070023import org.slf4j.Logger;
24import org.slf4j.LoggerFactory;
25
26import java.io.IOException;
27import java.net.InetAddress;
28import java.net.InetSocketAddress;
29import java.net.SocketAddress;
30import java.nio.channels.ByteChannel;
31import java.nio.channels.SelectionKey;
32import java.nio.channels.SocketChannel;
tom2d6d3972014-09-25 22:38:57 -070033import java.text.DecimalFormat;
toma7083182014-09-25 21:38:03 -070034import java.util.ArrayList;
35import java.util.List;
36import java.util.concurrent.ExecutionException;
37import java.util.concurrent.ExecutorService;
38import java.util.concurrent.Executors;
39import java.util.concurrent.FutureTask;
40import java.util.concurrent.Semaphore;
41import java.util.concurrent.TimeUnit;
42import java.util.concurrent.TimeoutException;
43
tom74d49652014-09-25 23:48:46 -070044import static java.lang.String.format;
tom5f4df2d2014-09-26 12:19:51 -070045import static java.lang.System.nanoTime;
tom74d49652014-09-25 23:48:46 -070046import static java.lang.System.out;
tom74d49652014-09-25 23:48:46 -070047import static org.onlab.nio.IOLoopTestServer.PORT;
tomf110fff2014-09-26 00:38:18 -070048import static org.onlab.util.Tools.delay;
toma7083182014-09-25 21:38:03 -070049import static org.onlab.util.Tools.namedThreads;
50
51/**
52 * Auxiliary test fixture to measure speed of NIO-based channels.
53 */
tom74d49652014-09-25 23:48:46 -070054public class IOLoopTestClient {
toma7083182014-09-25 21:38:03 -070055
tom74d49652014-09-25 23:48:46 -070056 private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
toma7083182014-09-25 21:38:03 -070057
58 private final InetAddress ip;
59 private final int port;
60 private final int msgCount;
61 private final int msgLength;
62
63 private final List<CustomIOLoop> iloops = new ArrayList<>();
64 private final ExecutorService ipool;
65 private final ExecutorService wpool;
66
tom2d6d3972014-09-25 22:38:57 -070067 Counter messages;
68 Counter bytes;
tom1ae3d162014-09-26 09:38:16 -070069 long latencyTotal = 0;
70 long latencyCount = 0;
71
toma7083182014-09-25 21:38:03 -070072
73 /**
74 * Main entry point to launch the client.
75 *
76 * @param args command-line arguments
tom1ae3d162014-09-26 09:38:16 -070077 * @throws java.io.IOException if unable to connect to server
78 * @throws InterruptedException if latch wait gets interrupted
79 * @throws java.util.concurrent.ExecutionException if wait gets interrupted
80 * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
toma7083182014-09-25 21:38:03 -070081 */
82 public static void main(String[] args)
83 throws IOException, InterruptedException, ExecutionException, TimeoutException {
tom74d49652014-09-25 23:48:46 -070084 startStandalone(args);
85
86 System.exit(0);
87 }
88
89 /**
90 * Starts a standalone IO loop test client.
91 *
92 * @param args command-line arguments
93 */
94 public static void startStandalone(String[] args)
95 throws IOException, InterruptedException, ExecutionException, TimeoutException {
toma7083182014-09-25 21:38:03 -070096 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
97 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
98 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
99 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
tom14dc4d02014-09-26 12:43:14 -0700100 int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
toma7083182014-09-25 21:38:03 -0700101
102 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
103 wc, mc, ml, ip);
tom74d49652014-09-25 23:48:46 -0700104 IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
toma7083182014-09-25 21:38:03 -0700105
tom74d49652014-09-25 23:48:46 -0700106 client.start();
tomf110fff2014-09-26 00:38:18 -0700107 delay(500);
toma7083182014-09-25 21:38:03 -0700108
tom74d49652014-09-25 23:48:46 -0700109 client.await(to);
110 client.report();
toma7083182014-09-25 21:38:03 -0700111 }
112
113 /**
114 * Creates a speed client.
115 *
116 * @param ip ip address of server
117 * @param wc worker count
118 * @param mc message count to send per client
119 * @param ml message length in bytes
120 * @param port socket port
tom1ae3d162014-09-26 09:38:16 -0700121 * @throws java.io.IOException if unable to create IO loops
toma7083182014-09-25 21:38:03 -0700122 */
tom74d49652014-09-25 23:48:46 -0700123 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -0700124 this.ip = ip;
125 this.port = port;
126 this.msgCount = mc;
127 this.msgLength = ml;
128 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
129 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
130
131 for (int i = 0; i < wc; i++) {
132 iloops.add(new CustomIOLoop());
133 }
134 }
135
136 /**
137 * Starts the client workers.
138 *
tom1ae3d162014-09-26 09:38:16 -0700139 * @throws java.io.IOException if unable to open connection
toma7083182014-09-25 21:38:03 -0700140 */
141 public void start() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700142 messages = new Counter();
143 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700144
145 // First start up all the IO loops
146 for (CustomIOLoop l : iloops) {
147 ipool.execute(l);
148 }
149
tom2d6d3972014-09-25 22:38:57 -0700150 // Wait for all of them to get going
tom74d49652014-09-25 23:48:46 -0700151 for (CustomIOLoop l : iloops) {
152 l.awaitStart(1000);
153 }
toma7083182014-09-25 21:38:03 -0700154
155 // ... and Next open all connections; one-per-loop
156 for (CustomIOLoop l : iloops) {
157 openConnection(l);
158 }
159 }
160
161
162 /**
163 * Initiates open connection request and registers the pending socket
164 * channel with the given IO loop.
165 *
166 * @param loop loop with which the channel should be registered
tom1ae3d162014-09-26 09:38:16 -0700167 * @throws java.io.IOException if the socket could not be open or connected
toma7083182014-09-25 21:38:03 -0700168 */
169 private void openConnection(CustomIOLoop loop) throws IOException {
170 SocketAddress sa = new InetSocketAddress(ip, port);
171 SocketChannel ch = SocketChannel.open();
172 ch.configureBlocking(false);
173 loop.connectStream(ch);
174 ch.connect(sa);
175 }
176
177
178 /**
179 * Waits for the client workers to complete.
180 *
181 * @param secs timeout in seconds
tom1ae3d162014-09-26 09:38:16 -0700182 * @throws java.util.concurrent.ExecutionException if execution failed
183 * @throws InterruptedException if interrupt occurred while waiting
184 * @throws java.util.concurrent.TimeoutException if timeout occurred
toma7083182014-09-25 21:38:03 -0700185 */
186 public void await(int secs) throws InterruptedException,
187 ExecutionException, TimeoutException {
188 for (CustomIOLoop l : iloops) {
189 if (l.worker.task != null) {
190 l.worker.task.get(secs, TimeUnit.SECONDS);
tom1ae3d162014-09-26 09:38:16 -0700191 latencyTotal += l.latencyTotal;
192 latencyCount += l.latencyCount;
toma7083182014-09-25 21:38:03 -0700193 }
194 }
tom2d6d3972014-09-25 22:38:57 -0700195 messages.freeze();
196 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700197 }
198
199 /**
tombf474382014-10-02 07:36:50 -0700200 * Reports on the accumulated throughput and latency.
toma7083182014-09-25 21:38:03 -0700201 */
202 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700203 DecimalFormat f = new DecimalFormat("#,##0");
tom14dc4d02014-09-26 12:43:14 -0700204 out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
tom74d49652014-09-25 23:48:46 -0700205 f.format(messages.total()), f.format(bytes.total()),
206 f.format(messages.throughput()),
tom1ae3d162014-09-26 09:38:16 -0700207 f.format(bytes.throughput() / (1024 * msgLength)),
208 f.format(latencyTotal / latencyCount)));
toma7083182014-09-25 21:38:03 -0700209 }
210
211
212 // Loop for transfer of fixed-length messages
213 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
214
215 Worker worker = new Worker();
tom1ae3d162014-09-26 09:38:16 -0700216 long latencyTotal = 0;
217 long latencyCount = 0;
218
toma7083182014-09-25 21:38:03 -0700219
220 public CustomIOLoop() throws IOException {
221 super(500);
222 }
223
224
225 @Override
226 protected TestMessageStream createStream(ByteChannel channel) {
227 return new TestMessageStream(msgLength, channel, this);
228 }
229
230 @Override
tom74d49652014-09-25 23:48:46 -0700231 protected synchronized void removeStream(MessageStream<TestMessage> stream) {
232 super.removeStream(stream);
tom74d49652014-09-25 23:48:46 -0700233 messages.add(stream.messagesIn().total());
234 bytes.add(stream.bytesIn().total());
tom74d49652014-09-25 23:48:46 -0700235 stream.messagesOut().reset();
236 stream.bytesOut().reset();
toma7083182014-09-25 21:38:03 -0700237 }
238
239 @Override
240 protected void processMessages(List<TestMessage> messages,
tom1ae3d162014-09-26 09:38:16 -0700241 MessageStream<TestMessage> stream) {
242 for (TestMessage message : messages) {
243 // TODO: summarize latency data better
tom5f4df2d2014-09-26 12:19:51 -0700244 latencyTotal += nanoTime() - message.requestorTime();
tom1ae3d162014-09-26 09:38:16 -0700245 latencyCount++;
246 }
toma7083182014-09-25 21:38:03 -0700247 worker.release(messages.size());
248 }
249
250 @Override
tom5a8779c2014-09-29 14:48:43 -0700251 protected void connect(SelectionKey key) throws IOException {
toma7083182014-09-25 21:38:03 -0700252 super.connect(key);
253 TestMessageStream b = (TestMessageStream) key.attachment();
254 Worker w = ((CustomIOLoop) b.loop()).worker;
255 w.pump(b);
256 }
257
258 }
259
260 /**
261 * Auxiliary worker to connect and pump batched messages using blocking I/O.
262 */
263 private class Worker implements Runnable {
264
tom5f4df2d2014-09-26 12:19:51 -0700265 private static final int BATCH_SIZE = 50;
toma7083182014-09-25 21:38:03 -0700266 private static final int PERMITS = 2 * BATCH_SIZE;
267
tom1ae3d162014-09-26 09:38:16 -0700268 private TestMessageStream stream;
toma7083182014-09-25 21:38:03 -0700269 private FutureTask<Worker> task;
270
271 // Stuff to throttle pump
272 private final Semaphore semaphore = new Semaphore(PERMITS);
273 private int msgWritten;
274
tom1ae3d162014-09-26 09:38:16 -0700275 void pump(TestMessageStream stream) {
276 this.stream = stream;
toma7083182014-09-25 21:38:03 -0700277 task = new FutureTask<>(this, this);
278 wpool.execute(task);
279 }
280
281 @Override
282 public void run() {
283 try {
284 log.info("Worker started...");
285
toma7083182014-09-25 21:38:03 -0700286 while (msgWritten < msgCount) {
tom1ae3d162014-09-26 09:38:16 -0700287 int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
288 writeBatch(size);
289 msgWritten += size;
toma7083182014-09-25 21:38:03 -0700290 }
291
292 // Now try to get all the permits back before sending poison pill
293 semaphore.acquireUninterruptibly(PERMITS);
tom1ae3d162014-09-26 09:38:16 -0700294 stream.close();
toma7083182014-09-25 21:38:03 -0700295
296 log.info("Worker done...");
297
298 } catch (IOException e) {
299 log.error("Worker unable to perform I/O", e);
300 }
301 }
302
303
tom1ae3d162014-09-26 09:38:16 -0700304 private void writeBatch(int size) throws IOException {
305 // Build a batch of messages
306 List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
307 for (int i = 0; i < size; i++) {
tom5f4df2d2014-09-26 12:19:51 -0700308 batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding()));
toma7083182014-09-25 21:38:03 -0700309 }
tom1ae3d162014-09-26 09:38:16 -0700310 acquire(size);
311 stream.write(batch);
toma7083182014-09-25 21:38:03 -0700312 }
313
314
315 // Release permits based on the specified number of message credits
316 private void release(int permits) {
317 semaphore.release(permits);
318 }
319
320 // Acquire permit for a single batch
321 private void acquire(int permits) {
322 semaphore.acquireUninterruptibly(permits);
323 }
324
325 }
326
327}