blob: ffa6b2178d0e6d8f8b477c31091baddff8342d69 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
tom1ae3d162014-09-26 09:38:16 -07003import com.google.common.collect.Lists;
tom2d6d3972014-09-25 22:38:57 -07004import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -07005import org.slf4j.Logger;
6import org.slf4j.LoggerFactory;
7
8import java.io.IOException;
9import java.net.InetAddress;
10import java.net.InetSocketAddress;
11import java.net.Socket;
12import java.net.SocketAddress;
13import java.nio.channels.ByteChannel;
14import java.nio.channels.ServerSocketChannel;
15import java.nio.channels.SocketChannel;
16import java.text.DecimalFormat;
17import java.util.ArrayList;
18import java.util.List;
19import java.util.concurrent.ExecutorService;
20import java.util.concurrent.Executors;
21
tom74d49652014-09-25 23:48:46 -070022import static java.lang.String.format;
23import static java.lang.System.out;
tom1ae3d162014-09-26 09:38:16 -070024import static org.onlab.util.Tools.delay;
toma7083182014-09-25 21:38:03 -070025import static org.onlab.util.Tools.namedThreads;
26
27/**
28 * Auxiliary test fixture to measure speed of NIO-based channels.
29 */
tom74d49652014-09-25 23:48:46 -070030public class IOLoopTestServer {
toma7083182014-09-25 21:38:03 -070031
tom74d49652014-09-25 23:48:46 -070032 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
toma7083182014-09-25 21:38:03 -070033
34 private static final int PRUNE_FREQUENCY = 1000;
35
36 static final int PORT = 9876;
37 static final long TIMEOUT = 1000;
38
39 static final boolean SO_NO_DELAY = false;
tom74d49652014-09-25 23:48:46 -070040 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
41 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
toma7083182014-09-25 21:38:03 -070042
43 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
44
45 private final AcceptorLoop aloop;
46 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
47
48 private final List<CustomIOLoop> iloops = new ArrayList<>();
49 private final ExecutorService ipool;
50
51 private final int workerCount;
52 private final int msgLength;
53 private int lastWorker = -1;
54
tom2d6d3972014-09-25 22:38:57 -070055 Counter messages;
56 Counter bytes;
toma7083182014-09-25 21:38:03 -070057
58 /**
59 * Main entry point to launch the server.
60 *
61 * @param args command-line arguments
tom1ae3d162014-09-26 09:38:16 -070062 * @throws java.io.IOException if unable to crate IO loops
toma7083182014-09-25 21:38:03 -070063 */
64 public static void main(String[] args) throws IOException {
tom74d49652014-09-25 23:48:46 -070065 startStandalone(args);
66 System.exit(0);
67 }
68
69 /**
70 * Starts a standalone IO loop test server.
71 *
72 * @param args command-line arguments
73 */
tomf110fff2014-09-26 00:38:18 -070074 public static void startStandalone(String[] args) throws IOException {
toma7083182014-09-25 21:38:03 -070075 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
76 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
77 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
78
79 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
80 wc, ml, ip);
tom74d49652014-09-25 23:48:46 -070081 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
82 server.start();
toma7083182014-09-25 21:38:03 -070083
tom5f4df2d2014-09-26 12:19:51 -070084 // Start pruning clients and keep going until their number goes to 0.
85 int remaining = -1;
86 while (remaining == -1 || remaining > 0) {
toma7083182014-09-25 21:38:03 -070087 delay(PRUNE_FREQUENCY);
tom5f4df2d2014-09-26 12:19:51 -070088 int r = server.prune();
89 remaining = remaining == -1 && r == 0 ? remaining : r;
toma7083182014-09-25 21:38:03 -070090 }
tom5f4df2d2014-09-26 12:19:51 -070091 server.stop();
toma7083182014-09-25 21:38:03 -070092 }
93
94 /**
95 * Creates a speed server.
96 *
97 * @param ip optional ip of the adapter where to bind
98 * @param wc worker count
99 * @param ml message length in bytes
100 * @param port listen port
tom1ae3d162014-09-26 09:38:16 -0700101 * @throws java.io.IOException if unable to create IO loops
toma7083182014-09-25 21:38:03 -0700102 */
tom74d49652014-09-25 23:48:46 -0700103 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -0700104 this.workerCount = wc;
105 this.msgLength = ml;
106 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
107
108 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
109 for (int i = 0; i < workerCount; i++) {
110 iloops.add(new CustomIOLoop());
111 }
112 }
113
114 /**
115 * Start the server IO loops and kicks off throughput tracking.
116 */
117 public void start() {
tom2d6d3972014-09-25 22:38:57 -0700118 messages = new Counter();
119 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700120
121 for (CustomIOLoop l : iloops) {
122 ipool.execute(l);
123 }
124 apool.execute(aloop);
tom2d6d3972014-09-25 22:38:57 -0700125
tom74d49652014-09-25 23:48:46 -0700126 for (CustomIOLoop l : iloops) {
127 l.awaitStart(TIMEOUT);
128 }
129 aloop.awaitStart(TIMEOUT);
toma7083182014-09-25 21:38:03 -0700130 }
131
132 /**
133 * Stop the server IO loops and freezes throughput tracking.
134 */
135 public void stop() {
136 aloop.shutdown();
137 for (CustomIOLoop l : iloops) {
138 l.shutdown();
139 }
140
tom74d49652014-09-25 23:48:46 -0700141 for (CustomIOLoop l : iloops) {
142 l.awaitStop(TIMEOUT);
143 }
144 aloop.awaitStop(TIMEOUT);
145
tom2d6d3972014-09-25 22:38:57 -0700146 messages.freeze();
147 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700148 }
149
150 /**
tombf474382014-10-02 07:36:50 -0700151 * Reports on the accumulated throughput and latency.
toma7083182014-09-25 21:38:03 -0700152 */
153 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700154 DecimalFormat f = new DecimalFormat("#,##0");
tom14dc4d02014-09-26 12:43:14 -0700155 out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
tom74d49652014-09-25 23:48:46 -0700156 f.format(messages.total()), f.format(bytes.total()),
157 f.format(messages.throughput()),
158 f.format(bytes.throughput() / (1024 * msgLength))));
toma7083182014-09-25 21:38:03 -0700159 }
160
161 /**
162 * Prunes the IO loops of stale message buffers.
tom5f4df2d2014-09-26 12:19:51 -0700163 *
164 * @return number of remaining IO loops among all workers.
toma7083182014-09-25 21:38:03 -0700165 */
tom5f4df2d2014-09-26 12:19:51 -0700166 public int prune() {
167 int count = 0;
toma7083182014-09-25 21:38:03 -0700168 for (CustomIOLoop l : iloops) {
tom5f4df2d2014-09-26 12:19:51 -0700169 count += l.pruneStaleStreams();
toma7083182014-09-25 21:38:03 -0700170 }
tom5f4df2d2014-09-26 12:19:51 -0700171 return count;
toma7083182014-09-25 21:38:03 -0700172 }
173
174 // Get the next worker to which a client should be assigned
175 private synchronized CustomIOLoop nextWorker() {
176 lastWorker = (lastWorker + 1) % workerCount;
177 return iloops.get(lastWorker);
178 }
179
180 // Loop for transfer of fixed-length messages
181 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
182
183 public CustomIOLoop() throws IOException {
184 super(500);
185 }
186
187 @Override
188 protected TestMessageStream createStream(ByteChannel channel) {
189 return new TestMessageStream(msgLength, channel, this);
190 }
191
192 @Override
193 protected void removeStream(MessageStream<TestMessage> stream) {
194 super.removeStream(stream);
tom2d6d3972014-09-25 22:38:57 -0700195 messages.add(stream.messagesIn().total());
196 bytes.add(stream.bytesIn().total());
toma7083182014-09-25 21:38:03 -0700197 }
198
199 @Override
200 protected void processMessages(List<TestMessage> messages,
201 MessageStream<TestMessage> stream) {
202 try {
tom1ae3d162014-09-26 09:38:16 -0700203 stream.write(createResponses(messages));
toma7083182014-09-25 21:38:03 -0700204 } catch (IOException e) {
205 log.error("Unable to echo messages", e);
206 }
207 }
tom1ae3d162014-09-26 09:38:16 -0700208
209 private List<TestMessage> createResponses(List<TestMessage> messages) {
210 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
211 for (TestMessage message : messages) {
212 responses.add(new TestMessage(message.length(), message.requestorTime(),
tom5f4df2d2014-09-26 12:19:51 -0700213 System.nanoTime(), message.padding()));
tom1ae3d162014-09-26 09:38:16 -0700214 }
215 return responses;
216 }
toma7083182014-09-25 21:38:03 -0700217 }
218
219 // Loop for accepting client connections
220 private class CustomAcceptLoop extends AcceptorLoop {
221
222 public CustomAcceptLoop(SocketAddress address) throws IOException {
223 super(500, address);
224 }
225
226 @Override
227 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
228 SocketChannel sc = channel.accept();
229 sc.configureBlocking(false);
230
231 Socket so = sc.socket();
232 so.setTcpNoDelay(SO_NO_DELAY);
233 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
234 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
235
236 nextWorker().acceptStream(sc);
237 log.info("Connected client");
238 }
239 }
240
241}