blob: 7ea0d26b1973c8d1bc40f010c9d7b56097f74f94 [file] [log] [blame]
Thomas Vachuska781d18b2014-10-27 10:31:25 -07001/*
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07002 * Copyright 2014 Open Networking Laboratory
Thomas Vachuska781d18b2014-10-27 10:31:25 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska781d18b2014-10-27 10:31:25 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska781d18b2014-10-27 10:31:25 -070015 */
tomf110fff2014-09-26 00:38:18 -070016package org.onlab.onos.foo;
17
tom0e0863f2014-09-26 09:02:33 -070018import com.google.common.collect.Lists;
tomf110fff2014-09-26 00:38:18 -070019import org.onlab.nio.IOLoop;
20import org.onlab.nio.MessageStream;
21import org.onlab.util.Counter;
22import org.slf4j.Logger;
23import org.slf4j.LoggerFactory;
24
25import java.io.IOException;
26import java.net.InetAddress;
27import java.net.InetSocketAddress;
28import java.net.SocketAddress;
29import java.nio.channels.ByteChannel;
30import java.nio.channels.SelectionKey;
31import java.nio.channels.SocketChannel;
32import java.text.DecimalFormat;
33import java.util.ArrayList;
34import java.util.List;
35import java.util.concurrent.ExecutionException;
36import java.util.concurrent.ExecutorService;
37import java.util.concurrent.Executors;
38import java.util.concurrent.FutureTask;
39import java.util.concurrent.Semaphore;
40import java.util.concurrent.TimeUnit;
41import java.util.concurrent.TimeoutException;
42
43import static java.lang.String.format;
tom5f4df2d2014-09-26 12:19:51 -070044import static java.lang.System.nanoTime;
tomf110fff2014-09-26 00:38:18 -070045import static java.lang.System.out;
46import static org.onlab.onos.foo.IOLoopTestServer.PORT;
47import static org.onlab.util.Tools.delay;
48import static org.onlab.util.Tools.namedThreads;
49
50/**
51 * Auxiliary test fixture to measure speed of NIO-based channels.
52 */
53public class IOLoopTestClient {
54
55 private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
56
57 private final InetAddress ip;
58 private final int port;
59 private final int msgCount;
60 private final int msgLength;
61
62 private final List<CustomIOLoop> iloops = new ArrayList<>();
63 private final ExecutorService ipool;
64 private final ExecutorService wpool;
65
66 Counter messages;
67 Counter bytes;
tom0e0863f2014-09-26 09:02:33 -070068 long latencyTotal = 0;
69 long latencyCount = 0;
70
tomf110fff2014-09-26 00:38:18 -070071
72 /**
73 * Main entry point to launch the client.
74 *
75 * @param args command-line arguments
tom0e0863f2014-09-26 09:02:33 -070076 * @throws java.io.IOException if unable to connect to server
77 * @throws InterruptedException if latch wait gets interrupted
78 * @throws java.util.concurrent.ExecutionException if wait gets interrupted
79 * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
tomf110fff2014-09-26 00:38:18 -070080 */
81 public static void main(String[] args)
82 throws IOException, InterruptedException, ExecutionException, TimeoutException {
83 startStandalone(args);
84
85 System.exit(0);
86 }
87
Yuta HIGUCHI5c947272014-11-03 21:39:21 -080088 /*
tomf110fff2014-09-26 00:38:18 -070089 * Starts a standalone IO loop test client.
90 *
91 * @param args command-line arguments
92 */
93 public static void startStandalone(String[] args)
94 throws IOException, InterruptedException, ExecutionException, TimeoutException {
95 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
96 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
97 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
98 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
tom14dc4d02014-09-26 12:43:14 -070099 int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
tomf110fff2014-09-26 00:38:18 -0700100
101 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
102 wc, mc, ml, ip);
103 IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
104
105 client.start();
106 delay(500);
107
108 client.await(to);
109 client.report();
110 }
111
112 /**
113 * Creates a speed client.
114 *
115 * @param ip ip address of server
116 * @param wc worker count
117 * @param mc message count to send per client
118 * @param ml message length in bytes
119 * @param port socket port
120 * @throws java.io.IOException if unable to create IO loops
121 */
122 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
123 this.ip = ip;
124 this.port = port;
125 this.msgCount = mc;
126 this.msgLength = ml;
127 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
128 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
129
130 for (int i = 0; i < wc; i++) {
131 iloops.add(new CustomIOLoop());
132 }
133 }
134
135 /**
136 * Starts the client workers.
137 *
138 * @throws java.io.IOException if unable to open connection
139 */
140 public void start() throws IOException {
141 messages = new Counter();
142 bytes = new Counter();
143
144 // First start up all the IO loops
145 for (CustomIOLoop l : iloops) {
146 ipool.execute(l);
147 }
148
149 // Wait for all of them to get going
150 for (CustomIOLoop l : iloops) {
151 l.awaitStart(1000);
152 }
153
154 // ... and Next open all connections; one-per-loop
155 for (CustomIOLoop l : iloops) {
156 openConnection(l);
157 }
158 }
159
160
161 /**
162 * Initiates open connection request and registers the pending socket
163 * channel with the given IO loop.
164 *
165 * @param loop loop with which the channel should be registered
166 * @throws java.io.IOException if the socket could not be open or connected
167 */
168 private void openConnection(CustomIOLoop loop) throws IOException {
169 SocketAddress sa = new InetSocketAddress(ip, port);
170 SocketChannel ch = SocketChannel.open();
171 ch.configureBlocking(false);
172 loop.connectStream(ch);
173 ch.connect(sa);
174 }
175
176
177 /**
178 * Waits for the client workers to complete.
179 *
180 * @param secs timeout in seconds
tom0e0863f2014-09-26 09:02:33 -0700181 * @throws java.util.concurrent.ExecutionException if execution failed
182 * @throws InterruptedException if interrupt occurred while waiting
183 * @throws java.util.concurrent.TimeoutException if timeout occurred
tomf110fff2014-09-26 00:38:18 -0700184 */
185 public void await(int secs) throws InterruptedException,
186 ExecutionException, TimeoutException {
187 for (CustomIOLoop l : iloops) {
188 if (l.worker.task != null) {
189 l.worker.task.get(secs, TimeUnit.SECONDS);
tom0e0863f2014-09-26 09:02:33 -0700190 latencyTotal += l.latencyTotal;
191 latencyCount += l.latencyCount;
tomf110fff2014-09-26 00:38:18 -0700192 }
193 }
194 messages.freeze();
195 bytes.freeze();
196 }
197
198 /**
tombf474382014-10-02 07:36:50 -0700199 * Reports on the accumulated throughput and latency.
tomf110fff2014-09-26 00:38:18 -0700200 */
201 public void report() {
202 DecimalFormat f = new DecimalFormat("#,##0");
tom14dc4d02014-09-26 12:43:14 -0700203 out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
tomf110fff2014-09-26 00:38:18 -0700204 f.format(messages.total()), f.format(bytes.total()),
205 f.format(messages.throughput()),
tom0e0863f2014-09-26 09:02:33 -0700206 f.format(bytes.throughput() / (1024 * msgLength)),
207 f.format(latencyTotal / latencyCount)));
tomf110fff2014-09-26 00:38:18 -0700208 }
209
210
211 // Loop for transfer of fixed-length messages
212 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
213
214 Worker worker = new Worker();
tom0e0863f2014-09-26 09:02:33 -0700215 long latencyTotal = 0;
216 long latencyCount = 0;
217
tomf110fff2014-09-26 00:38:18 -0700218
219 public CustomIOLoop() throws IOException {
220 super(500);
221 }
222
223
224 @Override
225 protected TestMessageStream createStream(ByteChannel channel) {
226 return new TestMessageStream(msgLength, channel, this);
227 }
228
229 @Override
230 protected synchronized void removeStream(MessageStream<TestMessage> stream) {
231 super.removeStream(stream);
232
233 messages.add(stream.messagesIn().total());
234 bytes.add(stream.bytesIn().total());
tomf110fff2014-09-26 00:38:18 -0700235 stream.messagesOut().reset();
236 stream.bytesOut().reset();
237 }
238
239 @Override
240 protected void processMessages(List<TestMessage> messages,
tom0e0863f2014-09-26 09:02:33 -0700241 MessageStream<TestMessage> stream) {
242 for (TestMessage message : messages) {
243 // TODO: summarize latency data better
tom5f4df2d2014-09-26 12:19:51 -0700244 latencyTotal += nanoTime() - message.requestorTime();
tom0e0863f2014-09-26 09:02:33 -0700245 latencyCount++;
246 }
tomf110fff2014-09-26 00:38:18 -0700247 worker.release(messages.size());
248 }
249
250 @Override
tom5a8779c2014-09-29 14:48:43 -0700251 protected void connect(SelectionKey key) throws IOException {
tomf110fff2014-09-26 00:38:18 -0700252 super.connect(key);
253 TestMessageStream b = (TestMessageStream) key.attachment();
254 Worker w = ((CustomIOLoop) b.loop()).worker;
255 w.pump(b);
256 }
257
258 }
259
260 /**
261 * Auxiliary worker to connect and pump batched messages using blocking I/O.
262 */
263 private class Worker implements Runnable {
264
tom5f4df2d2014-09-26 12:19:51 -0700265 private static final int BATCH_SIZE = 10;
tomf110fff2014-09-26 00:38:18 -0700266 private static final int PERMITS = 2 * BATCH_SIZE;
267
tom0e0863f2014-09-26 09:02:33 -0700268 private TestMessageStream stream;
tomf110fff2014-09-26 00:38:18 -0700269 private FutureTask<Worker> task;
270
271 // Stuff to throttle pump
272 private final Semaphore semaphore = new Semaphore(PERMITS);
273 private int msgWritten;
274
tom0e0863f2014-09-26 09:02:33 -0700275 void pump(TestMessageStream stream) {
276 this.stream = stream;
tomf110fff2014-09-26 00:38:18 -0700277 task = new FutureTask<>(this, this);
278 wpool.execute(task);
279 }
280
281 @Override
282 public void run() {
283 try {
284 log.info("Worker started...");
285
tomf110fff2014-09-26 00:38:18 -0700286 while (msgWritten < msgCount) {
tom0e0863f2014-09-26 09:02:33 -0700287 int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
288 writeBatch(size);
289 msgWritten += size;
tomf110fff2014-09-26 00:38:18 -0700290 }
291
292 // Now try to get all the permits back before sending poison pill
293 semaphore.acquireUninterruptibly(PERMITS);
tom0e0863f2014-09-26 09:02:33 -0700294 stream.close();
tomf110fff2014-09-26 00:38:18 -0700295
296 log.info("Worker done...");
297
298 } catch (IOException e) {
299 log.error("Worker unable to perform I/O", e);
300 }
301 }
302
303
tom0e0863f2014-09-26 09:02:33 -0700304 private void writeBatch(int size) throws IOException {
305 // Build a batch of messages
306 List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
307 for (int i = 0; i < size; i++) {
tom5f4df2d2014-09-26 12:19:51 -0700308 batch.add(new TestMessage(msgLength, nanoTime(), 0,
tomf7e13b02014-09-26 11:12:25 -0700309 stream.padding()));
tomf110fff2014-09-26 00:38:18 -0700310 }
tom0e0863f2014-09-26 09:02:33 -0700311 acquire(size);
312 stream.write(batch);
tomf110fff2014-09-26 00:38:18 -0700313 }
314
315
316 // Release permits based on the specified number of message credits
317 private void release(int permits) {
318 semaphore.release(permits);
319 }
320
321 // Acquire permit for a single batch
322 private void acquire(int permits) {
323 semaphore.acquireUninterruptibly(permits);
324 }
325
326 }
327
328}