blob: 778f217dee4a16abf1d89ffa0bca4a05586985bb [file] [log] [blame]
tomf110fff2014-09-26 00:38:18 -07001package org.onlab.onos.foo;
2
tom0e0863f2014-09-26 09:02:33 -07003import com.google.common.collect.Lists;
tomf110fff2014-09-26 00:38:18 -07004import org.onlab.nio.AcceptorLoop;
5import org.onlab.nio.IOLoop;
6import org.onlab.nio.MessageStream;
7import org.onlab.util.Counter;
8import org.slf4j.Logger;
9import org.slf4j.LoggerFactory;
10
11import java.io.IOException;
12import java.net.InetAddress;
13import java.net.InetSocketAddress;
14import java.net.Socket;
15import java.net.SocketAddress;
16import java.nio.channels.ByteChannel;
17import java.nio.channels.ServerSocketChannel;
18import java.nio.channels.SocketChannel;
19import java.text.DecimalFormat;
20import java.util.ArrayList;
21import java.util.List;
22import java.util.concurrent.ExecutorService;
23import java.util.concurrent.Executors;
24
25import static java.lang.String.format;
tom0e0863f2014-09-26 09:02:33 -070026import static java.lang.System.currentTimeMillis;
tomf110fff2014-09-26 00:38:18 -070027import static java.lang.System.out;
28import static org.onlab.util.Tools.delay;
29import static org.onlab.util.Tools.namedThreads;
30
31/**
32 * Auxiliary test fixture to measure speed of NIO-based channels.
33 */
34public class IOLoopTestServer {
35
36 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
37
38 private static final int PRUNE_FREQUENCY = 1000;
39
40 static final int PORT = 9876;
41 static final long TIMEOUT = 1000;
42
43 static final boolean SO_NO_DELAY = false;
44 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
45 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
46
47 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
48
49 private final AcceptorLoop aloop;
50 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
51
52 private final List<CustomIOLoop> iloops = new ArrayList<>();
53 private final ExecutorService ipool;
54
55 private final int workerCount;
56 private final int msgLength;
57 private int lastWorker = -1;
58
59 Counter messages;
60 Counter bytes;
61
62 /**
63 * Main entry point to launch the server.
64 *
65 * @param args command-line arguments
66 * @throws java.io.IOException if unable to crate IO loops
67 */
68 public static void main(String[] args) throws IOException {
69 startStandalone(args);
70 System.exit(0);
71 }
72
73 /**
74 * Starts a standalone IO loop test server.
75 *
76 * @param args command-line arguments
77 */
78 public static void startStandalone(String[] args) throws IOException {
79 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
80 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
81 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
82
83 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
84 wc, ml, ip);
85 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
86 server.start();
87
tomf7e13b02014-09-26 11:12:25 -070088 // Start pruning clients and keep going until their number goes to 0.
89 int remaining = -1;
90 while (remaining == -1 || remaining > 0) {
tomf110fff2014-09-26 00:38:18 -070091 delay(PRUNE_FREQUENCY);
tomf7e13b02014-09-26 11:12:25 -070092 int r = server.prune();
93 remaining = remaining == -1 && r == 0 ? remaining : r;
tomf110fff2014-09-26 00:38:18 -070094 }
95 }
96
97 /**
98 * Creates a speed server.
99 *
100 * @param ip optional ip of the adapter where to bind
101 * @param wc worker count
102 * @param ml message length in bytes
103 * @param port listen port
104 * @throws java.io.IOException if unable to create IO loops
105 */
106 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
107 this.workerCount = wc;
108 this.msgLength = ml;
109 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
110
111 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
112 for (int i = 0; i < workerCount; i++) {
113 iloops.add(new CustomIOLoop());
114 }
115 }
116
117 /**
118 * Start the server IO loops and kicks off throughput tracking.
119 */
120 public void start() {
121 messages = new Counter();
122 bytes = new Counter();
123
124 for (CustomIOLoop l : iloops) {
125 ipool.execute(l);
126 }
127 apool.execute(aloop);
128
129 for (CustomIOLoop l : iloops) {
130 l.awaitStart(TIMEOUT);
131 }
132 aloop.awaitStart(TIMEOUT);
133 }
134
135 /**
136 * Stop the server IO loops and freezes throughput tracking.
137 */
138 public void stop() {
139 aloop.shutdown();
140 for (CustomIOLoop l : iloops) {
141 l.shutdown();
142 }
143
144 for (CustomIOLoop l : iloops) {
145 l.awaitStop(TIMEOUT);
146 }
147 aloop.awaitStop(TIMEOUT);
148
149 messages.freeze();
150 bytes.freeze();
151 }
152
153 /**
154 * Reports on the accumulated throughput trackers.
155 */
156 public void report() {
157 DecimalFormat f = new DecimalFormat("#,##0");
158 out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
159 f.format(messages.total()), f.format(bytes.total()),
160 f.format(messages.throughput()),
161 f.format(bytes.throughput() / (1024 * msgLength))));
162 }
163
164 /**
165 * Prunes the IO loops of stale message buffers.
tomf7e13b02014-09-26 11:12:25 -0700166 *
167 * @return number of remaining IO loops among all workers.
tomf110fff2014-09-26 00:38:18 -0700168 */
tomf7e13b02014-09-26 11:12:25 -0700169 public int prune() {
170 int count = 0;
tomf110fff2014-09-26 00:38:18 -0700171 for (CustomIOLoop l : iloops) {
tomf7e13b02014-09-26 11:12:25 -0700172 count += l.pruneStaleStreams();
tomf110fff2014-09-26 00:38:18 -0700173 }
tomf7e13b02014-09-26 11:12:25 -0700174 return count;
tomf110fff2014-09-26 00:38:18 -0700175 }
176
177 // Get the next worker to which a client should be assigned
178 private synchronized CustomIOLoop nextWorker() {
179 lastWorker = (lastWorker + 1) % workerCount;
180 return iloops.get(lastWorker);
181 }
182
183 // Loop for transfer of fixed-length messages
184 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
185
186 public CustomIOLoop() throws IOException {
187 super(500);
188 }
189
190 @Override
191 protected TestMessageStream createStream(ByteChannel channel) {
192 return new TestMessageStream(msgLength, channel, this);
193 }
194
195 @Override
196 protected void removeStream(MessageStream<TestMessage> stream) {
197 super.removeStream(stream);
198
199 messages.add(stream.messagesIn().total());
200 bytes.add(stream.bytesIn().total());
201
202// out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
203// FORMAT.format(stream.messagesIn().throughput()),
204// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
205// FORMAT.format(stream.messagesOut().throughput()),
206// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
207 }
208
209 @Override
210 protected void processMessages(List<TestMessage> messages,
211 MessageStream<TestMessage> stream) {
212 try {
tom0e0863f2014-09-26 09:02:33 -0700213 stream.write(createResponses(messages));
tomf110fff2014-09-26 00:38:18 -0700214 } catch (IOException e) {
215 log.error("Unable to echo messages", e);
216 }
217 }
tom0e0863f2014-09-26 09:02:33 -0700218
219 private List<TestMessage> createResponses(List<TestMessage> messages) {
220 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
221 for (TestMessage message : messages) {
222 responses.add(new TestMessage(message.length(), message.requestorTime(),
223 currentTimeMillis(), message.padding()));
224 }
225 return responses;
226 }
tomf110fff2014-09-26 00:38:18 -0700227 }
228
229 // Loop for accepting client connections
230 private class CustomAcceptLoop extends AcceptorLoop {
231
232 public CustomAcceptLoop(SocketAddress address) throws IOException {
233 super(500, address);
234 }
235
236 @Override
237 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
238 SocketChannel sc = channel.accept();
239 sc.configureBlocking(false);
240
241 Socket so = sc.socket();
242 so.setTcpNoDelay(SO_NO_DELAY);
243 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
244 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
245
246 nextWorker().acceptStream(sc);
247 log.info("Connected client");
248 }
249 }
250
251}