blob: ca8d92ec8ca01d7ba03b67fbc506871c745448d3 [file] [log] [blame]
Thomas Vachuska781d18b2014-10-27 10:31:25 -07001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
tomf110fff2014-09-26 00:38:18 -070019package org.onlab.onos.foo;
20
tom0e0863f2014-09-26 09:02:33 -070021import com.google.common.collect.Lists;
tomf110fff2014-09-26 00:38:18 -070022import org.onlab.nio.IOLoop;
23import org.onlab.nio.MessageStream;
24import org.onlab.util.Counter;
25import org.slf4j.Logger;
26import org.slf4j.LoggerFactory;
27
28import java.io.IOException;
29import java.net.InetAddress;
30import java.net.InetSocketAddress;
31import java.net.SocketAddress;
32import java.nio.channels.ByteChannel;
33import java.nio.channels.SelectionKey;
34import java.nio.channels.SocketChannel;
35import java.text.DecimalFormat;
36import java.util.ArrayList;
37import java.util.List;
38import java.util.concurrent.ExecutionException;
39import java.util.concurrent.ExecutorService;
40import java.util.concurrent.Executors;
41import java.util.concurrent.FutureTask;
42import java.util.concurrent.Semaphore;
43import java.util.concurrent.TimeUnit;
44import java.util.concurrent.TimeoutException;
45
46import static java.lang.String.format;
tom5f4df2d2014-09-26 12:19:51 -070047import static java.lang.System.nanoTime;
tomf110fff2014-09-26 00:38:18 -070048import static java.lang.System.out;
49import static org.onlab.onos.foo.IOLoopTestServer.PORT;
50import static org.onlab.util.Tools.delay;
51import static org.onlab.util.Tools.namedThreads;
52
53/**
54 * Auxiliary test fixture to measure speed of NIO-based channels.
55 */
56public class IOLoopTestClient {
57
58 private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
59
60 private final InetAddress ip;
61 private final int port;
62 private final int msgCount;
63 private final int msgLength;
64
65 private final List<CustomIOLoop> iloops = new ArrayList<>();
66 private final ExecutorService ipool;
67 private final ExecutorService wpool;
68
69 Counter messages;
70 Counter bytes;
tom0e0863f2014-09-26 09:02:33 -070071 long latencyTotal = 0;
72 long latencyCount = 0;
73
tomf110fff2014-09-26 00:38:18 -070074
75 /**
76 * Main entry point to launch the client.
77 *
78 * @param args command-line arguments
tom0e0863f2014-09-26 09:02:33 -070079 * @throws java.io.IOException if unable to connect to server
80 * @throws InterruptedException if latch wait gets interrupted
81 * @throws java.util.concurrent.ExecutionException if wait gets interrupted
82 * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
tomf110fff2014-09-26 00:38:18 -070083 */
84 public static void main(String[] args)
85 throws IOException, InterruptedException, ExecutionException, TimeoutException {
86 startStandalone(args);
87
88 System.exit(0);
89 }
90
91 /**
92 * Starts a standalone IO loop test client.
93 *
94 * @param args command-line arguments
95 */
96 public static void startStandalone(String[] args)
97 throws IOException, InterruptedException, ExecutionException, TimeoutException {
98 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
99 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
100 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
101 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
tom14dc4d02014-09-26 12:43:14 -0700102 int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
tomf110fff2014-09-26 00:38:18 -0700103
104 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
105 wc, mc, ml, ip);
106 IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
107
108 client.start();
109 delay(500);
110
111 client.await(to);
112 client.report();
113 }
114
115 /**
116 * Creates a speed client.
117 *
118 * @param ip ip address of server
119 * @param wc worker count
120 * @param mc message count to send per client
121 * @param ml message length in bytes
122 * @param port socket port
123 * @throws java.io.IOException if unable to create IO loops
124 */
125 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
126 this.ip = ip;
127 this.port = port;
128 this.msgCount = mc;
129 this.msgLength = ml;
130 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
131 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
132
133 for (int i = 0; i < wc; i++) {
134 iloops.add(new CustomIOLoop());
135 }
136 }
137
138 /**
139 * Starts the client workers.
140 *
141 * @throws java.io.IOException if unable to open connection
142 */
143 public void start() throws IOException {
144 messages = new Counter();
145 bytes = new Counter();
146
147 // First start up all the IO loops
148 for (CustomIOLoop l : iloops) {
149 ipool.execute(l);
150 }
151
152 // Wait for all of them to get going
153 for (CustomIOLoop l : iloops) {
154 l.awaitStart(1000);
155 }
156
157 // ... and Next open all connections; one-per-loop
158 for (CustomIOLoop l : iloops) {
159 openConnection(l);
160 }
161 }
162
163
164 /**
165 * Initiates open connection request and registers the pending socket
166 * channel with the given IO loop.
167 *
168 * @param loop loop with which the channel should be registered
169 * @throws java.io.IOException if the socket could not be open or connected
170 */
171 private void openConnection(CustomIOLoop loop) throws IOException {
172 SocketAddress sa = new InetSocketAddress(ip, port);
173 SocketChannel ch = SocketChannel.open();
174 ch.configureBlocking(false);
175 loop.connectStream(ch);
176 ch.connect(sa);
177 }
178
179
180 /**
181 * Waits for the client workers to complete.
182 *
183 * @param secs timeout in seconds
tom0e0863f2014-09-26 09:02:33 -0700184 * @throws java.util.concurrent.ExecutionException if execution failed
185 * @throws InterruptedException if interrupt occurred while waiting
186 * @throws java.util.concurrent.TimeoutException if timeout occurred
tomf110fff2014-09-26 00:38:18 -0700187 */
188 public void await(int secs) throws InterruptedException,
189 ExecutionException, TimeoutException {
190 for (CustomIOLoop l : iloops) {
191 if (l.worker.task != null) {
192 l.worker.task.get(secs, TimeUnit.SECONDS);
tom0e0863f2014-09-26 09:02:33 -0700193 latencyTotal += l.latencyTotal;
194 latencyCount += l.latencyCount;
tomf110fff2014-09-26 00:38:18 -0700195 }
196 }
197 messages.freeze();
198 bytes.freeze();
199 }
200
201 /**
tombf474382014-10-02 07:36:50 -0700202 * Reports on the accumulated throughput and latency.
tomf110fff2014-09-26 00:38:18 -0700203 */
204 public void report() {
205 DecimalFormat f = new DecimalFormat("#,##0");
tom14dc4d02014-09-26 12:43:14 -0700206 out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
tomf110fff2014-09-26 00:38:18 -0700207 f.format(messages.total()), f.format(bytes.total()),
208 f.format(messages.throughput()),
tom0e0863f2014-09-26 09:02:33 -0700209 f.format(bytes.throughput() / (1024 * msgLength)),
210 f.format(latencyTotal / latencyCount)));
tomf110fff2014-09-26 00:38:18 -0700211 }
212
213
214 // Loop for transfer of fixed-length messages
215 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
216
217 Worker worker = new Worker();
tom0e0863f2014-09-26 09:02:33 -0700218 long latencyTotal = 0;
219 long latencyCount = 0;
220
tomf110fff2014-09-26 00:38:18 -0700221
222 public CustomIOLoop() throws IOException {
223 super(500);
224 }
225
226
227 @Override
228 protected TestMessageStream createStream(ByteChannel channel) {
229 return new TestMessageStream(msgLength, channel, this);
230 }
231
232 @Override
233 protected synchronized void removeStream(MessageStream<TestMessage> stream) {
234 super.removeStream(stream);
235
236 messages.add(stream.messagesIn().total());
237 bytes.add(stream.bytesIn().total());
tomf110fff2014-09-26 00:38:18 -0700238 stream.messagesOut().reset();
239 stream.bytesOut().reset();
240 }
241
242 @Override
243 protected void processMessages(List<TestMessage> messages,
tom0e0863f2014-09-26 09:02:33 -0700244 MessageStream<TestMessage> stream) {
245 for (TestMessage message : messages) {
246 // TODO: summarize latency data better
tom5f4df2d2014-09-26 12:19:51 -0700247 latencyTotal += nanoTime() - message.requestorTime();
tom0e0863f2014-09-26 09:02:33 -0700248 latencyCount++;
249 }
tomf110fff2014-09-26 00:38:18 -0700250 worker.release(messages.size());
251 }
252
253 @Override
tom5a8779c2014-09-29 14:48:43 -0700254 protected void connect(SelectionKey key) throws IOException {
tomf110fff2014-09-26 00:38:18 -0700255 super.connect(key);
256 TestMessageStream b = (TestMessageStream) key.attachment();
257 Worker w = ((CustomIOLoop) b.loop()).worker;
258 w.pump(b);
259 }
260
261 }
262
263 /**
264 * Auxiliary worker to connect and pump batched messages using blocking I/O.
265 */
266 private class Worker implements Runnable {
267
tom5f4df2d2014-09-26 12:19:51 -0700268 private static final int BATCH_SIZE = 10;
tomf110fff2014-09-26 00:38:18 -0700269 private static final int PERMITS = 2 * BATCH_SIZE;
270
tom0e0863f2014-09-26 09:02:33 -0700271 private TestMessageStream stream;
tomf110fff2014-09-26 00:38:18 -0700272 private FutureTask<Worker> task;
273
274 // Stuff to throttle pump
275 private final Semaphore semaphore = new Semaphore(PERMITS);
276 private int msgWritten;
277
tom0e0863f2014-09-26 09:02:33 -0700278 void pump(TestMessageStream stream) {
279 this.stream = stream;
tomf110fff2014-09-26 00:38:18 -0700280 task = new FutureTask<>(this, this);
281 wpool.execute(task);
282 }
283
284 @Override
285 public void run() {
286 try {
287 log.info("Worker started...");
288
tomf110fff2014-09-26 00:38:18 -0700289 while (msgWritten < msgCount) {
tom0e0863f2014-09-26 09:02:33 -0700290 int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
291 writeBatch(size);
292 msgWritten += size;
tomf110fff2014-09-26 00:38:18 -0700293 }
294
295 // Now try to get all the permits back before sending poison pill
296 semaphore.acquireUninterruptibly(PERMITS);
tom0e0863f2014-09-26 09:02:33 -0700297 stream.close();
tomf110fff2014-09-26 00:38:18 -0700298
299 log.info("Worker done...");
300
301 } catch (IOException e) {
302 log.error("Worker unable to perform I/O", e);
303 }
304 }
305
306
tom0e0863f2014-09-26 09:02:33 -0700307 private void writeBatch(int size) throws IOException {
308 // Build a batch of messages
309 List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
310 for (int i = 0; i < size; i++) {
tom5f4df2d2014-09-26 12:19:51 -0700311 batch.add(new TestMessage(msgLength, nanoTime(), 0,
tomf7e13b02014-09-26 11:12:25 -0700312 stream.padding()));
tomf110fff2014-09-26 00:38:18 -0700313 }
tom0e0863f2014-09-26 09:02:33 -0700314 acquire(size);
315 stream.write(batch);
tomf110fff2014-09-26 00:38:18 -0700316 }
317
318
319 // Release permits based on the specified number of message credits
320 private void release(int permits) {
321 semaphore.release(permits);
322 }
323
324 // Acquire permit for a single batch
325 private void acquire(int permits) {
326 semaphore.acquireUninterruptibly(permits);
327 }
328
329 }
330
331}