blob: 2497e0e40f380e5e43c02e471828a21d0540639f [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
tom2d6d3972014-09-25 22:38:57 -07003import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -07004import org.slf4j.Logger;
5import org.slf4j.LoggerFactory;
6
7import java.io.IOException;
8import java.net.InetAddress;
9import java.net.InetSocketAddress;
10import java.net.SocketAddress;
11import java.nio.channels.ByteChannel;
12import java.nio.channels.SelectionKey;
13import java.nio.channels.SocketChannel;
tom2d6d3972014-09-25 22:38:57 -070014import java.text.DecimalFormat;
toma7083182014-09-25 21:38:03 -070015import java.util.ArrayList;
16import java.util.List;
17import java.util.concurrent.ExecutionException;
18import java.util.concurrent.ExecutorService;
19import java.util.concurrent.Executors;
20import java.util.concurrent.FutureTask;
21import java.util.concurrent.Semaphore;
22import java.util.concurrent.TimeUnit;
23import java.util.concurrent.TimeoutException;
24
25import static org.onlab.junit.TestTools.delay;
26import static org.onlab.util.Tools.namedThreads;
27
28/**
29 * Auxiliary test fixture to measure speed of NIO-based channels.
30 */
tom2d6d3972014-09-25 22:38:57 -070031public class IOLoopClient {
toma7083182014-09-25 21:38:03 -070032
tom2d6d3972014-09-25 22:38:57 -070033 private static Logger log = LoggerFactory.getLogger(IOLoopClient.class);
toma7083182014-09-25 21:38:03 -070034
35 private final InetAddress ip;
36 private final int port;
37 private final int msgCount;
38 private final int msgLength;
39
40 private final List<CustomIOLoop> iloops = new ArrayList<>();
41 private final ExecutorService ipool;
42 private final ExecutorService wpool;
43
tom2d6d3972014-09-25 22:38:57 -070044 Counter messages;
45 Counter bytes;
toma7083182014-09-25 21:38:03 -070046
47 /**
48 * Main entry point to launch the client.
49 *
50 * @param args command-line arguments
51 * @throws IOException if unable to connect to server
52 * @throws InterruptedException if latch wait gets interrupted
53 * @throws ExecutionException if wait gets interrupted
54 * @throws TimeoutException if timeout occurred while waiting for completion
55 */
56 public static void main(String[] args)
57 throws IOException, InterruptedException, ExecutionException, TimeoutException {
58 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
59 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
60 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
61 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
62 int to = args.length > 4 ? Integer.parseInt(args[4]) : 30;
63
64 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
65 wc, mc, ml, ip);
tom2d6d3972014-09-25 22:38:57 -070066 IOLoopClient sc = new IOLoopClient(ip, wc, mc, ml, IOLoopServer.PORT);
toma7083182014-09-25 21:38:03 -070067
68 sc.start();
69 delay(2000);
70
71 sc.await(to);
72 sc.report();
73
74 System.exit(0);
75 }
76
77 /**
78 * Creates a speed client.
79 *
80 * @param ip ip address of server
81 * @param wc worker count
82 * @param mc message count to send per client
83 * @param ml message length in bytes
84 * @param port socket port
85 * @throws IOException if unable to create IO loops
86 */
tom2d6d3972014-09-25 22:38:57 -070087 public IOLoopClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -070088 this.ip = ip;
89 this.port = port;
90 this.msgCount = mc;
91 this.msgLength = ml;
92 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
93 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
94
95 for (int i = 0; i < wc; i++) {
96 iloops.add(new CustomIOLoop());
97 }
98 }
99
100 /**
101 * Starts the client workers.
102 *
103 * @throws IOException if unable to open connection
104 */
105 public void start() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700106 messages = new Counter();
107 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700108
109 // First start up all the IO loops
110 for (CustomIOLoop l : iloops) {
111 ipool.execute(l);
112 }
113
tom2d6d3972014-09-25 22:38:57 -0700114 // Wait for all of them to get going
toma7083182014-09-25 21:38:03 -0700115// for (CustomIOLoop l : iloops)
116// l.waitForStart(TIMEOUT);
117
118 // ... and Next open all connections; one-per-loop
119 for (CustomIOLoop l : iloops) {
120 openConnection(l);
121 }
122 }
123
124
125 /**
126 * Initiates open connection request and registers the pending socket
127 * channel with the given IO loop.
128 *
129 * @param loop loop with which the channel should be registered
130 * @throws IOException if the socket could not be open or connected
131 */
132 private void openConnection(CustomIOLoop loop) throws IOException {
133 SocketAddress sa = new InetSocketAddress(ip, port);
134 SocketChannel ch = SocketChannel.open();
135 ch.configureBlocking(false);
136 loop.connectStream(ch);
137 ch.connect(sa);
138 }
139
140
141 /**
142 * Waits for the client workers to complete.
143 *
144 * @param secs timeout in seconds
145 * @throws ExecutionException if execution failed
146 * @throws InterruptedException if interrupt occurred while waiting
147 * @throws TimeoutException if timeout occurred
148 */
149 public void await(int secs) throws InterruptedException,
150 ExecutionException, TimeoutException {
151 for (CustomIOLoop l : iloops) {
152 if (l.worker.task != null) {
153 l.worker.task.get(secs, TimeUnit.SECONDS);
154 }
155 }
tom2d6d3972014-09-25 22:38:57 -0700156 messages.freeze();
157 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700158 }
159
160 /**
161 * Reports on the accumulated throughput trackers.
162 */
163 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700164 DecimalFormat f = new DecimalFormat("#,##0");
165 log.info("{} messages; {} bytes; {} mps; {} Mbs",
166 f.format(messages.total()),
167 f.format(bytes.total()),
168 f.format(messages.throughput()),
169 f.format(bytes.throughput() / (1024 * 128)));
toma7083182014-09-25 21:38:03 -0700170 }
171
172
173 // Loop for transfer of fixed-length messages
174 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
175
176 Worker worker = new Worker();
177
178 public CustomIOLoop() throws IOException {
179 super(500);
180 }
181
182
183 @Override
184 protected TestMessageStream createStream(ByteChannel channel) {
185 return new TestMessageStream(msgLength, channel, this);
186 }
187
188 @Override
189 protected synchronized void removeStream(MessageStream<TestMessage> b) {
190 super.removeStream(b);
191
tom2d6d3972014-09-25 22:38:57 -0700192 messages.add(b.messagesIn().total());
193 bytes.add(b.bytesIn().total());
194 b.messagesOut().reset();
195 b.bytesOut().reset();
196//
197 log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
198 IOLoopServer.FORMAT.format(b.messagesIn().throughput()),
199 IOLoopServer.FORMAT.format(b.bytesIn().throughput() / (1024 * 128)),
200 IOLoopServer.FORMAT.format(b.messagesOut().throughput()),
201 IOLoopServer.FORMAT.format(b.bytesOut().throughput() / (1024 * 128)));
toma7083182014-09-25 21:38:03 -0700202 }
203
204 @Override
205 protected void processMessages(List<TestMessage> messages,
206 MessageStream<TestMessage> b) {
207 worker.release(messages.size());
208 }
209
210 @Override
211 protected void connect(SelectionKey key) {
212 super.connect(key);
213 TestMessageStream b = (TestMessageStream) key.attachment();
214 Worker w = ((CustomIOLoop) b.loop()).worker;
215 w.pump(b);
216 }
217
218 }
219
220 /**
221 * Auxiliary worker to connect and pump batched messages using blocking I/O.
222 */
223 private class Worker implements Runnable {
224
225 private static final int BATCH_SIZE = 1000;
226 private static final int PERMITS = 2 * BATCH_SIZE;
227
228 private TestMessageStream b;
229 private FutureTask<Worker> task;
230
231 // Stuff to throttle pump
232 private final Semaphore semaphore = new Semaphore(PERMITS);
233 private int msgWritten;
234
235 void pump(TestMessageStream b) {
236 this.b = b;
237 task = new FutureTask<>(this, this);
238 wpool.execute(task);
239 }
240
241 @Override
242 public void run() {
243 try {
244 log.info("Worker started...");
245
246 List<TestMessage> batch = new ArrayList<>();
247 for (int i = 0; i < BATCH_SIZE; i++) {
248 batch.add(new TestMessage(msgLength));
249 }
250
251 while (msgWritten < msgCount) {
252 msgWritten += writeBatch(b, batch);
253 }
254
255 // Now try to get all the permits back before sending poison pill
256 semaphore.acquireUninterruptibly(PERMITS);
257 b.close();
258
259 log.info("Worker done...");
260
261 } catch (IOException e) {
262 log.error("Worker unable to perform I/O", e);
263 }
264 }
265
266
267 private int writeBatch(TestMessageStream b, List<TestMessage> batch)
268 throws IOException {
269 int count = Math.min(BATCH_SIZE, msgCount - msgWritten);
270 acquire(count);
271 if (count == BATCH_SIZE) {
272 b.write(batch);
273 } else {
274 for (int i = 0; i < count; i++) {
275 b.write(batch.get(i));
276 }
277 }
278 return count;
279 }
280
281
282 // Release permits based on the specified number of message credits
283 private void release(int permits) {
284 semaphore.release(permits);
285 }
286
287 // Acquire permit for a single batch
288 private void acquire(int permits) {
289 semaphore.acquireUninterruptibly(permits);
290 }
291
292 }
293
294}