blob: 5c96d497f72f414b4711c1c459ae95fcb56ef116 [file] [log] [blame]
tomf110fff2014-09-26 00:38:18 -07001package org.onlab.onos.foo;
2
tom0e0863f2014-09-26 09:02:33 -07003import com.google.common.collect.Lists;
tomf110fff2014-09-26 00:38:18 -07004import org.onlab.nio.IOLoop;
5import org.onlab.nio.MessageStream;
6import org.onlab.util.Counter;
7import org.slf4j.Logger;
8import org.slf4j.LoggerFactory;
9
10import java.io.IOException;
11import java.net.InetAddress;
12import java.net.InetSocketAddress;
13import java.net.SocketAddress;
14import java.nio.channels.ByteChannel;
15import java.nio.channels.SelectionKey;
16import java.nio.channels.SocketChannel;
17import java.text.DecimalFormat;
18import java.util.ArrayList;
19import java.util.List;
20import java.util.concurrent.ExecutionException;
21import java.util.concurrent.ExecutorService;
22import java.util.concurrent.Executors;
23import java.util.concurrent.FutureTask;
24import java.util.concurrent.Semaphore;
25import java.util.concurrent.TimeUnit;
26import java.util.concurrent.TimeoutException;
27
28import static java.lang.String.format;
tom0e0863f2014-09-26 09:02:33 -070029import static java.lang.System.currentTimeMillis;
tom5f4df2d2014-09-26 12:19:51 -070030import static java.lang.System.nanoTime;
tomf110fff2014-09-26 00:38:18 -070031import static java.lang.System.out;
32import static org.onlab.onos.foo.IOLoopTestServer.PORT;
33import static org.onlab.util.Tools.delay;
34import static org.onlab.util.Tools.namedThreads;
35
36/**
37 * Auxiliary test fixture to measure speed of NIO-based channels.
38 */
39public class IOLoopTestClient {
40
41 private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
42
43 private final InetAddress ip;
44 private final int port;
45 private final int msgCount;
46 private final int msgLength;
47
48 private final List<CustomIOLoop> iloops = new ArrayList<>();
49 private final ExecutorService ipool;
50 private final ExecutorService wpool;
51
52 Counter messages;
53 Counter bytes;
tom0e0863f2014-09-26 09:02:33 -070054 long latencyTotal = 0;
55 long latencyCount = 0;
56
tomf110fff2014-09-26 00:38:18 -070057
58 /**
59 * Main entry point to launch the client.
60 *
61 * @param args command-line arguments
tom0e0863f2014-09-26 09:02:33 -070062 * @throws java.io.IOException if unable to connect to server
63 * @throws InterruptedException if latch wait gets interrupted
64 * @throws java.util.concurrent.ExecutionException if wait gets interrupted
65 * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
tomf110fff2014-09-26 00:38:18 -070066 */
67 public static void main(String[] args)
68 throws IOException, InterruptedException, ExecutionException, TimeoutException {
69 startStandalone(args);
70
71 System.exit(0);
72 }
73
74 /**
75 * Starts a standalone IO loop test client.
76 *
77 * @param args command-line arguments
78 */
79 public static void startStandalone(String[] args)
80 throws IOException, InterruptedException, ExecutionException, TimeoutException {
81 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
82 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
83 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
84 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
85 int to = args.length > 4 ? Integer.parseInt(args[4]) : 30;
86
87 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
88 wc, mc, ml, ip);
89 IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
90
91 client.start();
92 delay(500);
93
94 client.await(to);
95 client.report();
96 }
97
98 /**
99 * Creates a speed client.
100 *
101 * @param ip ip address of server
102 * @param wc worker count
103 * @param mc message count to send per client
104 * @param ml message length in bytes
105 * @param port socket port
106 * @throws java.io.IOException if unable to create IO loops
107 */
108 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
109 this.ip = ip;
110 this.port = port;
111 this.msgCount = mc;
112 this.msgLength = ml;
113 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
114 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
115
116 for (int i = 0; i < wc; i++) {
117 iloops.add(new CustomIOLoop());
118 }
119 }
120
121 /**
122 * Starts the client workers.
123 *
124 * @throws java.io.IOException if unable to open connection
125 */
126 public void start() throws IOException {
127 messages = new Counter();
128 bytes = new Counter();
129
130 // First start up all the IO loops
131 for (CustomIOLoop l : iloops) {
132 ipool.execute(l);
133 }
134
135 // Wait for all of them to get going
136 for (CustomIOLoop l : iloops) {
137 l.awaitStart(1000);
138 }
139
140 // ... and Next open all connections; one-per-loop
141 for (CustomIOLoop l : iloops) {
142 openConnection(l);
143 }
144 }
145
146
147 /**
148 * Initiates open connection request and registers the pending socket
149 * channel with the given IO loop.
150 *
151 * @param loop loop with which the channel should be registered
152 * @throws java.io.IOException if the socket could not be open or connected
153 */
154 private void openConnection(CustomIOLoop loop) throws IOException {
155 SocketAddress sa = new InetSocketAddress(ip, port);
156 SocketChannel ch = SocketChannel.open();
157 ch.configureBlocking(false);
158 loop.connectStream(ch);
159 ch.connect(sa);
160 }
161
162
163 /**
164 * Waits for the client workers to complete.
165 *
166 * @param secs timeout in seconds
tom0e0863f2014-09-26 09:02:33 -0700167 * @throws java.util.concurrent.ExecutionException if execution failed
168 * @throws InterruptedException if interrupt occurred while waiting
169 * @throws java.util.concurrent.TimeoutException if timeout occurred
tomf110fff2014-09-26 00:38:18 -0700170 */
171 public void await(int secs) throws InterruptedException,
172 ExecutionException, TimeoutException {
173 for (CustomIOLoop l : iloops) {
174 if (l.worker.task != null) {
175 l.worker.task.get(secs, TimeUnit.SECONDS);
tom0e0863f2014-09-26 09:02:33 -0700176 latencyTotal += l.latencyTotal;
177 latencyCount += l.latencyCount;
tomf110fff2014-09-26 00:38:18 -0700178 }
179 }
180 messages.freeze();
181 bytes.freeze();
182 }
183
184 /**
185 * Reports on the accumulated throughput trackers.
186 */
187 public void report() {
188 DecimalFormat f = new DecimalFormat("#,##0");
tom5f4df2d2014-09-26 12:19:51 -0700189 out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ns latency",
tomf110fff2014-09-26 00:38:18 -0700190 f.format(messages.total()), f.format(bytes.total()),
191 f.format(messages.throughput()),
tom0e0863f2014-09-26 09:02:33 -0700192 f.format(bytes.throughput() / (1024 * msgLength)),
193 f.format(latencyTotal / latencyCount)));
tomf110fff2014-09-26 00:38:18 -0700194 }
195
196
197 // Loop for transfer of fixed-length messages
198 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
199
200 Worker worker = new Worker();
tom0e0863f2014-09-26 09:02:33 -0700201 long latencyTotal = 0;
202 long latencyCount = 0;
203
tomf110fff2014-09-26 00:38:18 -0700204
205 public CustomIOLoop() throws IOException {
206 super(500);
207 }
208
209
210 @Override
211 protected TestMessageStream createStream(ByteChannel channel) {
212 return new TestMessageStream(msgLength, channel, this);
213 }
214
215 @Override
216 protected synchronized void removeStream(MessageStream<TestMessage> stream) {
217 super.removeStream(stream);
218
219 messages.add(stream.messagesIn().total());
220 bytes.add(stream.bytesIn().total());
221
222// out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
223// FORMAT.format(stream.messagesIn().throughput()),
224// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
225// FORMAT.format(stream.messagesOut().throughput()),
226// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
227
228 stream.messagesOut().reset();
229 stream.bytesOut().reset();
230 }
231
232 @Override
233 protected void processMessages(List<TestMessage> messages,
tom0e0863f2014-09-26 09:02:33 -0700234 MessageStream<TestMessage> stream) {
235 for (TestMessage message : messages) {
236 // TODO: summarize latency data better
tom5f4df2d2014-09-26 12:19:51 -0700237 latencyTotal += nanoTime() - message.requestorTime();
tom0e0863f2014-09-26 09:02:33 -0700238 latencyCount++;
239 }
tomf110fff2014-09-26 00:38:18 -0700240 worker.release(messages.size());
241 }
242
243 @Override
244 protected void connect(SelectionKey key) {
245 super.connect(key);
246 TestMessageStream b = (TestMessageStream) key.attachment();
247 Worker w = ((CustomIOLoop) b.loop()).worker;
248 w.pump(b);
249 }
250
251 }
252
253 /**
254 * Auxiliary worker to connect and pump batched messages using blocking I/O.
255 */
256 private class Worker implements Runnable {
257
tom5f4df2d2014-09-26 12:19:51 -0700258 private static final int BATCH_SIZE = 10;
tomf110fff2014-09-26 00:38:18 -0700259 private static final int PERMITS = 2 * BATCH_SIZE;
260
tom0e0863f2014-09-26 09:02:33 -0700261 private TestMessageStream stream;
tomf110fff2014-09-26 00:38:18 -0700262 private FutureTask<Worker> task;
263
264 // Stuff to throttle pump
265 private final Semaphore semaphore = new Semaphore(PERMITS);
266 private int msgWritten;
267
tom0e0863f2014-09-26 09:02:33 -0700268 void pump(TestMessageStream stream) {
269 this.stream = stream;
tomf110fff2014-09-26 00:38:18 -0700270 task = new FutureTask<>(this, this);
271 wpool.execute(task);
272 }
273
274 @Override
275 public void run() {
276 try {
277 log.info("Worker started...");
278
tomf110fff2014-09-26 00:38:18 -0700279 while (msgWritten < msgCount) {
tom0e0863f2014-09-26 09:02:33 -0700280 int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
281 writeBatch(size);
282 msgWritten += size;
tomf110fff2014-09-26 00:38:18 -0700283 }
284
285 // Now try to get all the permits back before sending poison pill
286 semaphore.acquireUninterruptibly(PERMITS);
tom0e0863f2014-09-26 09:02:33 -0700287 stream.close();
tomf110fff2014-09-26 00:38:18 -0700288
289 log.info("Worker done...");
290
291 } catch (IOException e) {
292 log.error("Worker unable to perform I/O", e);
293 }
294 }
295
296
tom0e0863f2014-09-26 09:02:33 -0700297 private void writeBatch(int size) throws IOException {
298 // Build a batch of messages
299 List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
300 for (int i = 0; i < size; i++) {
tom5f4df2d2014-09-26 12:19:51 -0700301 batch.add(new TestMessage(msgLength, nanoTime(), 0,
tomf7e13b02014-09-26 11:12:25 -0700302 stream.padding()));
tomf110fff2014-09-26 00:38:18 -0700303 }
tom0e0863f2014-09-26 09:02:33 -0700304 acquire(size);
305 stream.write(batch);
tomf110fff2014-09-26 00:38:18 -0700306 }
307
308
309 // Release permits based on the specified number of message credits
310 private void release(int permits) {
311 semaphore.release(permits);
312 }
313
314 // Acquire permit for a single batch
315 private void acquire(int permits) {
316 semaphore.acquireUninterruptibly(permits);
317 }
318
319 }
320
321}