blob: 8121cc3ba1d56e7c86e5e5cb98a05bd54c170640 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
tom2d6d3972014-09-25 22:38:57 -07003import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -07004import org.slf4j.Logger;
5import org.slf4j.LoggerFactory;
6
7import java.io.IOException;
8import java.net.InetAddress;
9import java.net.InetSocketAddress;
10import java.net.SocketAddress;
11import java.nio.channels.ByteChannel;
12import java.nio.channels.SelectionKey;
13import java.nio.channels.SocketChannel;
tom2d6d3972014-09-25 22:38:57 -070014import java.text.DecimalFormat;
toma7083182014-09-25 21:38:03 -070015import java.util.ArrayList;
16import java.util.List;
17import java.util.concurrent.ExecutionException;
18import java.util.concurrent.ExecutorService;
19import java.util.concurrent.Executors;
20import java.util.concurrent.FutureTask;
21import java.util.concurrent.Semaphore;
22import java.util.concurrent.TimeUnit;
23import java.util.concurrent.TimeoutException;
24
tom74d49652014-09-25 23:48:46 -070025import static java.lang.String.format;
26import static java.lang.System.out;
toma7083182014-09-25 21:38:03 -070027import static org.onlab.junit.TestTools.delay;
tom74d49652014-09-25 23:48:46 -070028import static org.onlab.nio.IOLoopTestServer.PORT;
toma7083182014-09-25 21:38:03 -070029import static org.onlab.util.Tools.namedThreads;
30
31/**
32 * Auxiliary test fixture to measure speed of NIO-based channels.
33 */
tom74d49652014-09-25 23:48:46 -070034public class IOLoopTestClient {
toma7083182014-09-25 21:38:03 -070035
tom74d49652014-09-25 23:48:46 -070036 private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
toma7083182014-09-25 21:38:03 -070037
38 private final InetAddress ip;
39 private final int port;
40 private final int msgCount;
41 private final int msgLength;
42
43 private final List<CustomIOLoop> iloops = new ArrayList<>();
44 private final ExecutorService ipool;
45 private final ExecutorService wpool;
46
tom2d6d3972014-09-25 22:38:57 -070047 Counter messages;
48 Counter bytes;
toma7083182014-09-25 21:38:03 -070049
50 /**
51 * Main entry point to launch the client.
52 *
53 * @param args command-line arguments
54 * @throws IOException if unable to connect to server
55 * @throws InterruptedException if latch wait gets interrupted
56 * @throws ExecutionException if wait gets interrupted
57 * @throws TimeoutException if timeout occurred while waiting for completion
58 */
59 public static void main(String[] args)
60 throws IOException, InterruptedException, ExecutionException, TimeoutException {
tom74d49652014-09-25 23:48:46 -070061 startStandalone(args);
62
63 System.exit(0);
64 }
65
66 /**
67 * Starts a standalone IO loop test client.
68 *
69 * @param args command-line arguments
70 */
71 public static void startStandalone(String[] args)
72 throws IOException, InterruptedException, ExecutionException, TimeoutException {
toma7083182014-09-25 21:38:03 -070073 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
74 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
75 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
76 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
77 int to = args.length > 4 ? Integer.parseInt(args[4]) : 30;
78
79 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
80 wc, mc, ml, ip);
tom74d49652014-09-25 23:48:46 -070081 IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
toma7083182014-09-25 21:38:03 -070082
tom74d49652014-09-25 23:48:46 -070083 client.start();
toma7083182014-09-25 21:38:03 -070084 delay(2000);
85
tom74d49652014-09-25 23:48:46 -070086 client.await(to);
87 client.report();
toma7083182014-09-25 21:38:03 -070088 }
89
90 /**
91 * Creates a speed client.
92 *
93 * @param ip ip address of server
94 * @param wc worker count
95 * @param mc message count to send per client
96 * @param ml message length in bytes
97 * @param port socket port
98 * @throws IOException if unable to create IO loops
99 */
tom74d49652014-09-25 23:48:46 -0700100 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -0700101 this.ip = ip;
102 this.port = port;
103 this.msgCount = mc;
104 this.msgLength = ml;
105 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
106 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
107
108 for (int i = 0; i < wc; i++) {
109 iloops.add(new CustomIOLoop());
110 }
111 }
112
113 /**
114 * Starts the client workers.
115 *
116 * @throws IOException if unable to open connection
117 */
118 public void start() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700119 messages = new Counter();
120 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700121
122 // First start up all the IO loops
123 for (CustomIOLoop l : iloops) {
124 ipool.execute(l);
125 }
126
tom2d6d3972014-09-25 22:38:57 -0700127 // Wait for all of them to get going
tom74d49652014-09-25 23:48:46 -0700128 for (CustomIOLoop l : iloops) {
129 l.awaitStart(1000);
130 }
toma7083182014-09-25 21:38:03 -0700131
132 // ... and Next open all connections; one-per-loop
133 for (CustomIOLoop l : iloops) {
134 openConnection(l);
135 }
136 }
137
138
139 /**
140 * Initiates open connection request and registers the pending socket
141 * channel with the given IO loop.
142 *
143 * @param loop loop with which the channel should be registered
144 * @throws IOException if the socket could not be open or connected
145 */
146 private void openConnection(CustomIOLoop loop) throws IOException {
147 SocketAddress sa = new InetSocketAddress(ip, port);
148 SocketChannel ch = SocketChannel.open();
149 ch.configureBlocking(false);
150 loop.connectStream(ch);
151 ch.connect(sa);
152 }
153
154
155 /**
156 * Waits for the client workers to complete.
157 *
158 * @param secs timeout in seconds
159 * @throws ExecutionException if execution failed
160 * @throws InterruptedException if interrupt occurred while waiting
161 * @throws TimeoutException if timeout occurred
162 */
163 public void await(int secs) throws InterruptedException,
164 ExecutionException, TimeoutException {
165 for (CustomIOLoop l : iloops) {
166 if (l.worker.task != null) {
167 l.worker.task.get(secs, TimeUnit.SECONDS);
168 }
169 }
tom2d6d3972014-09-25 22:38:57 -0700170 messages.freeze();
171 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700172 }
173
174 /**
175 * Reports on the accumulated throughput trackers.
176 */
177 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700178 DecimalFormat f = new DecimalFormat("#,##0");
tom74d49652014-09-25 23:48:46 -0700179 out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs",
180 f.format(messages.total()), f.format(bytes.total()),
181 f.format(messages.throughput()),
182 f.format(bytes.throughput() / (1024 * msgLength))));
toma7083182014-09-25 21:38:03 -0700183 }
184
185
186 // Loop for transfer of fixed-length messages
187 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
188
189 Worker worker = new Worker();
190
191 public CustomIOLoop() throws IOException {
192 super(500);
193 }
194
195
196 @Override
197 protected TestMessageStream createStream(ByteChannel channel) {
198 return new TestMessageStream(msgLength, channel, this);
199 }
200
201 @Override
tom74d49652014-09-25 23:48:46 -0700202 protected synchronized void removeStream(MessageStream<TestMessage> stream) {
203 super.removeStream(stream);
toma7083182014-09-25 21:38:03 -0700204
tom74d49652014-09-25 23:48:46 -0700205 messages.add(stream.messagesIn().total());
206 bytes.add(stream.bytesIn().total());
207
208// out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
209// FORMAT.format(stream.messagesIn().throughput()),
210// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
211// FORMAT.format(stream.messagesOut().throughput()),
212// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
213
214 stream.messagesOut().reset();
215 stream.bytesOut().reset();
toma7083182014-09-25 21:38:03 -0700216 }
217
218 @Override
219 protected void processMessages(List<TestMessage> messages,
220 MessageStream<TestMessage> b) {
221 worker.release(messages.size());
222 }
223
224 @Override
225 protected void connect(SelectionKey key) {
226 super.connect(key);
227 TestMessageStream b = (TestMessageStream) key.attachment();
228 Worker w = ((CustomIOLoop) b.loop()).worker;
229 w.pump(b);
230 }
231
232 }
233
234 /**
235 * Auxiliary worker to connect and pump batched messages using blocking I/O.
236 */
237 private class Worker implements Runnable {
238
239 private static final int BATCH_SIZE = 1000;
240 private static final int PERMITS = 2 * BATCH_SIZE;
241
242 private TestMessageStream b;
243 private FutureTask<Worker> task;
244
245 // Stuff to throttle pump
246 private final Semaphore semaphore = new Semaphore(PERMITS);
247 private int msgWritten;
248
249 void pump(TestMessageStream b) {
250 this.b = b;
251 task = new FutureTask<>(this, this);
252 wpool.execute(task);
253 }
254
255 @Override
256 public void run() {
257 try {
258 log.info("Worker started...");
259
260 List<TestMessage> batch = new ArrayList<>();
261 for (int i = 0; i < BATCH_SIZE; i++) {
262 batch.add(new TestMessage(msgLength));
263 }
264
265 while (msgWritten < msgCount) {
266 msgWritten += writeBatch(b, batch);
267 }
268
269 // Now try to get all the permits back before sending poison pill
270 semaphore.acquireUninterruptibly(PERMITS);
271 b.close();
272
273 log.info("Worker done...");
274
275 } catch (IOException e) {
276 log.error("Worker unable to perform I/O", e);
277 }
278 }
279
280
281 private int writeBatch(TestMessageStream b, List<TestMessage> batch)
282 throws IOException {
283 int count = Math.min(BATCH_SIZE, msgCount - msgWritten);
284 acquire(count);
285 if (count == BATCH_SIZE) {
286 b.write(batch);
287 } else {
288 for (int i = 0; i < count; i++) {
289 b.write(batch.get(i));
290 }
291 }
292 return count;
293 }
294
295
296 // Release permits based on the specified number of message credits
297 private void release(int permits) {
298 semaphore.release(permits);
299 }
300
301 // Acquire permit for a single batch
302 private void acquire(int permits) {
303 semaphore.acquireUninterruptibly(permits);
304 }
305
306 }
307
308}