blob: 076ad28b2bb050b977b4b7d7a4df5eeac95bbc4b [file] [log] [blame]
tomf110fff2014-09-26 00:38:18 -07001package org.onlab.onos.foo;
2
tom0e0863f2014-09-26 09:02:33 -07003import com.google.common.collect.Lists;
tomf110fff2014-09-26 00:38:18 -07004import org.onlab.nio.AcceptorLoop;
5import org.onlab.nio.IOLoop;
6import org.onlab.nio.MessageStream;
7import org.onlab.util.Counter;
8import org.slf4j.Logger;
9import org.slf4j.LoggerFactory;
10
11import java.io.IOException;
12import java.net.InetAddress;
13import java.net.InetSocketAddress;
14import java.net.Socket;
15import java.net.SocketAddress;
16import java.nio.channels.ByteChannel;
17import java.nio.channels.ServerSocketChannel;
18import java.nio.channels.SocketChannel;
19import java.text.DecimalFormat;
20import java.util.ArrayList;
21import java.util.List;
22import java.util.concurrent.ExecutorService;
23import java.util.concurrent.Executors;
24
25import static java.lang.String.format;
tom5f4df2d2014-09-26 12:19:51 -070026import static java.lang.System.nanoTime;
tomf110fff2014-09-26 00:38:18 -070027import static java.lang.System.out;
28import static org.onlab.util.Tools.delay;
29import static org.onlab.util.Tools.namedThreads;
30
31/**
32 * Auxiliary test fixture to measure speed of NIO-based channels.
33 */
34public class IOLoopTestServer {
35
36 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
37
38 private static final int PRUNE_FREQUENCY = 1000;
39
40 static final int PORT = 9876;
41 static final long TIMEOUT = 1000;
42
43 static final boolean SO_NO_DELAY = false;
44 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
45 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
46
47 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
48
49 private final AcceptorLoop aloop;
50 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
51
52 private final List<CustomIOLoop> iloops = new ArrayList<>();
53 private final ExecutorService ipool;
54
55 private final int workerCount;
56 private final int msgLength;
57 private int lastWorker = -1;
58
59 Counter messages;
60 Counter bytes;
61
62 /**
63 * Main entry point to launch the server.
64 *
65 * @param args command-line arguments
66 * @throws java.io.IOException if unable to crate IO loops
67 */
68 public static void main(String[] args) throws IOException {
69 startStandalone(args);
70 System.exit(0);
71 }
72
73 /**
74 * Starts a standalone IO loop test server.
75 *
76 * @param args command-line arguments
77 */
78 public static void startStandalone(String[] args) throws IOException {
79 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
80 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
81 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
82
83 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
84 wc, ml, ip);
85 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
86 server.start();
87
tomf7e13b02014-09-26 11:12:25 -070088 // Start pruning clients and keep going until their number goes to 0.
89 int remaining = -1;
90 while (remaining == -1 || remaining > 0) {
tomf110fff2014-09-26 00:38:18 -070091 delay(PRUNE_FREQUENCY);
tomf7e13b02014-09-26 11:12:25 -070092 int r = server.prune();
93 remaining = remaining == -1 && r == 0 ? remaining : r;
tomf110fff2014-09-26 00:38:18 -070094 }
tom5f4df2d2014-09-26 12:19:51 -070095 server.stop();
tomf110fff2014-09-26 00:38:18 -070096 }
97
98 /**
99 * Creates a speed server.
100 *
101 * @param ip optional ip of the adapter where to bind
102 * @param wc worker count
103 * @param ml message length in bytes
104 * @param port listen port
105 * @throws java.io.IOException if unable to create IO loops
106 */
107 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
108 this.workerCount = wc;
109 this.msgLength = ml;
110 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
111
112 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
113 for (int i = 0; i < workerCount; i++) {
114 iloops.add(new CustomIOLoop());
115 }
116 }
117
118 /**
119 * Start the server IO loops and kicks off throughput tracking.
120 */
121 public void start() {
122 messages = new Counter();
123 bytes = new Counter();
124
125 for (CustomIOLoop l : iloops) {
126 ipool.execute(l);
127 }
128 apool.execute(aloop);
129
130 for (CustomIOLoop l : iloops) {
131 l.awaitStart(TIMEOUT);
132 }
133 aloop.awaitStart(TIMEOUT);
134 }
135
136 /**
137 * Stop the server IO loops and freezes throughput tracking.
138 */
139 public void stop() {
140 aloop.shutdown();
141 for (CustomIOLoop l : iloops) {
142 l.shutdown();
143 }
144
145 for (CustomIOLoop l : iloops) {
146 l.awaitStop(TIMEOUT);
147 }
148 aloop.awaitStop(TIMEOUT);
149
150 messages.freeze();
151 bytes.freeze();
152 }
153
154 /**
tombf474382014-10-02 07:36:50 -0700155 * Reports on the accumulated throughput and latency.
tomf110fff2014-09-26 00:38:18 -0700156 */
157 public void report() {
158 DecimalFormat f = new DecimalFormat("#,##0");
tom14dc4d02014-09-26 12:43:14 -0700159 out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
tomf110fff2014-09-26 00:38:18 -0700160 f.format(messages.total()), f.format(bytes.total()),
161 f.format(messages.throughput()),
162 f.format(bytes.throughput() / (1024 * msgLength))));
163 }
164
165 /**
166 * Prunes the IO loops of stale message buffers.
tomf7e13b02014-09-26 11:12:25 -0700167 *
168 * @return number of remaining IO loops among all workers.
tomf110fff2014-09-26 00:38:18 -0700169 */
tomf7e13b02014-09-26 11:12:25 -0700170 public int prune() {
171 int count = 0;
tomf110fff2014-09-26 00:38:18 -0700172 for (CustomIOLoop l : iloops) {
tomf7e13b02014-09-26 11:12:25 -0700173 count += l.pruneStaleStreams();
tomf110fff2014-09-26 00:38:18 -0700174 }
tomf7e13b02014-09-26 11:12:25 -0700175 return count;
tomf110fff2014-09-26 00:38:18 -0700176 }
177
178 // Get the next worker to which a client should be assigned
179 private synchronized CustomIOLoop nextWorker() {
180 lastWorker = (lastWorker + 1) % workerCount;
181 return iloops.get(lastWorker);
182 }
183
184 // Loop for transfer of fixed-length messages
185 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
186
187 public CustomIOLoop() throws IOException {
188 super(500);
189 }
190
191 @Override
192 protected TestMessageStream createStream(ByteChannel channel) {
193 return new TestMessageStream(msgLength, channel, this);
194 }
195
196 @Override
197 protected void removeStream(MessageStream<TestMessage> stream) {
198 super.removeStream(stream);
tomf110fff2014-09-26 00:38:18 -0700199 messages.add(stream.messagesIn().total());
200 bytes.add(stream.bytesIn().total());
tomf110fff2014-09-26 00:38:18 -0700201 }
202
203 @Override
204 protected void processMessages(List<TestMessage> messages,
205 MessageStream<TestMessage> stream) {
206 try {
tom0e0863f2014-09-26 09:02:33 -0700207 stream.write(createResponses(messages));
tomf110fff2014-09-26 00:38:18 -0700208 } catch (IOException e) {
209 log.error("Unable to echo messages", e);
210 }
211 }
tom0e0863f2014-09-26 09:02:33 -0700212
213 private List<TestMessage> createResponses(List<TestMessage> messages) {
214 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
215 for (TestMessage message : messages) {
216 responses.add(new TestMessage(message.length(), message.requestorTime(),
tom5f4df2d2014-09-26 12:19:51 -0700217 nanoTime(), message.padding()));
tom0e0863f2014-09-26 09:02:33 -0700218 }
219 return responses;
220 }
tomf110fff2014-09-26 00:38:18 -0700221 }
222
223 // Loop for accepting client connections
224 private class CustomAcceptLoop extends AcceptorLoop {
225
226 public CustomAcceptLoop(SocketAddress address) throws IOException {
227 super(500, address);
228 }
229
230 @Override
231 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
232 SocketChannel sc = channel.accept();
233 sc.configureBlocking(false);
234
235 Socket so = sc.socket();
236 so.setTcpNoDelay(SO_NO_DELAY);
237 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
238 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
239
240 nextWorker().acceptStream(sc);
241 log.info("Connected client");
242 }
243 }
244
245}