blob: c147f11a6e66cd442af72f44129fd08dab29a8f3 [file] [log] [blame]
tomf110fff2014-09-26 00:38:18 -07001package org.onlab.onos.foo;
2
tom0e0863f2014-09-26 09:02:33 -07003import com.google.common.collect.Lists;
tomf110fff2014-09-26 00:38:18 -07004import org.onlab.nio.IOLoop;
5import org.onlab.nio.MessageStream;
6import org.onlab.util.Counter;
7import org.slf4j.Logger;
8import org.slf4j.LoggerFactory;
9
10import java.io.IOException;
11import java.net.InetAddress;
12import java.net.InetSocketAddress;
13import java.net.SocketAddress;
14import java.nio.channels.ByteChannel;
15import java.nio.channels.SelectionKey;
16import java.nio.channels.SocketChannel;
17import java.text.DecimalFormat;
18import java.util.ArrayList;
19import java.util.List;
20import java.util.concurrent.ExecutionException;
21import java.util.concurrent.ExecutorService;
22import java.util.concurrent.Executors;
23import java.util.concurrent.FutureTask;
24import java.util.concurrent.Semaphore;
25import java.util.concurrent.TimeUnit;
26import java.util.concurrent.TimeoutException;
27
28import static java.lang.String.format;
tom5f4df2d2014-09-26 12:19:51 -070029import static java.lang.System.nanoTime;
tomf110fff2014-09-26 00:38:18 -070030import static java.lang.System.out;
31import static org.onlab.onos.foo.IOLoopTestServer.PORT;
32import static org.onlab.util.Tools.delay;
33import static org.onlab.util.Tools.namedThreads;
34
35/**
36 * Auxiliary test fixture to measure speed of NIO-based channels.
37 */
38public class IOLoopTestClient {
39
40 private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
41
42 private final InetAddress ip;
43 private final int port;
44 private final int msgCount;
45 private final int msgLength;
46
47 private final List<CustomIOLoop> iloops = new ArrayList<>();
48 private final ExecutorService ipool;
49 private final ExecutorService wpool;
50
51 Counter messages;
52 Counter bytes;
tom0e0863f2014-09-26 09:02:33 -070053 long latencyTotal = 0;
54 long latencyCount = 0;
55
tomf110fff2014-09-26 00:38:18 -070056
57 /**
58 * Main entry point to launch the client.
59 *
60 * @param args command-line arguments
tom0e0863f2014-09-26 09:02:33 -070061 * @throws java.io.IOException if unable to connect to server
62 * @throws InterruptedException if latch wait gets interrupted
63 * @throws java.util.concurrent.ExecutionException if wait gets interrupted
64 * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
tomf110fff2014-09-26 00:38:18 -070065 */
66 public static void main(String[] args)
67 throws IOException, InterruptedException, ExecutionException, TimeoutException {
68 startStandalone(args);
69
70 System.exit(0);
71 }
72
73 /**
74 * Starts a standalone IO loop test client.
75 *
76 * @param args command-line arguments
77 */
78 public static void startStandalone(String[] args)
79 throws IOException, InterruptedException, ExecutionException, TimeoutException {
80 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
81 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
82 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
83 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
tom14dc4d02014-09-26 12:43:14 -070084 int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
tomf110fff2014-09-26 00:38:18 -070085
86 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
87 wc, mc, ml, ip);
88 IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
89
90 client.start();
91 delay(500);
92
93 client.await(to);
94 client.report();
95 }
96
97 /**
98 * Creates a speed client.
99 *
100 * @param ip ip address of server
101 * @param wc worker count
102 * @param mc message count to send per client
103 * @param ml message length in bytes
104 * @param port socket port
105 * @throws java.io.IOException if unable to create IO loops
106 */
107 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
108 this.ip = ip;
109 this.port = port;
110 this.msgCount = mc;
111 this.msgLength = ml;
112 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
113 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
114
115 for (int i = 0; i < wc; i++) {
116 iloops.add(new CustomIOLoop());
117 }
118 }
119
120 /**
121 * Starts the client workers.
122 *
123 * @throws java.io.IOException if unable to open connection
124 */
125 public void start() throws IOException {
126 messages = new Counter();
127 bytes = new Counter();
128
129 // First start up all the IO loops
130 for (CustomIOLoop l : iloops) {
131 ipool.execute(l);
132 }
133
134 // Wait for all of them to get going
135 for (CustomIOLoop l : iloops) {
136 l.awaitStart(1000);
137 }
138
139 // ... and Next open all connections; one-per-loop
140 for (CustomIOLoop l : iloops) {
141 openConnection(l);
142 }
143 }
144
145
146 /**
147 * Initiates open connection request and registers the pending socket
148 * channel with the given IO loop.
149 *
150 * @param loop loop with which the channel should be registered
151 * @throws java.io.IOException if the socket could not be open or connected
152 */
153 private void openConnection(CustomIOLoop loop) throws IOException {
154 SocketAddress sa = new InetSocketAddress(ip, port);
155 SocketChannel ch = SocketChannel.open();
156 ch.configureBlocking(false);
157 loop.connectStream(ch);
158 ch.connect(sa);
159 }
160
161
162 /**
163 * Waits for the client workers to complete.
164 *
165 * @param secs timeout in seconds
tom0e0863f2014-09-26 09:02:33 -0700166 * @throws java.util.concurrent.ExecutionException if execution failed
167 * @throws InterruptedException if interrupt occurred while waiting
168 * @throws java.util.concurrent.TimeoutException if timeout occurred
tomf110fff2014-09-26 00:38:18 -0700169 */
170 public void await(int secs) throws InterruptedException,
171 ExecutionException, TimeoutException {
172 for (CustomIOLoop l : iloops) {
173 if (l.worker.task != null) {
174 l.worker.task.get(secs, TimeUnit.SECONDS);
tom0e0863f2014-09-26 09:02:33 -0700175 latencyTotal += l.latencyTotal;
176 latencyCount += l.latencyCount;
tomf110fff2014-09-26 00:38:18 -0700177 }
178 }
179 messages.freeze();
180 bytes.freeze();
181 }
182
183 /**
tombf474382014-10-02 07:36:50 -0700184 * Reports on the accumulated throughput and latency.
tomf110fff2014-09-26 00:38:18 -0700185 */
186 public void report() {
187 DecimalFormat f = new DecimalFormat("#,##0");
tom14dc4d02014-09-26 12:43:14 -0700188 out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
tomf110fff2014-09-26 00:38:18 -0700189 f.format(messages.total()), f.format(bytes.total()),
190 f.format(messages.throughput()),
tom0e0863f2014-09-26 09:02:33 -0700191 f.format(bytes.throughput() / (1024 * msgLength)),
192 f.format(latencyTotal / latencyCount)));
tomf110fff2014-09-26 00:38:18 -0700193 }
194
195
196 // Loop for transfer of fixed-length messages
197 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
198
199 Worker worker = new Worker();
tom0e0863f2014-09-26 09:02:33 -0700200 long latencyTotal = 0;
201 long latencyCount = 0;
202
tomf110fff2014-09-26 00:38:18 -0700203
204 public CustomIOLoop() throws IOException {
205 super(500);
206 }
207
208
209 @Override
210 protected TestMessageStream createStream(ByteChannel channel) {
211 return new TestMessageStream(msgLength, channel, this);
212 }
213
214 @Override
215 protected synchronized void removeStream(MessageStream<TestMessage> stream) {
216 super.removeStream(stream);
217
218 messages.add(stream.messagesIn().total());
219 bytes.add(stream.bytesIn().total());
tomf110fff2014-09-26 00:38:18 -0700220 stream.messagesOut().reset();
221 stream.bytesOut().reset();
222 }
223
224 @Override
225 protected void processMessages(List<TestMessage> messages,
tom0e0863f2014-09-26 09:02:33 -0700226 MessageStream<TestMessage> stream) {
227 for (TestMessage message : messages) {
228 // TODO: summarize latency data better
tom5f4df2d2014-09-26 12:19:51 -0700229 latencyTotal += nanoTime() - message.requestorTime();
tom0e0863f2014-09-26 09:02:33 -0700230 latencyCount++;
231 }
tomf110fff2014-09-26 00:38:18 -0700232 worker.release(messages.size());
233 }
234
235 @Override
tom5a8779c2014-09-29 14:48:43 -0700236 protected void connect(SelectionKey key) throws IOException {
tomf110fff2014-09-26 00:38:18 -0700237 super.connect(key);
238 TestMessageStream b = (TestMessageStream) key.attachment();
239 Worker w = ((CustomIOLoop) b.loop()).worker;
240 w.pump(b);
241 }
242
243 }
244
245 /**
246 * Auxiliary worker to connect and pump batched messages using blocking I/O.
247 */
248 private class Worker implements Runnable {
249
tom5f4df2d2014-09-26 12:19:51 -0700250 private static final int BATCH_SIZE = 10;
tomf110fff2014-09-26 00:38:18 -0700251 private static final int PERMITS = 2 * BATCH_SIZE;
252
tom0e0863f2014-09-26 09:02:33 -0700253 private TestMessageStream stream;
tomf110fff2014-09-26 00:38:18 -0700254 private FutureTask<Worker> task;
255
256 // Stuff to throttle pump
257 private final Semaphore semaphore = new Semaphore(PERMITS);
258 private int msgWritten;
259
tom0e0863f2014-09-26 09:02:33 -0700260 void pump(TestMessageStream stream) {
261 this.stream = stream;
tomf110fff2014-09-26 00:38:18 -0700262 task = new FutureTask<>(this, this);
263 wpool.execute(task);
264 }
265
266 @Override
267 public void run() {
268 try {
269 log.info("Worker started...");
270
tomf110fff2014-09-26 00:38:18 -0700271 while (msgWritten < msgCount) {
tom0e0863f2014-09-26 09:02:33 -0700272 int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
273 writeBatch(size);
274 msgWritten += size;
tomf110fff2014-09-26 00:38:18 -0700275 }
276
277 // Now try to get all the permits back before sending poison pill
278 semaphore.acquireUninterruptibly(PERMITS);
tom0e0863f2014-09-26 09:02:33 -0700279 stream.close();
tomf110fff2014-09-26 00:38:18 -0700280
281 log.info("Worker done...");
282
283 } catch (IOException e) {
284 log.error("Worker unable to perform I/O", e);
285 }
286 }
287
288
tom0e0863f2014-09-26 09:02:33 -0700289 private void writeBatch(int size) throws IOException {
290 // Build a batch of messages
291 List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
292 for (int i = 0; i < size; i++) {
tom5f4df2d2014-09-26 12:19:51 -0700293 batch.add(new TestMessage(msgLength, nanoTime(), 0,
tomf7e13b02014-09-26 11:12:25 -0700294 stream.padding()));
tomf110fff2014-09-26 00:38:18 -0700295 }
tom0e0863f2014-09-26 09:02:33 -0700296 acquire(size);
297 stream.write(batch);
tomf110fff2014-09-26 00:38:18 -0700298 }
299
300
301 // Release permits based on the specified number of message credits
302 private void release(int permits) {
303 semaphore.release(permits);
304 }
305
306 // Acquire permit for a single batch
307 private void acquire(int permits) {
308 semaphore.acquireUninterruptibly(permits);
309 }
310
311 }
312
313}