blob: acc9a08f23b077a3e998309d818e56c8b2028a83 [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07002 * Copyright 2014 Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
toma7083182014-09-25 21:38:03 -070016package org.onlab.nio;
17
tom1ae3d162014-09-26 09:38:16 -070018import com.google.common.collect.Lists;
tom2d6d3972014-09-25 22:38:57 -070019import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -070020import org.slf4j.Logger;
21import org.slf4j.LoggerFactory;
22
23import java.io.IOException;
24import java.net.InetAddress;
25import java.net.InetSocketAddress;
26import java.net.SocketAddress;
27import java.nio.channels.ByteChannel;
28import java.nio.channels.SelectionKey;
29import java.nio.channels.SocketChannel;
tom2d6d3972014-09-25 22:38:57 -070030import java.text.DecimalFormat;
toma7083182014-09-25 21:38:03 -070031import java.util.ArrayList;
32import java.util.List;
33import java.util.concurrent.ExecutionException;
34import java.util.concurrent.ExecutorService;
35import java.util.concurrent.Executors;
36import java.util.concurrent.FutureTask;
37import java.util.concurrent.Semaphore;
38import java.util.concurrent.TimeUnit;
39import java.util.concurrent.TimeoutException;
40
tom74d49652014-09-25 23:48:46 -070041import static java.lang.String.format;
tom5f4df2d2014-09-26 12:19:51 -070042import static java.lang.System.nanoTime;
tom74d49652014-09-25 23:48:46 -070043import static java.lang.System.out;
tom74d49652014-09-25 23:48:46 -070044import static org.onlab.nio.IOLoopTestServer.PORT;
tomf110fff2014-09-26 00:38:18 -070045import static org.onlab.util.Tools.delay;
toma7083182014-09-25 21:38:03 -070046import static org.onlab.util.Tools.namedThreads;
47
48/**
49 * Auxiliary test fixture to measure speed of NIO-based channels.
50 */
tom74d49652014-09-25 23:48:46 -070051public class IOLoopTestClient {
toma7083182014-09-25 21:38:03 -070052
tom74d49652014-09-25 23:48:46 -070053 private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
toma7083182014-09-25 21:38:03 -070054
55 private final InetAddress ip;
56 private final int port;
57 private final int msgCount;
58 private final int msgLength;
59
60 private final List<CustomIOLoop> iloops = new ArrayList<>();
61 private final ExecutorService ipool;
62 private final ExecutorService wpool;
63
tom2d6d3972014-09-25 22:38:57 -070064 Counter messages;
65 Counter bytes;
tom1ae3d162014-09-26 09:38:16 -070066 long latencyTotal = 0;
67 long latencyCount = 0;
68
toma7083182014-09-25 21:38:03 -070069
70 /**
71 * Main entry point to launch the client.
72 *
73 * @param args command-line arguments
tom1ae3d162014-09-26 09:38:16 -070074 * @throws java.io.IOException if unable to connect to server
75 * @throws InterruptedException if latch wait gets interrupted
76 * @throws java.util.concurrent.ExecutionException if wait gets interrupted
77 * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
toma7083182014-09-25 21:38:03 -070078 */
79 public static void main(String[] args)
80 throws IOException, InterruptedException, ExecutionException, TimeoutException {
tom74d49652014-09-25 23:48:46 -070081 startStandalone(args);
82
83 System.exit(0);
84 }
85
86 /**
87 * Starts a standalone IO loop test client.
88 *
89 * @param args command-line arguments
90 */
91 public static void startStandalone(String[] args)
92 throws IOException, InterruptedException, ExecutionException, TimeoutException {
toma7083182014-09-25 21:38:03 -070093 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
94 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
95 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
96 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
tom14dc4d02014-09-26 12:43:14 -070097 int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
toma7083182014-09-25 21:38:03 -070098
99 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
100 wc, mc, ml, ip);
tom74d49652014-09-25 23:48:46 -0700101 IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
toma7083182014-09-25 21:38:03 -0700102
tom74d49652014-09-25 23:48:46 -0700103 client.start();
tomf110fff2014-09-26 00:38:18 -0700104 delay(500);
toma7083182014-09-25 21:38:03 -0700105
tom74d49652014-09-25 23:48:46 -0700106 client.await(to);
107 client.report();
toma7083182014-09-25 21:38:03 -0700108 }
109
110 /**
111 * Creates a speed client.
112 *
113 * @param ip ip address of server
114 * @param wc worker count
115 * @param mc message count to send per client
116 * @param ml message length in bytes
117 * @param port socket port
tom1ae3d162014-09-26 09:38:16 -0700118 * @throws java.io.IOException if unable to create IO loops
toma7083182014-09-25 21:38:03 -0700119 */
tom74d49652014-09-25 23:48:46 -0700120 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -0700121 this.ip = ip;
122 this.port = port;
123 this.msgCount = mc;
124 this.msgLength = ml;
125 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
126 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
127
128 for (int i = 0; i < wc; i++) {
129 iloops.add(new CustomIOLoop());
130 }
131 }
132
133 /**
134 * Starts the client workers.
135 *
tom1ae3d162014-09-26 09:38:16 -0700136 * @throws java.io.IOException if unable to open connection
toma7083182014-09-25 21:38:03 -0700137 */
138 public void start() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700139 messages = new Counter();
140 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700141
142 // First start up all the IO loops
143 for (CustomIOLoop l : iloops) {
144 ipool.execute(l);
145 }
146
tom2d6d3972014-09-25 22:38:57 -0700147 // Wait for all of them to get going
tom74d49652014-09-25 23:48:46 -0700148 for (CustomIOLoop l : iloops) {
149 l.awaitStart(1000);
150 }
toma7083182014-09-25 21:38:03 -0700151
152 // ... and Next open all connections; one-per-loop
153 for (CustomIOLoop l : iloops) {
154 openConnection(l);
155 }
156 }
157
158
159 /**
160 * Initiates open connection request and registers the pending socket
161 * channel with the given IO loop.
162 *
163 * @param loop loop with which the channel should be registered
tom1ae3d162014-09-26 09:38:16 -0700164 * @throws java.io.IOException if the socket could not be open or connected
toma7083182014-09-25 21:38:03 -0700165 */
166 private void openConnection(CustomIOLoop loop) throws IOException {
167 SocketAddress sa = new InetSocketAddress(ip, port);
168 SocketChannel ch = SocketChannel.open();
169 ch.configureBlocking(false);
170 loop.connectStream(ch);
171 ch.connect(sa);
172 }
173
174
175 /**
176 * Waits for the client workers to complete.
177 *
178 * @param secs timeout in seconds
tom1ae3d162014-09-26 09:38:16 -0700179 * @throws java.util.concurrent.ExecutionException if execution failed
180 * @throws InterruptedException if interrupt occurred while waiting
181 * @throws java.util.concurrent.TimeoutException if timeout occurred
toma7083182014-09-25 21:38:03 -0700182 */
183 public void await(int secs) throws InterruptedException,
184 ExecutionException, TimeoutException {
185 for (CustomIOLoop l : iloops) {
186 if (l.worker.task != null) {
187 l.worker.task.get(secs, TimeUnit.SECONDS);
tom1ae3d162014-09-26 09:38:16 -0700188 latencyTotal += l.latencyTotal;
189 latencyCount += l.latencyCount;
toma7083182014-09-25 21:38:03 -0700190 }
191 }
tom2d6d3972014-09-25 22:38:57 -0700192 messages.freeze();
193 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700194 }
195
196 /**
tombf474382014-10-02 07:36:50 -0700197 * Reports on the accumulated throughput and latency.
toma7083182014-09-25 21:38:03 -0700198 */
199 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700200 DecimalFormat f = new DecimalFormat("#,##0");
tom14dc4d02014-09-26 12:43:14 -0700201 out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
tom74d49652014-09-25 23:48:46 -0700202 f.format(messages.total()), f.format(bytes.total()),
203 f.format(messages.throughput()),
tom1ae3d162014-09-26 09:38:16 -0700204 f.format(bytes.throughput() / (1024 * msgLength)),
205 f.format(latencyTotal / latencyCount)));
toma7083182014-09-25 21:38:03 -0700206 }
207
208
209 // Loop for transfer of fixed-length messages
210 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
211
212 Worker worker = new Worker();
tom1ae3d162014-09-26 09:38:16 -0700213 long latencyTotal = 0;
214 long latencyCount = 0;
215
toma7083182014-09-25 21:38:03 -0700216
217 public CustomIOLoop() throws IOException {
218 super(500);
219 }
220
221
222 @Override
223 protected TestMessageStream createStream(ByteChannel channel) {
224 return new TestMessageStream(msgLength, channel, this);
225 }
226
227 @Override
tom74d49652014-09-25 23:48:46 -0700228 protected synchronized void removeStream(MessageStream<TestMessage> stream) {
229 super.removeStream(stream);
tom74d49652014-09-25 23:48:46 -0700230 messages.add(stream.messagesIn().total());
231 bytes.add(stream.bytesIn().total());
tom74d49652014-09-25 23:48:46 -0700232 stream.messagesOut().reset();
233 stream.bytesOut().reset();
toma7083182014-09-25 21:38:03 -0700234 }
235
236 @Override
237 protected void processMessages(List<TestMessage> messages,
tom1ae3d162014-09-26 09:38:16 -0700238 MessageStream<TestMessage> stream) {
239 for (TestMessage message : messages) {
240 // TODO: summarize latency data better
tom5f4df2d2014-09-26 12:19:51 -0700241 latencyTotal += nanoTime() - message.requestorTime();
tom1ae3d162014-09-26 09:38:16 -0700242 latencyCount++;
243 }
toma7083182014-09-25 21:38:03 -0700244 worker.release(messages.size());
245 }
246
247 @Override
tom5a8779c2014-09-29 14:48:43 -0700248 protected void connect(SelectionKey key) throws IOException {
toma7083182014-09-25 21:38:03 -0700249 super.connect(key);
250 TestMessageStream b = (TestMessageStream) key.attachment();
251 Worker w = ((CustomIOLoop) b.loop()).worker;
252 w.pump(b);
253 }
254
255 }
256
257 /**
258 * Auxiliary worker to connect and pump batched messages using blocking I/O.
259 */
260 private class Worker implements Runnable {
261
tom5f4df2d2014-09-26 12:19:51 -0700262 private static final int BATCH_SIZE = 50;
toma7083182014-09-25 21:38:03 -0700263 private static final int PERMITS = 2 * BATCH_SIZE;
264
tom1ae3d162014-09-26 09:38:16 -0700265 private TestMessageStream stream;
toma7083182014-09-25 21:38:03 -0700266 private FutureTask<Worker> task;
267
268 // Stuff to throttle pump
269 private final Semaphore semaphore = new Semaphore(PERMITS);
270 private int msgWritten;
271
tom1ae3d162014-09-26 09:38:16 -0700272 void pump(TestMessageStream stream) {
273 this.stream = stream;
toma7083182014-09-25 21:38:03 -0700274 task = new FutureTask<>(this, this);
275 wpool.execute(task);
276 }
277
278 @Override
279 public void run() {
280 try {
281 log.info("Worker started...");
282
toma7083182014-09-25 21:38:03 -0700283 while (msgWritten < msgCount) {
tom1ae3d162014-09-26 09:38:16 -0700284 int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
285 writeBatch(size);
286 msgWritten += size;
toma7083182014-09-25 21:38:03 -0700287 }
288
289 // Now try to get all the permits back before sending poison pill
290 semaphore.acquireUninterruptibly(PERMITS);
tom1ae3d162014-09-26 09:38:16 -0700291 stream.close();
toma7083182014-09-25 21:38:03 -0700292
293 log.info("Worker done...");
294
295 } catch (IOException e) {
296 log.error("Worker unable to perform I/O", e);
297 }
298 }
299
300
tom1ae3d162014-09-26 09:38:16 -0700301 private void writeBatch(int size) throws IOException {
302 // Build a batch of messages
303 List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
304 for (int i = 0; i < size; i++) {
tom5f4df2d2014-09-26 12:19:51 -0700305 batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding()));
toma7083182014-09-25 21:38:03 -0700306 }
tom1ae3d162014-09-26 09:38:16 -0700307 acquire(size);
308 stream.write(batch);
toma7083182014-09-25 21:38:03 -0700309 }
310
311
312 // Release permits based on the specified number of message credits
313 private void release(int permits) {
314 semaphore.release(permits);
315 }
316
317 // Acquire permit for a single batch
318 private void acquire(int permits) {
319 semaphore.acquireUninterruptibly(permits);
320 }
321
322 }
323
324}