blob: 457023bb8c0950bddecce6a262f18ce4f4906760 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
tom1ae3d162014-09-26 09:38:16 -07003import com.google.common.collect.Lists;
tom2d6d3972014-09-25 22:38:57 -07004import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -07005import org.slf4j.Logger;
6import org.slf4j.LoggerFactory;
7
8import java.io.IOException;
9import java.net.InetAddress;
10import java.net.InetSocketAddress;
11import java.net.Socket;
12import java.net.SocketAddress;
13import java.nio.channels.ByteChannel;
14import java.nio.channels.ServerSocketChannel;
15import java.nio.channels.SocketChannel;
16import java.text.DecimalFormat;
17import java.util.ArrayList;
18import java.util.List;
19import java.util.concurrent.ExecutorService;
20import java.util.concurrent.Executors;
21
tom74d49652014-09-25 23:48:46 -070022import static java.lang.String.format;
tom1ae3d162014-09-26 09:38:16 -070023import static java.lang.System.currentTimeMillis;
tom74d49652014-09-25 23:48:46 -070024import static java.lang.System.out;
tom1ae3d162014-09-26 09:38:16 -070025import static org.onlab.util.Tools.delay;
toma7083182014-09-25 21:38:03 -070026import static org.onlab.util.Tools.namedThreads;
27
28/**
29 * Auxiliary test fixture to measure speed of NIO-based channels.
30 */
tom74d49652014-09-25 23:48:46 -070031public class IOLoopTestServer {
toma7083182014-09-25 21:38:03 -070032
tom74d49652014-09-25 23:48:46 -070033 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
toma7083182014-09-25 21:38:03 -070034
35 private static final int PRUNE_FREQUENCY = 1000;
36
37 static final int PORT = 9876;
38 static final long TIMEOUT = 1000;
39
40 static final boolean SO_NO_DELAY = false;
tom74d49652014-09-25 23:48:46 -070041 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
42 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
toma7083182014-09-25 21:38:03 -070043
44 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
45
46 private final AcceptorLoop aloop;
47 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
48
49 private final List<CustomIOLoop> iloops = new ArrayList<>();
50 private final ExecutorService ipool;
51
52 private final int workerCount;
53 private final int msgLength;
54 private int lastWorker = -1;
55
tom2d6d3972014-09-25 22:38:57 -070056 Counter messages;
57 Counter bytes;
toma7083182014-09-25 21:38:03 -070058
59 /**
60 * Main entry point to launch the server.
61 *
62 * @param args command-line arguments
tom1ae3d162014-09-26 09:38:16 -070063 * @throws java.io.IOException if unable to crate IO loops
toma7083182014-09-25 21:38:03 -070064 */
65 public static void main(String[] args) throws IOException {
tom74d49652014-09-25 23:48:46 -070066 startStandalone(args);
67 System.exit(0);
68 }
69
70 /**
71 * Starts a standalone IO loop test server.
72 *
73 * @param args command-line arguments
74 */
tomf110fff2014-09-26 00:38:18 -070075 public static void startStandalone(String[] args) throws IOException {
toma7083182014-09-25 21:38:03 -070076 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
77 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
78 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
79
80 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
81 wc, ml, ip);
tom74d49652014-09-25 23:48:46 -070082 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
83 server.start();
toma7083182014-09-25 21:38:03 -070084
tom5f4df2d2014-09-26 12:19:51 -070085 // Start pruning clients and keep going until their number goes to 0.
86 int remaining = -1;
87 while (remaining == -1 || remaining > 0) {
toma7083182014-09-25 21:38:03 -070088 delay(PRUNE_FREQUENCY);
tom5f4df2d2014-09-26 12:19:51 -070089 int r = server.prune();
90 remaining = remaining == -1 && r == 0 ? remaining : r;
toma7083182014-09-25 21:38:03 -070091 }
tom5f4df2d2014-09-26 12:19:51 -070092 server.stop();
toma7083182014-09-25 21:38:03 -070093 }
94
95 /**
96 * Creates a speed server.
97 *
98 * @param ip optional ip of the adapter where to bind
99 * @param wc worker count
100 * @param ml message length in bytes
101 * @param port listen port
tom1ae3d162014-09-26 09:38:16 -0700102 * @throws java.io.IOException if unable to create IO loops
toma7083182014-09-25 21:38:03 -0700103 */
tom74d49652014-09-25 23:48:46 -0700104 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -0700105 this.workerCount = wc;
106 this.msgLength = ml;
107 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
108
109 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
110 for (int i = 0; i < workerCount; i++) {
111 iloops.add(new CustomIOLoop());
112 }
113 }
114
115 /**
116 * Start the server IO loops and kicks off throughput tracking.
117 */
118 public void start() {
tom2d6d3972014-09-25 22:38:57 -0700119 messages = new Counter();
120 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700121
122 for (CustomIOLoop l : iloops) {
123 ipool.execute(l);
124 }
125 apool.execute(aloop);
tom2d6d3972014-09-25 22:38:57 -0700126
tom74d49652014-09-25 23:48:46 -0700127 for (CustomIOLoop l : iloops) {
128 l.awaitStart(TIMEOUT);
129 }
130 aloop.awaitStart(TIMEOUT);
toma7083182014-09-25 21:38:03 -0700131 }
132
133 /**
134 * Stop the server IO loops and freezes throughput tracking.
135 */
136 public void stop() {
137 aloop.shutdown();
138 for (CustomIOLoop l : iloops) {
139 l.shutdown();
140 }
141
tom74d49652014-09-25 23:48:46 -0700142 for (CustomIOLoop l : iloops) {
143 l.awaitStop(TIMEOUT);
144 }
145 aloop.awaitStop(TIMEOUT);
146
tom2d6d3972014-09-25 22:38:57 -0700147 messages.freeze();
148 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700149 }
150
151 /**
152 * Reports on the accumulated throughput trackers.
153 */
154 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700155 DecimalFormat f = new DecimalFormat("#,##0");
tom74d49652014-09-25 23:48:46 -0700156 out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
157 f.format(messages.total()), f.format(bytes.total()),
158 f.format(messages.throughput()),
159 f.format(bytes.throughput() / (1024 * msgLength))));
toma7083182014-09-25 21:38:03 -0700160 }
161
162 /**
163 * Prunes the IO loops of stale message buffers.
tom5f4df2d2014-09-26 12:19:51 -0700164 *
165 * @return number of remaining IO loops among all workers.
toma7083182014-09-25 21:38:03 -0700166 */
tom5f4df2d2014-09-26 12:19:51 -0700167 public int prune() {
168 int count = 0;
toma7083182014-09-25 21:38:03 -0700169 for (CustomIOLoop l : iloops) {
tom5f4df2d2014-09-26 12:19:51 -0700170 count += l.pruneStaleStreams();
toma7083182014-09-25 21:38:03 -0700171 }
tom5f4df2d2014-09-26 12:19:51 -0700172 return count;
toma7083182014-09-25 21:38:03 -0700173 }
174
175 // Get the next worker to which a client should be assigned
176 private synchronized CustomIOLoop nextWorker() {
177 lastWorker = (lastWorker + 1) % workerCount;
178 return iloops.get(lastWorker);
179 }
180
181 // Loop for transfer of fixed-length messages
182 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
183
184 public CustomIOLoop() throws IOException {
185 super(500);
186 }
187
188 @Override
189 protected TestMessageStream createStream(ByteChannel channel) {
190 return new TestMessageStream(msgLength, channel, this);
191 }
192
193 @Override
194 protected void removeStream(MessageStream<TestMessage> stream) {
195 super.removeStream(stream);
tom2d6d3972014-09-25 22:38:57 -0700196
197 messages.add(stream.messagesIn().total());
198 bytes.add(stream.bytesIn().total());
199
tom74d49652014-09-25 23:48:46 -0700200// out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
201// FORMAT.format(stream.messagesIn().throughput()),
202// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
203// FORMAT.format(stream.messagesOut().throughput()),
204// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
toma7083182014-09-25 21:38:03 -0700205 }
206
207 @Override
208 protected void processMessages(List<TestMessage> messages,
209 MessageStream<TestMessage> stream) {
210 try {
tom1ae3d162014-09-26 09:38:16 -0700211 stream.write(createResponses(messages));
toma7083182014-09-25 21:38:03 -0700212 } catch (IOException e) {
213 log.error("Unable to echo messages", e);
214 }
215 }
tom1ae3d162014-09-26 09:38:16 -0700216
217 private List<TestMessage> createResponses(List<TestMessage> messages) {
218 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
219 for (TestMessage message : messages) {
220 responses.add(new TestMessage(message.length(), message.requestorTime(),
tom5f4df2d2014-09-26 12:19:51 -0700221 System.nanoTime(), message.padding()));
tom1ae3d162014-09-26 09:38:16 -0700222 }
223 return responses;
224 }
toma7083182014-09-25 21:38:03 -0700225 }
226
227 // Loop for accepting client connections
228 private class CustomAcceptLoop extends AcceptorLoop {
229
230 public CustomAcceptLoop(SocketAddress address) throws IOException {
231 super(500, address);
232 }
233
234 @Override
235 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
236 SocketChannel sc = channel.accept();
237 sc.configureBlocking(false);
238
239 Socket so = sc.socket();
240 so.setTcpNoDelay(SO_NO_DELAY);
241 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
242 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
243
244 nextWorker().acceptStream(sc);
245 log.info("Connected client");
246 }
247 }
248
249}