blob: 64452214379d3fb446476e9a0aab10c81b19d21b [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
3import org.slf4j.Logger;
4import org.slf4j.LoggerFactory;
5
6import java.io.IOException;
7import java.net.InetAddress;
8import java.net.InetSocketAddress;
9import java.net.SocketAddress;
10import java.nio.channels.ByteChannel;
11import java.nio.channels.SelectionKey;
12import java.nio.channels.SocketChannel;
13import java.util.ArrayList;
14import java.util.List;
15import java.util.concurrent.ExecutionException;
16import java.util.concurrent.ExecutorService;
17import java.util.concurrent.Executors;
18import java.util.concurrent.FutureTask;
19import java.util.concurrent.Semaphore;
20import java.util.concurrent.TimeUnit;
21import java.util.concurrent.TimeoutException;
22
23import static org.onlab.junit.TestTools.delay;
24import static org.onlab.util.Tools.namedThreads;
25
26/**
27 * Auxiliary test fixture to measure speed of NIO-based channels.
28 */
29public class StandaloneSpeedClient {
30
31 private static Logger log = LoggerFactory.getLogger(StandaloneSpeedClient.class);
32
33 private final InetAddress ip;
34 private final int port;
35 private final int msgCount;
36 private final int msgLength;
37
38 private final List<CustomIOLoop> iloops = new ArrayList<>();
39 private final ExecutorService ipool;
40 private final ExecutorService wpool;
41
42// ThroughputTracker messages;
43// ThroughputTracker bytes;
44
45 /**
46 * Main entry point to launch the client.
47 *
48 * @param args command-line arguments
49 * @throws IOException if unable to connect to server
50 * @throws InterruptedException if latch wait gets interrupted
51 * @throws ExecutionException if wait gets interrupted
52 * @throws TimeoutException if timeout occurred while waiting for completion
53 */
54 public static void main(String[] args)
55 throws IOException, InterruptedException, ExecutionException, TimeoutException {
56 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
57 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
58 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
59 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
60 int to = args.length > 4 ? Integer.parseInt(args[4]) : 30;
61
62 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
63 wc, mc, ml, ip);
64 StandaloneSpeedClient sc = new StandaloneSpeedClient(ip, wc, mc, ml, StandaloneSpeedServer.PORT);
65
66 sc.start();
67 delay(2000);
68
69 sc.await(to);
70 sc.report();
71
72 System.exit(0);
73 }
74
75 /**
76 * Creates a speed client.
77 *
78 * @param ip ip address of server
79 * @param wc worker count
80 * @param mc message count to send per client
81 * @param ml message length in bytes
82 * @param port socket port
83 * @throws IOException if unable to create IO loops
84 */
85 public StandaloneSpeedClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
86 this.ip = ip;
87 this.port = port;
88 this.msgCount = mc;
89 this.msgLength = ml;
90 this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
91 this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
92
93 for (int i = 0; i < wc; i++) {
94 iloops.add(new CustomIOLoop());
95 }
96 }
97
98 /**
99 * Starts the client workers.
100 *
101 * @throws IOException if unable to open connection
102 */
103 public void start() throws IOException {
104// messages = new ThroughputTracker();
105// bytes = new ThroughputTracker();
106
107 // First start up all the IO loops
108 for (CustomIOLoop l : iloops) {
109 ipool.execute(l);
110 }
111
112// // Wait for all of them to get going
113// for (CustomIOLoop l : iloops)
114// l.waitForStart(TIMEOUT);
115
116 // ... and Next open all connections; one-per-loop
117 for (CustomIOLoop l : iloops) {
118 openConnection(l);
119 }
120 }
121
122
123 /**
124 * Initiates open connection request and registers the pending socket
125 * channel with the given IO loop.
126 *
127 * @param loop loop with which the channel should be registered
128 * @throws IOException if the socket could not be open or connected
129 */
130 private void openConnection(CustomIOLoop loop) throws IOException {
131 SocketAddress sa = new InetSocketAddress(ip, port);
132 SocketChannel ch = SocketChannel.open();
133 ch.configureBlocking(false);
134 loop.connectStream(ch);
135 ch.connect(sa);
136 }
137
138
139 /**
140 * Waits for the client workers to complete.
141 *
142 * @param secs timeout in seconds
143 * @throws ExecutionException if execution failed
144 * @throws InterruptedException if interrupt occurred while waiting
145 * @throws TimeoutException if timeout occurred
146 */
147 public void await(int secs) throws InterruptedException,
148 ExecutionException, TimeoutException {
149 for (CustomIOLoop l : iloops) {
150 if (l.worker.task != null) {
151 l.worker.task.get(secs, TimeUnit.SECONDS);
152 }
153 }
154// messages.freeze();
155// bytes.freeze();
156 }
157
158 /**
159 * Reports on the accumulated throughput trackers.
160 */
161 public void report() {
162// DecimalFormat f = new DecimalFormat("#,##0");
163// log.info("{} messages; {} bytes; {} mps; {} Mbs",
164// f.format(messages.total()),
165// f.format(bytes.total()),
166// f.format(messages.throughput()),
167// f.format(bytes.throughput() / (1024 * 128)));
168 }
169
170
171 // Loop for transfer of fixed-length messages
172 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
173
174 Worker worker = new Worker();
175
176 public CustomIOLoop() throws IOException {
177 super(500);
178 }
179
180
181 @Override
182 protected TestMessageStream createStream(ByteChannel channel) {
183 return new TestMessageStream(msgLength, channel, this);
184 }
185
186 @Override
187 protected synchronized void removeStream(MessageStream<TestMessage> b) {
188 super.removeStream(b);
189
190// messages.add(b.inMessages().total());
191// bytes.add(b.inBytes().total());
192// b.inMessages().reset();
193// b.inBytes().reset();
194
195// log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
196// StandaloneSpeedServer.format.format(b.inMessages().throughput()),
197// StandaloneSpeedServer.format.format(b.inBytes().throughput() / (1024 * 128)),
198// StandaloneSpeedServer.format.format(b.outMessages().throughput()),
199// StandaloneSpeedServer.format.format(b.outBytes().throughput() / (1024 * 128)));
200 }
201
202 @Override
203 protected void processMessages(List<TestMessage> messages,
204 MessageStream<TestMessage> b) {
205 worker.release(messages.size());
206 }
207
208 @Override
209 protected void connect(SelectionKey key) {
210 super.connect(key);
211 TestMessageStream b = (TestMessageStream) key.attachment();
212 Worker w = ((CustomIOLoop) b.loop()).worker;
213 w.pump(b);
214 }
215
216 }
217
218 /**
219 * Auxiliary worker to connect and pump batched messages using blocking I/O.
220 */
221 private class Worker implements Runnable {
222
223 private static final int BATCH_SIZE = 1000;
224 private static final int PERMITS = 2 * BATCH_SIZE;
225
226 private TestMessageStream b;
227 private FutureTask<Worker> task;
228
229 // Stuff to throttle pump
230 private final Semaphore semaphore = new Semaphore(PERMITS);
231 private int msgWritten;
232
233 void pump(TestMessageStream b) {
234 this.b = b;
235 task = new FutureTask<>(this, this);
236 wpool.execute(task);
237 }
238
239 @Override
240 public void run() {
241 try {
242 log.info("Worker started...");
243
244 List<TestMessage> batch = new ArrayList<>();
245 for (int i = 0; i < BATCH_SIZE; i++) {
246 batch.add(new TestMessage(msgLength));
247 }
248
249 while (msgWritten < msgCount) {
250 msgWritten += writeBatch(b, batch);
251 }
252
253 // Now try to get all the permits back before sending poison pill
254 semaphore.acquireUninterruptibly(PERMITS);
255 b.close();
256
257 log.info("Worker done...");
258
259 } catch (IOException e) {
260 log.error("Worker unable to perform I/O", e);
261 }
262 }
263
264
265 private int writeBatch(TestMessageStream b, List<TestMessage> batch)
266 throws IOException {
267 int count = Math.min(BATCH_SIZE, msgCount - msgWritten);
268 acquire(count);
269 if (count == BATCH_SIZE) {
270 b.write(batch);
271 } else {
272 for (int i = 0; i < count; i++) {
273 b.write(batch.get(i));
274 }
275 }
276 return count;
277 }
278
279
280 // Release permits based on the specified number of message credits
281 private void release(int permits) {
282 semaphore.release(permits);
283 }
284
285 // Acquire permit for a single batch
286 private void acquire(int permits) {
287 semaphore.acquireUninterruptibly(permits);
288 }
289
290 }
291
292}