blob: bbeedd00c176053d8a602e49f3e3f796241eb780 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
tom1ae3d162014-09-26 09:38:16 -07003import com.google.common.collect.Lists;
tom2d6d3972014-09-25 22:38:57 -07004import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -07005import org.slf4j.Logger;
6import org.slf4j.LoggerFactory;
7
8import java.io.IOException;
9import java.net.InetAddress;
10import java.net.InetSocketAddress;
11import java.net.SocketAddress;
12import java.nio.channels.ByteChannel;
13import java.nio.channels.SelectionKey;
14import java.nio.channels.SocketChannel;
tom2d6d3972014-09-25 22:38:57 -070015import java.text.DecimalFormat;
toma7083182014-09-25 21:38:03 -070016import java.util.ArrayList;
17import java.util.List;
18import java.util.concurrent.ExecutionException;
19import java.util.concurrent.ExecutorService;
20import java.util.concurrent.Executors;
21import java.util.concurrent.FutureTask;
22import java.util.concurrent.Semaphore;
23import java.util.concurrent.TimeUnit;
24import java.util.concurrent.TimeoutException;
25
tom74d49652014-09-25 23:48:46 -070026import static java.lang.String.format;
tom5f4df2d2014-09-26 12:19:51 -070027import static java.lang.System.nanoTime;
tom74d49652014-09-25 23:48:46 -070028import static java.lang.System.out;
tom74d49652014-09-25 23:48:46 -070029import static org.onlab.nio.IOLoopTestServer.PORT;
tomf110fff2014-09-26 00:38:18 -070030import static org.onlab.util.Tools.delay;
toma7083182014-09-25 21:38:03 -070031import static org.onlab.util.Tools.namedThreads;
32
33/**
34 * Auxiliary test fixture to measure speed of NIO-based channels.
35 */
tom74d49652014-09-25 23:48:46 -070036public class IOLoopTestClient {
toma7083182014-09-25 21:38:03 -070037
tom74d49652014-09-25 23:48:46 -070038 private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
toma7083182014-09-25 21:38:03 -070039
40 private final InetAddress ip;
41 private final int port;
42 private final int msgCount;
43 private final int msgLength;
44
45 private final List<CustomIOLoop> iloops = new ArrayList<>();
46 private final ExecutorService ipool;
47 private final ExecutorService wpool;
48
tom2d6d3972014-09-25 22:38:57 -070049 Counter messages;
50 Counter bytes;
tom1ae3d162014-09-26 09:38:16 -070051 long latencyTotal = 0;
52 long latencyCount = 0;
53
toma7083182014-09-25 21:38:03 -070054
55 /**
56 * Main entry point to launch the client.
57 *
58 * @param args command-line arguments
tom1ae3d162014-09-26 09:38:16 -070059 * @throws java.io.IOException if unable to connect to server
60 * @throws InterruptedException if latch wait gets interrupted
61 * @throws java.util.concurrent.ExecutionException if wait gets interrupted
62 * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
toma7083182014-09-25 21:38:03 -070063 */
64 public static void main(String[] args)
65 throws IOException, InterruptedException, ExecutionException, TimeoutException {
tom74d49652014-09-25 23:48:46 -070066 startStandalone(args);
67
68 System.exit(0);
69 }
70
71 /**
72 * Starts a standalone IO loop test client.
73 *
74 * @param args command-line arguments
75 */
76 public static void startStandalone(String[] args)
77 throws IOException, InterruptedException, ExecutionException, TimeoutException {
toma7083182014-09-25 21:38:03 -070078 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
79 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
80 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
81 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
tom14dc4d02014-09-26 12:43:14 -070082 int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
toma7083182014-09-25 21:38:03 -070083
84 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
85 wc, mc, ml, ip);
tom74d49652014-09-25 23:48:46 -070086 IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
toma7083182014-09-25 21:38:03 -070087
tom74d49652014-09-25 23:48:46 -070088 client.start();
tomf110fff2014-09-26 00:38:18 -070089 delay(500);
toma7083182014-09-25 21:38:03 -070090
tom74d49652014-09-25 23:48:46 -070091 client.await(to);
92 client.report();
toma7083182014-09-25 21:38:03 -070093 }
94
95 /**
96 * Creates a speed client.
97 *
98 * @param ip ip address of server
99 * @param wc worker count
100 * @param mc message count to send per client
101 * @param ml message length in bytes
102 * @param port socket port
tom1ae3d162014-09-26 09:38:16 -0700103 * @throws java.io.IOException if unable to create IO loops
toma7083182014-09-25 21:38:03 -0700104 */
tom74d49652014-09-25 23:48:46 -0700105 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -0700106 this.ip = ip;
107 this.port = port;
108 this.msgCount = mc;
109 this.msgLength = ml;
110 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
111 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
112
113 for (int i = 0; i < wc; i++) {
114 iloops.add(new CustomIOLoop());
115 }
116 }
117
118 /**
119 * Starts the client workers.
120 *
tom1ae3d162014-09-26 09:38:16 -0700121 * @throws java.io.IOException if unable to open connection
toma7083182014-09-25 21:38:03 -0700122 */
123 public void start() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700124 messages = new Counter();
125 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700126
127 // First start up all the IO loops
128 for (CustomIOLoop l : iloops) {
129 ipool.execute(l);
130 }
131
tom2d6d3972014-09-25 22:38:57 -0700132 // Wait for all of them to get going
tom74d49652014-09-25 23:48:46 -0700133 for (CustomIOLoop l : iloops) {
134 l.awaitStart(1000);
135 }
toma7083182014-09-25 21:38:03 -0700136
137 // ... and Next open all connections; one-per-loop
138 for (CustomIOLoop l : iloops) {
139 openConnection(l);
140 }
141 }
142
143
144 /**
145 * Initiates open connection request and registers the pending socket
146 * channel with the given IO loop.
147 *
148 * @param loop loop with which the channel should be registered
tom1ae3d162014-09-26 09:38:16 -0700149 * @throws java.io.IOException if the socket could not be open or connected
toma7083182014-09-25 21:38:03 -0700150 */
151 private void openConnection(CustomIOLoop loop) throws IOException {
152 SocketAddress sa = new InetSocketAddress(ip, port);
153 SocketChannel ch = SocketChannel.open();
154 ch.configureBlocking(false);
155 loop.connectStream(ch);
156 ch.connect(sa);
157 }
158
159
160 /**
161 * Waits for the client workers to complete.
162 *
163 * @param secs timeout in seconds
tom1ae3d162014-09-26 09:38:16 -0700164 * @throws java.util.concurrent.ExecutionException if execution failed
165 * @throws InterruptedException if interrupt occurred while waiting
166 * @throws java.util.concurrent.TimeoutException if timeout occurred
toma7083182014-09-25 21:38:03 -0700167 */
168 public void await(int secs) throws InterruptedException,
169 ExecutionException, TimeoutException {
170 for (CustomIOLoop l : iloops) {
171 if (l.worker.task != null) {
172 l.worker.task.get(secs, TimeUnit.SECONDS);
tom1ae3d162014-09-26 09:38:16 -0700173 latencyTotal += l.latencyTotal;
174 latencyCount += l.latencyCount;
toma7083182014-09-25 21:38:03 -0700175 }
176 }
tom2d6d3972014-09-25 22:38:57 -0700177 messages.freeze();
178 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700179 }
180
181 /**
182 * Reports on the accumulated throughput trackers.
183 */
184 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700185 DecimalFormat f = new DecimalFormat("#,##0");
tom14dc4d02014-09-26 12:43:14 -0700186 out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
tom74d49652014-09-25 23:48:46 -0700187 f.format(messages.total()), f.format(bytes.total()),
188 f.format(messages.throughput()),
tom1ae3d162014-09-26 09:38:16 -0700189 f.format(bytes.throughput() / (1024 * msgLength)),
190 f.format(latencyTotal / latencyCount)));
toma7083182014-09-25 21:38:03 -0700191 }
192
193
194 // Loop for transfer of fixed-length messages
195 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
196
197 Worker worker = new Worker();
tom1ae3d162014-09-26 09:38:16 -0700198 long latencyTotal = 0;
199 long latencyCount = 0;
200
toma7083182014-09-25 21:38:03 -0700201
202 public CustomIOLoop() throws IOException {
203 super(500);
204 }
205
206
207 @Override
208 protected TestMessageStream createStream(ByteChannel channel) {
209 return new TestMessageStream(msgLength, channel, this);
210 }
211
212 @Override
tom74d49652014-09-25 23:48:46 -0700213 protected synchronized void removeStream(MessageStream<TestMessage> stream) {
214 super.removeStream(stream);
tom74d49652014-09-25 23:48:46 -0700215 messages.add(stream.messagesIn().total());
216 bytes.add(stream.bytesIn().total());
tom74d49652014-09-25 23:48:46 -0700217 stream.messagesOut().reset();
218 stream.bytesOut().reset();
toma7083182014-09-25 21:38:03 -0700219 }
220
221 @Override
222 protected void processMessages(List<TestMessage> messages,
tom1ae3d162014-09-26 09:38:16 -0700223 MessageStream<TestMessage> stream) {
224 for (TestMessage message : messages) {
225 // TODO: summarize latency data better
tom5f4df2d2014-09-26 12:19:51 -0700226 latencyTotal += nanoTime() - message.requestorTime();
tom1ae3d162014-09-26 09:38:16 -0700227 latencyCount++;
228 }
toma7083182014-09-25 21:38:03 -0700229 worker.release(messages.size());
230 }
231
232 @Override
tom5a8779c2014-09-29 14:48:43 -0700233 protected void connect(SelectionKey key) throws IOException {
toma7083182014-09-25 21:38:03 -0700234 super.connect(key);
235 TestMessageStream b = (TestMessageStream) key.attachment();
236 Worker w = ((CustomIOLoop) b.loop()).worker;
237 w.pump(b);
238 }
239
240 }
241
242 /**
243 * Auxiliary worker to connect and pump batched messages using blocking I/O.
244 */
245 private class Worker implements Runnable {
246
tom5f4df2d2014-09-26 12:19:51 -0700247 private static final int BATCH_SIZE = 50;
toma7083182014-09-25 21:38:03 -0700248 private static final int PERMITS = 2 * BATCH_SIZE;
249
tom1ae3d162014-09-26 09:38:16 -0700250 private TestMessageStream stream;
toma7083182014-09-25 21:38:03 -0700251 private FutureTask<Worker> task;
252
253 // Stuff to throttle pump
254 private final Semaphore semaphore = new Semaphore(PERMITS);
255 private int msgWritten;
256
tom1ae3d162014-09-26 09:38:16 -0700257 void pump(TestMessageStream stream) {
258 this.stream = stream;
toma7083182014-09-25 21:38:03 -0700259 task = new FutureTask<>(this, this);
260 wpool.execute(task);
261 }
262
263 @Override
264 public void run() {
265 try {
266 log.info("Worker started...");
267
toma7083182014-09-25 21:38:03 -0700268 while (msgWritten < msgCount) {
tom1ae3d162014-09-26 09:38:16 -0700269 int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
270 writeBatch(size);
271 msgWritten += size;
toma7083182014-09-25 21:38:03 -0700272 }
273
274 // Now try to get all the permits back before sending poison pill
275 semaphore.acquireUninterruptibly(PERMITS);
tom1ae3d162014-09-26 09:38:16 -0700276 stream.close();
toma7083182014-09-25 21:38:03 -0700277
278 log.info("Worker done...");
279
280 } catch (IOException e) {
281 log.error("Worker unable to perform I/O", e);
282 }
283 }
284
285
tom1ae3d162014-09-26 09:38:16 -0700286 private void writeBatch(int size) throws IOException {
287 // Build a batch of messages
288 List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
289 for (int i = 0; i < size; i++) {
tom5f4df2d2014-09-26 12:19:51 -0700290 batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding()));
toma7083182014-09-25 21:38:03 -0700291 }
tom1ae3d162014-09-26 09:38:16 -0700292 acquire(size);
293 stream.write(batch);
toma7083182014-09-25 21:38:03 -0700294 }
295
296
297 // Release permits based on the specified number of message credits
298 private void release(int permits) {
299 semaphore.release(permits);
300 }
301
302 // Acquire permit for a single batch
303 private void acquire(int permits) {
304 semaphore.acquireUninterruptibly(permits);
305 }
306
307 }
308
309}