blob: bb5fee75f6fd2241e702d6b639e452159f9426d5 [file] [log] [blame]
tomf110fff2014-09-26 00:38:18 -07001package org.onlab.onos.foo;
2
tom0e0863f2014-09-26 09:02:33 -07003import com.google.common.collect.Lists;
tomf110fff2014-09-26 00:38:18 -07004import org.onlab.nio.AcceptorLoop;
5import org.onlab.nio.IOLoop;
6import org.onlab.nio.MessageStream;
7import org.onlab.util.Counter;
8import org.slf4j.Logger;
9import org.slf4j.LoggerFactory;
10
11import java.io.IOException;
12import java.net.InetAddress;
13import java.net.InetSocketAddress;
14import java.net.Socket;
15import java.net.SocketAddress;
16import java.nio.channels.ByteChannel;
17import java.nio.channels.ServerSocketChannel;
18import java.nio.channels.SocketChannel;
19import java.text.DecimalFormat;
20import java.util.ArrayList;
21import java.util.List;
22import java.util.concurrent.ExecutorService;
23import java.util.concurrent.Executors;
24
25import static java.lang.String.format;
tom0e0863f2014-09-26 09:02:33 -070026import static java.lang.System.currentTimeMillis;
tomf110fff2014-09-26 00:38:18 -070027import static java.lang.System.out;
28import static org.onlab.util.Tools.delay;
29import static org.onlab.util.Tools.namedThreads;
30
31/**
32 * Auxiliary test fixture to measure speed of NIO-based channels.
33 */
34public class IOLoopTestServer {
35
36 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
37
38 private static final int PRUNE_FREQUENCY = 1000;
39
40 static final int PORT = 9876;
41 static final long TIMEOUT = 1000;
42
43 static final boolean SO_NO_DELAY = false;
44 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
45 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
46
47 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
48
49 private final AcceptorLoop aloop;
50 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
51
52 private final List<CustomIOLoop> iloops = new ArrayList<>();
53 private final ExecutorService ipool;
54
55 private final int workerCount;
56 private final int msgLength;
57 private int lastWorker = -1;
58
59 Counter messages;
60 Counter bytes;
61
62 /**
63 * Main entry point to launch the server.
64 *
65 * @param args command-line arguments
66 * @throws java.io.IOException if unable to crate IO loops
67 */
68 public static void main(String[] args) throws IOException {
69 startStandalone(args);
70 System.exit(0);
71 }
72
73 /**
74 * Starts a standalone IO loop test server.
75 *
76 * @param args command-line arguments
77 */
78 public static void startStandalone(String[] args) throws IOException {
79 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
80 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
81 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
82
83 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
84 wc, ml, ip);
85 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
86 server.start();
87
88 // Start pruning clients.
89 while (true) {
90 delay(PRUNE_FREQUENCY);
91 server.prune();
92 }
93 }
94
95 /**
96 * Creates a speed server.
97 *
98 * @param ip optional ip of the adapter where to bind
99 * @param wc worker count
100 * @param ml message length in bytes
101 * @param port listen port
102 * @throws java.io.IOException if unable to create IO loops
103 */
104 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
105 this.workerCount = wc;
106 this.msgLength = ml;
107 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
108
109 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
110 for (int i = 0; i < workerCount; i++) {
111 iloops.add(new CustomIOLoop());
112 }
113 }
114
115 /**
116 * Start the server IO loops and kicks off throughput tracking.
117 */
118 public void start() {
119 messages = new Counter();
120 bytes = new Counter();
121
122 for (CustomIOLoop l : iloops) {
123 ipool.execute(l);
124 }
125 apool.execute(aloop);
126
127 for (CustomIOLoop l : iloops) {
128 l.awaitStart(TIMEOUT);
129 }
130 aloop.awaitStart(TIMEOUT);
131 }
132
133 /**
134 * Stop the server IO loops and freezes throughput tracking.
135 */
136 public void stop() {
137 aloop.shutdown();
138 for (CustomIOLoop l : iloops) {
139 l.shutdown();
140 }
141
142 for (CustomIOLoop l : iloops) {
143 l.awaitStop(TIMEOUT);
144 }
145 aloop.awaitStop(TIMEOUT);
146
147 messages.freeze();
148 bytes.freeze();
149 }
150
151 /**
152 * Reports on the accumulated throughput trackers.
153 */
154 public void report() {
155 DecimalFormat f = new DecimalFormat("#,##0");
156 out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
157 f.format(messages.total()), f.format(bytes.total()),
158 f.format(messages.throughput()),
159 f.format(bytes.throughput() / (1024 * msgLength))));
160 }
161
162 /**
163 * Prunes the IO loops of stale message buffers.
164 */
165 public void prune() {
166 for (CustomIOLoop l : iloops) {
167 l.pruneStaleStreams();
168 }
169 }
170
171 // Get the next worker to which a client should be assigned
172 private synchronized CustomIOLoop nextWorker() {
173 lastWorker = (lastWorker + 1) % workerCount;
174 return iloops.get(lastWorker);
175 }
176
177 // Loop for transfer of fixed-length messages
178 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
179
180 public CustomIOLoop() throws IOException {
181 super(500);
182 }
183
184 @Override
185 protected TestMessageStream createStream(ByteChannel channel) {
186 return new TestMessageStream(msgLength, channel, this);
187 }
188
189 @Override
190 protected void removeStream(MessageStream<TestMessage> stream) {
191 super.removeStream(stream);
192
193 messages.add(stream.messagesIn().total());
194 bytes.add(stream.bytesIn().total());
195
196// out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
197// FORMAT.format(stream.messagesIn().throughput()),
198// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
199// FORMAT.format(stream.messagesOut().throughput()),
200// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
201 }
202
203 @Override
204 protected void processMessages(List<TestMessage> messages,
205 MessageStream<TestMessage> stream) {
206 try {
tom0e0863f2014-09-26 09:02:33 -0700207 stream.write(createResponses(messages));
tomf110fff2014-09-26 00:38:18 -0700208 } catch (IOException e) {
209 log.error("Unable to echo messages", e);
210 }
211 }
tom0e0863f2014-09-26 09:02:33 -0700212
213 private List<TestMessage> createResponses(List<TestMessage> messages) {
214 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
215 for (TestMessage message : messages) {
216 responses.add(new TestMessage(message.length(), message.requestorTime(),
217 currentTimeMillis(), message.padding()));
218 }
219 return responses;
220 }
tomf110fff2014-09-26 00:38:18 -0700221 }
222
223 // Loop for accepting client connections
224 private class CustomAcceptLoop extends AcceptorLoop {
225
226 public CustomAcceptLoop(SocketAddress address) throws IOException {
227 super(500, address);
228 }
229
230 @Override
231 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
232 SocketChannel sc = channel.accept();
233 sc.configureBlocking(false);
234
235 Socket so = sc.socket();
236 so.setTcpNoDelay(SO_NO_DELAY);
237 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
238 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
239
240 nextWorker().acceptStream(sc);
241 log.info("Connected client");
242 }
243 }
244
245}