blob: f2d2ca3b0a7458dca87bdca5ab1e56d2f8fbb4ad [file] [log] [blame]
Thomas Vachuska781d18b2014-10-27 10:31:25 -07001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
tomf110fff2014-09-26 00:38:18 -070019package org.onlab.onos.foo;
20
tom0e0863f2014-09-26 09:02:33 -070021import com.google.common.collect.Lists;
tomf110fff2014-09-26 00:38:18 -070022import org.onlab.nio.AcceptorLoop;
23import org.onlab.nio.IOLoop;
24import org.onlab.nio.MessageStream;
25import org.onlab.util.Counter;
26import org.slf4j.Logger;
27import org.slf4j.LoggerFactory;
28
29import java.io.IOException;
30import java.net.InetAddress;
31import java.net.InetSocketAddress;
32import java.net.Socket;
33import java.net.SocketAddress;
34import java.nio.channels.ByteChannel;
35import java.nio.channels.ServerSocketChannel;
36import java.nio.channels.SocketChannel;
37import java.text.DecimalFormat;
38import java.util.ArrayList;
39import java.util.List;
40import java.util.concurrent.ExecutorService;
41import java.util.concurrent.Executors;
42
43import static java.lang.String.format;
tom5f4df2d2014-09-26 12:19:51 -070044import static java.lang.System.nanoTime;
tomf110fff2014-09-26 00:38:18 -070045import static java.lang.System.out;
46import static org.onlab.util.Tools.delay;
47import static org.onlab.util.Tools.namedThreads;
48
49/**
50 * Auxiliary test fixture to measure speed of NIO-based channels.
51 */
52public class IOLoopTestServer {
53
54 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
55
56 private static final int PRUNE_FREQUENCY = 1000;
57
58 static final int PORT = 9876;
59 static final long TIMEOUT = 1000;
60
61 static final boolean SO_NO_DELAY = false;
62 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
63 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
64
65 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
66
67 private final AcceptorLoop aloop;
68 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
69
70 private final List<CustomIOLoop> iloops = new ArrayList<>();
71 private final ExecutorService ipool;
72
73 private final int workerCount;
74 private final int msgLength;
75 private int lastWorker = -1;
76
77 Counter messages;
78 Counter bytes;
79
80 /**
81 * Main entry point to launch the server.
82 *
83 * @param args command-line arguments
84 * @throws java.io.IOException if unable to crate IO loops
85 */
86 public static void main(String[] args) throws IOException {
87 startStandalone(args);
88 System.exit(0);
89 }
90
91 /**
92 * Starts a standalone IO loop test server.
93 *
94 * @param args command-line arguments
95 */
96 public static void startStandalone(String[] args) throws IOException {
97 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
98 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
99 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
100
101 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
102 wc, ml, ip);
103 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
104 server.start();
105
tomf7e13b02014-09-26 11:12:25 -0700106 // Start pruning clients and keep going until their number goes to 0.
107 int remaining = -1;
108 while (remaining == -1 || remaining > 0) {
tomf110fff2014-09-26 00:38:18 -0700109 delay(PRUNE_FREQUENCY);
tomf7e13b02014-09-26 11:12:25 -0700110 int r = server.prune();
111 remaining = remaining == -1 && r == 0 ? remaining : r;
tomf110fff2014-09-26 00:38:18 -0700112 }
tom5f4df2d2014-09-26 12:19:51 -0700113 server.stop();
tomf110fff2014-09-26 00:38:18 -0700114 }
115
116 /**
117 * Creates a speed server.
118 *
119 * @param ip optional ip of the adapter where to bind
120 * @param wc worker count
121 * @param ml message length in bytes
122 * @param port listen port
123 * @throws java.io.IOException if unable to create IO loops
124 */
125 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
126 this.workerCount = wc;
127 this.msgLength = ml;
128 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
129
130 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
131 for (int i = 0; i < workerCount; i++) {
132 iloops.add(new CustomIOLoop());
133 }
134 }
135
136 /**
137 * Start the server IO loops and kicks off throughput tracking.
138 */
139 public void start() {
140 messages = new Counter();
141 bytes = new Counter();
142
143 for (CustomIOLoop l : iloops) {
144 ipool.execute(l);
145 }
146 apool.execute(aloop);
147
148 for (CustomIOLoop l : iloops) {
149 l.awaitStart(TIMEOUT);
150 }
151 aloop.awaitStart(TIMEOUT);
152 }
153
154 /**
155 * Stop the server IO loops and freezes throughput tracking.
156 */
157 public void stop() {
158 aloop.shutdown();
159 for (CustomIOLoop l : iloops) {
160 l.shutdown();
161 }
162
163 for (CustomIOLoop l : iloops) {
164 l.awaitStop(TIMEOUT);
165 }
166 aloop.awaitStop(TIMEOUT);
167
168 messages.freeze();
169 bytes.freeze();
170 }
171
172 /**
tombf474382014-10-02 07:36:50 -0700173 * Reports on the accumulated throughput and latency.
tomf110fff2014-09-26 00:38:18 -0700174 */
175 public void report() {
176 DecimalFormat f = new DecimalFormat("#,##0");
tom14dc4d02014-09-26 12:43:14 -0700177 out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
tomf110fff2014-09-26 00:38:18 -0700178 f.format(messages.total()), f.format(bytes.total()),
179 f.format(messages.throughput()),
180 f.format(bytes.throughput() / (1024 * msgLength))));
181 }
182
183 /**
184 * Prunes the IO loops of stale message buffers.
tomf7e13b02014-09-26 11:12:25 -0700185 *
186 * @return number of remaining IO loops among all workers.
tomf110fff2014-09-26 00:38:18 -0700187 */
tomf7e13b02014-09-26 11:12:25 -0700188 public int prune() {
189 int count = 0;
tomf110fff2014-09-26 00:38:18 -0700190 for (CustomIOLoop l : iloops) {
tomf7e13b02014-09-26 11:12:25 -0700191 count += l.pruneStaleStreams();
tomf110fff2014-09-26 00:38:18 -0700192 }
tomf7e13b02014-09-26 11:12:25 -0700193 return count;
tomf110fff2014-09-26 00:38:18 -0700194 }
195
196 // Get the next worker to which a client should be assigned
197 private synchronized CustomIOLoop nextWorker() {
198 lastWorker = (lastWorker + 1) % workerCount;
199 return iloops.get(lastWorker);
200 }
201
202 // Loop for transfer of fixed-length messages
203 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
204
205 public CustomIOLoop() throws IOException {
206 super(500);
207 }
208
209 @Override
210 protected TestMessageStream createStream(ByteChannel channel) {
211 return new TestMessageStream(msgLength, channel, this);
212 }
213
214 @Override
215 protected void removeStream(MessageStream<TestMessage> stream) {
216 super.removeStream(stream);
tomf110fff2014-09-26 00:38:18 -0700217 messages.add(stream.messagesIn().total());
218 bytes.add(stream.bytesIn().total());
tomf110fff2014-09-26 00:38:18 -0700219 }
220
221 @Override
222 protected void processMessages(List<TestMessage> messages,
223 MessageStream<TestMessage> stream) {
224 try {
tom0e0863f2014-09-26 09:02:33 -0700225 stream.write(createResponses(messages));
tomf110fff2014-09-26 00:38:18 -0700226 } catch (IOException e) {
227 log.error("Unable to echo messages", e);
228 }
229 }
tom0e0863f2014-09-26 09:02:33 -0700230
231 private List<TestMessage> createResponses(List<TestMessage> messages) {
232 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
233 for (TestMessage message : messages) {
234 responses.add(new TestMessage(message.length(), message.requestorTime(),
tom5f4df2d2014-09-26 12:19:51 -0700235 nanoTime(), message.padding()));
tom0e0863f2014-09-26 09:02:33 -0700236 }
237 return responses;
238 }
tomf110fff2014-09-26 00:38:18 -0700239 }
240
241 // Loop for accepting client connections
242 private class CustomAcceptLoop extends AcceptorLoop {
243
244 public CustomAcceptLoop(SocketAddress address) throws IOException {
245 super(500, address);
246 }
247
248 @Override
249 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
250 SocketChannel sc = channel.accept();
251 sc.configureBlocking(false);
252
253 Socket so = sc.socket();
254 so.setTcpNoDelay(SO_NO_DELAY);
255 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
256 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
257
258 nextWorker().acceptStream(sc);
259 log.info("Connected client");
260 }
261 }
262
263}