blob: 8a190ecdaf6184c9e6bf759b1a6eacb6c73bef62 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
tom2d6d3972014-09-25 22:38:57 -07003import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -07004import org.slf4j.Logger;
5import org.slf4j.LoggerFactory;
6
7import java.io.IOException;
8import java.net.InetAddress;
9import java.net.InetSocketAddress;
10import java.net.Socket;
11import java.net.SocketAddress;
12import java.nio.channels.ByteChannel;
13import java.nio.channels.ServerSocketChannel;
14import java.nio.channels.SocketChannel;
15import java.text.DecimalFormat;
16import java.util.ArrayList;
17import java.util.List;
18import java.util.concurrent.ExecutorService;
19import java.util.concurrent.Executors;
20
tom74d49652014-09-25 23:48:46 -070021import static java.lang.String.format;
22import static java.lang.System.out;
toma7083182014-09-25 21:38:03 -070023import static org.onlab.junit.TestTools.delay;
24import static org.onlab.util.Tools.namedThreads;
25
26/**
27 * Auxiliary test fixture to measure speed of NIO-based channels.
28 */
tom74d49652014-09-25 23:48:46 -070029public class IOLoopTestServer {
toma7083182014-09-25 21:38:03 -070030
tom74d49652014-09-25 23:48:46 -070031 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
toma7083182014-09-25 21:38:03 -070032
33 private static final int PRUNE_FREQUENCY = 1000;
34
35 static final int PORT = 9876;
36 static final long TIMEOUT = 1000;
37
38 static final boolean SO_NO_DELAY = false;
tom74d49652014-09-25 23:48:46 -070039 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
40 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
toma7083182014-09-25 21:38:03 -070041
42 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
43
44 private final AcceptorLoop aloop;
45 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
46
47 private final List<CustomIOLoop> iloops = new ArrayList<>();
48 private final ExecutorService ipool;
49
50 private final int workerCount;
51 private final int msgLength;
52 private int lastWorker = -1;
53
tom2d6d3972014-09-25 22:38:57 -070054 Counter messages;
55 Counter bytes;
toma7083182014-09-25 21:38:03 -070056
57 /**
58 * Main entry point to launch the server.
59 *
60 * @param args command-line arguments
61 * @throws IOException if unable to crate IO loops
62 */
63 public static void main(String[] args) throws IOException {
tom74d49652014-09-25 23:48:46 -070064 startStandalone(args);
65 System.exit(0);
66 }
67
68 /**
69 * Starts a standalone IO loop test server.
70 *
71 * @param args command-line arguments
72 */
73 private static void startStandalone(String[] args) throws IOException {
toma7083182014-09-25 21:38:03 -070074 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
75 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
76 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
77
78 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
79 wc, ml, ip);
tom74d49652014-09-25 23:48:46 -070080 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
81 server.start();
toma7083182014-09-25 21:38:03 -070082
83 // Start pruning clients.
84 while (true) {
85 delay(PRUNE_FREQUENCY);
tom74d49652014-09-25 23:48:46 -070086 server.prune();
toma7083182014-09-25 21:38:03 -070087 }
88 }
89
90 /**
91 * Creates a speed server.
92 *
93 * @param ip optional ip of the adapter where to bind
94 * @param wc worker count
95 * @param ml message length in bytes
96 * @param port listen port
97 * @throws IOException if unable to create IO loops
98 */
tom74d49652014-09-25 23:48:46 -070099 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -0700100 this.workerCount = wc;
101 this.msgLength = ml;
102 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
103
104 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
105 for (int i = 0; i < workerCount; i++) {
106 iloops.add(new CustomIOLoop());
107 }
108 }
109
110 /**
111 * Start the server IO loops and kicks off throughput tracking.
112 */
113 public void start() {
tom2d6d3972014-09-25 22:38:57 -0700114 messages = new Counter();
115 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700116
117 for (CustomIOLoop l : iloops) {
118 ipool.execute(l);
119 }
120 apool.execute(aloop);
tom2d6d3972014-09-25 22:38:57 -0700121
tom74d49652014-09-25 23:48:46 -0700122 for (CustomIOLoop l : iloops) {
123 l.awaitStart(TIMEOUT);
124 }
125 aloop.awaitStart(TIMEOUT);
toma7083182014-09-25 21:38:03 -0700126 }
127
128 /**
129 * Stop the server IO loops and freezes throughput tracking.
130 */
131 public void stop() {
132 aloop.shutdown();
133 for (CustomIOLoop l : iloops) {
134 l.shutdown();
135 }
136
tom74d49652014-09-25 23:48:46 -0700137 for (CustomIOLoop l : iloops) {
138 l.awaitStop(TIMEOUT);
139 }
140 aloop.awaitStop(TIMEOUT);
141
tom2d6d3972014-09-25 22:38:57 -0700142 messages.freeze();
143 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700144 }
145
146 /**
147 * Reports on the accumulated throughput trackers.
148 */
149 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700150 DecimalFormat f = new DecimalFormat("#,##0");
tom74d49652014-09-25 23:48:46 -0700151 out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
152 f.format(messages.total()), f.format(bytes.total()),
153 f.format(messages.throughput()),
154 f.format(bytes.throughput() / (1024 * msgLength))));
toma7083182014-09-25 21:38:03 -0700155 }
156
157 /**
158 * Prunes the IO loops of stale message buffers.
159 */
160 public void prune() {
161 for (CustomIOLoop l : iloops) {
162 l.pruneStaleStreams();
163 }
164 }
165
166 // Get the next worker to which a client should be assigned
167 private synchronized CustomIOLoop nextWorker() {
168 lastWorker = (lastWorker + 1) % workerCount;
169 return iloops.get(lastWorker);
170 }
171
172 // Loop for transfer of fixed-length messages
173 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
174
175 public CustomIOLoop() throws IOException {
176 super(500);
177 }
178
179 @Override
180 protected TestMessageStream createStream(ByteChannel channel) {
181 return new TestMessageStream(msgLength, channel, this);
182 }
183
184 @Override
185 protected void removeStream(MessageStream<TestMessage> stream) {
186 super.removeStream(stream);
tom2d6d3972014-09-25 22:38:57 -0700187
188 messages.add(stream.messagesIn().total());
189 bytes.add(stream.bytesIn().total());
190
tom74d49652014-09-25 23:48:46 -0700191// out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
192// FORMAT.format(stream.messagesIn().throughput()),
193// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
194// FORMAT.format(stream.messagesOut().throughput()),
195// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
toma7083182014-09-25 21:38:03 -0700196 }
197
198 @Override
199 protected void processMessages(List<TestMessage> messages,
200 MessageStream<TestMessage> stream) {
201 try {
202 stream.write(messages);
203 } catch (IOException e) {
204 log.error("Unable to echo messages", e);
205 }
206 }
207 }
208
209 // Loop for accepting client connections
210 private class CustomAcceptLoop extends AcceptorLoop {
211
212 public CustomAcceptLoop(SocketAddress address) throws IOException {
213 super(500, address);
214 }
215
216 @Override
217 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
218 SocketChannel sc = channel.accept();
219 sc.configureBlocking(false);
220
221 Socket so = sc.socket();
222 so.setTcpNoDelay(SO_NO_DELAY);
223 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
224 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
225
226 nextWorker().acceptStream(sc);
227 log.info("Connected client");
228 }
229 }
230
231}