blob: a783512955b0e1e5e45ff5e24a63339bc8c37e9f [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
toma7083182014-09-25 21:38:03 -070019package org.onlab.nio;
20
tom1ae3d162014-09-26 09:38:16 -070021import com.google.common.collect.Lists;
tom2d6d3972014-09-25 22:38:57 -070022import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -070023import org.slf4j.Logger;
24import org.slf4j.LoggerFactory;
25
26import java.io.IOException;
27import java.net.InetAddress;
28import java.net.InetSocketAddress;
29import java.net.Socket;
30import java.net.SocketAddress;
31import java.nio.channels.ByteChannel;
32import java.nio.channels.ServerSocketChannel;
33import java.nio.channels.SocketChannel;
34import java.text.DecimalFormat;
35import java.util.ArrayList;
36import java.util.List;
37import java.util.concurrent.ExecutorService;
38import java.util.concurrent.Executors;
39
tom74d49652014-09-25 23:48:46 -070040import static java.lang.String.format;
41import static java.lang.System.out;
tom1ae3d162014-09-26 09:38:16 -070042import static org.onlab.util.Tools.delay;
toma7083182014-09-25 21:38:03 -070043import static org.onlab.util.Tools.namedThreads;
44
45/**
46 * Auxiliary test fixture to measure speed of NIO-based channels.
47 */
tom74d49652014-09-25 23:48:46 -070048public class IOLoopTestServer {
toma7083182014-09-25 21:38:03 -070049
tom74d49652014-09-25 23:48:46 -070050 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
toma7083182014-09-25 21:38:03 -070051
52 private static final int PRUNE_FREQUENCY = 1000;
53
54 static final int PORT = 9876;
55 static final long TIMEOUT = 1000;
56
57 static final boolean SO_NO_DELAY = false;
tom74d49652014-09-25 23:48:46 -070058 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
59 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
toma7083182014-09-25 21:38:03 -070060
61 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
62
63 private final AcceptorLoop aloop;
64 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
65
66 private final List<CustomIOLoop> iloops = new ArrayList<>();
67 private final ExecutorService ipool;
68
69 private final int workerCount;
70 private final int msgLength;
71 private int lastWorker = -1;
72
tom2d6d3972014-09-25 22:38:57 -070073 Counter messages;
74 Counter bytes;
toma7083182014-09-25 21:38:03 -070075
76 /**
77 * Main entry point to launch the server.
78 *
79 * @param args command-line arguments
tom1ae3d162014-09-26 09:38:16 -070080 * @throws java.io.IOException if unable to crate IO loops
toma7083182014-09-25 21:38:03 -070081 */
82 public static void main(String[] args) throws IOException {
tom74d49652014-09-25 23:48:46 -070083 startStandalone(args);
84 System.exit(0);
85 }
86
87 /**
88 * Starts a standalone IO loop test server.
89 *
90 * @param args command-line arguments
91 */
tomf110fff2014-09-26 00:38:18 -070092 public static void startStandalone(String[] args) throws IOException {
toma7083182014-09-25 21:38:03 -070093 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
94 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
95 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
96
97 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
98 wc, ml, ip);
tom74d49652014-09-25 23:48:46 -070099 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
100 server.start();
toma7083182014-09-25 21:38:03 -0700101
tom5f4df2d2014-09-26 12:19:51 -0700102 // Start pruning clients and keep going until their number goes to 0.
103 int remaining = -1;
104 while (remaining == -1 || remaining > 0) {
toma7083182014-09-25 21:38:03 -0700105 delay(PRUNE_FREQUENCY);
tom5f4df2d2014-09-26 12:19:51 -0700106 int r = server.prune();
107 remaining = remaining == -1 && r == 0 ? remaining : r;
toma7083182014-09-25 21:38:03 -0700108 }
tom5f4df2d2014-09-26 12:19:51 -0700109 server.stop();
toma7083182014-09-25 21:38:03 -0700110 }
111
112 /**
113 * Creates a speed server.
114 *
115 * @param ip optional ip of the adapter where to bind
116 * @param wc worker count
117 * @param ml message length in bytes
118 * @param port listen port
tom1ae3d162014-09-26 09:38:16 -0700119 * @throws java.io.IOException if unable to create IO loops
toma7083182014-09-25 21:38:03 -0700120 */
tom74d49652014-09-25 23:48:46 -0700121 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -0700122 this.workerCount = wc;
123 this.msgLength = ml;
124 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
125
126 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
127 for (int i = 0; i < workerCount; i++) {
128 iloops.add(new CustomIOLoop());
129 }
130 }
131
132 /**
133 * Start the server IO loops and kicks off throughput tracking.
134 */
135 public void start() {
tom2d6d3972014-09-25 22:38:57 -0700136 messages = new Counter();
137 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700138
139 for (CustomIOLoop l : iloops) {
140 ipool.execute(l);
141 }
142 apool.execute(aloop);
tom2d6d3972014-09-25 22:38:57 -0700143
tom74d49652014-09-25 23:48:46 -0700144 for (CustomIOLoop l : iloops) {
145 l.awaitStart(TIMEOUT);
146 }
147 aloop.awaitStart(TIMEOUT);
toma7083182014-09-25 21:38:03 -0700148 }
149
150 /**
151 * Stop the server IO loops and freezes throughput tracking.
152 */
153 public void stop() {
154 aloop.shutdown();
155 for (CustomIOLoop l : iloops) {
156 l.shutdown();
157 }
158
tom74d49652014-09-25 23:48:46 -0700159 for (CustomIOLoop l : iloops) {
160 l.awaitStop(TIMEOUT);
161 }
162 aloop.awaitStop(TIMEOUT);
163
tom2d6d3972014-09-25 22:38:57 -0700164 messages.freeze();
165 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700166 }
167
168 /**
tombf474382014-10-02 07:36:50 -0700169 * Reports on the accumulated throughput and latency.
toma7083182014-09-25 21:38:03 -0700170 */
171 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700172 DecimalFormat f = new DecimalFormat("#,##0");
tom14dc4d02014-09-26 12:43:14 -0700173 out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
tom74d49652014-09-25 23:48:46 -0700174 f.format(messages.total()), f.format(bytes.total()),
175 f.format(messages.throughput()),
176 f.format(bytes.throughput() / (1024 * msgLength))));
toma7083182014-09-25 21:38:03 -0700177 }
178
179 /**
180 * Prunes the IO loops of stale message buffers.
tom5f4df2d2014-09-26 12:19:51 -0700181 *
182 * @return number of remaining IO loops among all workers.
toma7083182014-09-25 21:38:03 -0700183 */
tom5f4df2d2014-09-26 12:19:51 -0700184 public int prune() {
185 int count = 0;
toma7083182014-09-25 21:38:03 -0700186 for (CustomIOLoop l : iloops) {
tom5f4df2d2014-09-26 12:19:51 -0700187 count += l.pruneStaleStreams();
toma7083182014-09-25 21:38:03 -0700188 }
tom5f4df2d2014-09-26 12:19:51 -0700189 return count;
toma7083182014-09-25 21:38:03 -0700190 }
191
192 // Get the next worker to which a client should be assigned
193 private synchronized CustomIOLoop nextWorker() {
194 lastWorker = (lastWorker + 1) % workerCount;
195 return iloops.get(lastWorker);
196 }
197
198 // Loop for transfer of fixed-length messages
199 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
200
201 public CustomIOLoop() throws IOException {
202 super(500);
203 }
204
205 @Override
206 protected TestMessageStream createStream(ByteChannel channel) {
207 return new TestMessageStream(msgLength, channel, this);
208 }
209
210 @Override
211 protected void removeStream(MessageStream<TestMessage> stream) {
212 super.removeStream(stream);
tom2d6d3972014-09-25 22:38:57 -0700213 messages.add(stream.messagesIn().total());
214 bytes.add(stream.bytesIn().total());
toma7083182014-09-25 21:38:03 -0700215 }
216
217 @Override
218 protected void processMessages(List<TestMessage> messages,
219 MessageStream<TestMessage> stream) {
220 try {
tom1ae3d162014-09-26 09:38:16 -0700221 stream.write(createResponses(messages));
toma7083182014-09-25 21:38:03 -0700222 } catch (IOException e) {
223 log.error("Unable to echo messages", e);
224 }
225 }
tom1ae3d162014-09-26 09:38:16 -0700226
227 private List<TestMessage> createResponses(List<TestMessage> messages) {
228 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
229 for (TestMessage message : messages) {
230 responses.add(new TestMessage(message.length(), message.requestorTime(),
tom5f4df2d2014-09-26 12:19:51 -0700231 System.nanoTime(), message.padding()));
tom1ae3d162014-09-26 09:38:16 -0700232 }
233 return responses;
234 }
toma7083182014-09-25 21:38:03 -0700235 }
236
237 // Loop for accepting client connections
238 private class CustomAcceptLoop extends AcceptorLoop {
239
240 public CustomAcceptLoop(SocketAddress address) throws IOException {
241 super(500, address);
242 }
243
244 @Override
245 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
246 SocketChannel sc = channel.accept();
247 sc.configureBlocking(false);
248
249 Socket so = sc.socket();
250 so.setTcpNoDelay(SO_NO_DELAY);
251 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
252 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
253
254 nextWorker().acceptStream(sc);
255 log.info("Connected client");
256 }
257 }
258
259}