blob: ad7e6b8108820d5f6b8b3f6f7ca267abdf4e9898 [file] [log] [blame]
tomf110fff2014-09-26 00:38:18 -07001package org.onlab.onos.foo;
2
tom0e0863f2014-09-26 09:02:33 -07003import com.google.common.collect.Lists;
tomf110fff2014-09-26 00:38:18 -07004import org.onlab.nio.AcceptorLoop;
5import org.onlab.nio.IOLoop;
6import org.onlab.nio.MessageStream;
7import org.onlab.util.Counter;
8import org.slf4j.Logger;
9import org.slf4j.LoggerFactory;
10
11import java.io.IOException;
12import java.net.InetAddress;
13import java.net.InetSocketAddress;
14import java.net.Socket;
15import java.net.SocketAddress;
16import java.nio.channels.ByteChannel;
17import java.nio.channels.ServerSocketChannel;
18import java.nio.channels.SocketChannel;
19import java.text.DecimalFormat;
20import java.util.ArrayList;
21import java.util.List;
22import java.util.concurrent.ExecutorService;
23import java.util.concurrent.Executors;
24
25import static java.lang.String.format;
tom0e0863f2014-09-26 09:02:33 -070026import static java.lang.System.currentTimeMillis;
tom5f4df2d2014-09-26 12:19:51 -070027import static java.lang.System.nanoTime;
tomf110fff2014-09-26 00:38:18 -070028import static java.lang.System.out;
29import static org.onlab.util.Tools.delay;
30import static org.onlab.util.Tools.namedThreads;
31
32/**
33 * Auxiliary test fixture to measure speed of NIO-based channels.
34 */
35public class IOLoopTestServer {
36
37 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
38
39 private static final int PRUNE_FREQUENCY = 1000;
40
41 static final int PORT = 9876;
42 static final long TIMEOUT = 1000;
43
44 static final boolean SO_NO_DELAY = false;
45 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
46 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
47
48 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
49
50 private final AcceptorLoop aloop;
51 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
52
53 private final List<CustomIOLoop> iloops = new ArrayList<>();
54 private final ExecutorService ipool;
55
56 private final int workerCount;
57 private final int msgLength;
58 private int lastWorker = -1;
59
60 Counter messages;
61 Counter bytes;
62
63 /**
64 * Main entry point to launch the server.
65 *
66 * @param args command-line arguments
67 * @throws java.io.IOException if unable to crate IO loops
68 */
69 public static void main(String[] args) throws IOException {
70 startStandalone(args);
71 System.exit(0);
72 }
73
74 /**
75 * Starts a standalone IO loop test server.
76 *
77 * @param args command-line arguments
78 */
79 public static void startStandalone(String[] args) throws IOException {
80 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
81 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
82 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
83
84 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
85 wc, ml, ip);
86 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
87 server.start();
88
tomf7e13b02014-09-26 11:12:25 -070089 // Start pruning clients and keep going until their number goes to 0.
90 int remaining = -1;
91 while (remaining == -1 || remaining > 0) {
tomf110fff2014-09-26 00:38:18 -070092 delay(PRUNE_FREQUENCY);
tomf7e13b02014-09-26 11:12:25 -070093 int r = server.prune();
94 remaining = remaining == -1 && r == 0 ? remaining : r;
tomf110fff2014-09-26 00:38:18 -070095 }
tom5f4df2d2014-09-26 12:19:51 -070096 server.stop();
tomf110fff2014-09-26 00:38:18 -070097 }
98
99 /**
100 * Creates a speed server.
101 *
102 * @param ip optional ip of the adapter where to bind
103 * @param wc worker count
104 * @param ml message length in bytes
105 * @param port listen port
106 * @throws java.io.IOException if unable to create IO loops
107 */
108 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
109 this.workerCount = wc;
110 this.msgLength = ml;
111 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
112
113 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
114 for (int i = 0; i < workerCount; i++) {
115 iloops.add(new CustomIOLoop());
116 }
117 }
118
119 /**
120 * Start the server IO loops and kicks off throughput tracking.
121 */
122 public void start() {
123 messages = new Counter();
124 bytes = new Counter();
125
126 for (CustomIOLoop l : iloops) {
127 ipool.execute(l);
128 }
129 apool.execute(aloop);
130
131 for (CustomIOLoop l : iloops) {
132 l.awaitStart(TIMEOUT);
133 }
134 aloop.awaitStart(TIMEOUT);
135 }
136
137 /**
138 * Stop the server IO loops and freezes throughput tracking.
139 */
140 public void stop() {
141 aloop.shutdown();
142 for (CustomIOLoop l : iloops) {
143 l.shutdown();
144 }
145
146 for (CustomIOLoop l : iloops) {
147 l.awaitStop(TIMEOUT);
148 }
149 aloop.awaitStop(TIMEOUT);
150
151 messages.freeze();
152 bytes.freeze();
153 }
154
155 /**
156 * Reports on the accumulated throughput trackers.
157 */
158 public void report() {
159 DecimalFormat f = new DecimalFormat("#,##0");
160 out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
161 f.format(messages.total()), f.format(bytes.total()),
162 f.format(messages.throughput()),
163 f.format(bytes.throughput() / (1024 * msgLength))));
164 }
165
166 /**
167 * Prunes the IO loops of stale message buffers.
tomf7e13b02014-09-26 11:12:25 -0700168 *
169 * @return number of remaining IO loops among all workers.
tomf110fff2014-09-26 00:38:18 -0700170 */
tomf7e13b02014-09-26 11:12:25 -0700171 public int prune() {
172 int count = 0;
tomf110fff2014-09-26 00:38:18 -0700173 for (CustomIOLoop l : iloops) {
tomf7e13b02014-09-26 11:12:25 -0700174 count += l.pruneStaleStreams();
tomf110fff2014-09-26 00:38:18 -0700175 }
tomf7e13b02014-09-26 11:12:25 -0700176 return count;
tomf110fff2014-09-26 00:38:18 -0700177 }
178
179 // Get the next worker to which a client should be assigned
180 private synchronized CustomIOLoop nextWorker() {
181 lastWorker = (lastWorker + 1) % workerCount;
182 return iloops.get(lastWorker);
183 }
184
185 // Loop for transfer of fixed-length messages
186 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
187
188 public CustomIOLoop() throws IOException {
189 super(500);
190 }
191
192 @Override
193 protected TestMessageStream createStream(ByteChannel channel) {
194 return new TestMessageStream(msgLength, channel, this);
195 }
196
197 @Override
198 protected void removeStream(MessageStream<TestMessage> stream) {
199 super.removeStream(stream);
200
201 messages.add(stream.messagesIn().total());
202 bytes.add(stream.bytesIn().total());
203
204// out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
205// FORMAT.format(stream.messagesIn().throughput()),
206// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
207// FORMAT.format(stream.messagesOut().throughput()),
208// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
209 }
210
211 @Override
212 protected void processMessages(List<TestMessage> messages,
213 MessageStream<TestMessage> stream) {
214 try {
tom0e0863f2014-09-26 09:02:33 -0700215 stream.write(createResponses(messages));
tomf110fff2014-09-26 00:38:18 -0700216 } catch (IOException e) {
217 log.error("Unable to echo messages", e);
218 }
219 }
tom0e0863f2014-09-26 09:02:33 -0700220
221 private List<TestMessage> createResponses(List<TestMessage> messages) {
222 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
223 for (TestMessage message : messages) {
224 responses.add(new TestMessage(message.length(), message.requestorTime(),
tom5f4df2d2014-09-26 12:19:51 -0700225 nanoTime(), message.padding()));
tom0e0863f2014-09-26 09:02:33 -0700226 }
227 return responses;
228 }
tomf110fff2014-09-26 00:38:18 -0700229 }
230
231 // Loop for accepting client connections
232 private class CustomAcceptLoop extends AcceptorLoop {
233
234 public CustomAcceptLoop(SocketAddress address) throws IOException {
235 super(500, address);
236 }
237
238 @Override
239 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
240 SocketChannel sc = channel.accept();
241 sc.configureBlocking(false);
242
243 Socket so = sc.socket();
244 so.setTcpNoDelay(SO_NO_DELAY);
245 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
246 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
247
248 nextWorker().acceptStream(sc);
249 log.info("Connected client");
250 }
251 }
252
253}