blob: dba0b180070e9577fe6c7f6abc8061bc57aff7c6 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
tom1ae3d162014-09-26 09:38:16 -07003import com.google.common.collect.Lists;
tom2d6d3972014-09-25 22:38:57 -07004import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -07005import org.slf4j.Logger;
6import org.slf4j.LoggerFactory;
7
8import java.io.IOException;
9import java.net.InetAddress;
10import java.net.InetSocketAddress;
11import java.net.SocketAddress;
12import java.nio.channels.ByteChannel;
13import java.nio.channels.SelectionKey;
14import java.nio.channels.SocketChannel;
tom2d6d3972014-09-25 22:38:57 -070015import java.text.DecimalFormat;
toma7083182014-09-25 21:38:03 -070016import java.util.ArrayList;
17import java.util.List;
18import java.util.concurrent.ExecutionException;
19import java.util.concurrent.ExecutorService;
20import java.util.concurrent.Executors;
21import java.util.concurrent.FutureTask;
22import java.util.concurrent.Semaphore;
23import java.util.concurrent.TimeUnit;
24import java.util.concurrent.TimeoutException;
25
tom74d49652014-09-25 23:48:46 -070026import static java.lang.String.format;
tom1ae3d162014-09-26 09:38:16 -070027import static java.lang.System.currentTimeMillis;
tom5f4df2d2014-09-26 12:19:51 -070028import static java.lang.System.nanoTime;
tom74d49652014-09-25 23:48:46 -070029import static java.lang.System.out;
tom74d49652014-09-25 23:48:46 -070030import static org.onlab.nio.IOLoopTestServer.PORT;
tomf110fff2014-09-26 00:38:18 -070031import static org.onlab.util.Tools.delay;
toma7083182014-09-25 21:38:03 -070032import static org.onlab.util.Tools.namedThreads;
33
34/**
35 * Auxiliary test fixture to measure speed of NIO-based channels.
36 */
tom74d49652014-09-25 23:48:46 -070037public class IOLoopTestClient {
toma7083182014-09-25 21:38:03 -070038
tom74d49652014-09-25 23:48:46 -070039 private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
toma7083182014-09-25 21:38:03 -070040
41 private final InetAddress ip;
42 private final int port;
43 private final int msgCount;
44 private final int msgLength;
45
46 private final List<CustomIOLoop> iloops = new ArrayList<>();
47 private final ExecutorService ipool;
48 private final ExecutorService wpool;
49
tom2d6d3972014-09-25 22:38:57 -070050 Counter messages;
51 Counter bytes;
tom1ae3d162014-09-26 09:38:16 -070052 long latencyTotal = 0;
53 long latencyCount = 0;
54
toma7083182014-09-25 21:38:03 -070055
56 /**
57 * Main entry point to launch the client.
58 *
59 * @param args command-line arguments
tom1ae3d162014-09-26 09:38:16 -070060 * @throws java.io.IOException if unable to connect to server
61 * @throws InterruptedException if latch wait gets interrupted
62 * @throws java.util.concurrent.ExecutionException if wait gets interrupted
63 * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
toma7083182014-09-25 21:38:03 -070064 */
65 public static void main(String[] args)
66 throws IOException, InterruptedException, ExecutionException, TimeoutException {
tom74d49652014-09-25 23:48:46 -070067 startStandalone(args);
68
69 System.exit(0);
70 }
71
72 /**
73 * Starts a standalone IO loop test client.
74 *
75 * @param args command-line arguments
76 */
77 public static void startStandalone(String[] args)
78 throws IOException, InterruptedException, ExecutionException, TimeoutException {
toma7083182014-09-25 21:38:03 -070079 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
80 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
81 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
82 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
83 int to = args.length > 4 ? Integer.parseInt(args[4]) : 30;
84
85 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
86 wc, mc, ml, ip);
tom74d49652014-09-25 23:48:46 -070087 IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
toma7083182014-09-25 21:38:03 -070088
tom74d49652014-09-25 23:48:46 -070089 client.start();
tomf110fff2014-09-26 00:38:18 -070090 delay(500);
toma7083182014-09-25 21:38:03 -070091
tom74d49652014-09-25 23:48:46 -070092 client.await(to);
93 client.report();
toma7083182014-09-25 21:38:03 -070094 }
95
96 /**
97 * Creates a speed client.
98 *
99 * @param ip ip address of server
100 * @param wc worker count
101 * @param mc message count to send per client
102 * @param ml message length in bytes
103 * @param port socket port
tom1ae3d162014-09-26 09:38:16 -0700104 * @throws java.io.IOException if unable to create IO loops
toma7083182014-09-25 21:38:03 -0700105 */
tom74d49652014-09-25 23:48:46 -0700106 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -0700107 this.ip = ip;
108 this.port = port;
109 this.msgCount = mc;
110 this.msgLength = ml;
111 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
112 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
113
114 for (int i = 0; i < wc; i++) {
115 iloops.add(new CustomIOLoop());
116 }
117 }
118
119 /**
120 * Starts the client workers.
121 *
tom1ae3d162014-09-26 09:38:16 -0700122 * @throws java.io.IOException if unable to open connection
toma7083182014-09-25 21:38:03 -0700123 */
124 public void start() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700125 messages = new Counter();
126 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700127
128 // First start up all the IO loops
129 for (CustomIOLoop l : iloops) {
130 ipool.execute(l);
131 }
132
tom2d6d3972014-09-25 22:38:57 -0700133 // Wait for all of them to get going
tom74d49652014-09-25 23:48:46 -0700134 for (CustomIOLoop l : iloops) {
135 l.awaitStart(1000);
136 }
toma7083182014-09-25 21:38:03 -0700137
138 // ... and Next open all connections; one-per-loop
139 for (CustomIOLoop l : iloops) {
140 openConnection(l);
141 }
142 }
143
144
145 /**
146 * Initiates open connection request and registers the pending socket
147 * channel with the given IO loop.
148 *
149 * @param loop loop with which the channel should be registered
tom1ae3d162014-09-26 09:38:16 -0700150 * @throws java.io.IOException if the socket could not be open or connected
toma7083182014-09-25 21:38:03 -0700151 */
152 private void openConnection(CustomIOLoop loop) throws IOException {
153 SocketAddress sa = new InetSocketAddress(ip, port);
154 SocketChannel ch = SocketChannel.open();
155 ch.configureBlocking(false);
156 loop.connectStream(ch);
157 ch.connect(sa);
158 }
159
160
161 /**
162 * Waits for the client workers to complete.
163 *
164 * @param secs timeout in seconds
tom1ae3d162014-09-26 09:38:16 -0700165 * @throws java.util.concurrent.ExecutionException if execution failed
166 * @throws InterruptedException if interrupt occurred while waiting
167 * @throws java.util.concurrent.TimeoutException if timeout occurred
toma7083182014-09-25 21:38:03 -0700168 */
169 public void await(int secs) throws InterruptedException,
170 ExecutionException, TimeoutException {
171 for (CustomIOLoop l : iloops) {
172 if (l.worker.task != null) {
173 l.worker.task.get(secs, TimeUnit.SECONDS);
tom1ae3d162014-09-26 09:38:16 -0700174 latencyTotal += l.latencyTotal;
175 latencyCount += l.latencyCount;
toma7083182014-09-25 21:38:03 -0700176 }
177 }
tom2d6d3972014-09-25 22:38:57 -0700178 messages.freeze();
179 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700180 }
181
182 /**
183 * Reports on the accumulated throughput trackers.
184 */
185 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700186 DecimalFormat f = new DecimalFormat("#,##0");
tom5f4df2d2014-09-26 12:19:51 -0700187 out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ns latency",
tom74d49652014-09-25 23:48:46 -0700188 f.format(messages.total()), f.format(bytes.total()),
189 f.format(messages.throughput()),
tom1ae3d162014-09-26 09:38:16 -0700190 f.format(bytes.throughput() / (1024 * msgLength)),
191 f.format(latencyTotal / latencyCount)));
toma7083182014-09-25 21:38:03 -0700192 }
193
194
195 // Loop for transfer of fixed-length messages
196 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
197
198 Worker worker = new Worker();
tom1ae3d162014-09-26 09:38:16 -0700199 long latencyTotal = 0;
200 long latencyCount = 0;
201
toma7083182014-09-25 21:38:03 -0700202
203 public CustomIOLoop() throws IOException {
204 super(500);
205 }
206
207
208 @Override
209 protected TestMessageStream createStream(ByteChannel channel) {
210 return new TestMessageStream(msgLength, channel, this);
211 }
212
213 @Override
tom74d49652014-09-25 23:48:46 -0700214 protected synchronized void removeStream(MessageStream<TestMessage> stream) {
215 super.removeStream(stream);
toma7083182014-09-25 21:38:03 -0700216
tom74d49652014-09-25 23:48:46 -0700217 messages.add(stream.messagesIn().total());
218 bytes.add(stream.bytesIn().total());
219
220// out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
221// FORMAT.format(stream.messagesIn().throughput()),
222// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
223// FORMAT.format(stream.messagesOut().throughput()),
224// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
225
226 stream.messagesOut().reset();
227 stream.bytesOut().reset();
toma7083182014-09-25 21:38:03 -0700228 }
229
230 @Override
231 protected void processMessages(List<TestMessage> messages,
tom1ae3d162014-09-26 09:38:16 -0700232 MessageStream<TestMessage> stream) {
233 for (TestMessage message : messages) {
234 // TODO: summarize latency data better
tom5f4df2d2014-09-26 12:19:51 -0700235 latencyTotal += nanoTime() - message.requestorTime();
tom1ae3d162014-09-26 09:38:16 -0700236 latencyCount++;
237 }
toma7083182014-09-25 21:38:03 -0700238 worker.release(messages.size());
239 }
240
241 @Override
242 protected void connect(SelectionKey key) {
243 super.connect(key);
244 TestMessageStream b = (TestMessageStream) key.attachment();
245 Worker w = ((CustomIOLoop) b.loop()).worker;
246 w.pump(b);
247 }
248
249 }
250
251 /**
252 * Auxiliary worker to connect and pump batched messages using blocking I/O.
253 */
254 private class Worker implements Runnable {
255
tom5f4df2d2014-09-26 12:19:51 -0700256 private static final int BATCH_SIZE = 50;
toma7083182014-09-25 21:38:03 -0700257 private static final int PERMITS = 2 * BATCH_SIZE;
258
tom1ae3d162014-09-26 09:38:16 -0700259 private TestMessageStream stream;
toma7083182014-09-25 21:38:03 -0700260 private FutureTask<Worker> task;
261
262 // Stuff to throttle pump
263 private final Semaphore semaphore = new Semaphore(PERMITS);
264 private int msgWritten;
265
tom1ae3d162014-09-26 09:38:16 -0700266 void pump(TestMessageStream stream) {
267 this.stream = stream;
toma7083182014-09-25 21:38:03 -0700268 task = new FutureTask<>(this, this);
269 wpool.execute(task);
270 }
271
272 @Override
273 public void run() {
274 try {
275 log.info("Worker started...");
276
toma7083182014-09-25 21:38:03 -0700277 while (msgWritten < msgCount) {
tom1ae3d162014-09-26 09:38:16 -0700278 int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
279 writeBatch(size);
280 msgWritten += size;
toma7083182014-09-25 21:38:03 -0700281 }
282
283 // Now try to get all the permits back before sending poison pill
284 semaphore.acquireUninterruptibly(PERMITS);
tom1ae3d162014-09-26 09:38:16 -0700285 stream.close();
toma7083182014-09-25 21:38:03 -0700286
287 log.info("Worker done...");
288
289 } catch (IOException e) {
290 log.error("Worker unable to perform I/O", e);
291 }
292 }
293
294
tom1ae3d162014-09-26 09:38:16 -0700295 private void writeBatch(int size) throws IOException {
296 // Build a batch of messages
297 List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
298 for (int i = 0; i < size; i++) {
tom5f4df2d2014-09-26 12:19:51 -0700299 batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding()));
toma7083182014-09-25 21:38:03 -0700300 }
tom1ae3d162014-09-26 09:38:16 -0700301 acquire(size);
302 stream.write(batch);
toma7083182014-09-25 21:38:03 -0700303 }
304
305
306 // Release permits based on the specified number of message credits
307 private void release(int permits) {
308 semaphore.release(permits);
309 }
310
311 // Acquire permit for a single batch
312 private void acquire(int permits) {
313 semaphore.acquireUninterruptibly(permits);
314 }
315
316 }
317
318}