blob: 72e7a59222c0e67f691d7ab7087a2513ce86ac78 [file] [log] [blame]
tomf110fff2014-09-26 00:38:18 -07001package org.onlab.onos.foo;
2
3import org.onlab.nio.IOLoop;
4import org.onlab.nio.MessageStream;
5import org.onlab.util.Counter;
6import org.slf4j.Logger;
7import org.slf4j.LoggerFactory;
8
9import java.io.IOException;
10import java.net.InetAddress;
11import java.net.InetSocketAddress;
12import java.net.SocketAddress;
13import java.nio.channels.ByteChannel;
14import java.nio.channels.SelectionKey;
15import java.nio.channels.SocketChannel;
16import java.text.DecimalFormat;
17import java.util.ArrayList;
18import java.util.List;
19import java.util.concurrent.ExecutionException;
20import java.util.concurrent.ExecutorService;
21import java.util.concurrent.Executors;
22import java.util.concurrent.FutureTask;
23import java.util.concurrent.Semaphore;
24import java.util.concurrent.TimeUnit;
25import java.util.concurrent.TimeoutException;
26
27import static java.lang.String.format;
28import static java.lang.System.out;
29import static org.onlab.onos.foo.IOLoopTestServer.PORT;
30import static org.onlab.util.Tools.delay;
31import static org.onlab.util.Tools.namedThreads;
32
33/**
34 * Auxiliary test fixture to measure speed of NIO-based channels.
35 */
36public class IOLoopTestClient {
37
38 private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
39
40 private final InetAddress ip;
41 private final int port;
42 private final int msgCount;
43 private final int msgLength;
44
45 private final List<CustomIOLoop> iloops = new ArrayList<>();
46 private final ExecutorService ipool;
47 private final ExecutorService wpool;
48
49 Counter messages;
50 Counter bytes;
51
52 /**
53 * Main entry point to launch the client.
54 *
55 * @param args command-line arguments
56 * @throws java.io.IOException if unable to connect to server
57 * @throws InterruptedException if latch wait gets interrupted
58 * @throws java.util.concurrent.ExecutionException if wait gets interrupted
59 * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
60 */
61 public static void main(String[] args)
62 throws IOException, InterruptedException, ExecutionException, TimeoutException {
63 startStandalone(args);
64
65 System.exit(0);
66 }
67
68 /**
69 * Starts a standalone IO loop test client.
70 *
71 * @param args command-line arguments
72 */
73 public static void startStandalone(String[] args)
74 throws IOException, InterruptedException, ExecutionException, TimeoutException {
75 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
76 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
77 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
78 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
79 int to = args.length > 4 ? Integer.parseInt(args[4]) : 30;
80
81 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
82 wc, mc, ml, ip);
83 IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
84
85 client.start();
86 delay(500);
87
88 client.await(to);
89 client.report();
90 }
91
92 /**
93 * Creates a speed client.
94 *
95 * @param ip ip address of server
96 * @param wc worker count
97 * @param mc message count to send per client
98 * @param ml message length in bytes
99 * @param port socket port
100 * @throws java.io.IOException if unable to create IO loops
101 */
102 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
103 this.ip = ip;
104 this.port = port;
105 this.msgCount = mc;
106 this.msgLength = ml;
107 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
108 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
109
110 for (int i = 0; i < wc; i++) {
111 iloops.add(new CustomIOLoop());
112 }
113 }
114
115 /**
116 * Starts the client workers.
117 *
118 * @throws java.io.IOException if unable to open connection
119 */
120 public void start() throws IOException {
121 messages = new Counter();
122 bytes = new Counter();
123
124 // First start up all the IO loops
125 for (CustomIOLoop l : iloops) {
126 ipool.execute(l);
127 }
128
129 // Wait for all of them to get going
130 for (CustomIOLoop l : iloops) {
131 l.awaitStart(1000);
132 }
133
134 // ... and Next open all connections; one-per-loop
135 for (CustomIOLoop l : iloops) {
136 openConnection(l);
137 }
138 }
139
140
141 /**
142 * Initiates open connection request and registers the pending socket
143 * channel with the given IO loop.
144 *
145 * @param loop loop with which the channel should be registered
146 * @throws java.io.IOException if the socket could not be open or connected
147 */
148 private void openConnection(CustomIOLoop loop) throws IOException {
149 SocketAddress sa = new InetSocketAddress(ip, port);
150 SocketChannel ch = SocketChannel.open();
151 ch.configureBlocking(false);
152 loop.connectStream(ch);
153 ch.connect(sa);
154 }
155
156
157 /**
158 * Waits for the client workers to complete.
159 *
160 * @param secs timeout in seconds
161 * @throws java.util.concurrent.ExecutionException if execution failed
162 * @throws InterruptedException if interrupt occurred while waiting
163 * @throws java.util.concurrent.TimeoutException if timeout occurred
164 */
165 public void await(int secs) throws InterruptedException,
166 ExecutionException, TimeoutException {
167 for (CustomIOLoop l : iloops) {
168 if (l.worker.task != null) {
169 l.worker.task.get(secs, TimeUnit.SECONDS);
170 }
171 }
172 messages.freeze();
173 bytes.freeze();
174 }
175
176 /**
177 * Reports on the accumulated throughput trackers.
178 */
179 public void report() {
180 DecimalFormat f = new DecimalFormat("#,##0");
181 out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs",
182 f.format(messages.total()), f.format(bytes.total()),
183 f.format(messages.throughput()),
184 f.format(bytes.throughput() / (1024 * msgLength))));
185 }
186
187
188 // Loop for transfer of fixed-length messages
189 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
190
191 Worker worker = new Worker();
192
193 public CustomIOLoop() throws IOException {
194 super(500);
195 }
196
197
198 @Override
199 protected TestMessageStream createStream(ByteChannel channel) {
200 return new TestMessageStream(msgLength, channel, this);
201 }
202
203 @Override
204 protected synchronized void removeStream(MessageStream<TestMessage> stream) {
205 super.removeStream(stream);
206
207 messages.add(stream.messagesIn().total());
208 bytes.add(stream.bytesIn().total());
209
210// out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
211// FORMAT.format(stream.messagesIn().throughput()),
212// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
213// FORMAT.format(stream.messagesOut().throughput()),
214// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
215
216 stream.messagesOut().reset();
217 stream.bytesOut().reset();
218 }
219
220 @Override
221 protected void processMessages(List<TestMessage> messages,
222 MessageStream<TestMessage> b) {
223 worker.release(messages.size());
224 }
225
226 @Override
227 protected void connect(SelectionKey key) {
228 super.connect(key);
229 TestMessageStream b = (TestMessageStream) key.attachment();
230 Worker w = ((CustomIOLoop) b.loop()).worker;
231 w.pump(b);
232 }
233
234 }
235
236 /**
237 * Auxiliary worker to connect and pump batched messages using blocking I/O.
238 */
239 private class Worker implements Runnable {
240
241 private static final int BATCH_SIZE = 1000;
242 private static final int PERMITS = 2 * BATCH_SIZE;
243
244 private TestMessageStream b;
245 private FutureTask<Worker> task;
246
247 // Stuff to throttle pump
248 private final Semaphore semaphore = new Semaphore(PERMITS);
249 private int msgWritten;
250
251 void pump(TestMessageStream b) {
252 this.b = b;
253 task = new FutureTask<>(this, this);
254 wpool.execute(task);
255 }
256
257 @Override
258 public void run() {
259 try {
260 log.info("Worker started...");
261
262 List<TestMessage> batch = new ArrayList<>();
263 for (int i = 0; i < BATCH_SIZE; i++) {
264 batch.add(new TestMessage(msgLength));
265 }
266
267 while (msgWritten < msgCount) {
268 msgWritten += writeBatch(b, batch);
269 }
270
271 // Now try to get all the permits back before sending poison pill
272 semaphore.acquireUninterruptibly(PERMITS);
273 b.close();
274
275 log.info("Worker done...");
276
277 } catch (IOException e) {
278 log.error("Worker unable to perform I/O", e);
279 }
280 }
281
282
283 private int writeBatch(TestMessageStream b, List<TestMessage> batch)
284 throws IOException {
285 int count = Math.min(BATCH_SIZE, msgCount - msgWritten);
286 acquire(count);
287 if (count == BATCH_SIZE) {
288 b.write(batch);
289 } else {
290 for (int i = 0; i < count; i++) {
291 b.write(batch.get(i));
292 }
293 }
294 return count;
295 }
296
297
298 // Release permits based on the specified number of message credits
299 private void release(int permits) {
300 semaphore.release(permits);
301 }
302
303 // Acquire permit for a single batch
304 private void acquire(int permits) {
305 semaphore.acquireUninterruptibly(permits);
306 }
307
308 }
309
310}