blob: 6f6bd6d14779355e43394ecf343d7e6d2b500e40 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
tom1ae3d162014-09-26 09:38:16 -07003import com.google.common.collect.Lists;
tom2d6d3972014-09-25 22:38:57 -07004import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -07005import org.slf4j.Logger;
6import org.slf4j.LoggerFactory;
7
8import java.io.IOException;
9import java.net.InetAddress;
10import java.net.InetSocketAddress;
11import java.net.SocketAddress;
12import java.nio.channels.ByteChannel;
13import java.nio.channels.SelectionKey;
14import java.nio.channels.SocketChannel;
tom2d6d3972014-09-25 22:38:57 -070015import java.text.DecimalFormat;
toma7083182014-09-25 21:38:03 -070016import java.util.ArrayList;
17import java.util.List;
18import java.util.concurrent.ExecutionException;
19import java.util.concurrent.ExecutorService;
20import java.util.concurrent.Executors;
21import java.util.concurrent.FutureTask;
22import java.util.concurrent.Semaphore;
23import java.util.concurrent.TimeUnit;
24import java.util.concurrent.TimeoutException;
25
tom74d49652014-09-25 23:48:46 -070026import static java.lang.String.format;
tom1ae3d162014-09-26 09:38:16 -070027import static java.lang.System.currentTimeMillis;
tom74d49652014-09-25 23:48:46 -070028import static java.lang.System.out;
tom74d49652014-09-25 23:48:46 -070029import static org.onlab.nio.IOLoopTestServer.PORT;
tomf110fff2014-09-26 00:38:18 -070030import static org.onlab.util.Tools.delay;
toma7083182014-09-25 21:38:03 -070031import static org.onlab.util.Tools.namedThreads;
32
33/**
34 * Auxiliary test fixture to measure speed of NIO-based channels.
35 */
tom74d49652014-09-25 23:48:46 -070036public class IOLoopTestClient {
toma7083182014-09-25 21:38:03 -070037
tom74d49652014-09-25 23:48:46 -070038 private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
toma7083182014-09-25 21:38:03 -070039
40 private final InetAddress ip;
41 private final int port;
42 private final int msgCount;
43 private final int msgLength;
44
45 private final List<CustomIOLoop> iloops = new ArrayList<>();
46 private final ExecutorService ipool;
47 private final ExecutorService wpool;
48
tom2d6d3972014-09-25 22:38:57 -070049 Counter messages;
50 Counter bytes;
tom1ae3d162014-09-26 09:38:16 -070051 long latencyTotal = 0;
52 long latencyCount = 0;
53
toma7083182014-09-25 21:38:03 -070054
55 /**
56 * Main entry point to launch the client.
57 *
58 * @param args command-line arguments
tom1ae3d162014-09-26 09:38:16 -070059 * @throws java.io.IOException if unable to connect to server
60 * @throws InterruptedException if latch wait gets interrupted
61 * @throws java.util.concurrent.ExecutionException if wait gets interrupted
62 * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
toma7083182014-09-25 21:38:03 -070063 */
64 public static void main(String[] args)
65 throws IOException, InterruptedException, ExecutionException, TimeoutException {
tom74d49652014-09-25 23:48:46 -070066 startStandalone(args);
67
68 System.exit(0);
69 }
70
71 /**
72 * Starts a standalone IO loop test client.
73 *
74 * @param args command-line arguments
75 */
76 public static void startStandalone(String[] args)
77 throws IOException, InterruptedException, ExecutionException, TimeoutException {
toma7083182014-09-25 21:38:03 -070078 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
79 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
80 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
81 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
82 int to = args.length > 4 ? Integer.parseInt(args[4]) : 30;
83
84 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
85 wc, mc, ml, ip);
tom74d49652014-09-25 23:48:46 -070086 IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
toma7083182014-09-25 21:38:03 -070087
tom74d49652014-09-25 23:48:46 -070088 client.start();
tomf110fff2014-09-26 00:38:18 -070089 delay(500);
toma7083182014-09-25 21:38:03 -070090
tom74d49652014-09-25 23:48:46 -070091 client.await(to);
92 client.report();
toma7083182014-09-25 21:38:03 -070093 }
94
95 /**
96 * Creates a speed client.
97 *
98 * @param ip ip address of server
99 * @param wc worker count
100 * @param mc message count to send per client
101 * @param ml message length in bytes
102 * @param port socket port
tom1ae3d162014-09-26 09:38:16 -0700103 * @throws java.io.IOException if unable to create IO loops
toma7083182014-09-25 21:38:03 -0700104 */
tom74d49652014-09-25 23:48:46 -0700105 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -0700106 this.ip = ip;
107 this.port = port;
108 this.msgCount = mc;
109 this.msgLength = ml;
110 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
111 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
112
113 for (int i = 0; i < wc; i++) {
114 iloops.add(new CustomIOLoop());
115 }
116 }
117
118 /**
119 * Starts the client workers.
120 *
tom1ae3d162014-09-26 09:38:16 -0700121 * @throws java.io.IOException if unable to open connection
toma7083182014-09-25 21:38:03 -0700122 */
123 public void start() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700124 messages = new Counter();
125 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700126
127 // First start up all the IO loops
128 for (CustomIOLoop l : iloops) {
129 ipool.execute(l);
130 }
131
tom2d6d3972014-09-25 22:38:57 -0700132 // Wait for all of them to get going
tom74d49652014-09-25 23:48:46 -0700133 for (CustomIOLoop l : iloops) {
134 l.awaitStart(1000);
135 }
toma7083182014-09-25 21:38:03 -0700136
137 // ... and Next open all connections; one-per-loop
138 for (CustomIOLoop l : iloops) {
139 openConnection(l);
140 }
141 }
142
143
144 /**
145 * Initiates open connection request and registers the pending socket
146 * channel with the given IO loop.
147 *
148 * @param loop loop with which the channel should be registered
tom1ae3d162014-09-26 09:38:16 -0700149 * @throws java.io.IOException if the socket could not be open or connected
toma7083182014-09-25 21:38:03 -0700150 */
151 private void openConnection(CustomIOLoop loop) throws IOException {
152 SocketAddress sa = new InetSocketAddress(ip, port);
153 SocketChannel ch = SocketChannel.open();
154 ch.configureBlocking(false);
155 loop.connectStream(ch);
156 ch.connect(sa);
157 }
158
159
160 /**
161 * Waits for the client workers to complete.
162 *
163 * @param secs timeout in seconds
tom1ae3d162014-09-26 09:38:16 -0700164 * @throws java.util.concurrent.ExecutionException if execution failed
165 * @throws InterruptedException if interrupt occurred while waiting
166 * @throws java.util.concurrent.TimeoutException if timeout occurred
toma7083182014-09-25 21:38:03 -0700167 */
168 public void await(int secs) throws InterruptedException,
169 ExecutionException, TimeoutException {
170 for (CustomIOLoop l : iloops) {
171 if (l.worker.task != null) {
172 l.worker.task.get(secs, TimeUnit.SECONDS);
tom1ae3d162014-09-26 09:38:16 -0700173 latencyTotal += l.latencyTotal;
174 latencyCount += l.latencyCount;
toma7083182014-09-25 21:38:03 -0700175 }
176 }
tom2d6d3972014-09-25 22:38:57 -0700177 messages.freeze();
178 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700179 }
180
181 /**
182 * Reports on the accumulated throughput trackers.
183 */
184 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700185 DecimalFormat f = new DecimalFormat("#,##0");
tom1ae3d162014-09-26 09:38:16 -0700186 out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency",
tom74d49652014-09-25 23:48:46 -0700187 f.format(messages.total()), f.format(bytes.total()),
188 f.format(messages.throughput()),
tom1ae3d162014-09-26 09:38:16 -0700189 f.format(bytes.throughput() / (1024 * msgLength)),
190 f.format(latencyTotal / latencyCount)));
toma7083182014-09-25 21:38:03 -0700191 }
192
193
194 // Loop for transfer of fixed-length messages
195 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
196
197 Worker worker = new Worker();
tom1ae3d162014-09-26 09:38:16 -0700198 long latencyTotal = 0;
199 long latencyCount = 0;
200
toma7083182014-09-25 21:38:03 -0700201
202 public CustomIOLoop() throws IOException {
203 super(500);
204 }
205
206
207 @Override
208 protected TestMessageStream createStream(ByteChannel channel) {
209 return new TestMessageStream(msgLength, channel, this);
210 }
211
212 @Override
tom74d49652014-09-25 23:48:46 -0700213 protected synchronized void removeStream(MessageStream<TestMessage> stream) {
214 super.removeStream(stream);
toma7083182014-09-25 21:38:03 -0700215
tom74d49652014-09-25 23:48:46 -0700216 messages.add(stream.messagesIn().total());
217 bytes.add(stream.bytesIn().total());
218
219// out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
220// FORMAT.format(stream.messagesIn().throughput()),
221// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
222// FORMAT.format(stream.messagesOut().throughput()),
223// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
224
225 stream.messagesOut().reset();
226 stream.bytesOut().reset();
toma7083182014-09-25 21:38:03 -0700227 }
228
229 @Override
230 protected void processMessages(List<TestMessage> messages,
tom1ae3d162014-09-26 09:38:16 -0700231 MessageStream<TestMessage> stream) {
232 for (TestMessage message : messages) {
233 // TODO: summarize latency data better
234 latencyTotal += currentTimeMillis() - message.requestorTime();
235 latencyCount++;
236 }
toma7083182014-09-25 21:38:03 -0700237 worker.release(messages.size());
238 }
239
240 @Override
241 protected void connect(SelectionKey key) {
242 super.connect(key);
243 TestMessageStream b = (TestMessageStream) key.attachment();
244 Worker w = ((CustomIOLoop) b.loop()).worker;
245 w.pump(b);
246 }
247
248 }
249
250 /**
251 * Auxiliary worker to connect and pump batched messages using blocking I/O.
252 */
253 private class Worker implements Runnable {
254
255 private static final int BATCH_SIZE = 1000;
256 private static final int PERMITS = 2 * BATCH_SIZE;
257
tom1ae3d162014-09-26 09:38:16 -0700258 private TestMessageStream stream;
toma7083182014-09-25 21:38:03 -0700259 private FutureTask<Worker> task;
260
261 // Stuff to throttle pump
262 private final Semaphore semaphore = new Semaphore(PERMITS);
263 private int msgWritten;
264
tom1ae3d162014-09-26 09:38:16 -0700265 void pump(TestMessageStream stream) {
266 this.stream = stream;
toma7083182014-09-25 21:38:03 -0700267 task = new FutureTask<>(this, this);
268 wpool.execute(task);
269 }
270
271 @Override
272 public void run() {
273 try {
274 log.info("Worker started...");
275
toma7083182014-09-25 21:38:03 -0700276 while (msgWritten < msgCount) {
tom1ae3d162014-09-26 09:38:16 -0700277 int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
278 writeBatch(size);
279 msgWritten += size;
toma7083182014-09-25 21:38:03 -0700280 }
281
282 // Now try to get all the permits back before sending poison pill
283 semaphore.acquireUninterruptibly(PERMITS);
tom1ae3d162014-09-26 09:38:16 -0700284 stream.close();
toma7083182014-09-25 21:38:03 -0700285
286 log.info("Worker done...");
287
288 } catch (IOException e) {
289 log.error("Worker unable to perform I/O", e);
290 }
291 }
292
293
tom1ae3d162014-09-26 09:38:16 -0700294 private void writeBatch(int size) throws IOException {
295 // Build a batch of messages
296 List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
297 for (int i = 0; i < size; i++) {
298 batch.add(new TestMessage(msgLength, currentTimeMillis(), 0,
299 stream.padding()));
toma7083182014-09-25 21:38:03 -0700300 }
tom1ae3d162014-09-26 09:38:16 -0700301 acquire(size);
302 stream.write(batch);
toma7083182014-09-25 21:38:03 -0700303 }
304
305
306 // Release permits based on the specified number of message credits
307 private void release(int permits) {
308 semaphore.release(permits);
309 }
310
311 // Acquire permit for a single batch
312 private void acquire(int permits) {
313 semaphore.acquireUninterruptibly(permits);
314 }
315
316 }
317
318}