blob: 9c1f649bd484bebb0ea2ac0f02b29d273f6f70be [file] [log] [blame]
tomf110fff2014-09-26 00:38:18 -07001package org.onlab.onos.foo;
2
3import org.onlab.nio.AcceptorLoop;
4import org.onlab.nio.IOLoop;
5import org.onlab.nio.MessageStream;
6import org.onlab.util.Counter;
7import org.slf4j.Logger;
8import org.slf4j.LoggerFactory;
9
10import java.io.IOException;
11import java.net.InetAddress;
12import java.net.InetSocketAddress;
13import java.net.Socket;
14import java.net.SocketAddress;
15import java.nio.channels.ByteChannel;
16import java.nio.channels.ServerSocketChannel;
17import java.nio.channels.SocketChannel;
18import java.text.DecimalFormat;
19import java.util.ArrayList;
20import java.util.List;
21import java.util.concurrent.ExecutorService;
22import java.util.concurrent.Executors;
23
24import static java.lang.String.format;
25import static java.lang.System.out;
26import static org.onlab.util.Tools.delay;
27import static org.onlab.util.Tools.namedThreads;
28
29/**
30 * Auxiliary test fixture to measure speed of NIO-based channels.
31 */
32public class IOLoopTestServer {
33
34 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
35
36 private static final int PRUNE_FREQUENCY = 1000;
37
38 static final int PORT = 9876;
39 static final long TIMEOUT = 1000;
40
41 static final boolean SO_NO_DELAY = false;
42 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
43 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
44
45 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
46
47 private final AcceptorLoop aloop;
48 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
49
50 private final List<CustomIOLoop> iloops = new ArrayList<>();
51 private final ExecutorService ipool;
52
53 private final int workerCount;
54 private final int msgLength;
55 private int lastWorker = -1;
56
57 Counter messages;
58 Counter bytes;
59
60 /**
61 * Main entry point to launch the server.
62 *
63 * @param args command-line arguments
64 * @throws java.io.IOException if unable to crate IO loops
65 */
66 public static void main(String[] args) throws IOException {
67 startStandalone(args);
68 System.exit(0);
69 }
70
71 /**
72 * Starts a standalone IO loop test server.
73 *
74 * @param args command-line arguments
75 */
76 public static void startStandalone(String[] args) throws IOException {
77 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
78 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
79 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
80
81 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
82 wc, ml, ip);
83 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
84 server.start();
85
86 // Start pruning clients.
87 while (true) {
88 delay(PRUNE_FREQUENCY);
89 server.prune();
90 }
91 }
92
93 /**
94 * Creates a speed server.
95 *
96 * @param ip optional ip of the adapter where to bind
97 * @param wc worker count
98 * @param ml message length in bytes
99 * @param port listen port
100 * @throws java.io.IOException if unable to create IO loops
101 */
102 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
103 this.workerCount = wc;
104 this.msgLength = ml;
105 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
106
107 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
108 for (int i = 0; i < workerCount; i++) {
109 iloops.add(new CustomIOLoop());
110 }
111 }
112
113 /**
114 * Start the server IO loops and kicks off throughput tracking.
115 */
116 public void start() {
117 messages = new Counter();
118 bytes = new Counter();
119
120 for (CustomIOLoop l : iloops) {
121 ipool.execute(l);
122 }
123 apool.execute(aloop);
124
125 for (CustomIOLoop l : iloops) {
126 l.awaitStart(TIMEOUT);
127 }
128 aloop.awaitStart(TIMEOUT);
129 }
130
131 /**
132 * Stop the server IO loops and freezes throughput tracking.
133 */
134 public void stop() {
135 aloop.shutdown();
136 for (CustomIOLoop l : iloops) {
137 l.shutdown();
138 }
139
140 for (CustomIOLoop l : iloops) {
141 l.awaitStop(TIMEOUT);
142 }
143 aloop.awaitStop(TIMEOUT);
144
145 messages.freeze();
146 bytes.freeze();
147 }
148
149 /**
150 * Reports on the accumulated throughput trackers.
151 */
152 public void report() {
153 DecimalFormat f = new DecimalFormat("#,##0");
154 out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
155 f.format(messages.total()), f.format(bytes.total()),
156 f.format(messages.throughput()),
157 f.format(bytes.throughput() / (1024 * msgLength))));
158 }
159
160 /**
161 * Prunes the IO loops of stale message buffers.
162 */
163 public void prune() {
164 for (CustomIOLoop l : iloops) {
165 l.pruneStaleStreams();
166 }
167 }
168
169 // Get the next worker to which a client should be assigned
170 private synchronized CustomIOLoop nextWorker() {
171 lastWorker = (lastWorker + 1) % workerCount;
172 return iloops.get(lastWorker);
173 }
174
175 // Loop for transfer of fixed-length messages
176 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
177
178 public CustomIOLoop() throws IOException {
179 super(500);
180 }
181
182 @Override
183 protected TestMessageStream createStream(ByteChannel channel) {
184 return new TestMessageStream(msgLength, channel, this);
185 }
186
187 @Override
188 protected void removeStream(MessageStream<TestMessage> stream) {
189 super.removeStream(stream);
190
191 messages.add(stream.messagesIn().total());
192 bytes.add(stream.bytesIn().total());
193
194// out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
195// FORMAT.format(stream.messagesIn().throughput()),
196// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
197// FORMAT.format(stream.messagesOut().throughput()),
198// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
199 }
200
201 @Override
202 protected void processMessages(List<TestMessage> messages,
203 MessageStream<TestMessage> stream) {
204 try {
205 stream.write(messages);
206 } catch (IOException e) {
207 log.error("Unable to echo messages", e);
208 }
209 }
210 }
211
212 // Loop for accepting client connections
213 private class CustomAcceptLoop extends AcceptorLoop {
214
215 public CustomAcceptLoop(SocketAddress address) throws IOException {
216 super(500, address);
217 }
218
219 @Override
220 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
221 SocketChannel sc = channel.accept();
222 sc.configureBlocking(false);
223
224 Socket so = sc.socket();
225 so.setTcpNoDelay(SO_NO_DELAY);
226 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
227 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
228
229 nextWorker().acceptStream(sc);
230 log.info("Connected client");
231 }
232 }
233
234}