blob: d5ce5f3961cd8e01a5eb1abfbe293d2ebf4e7720 [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07002 * Copyright 2014 Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
toma7083182014-09-25 21:38:03 -070016package org.onlab.nio;
17
tom1ae3d162014-09-26 09:38:16 -070018import com.google.common.collect.Lists;
tom2d6d3972014-09-25 22:38:57 -070019import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -070020import org.slf4j.Logger;
21import org.slf4j.LoggerFactory;
22
23import java.io.IOException;
24import java.net.InetAddress;
25import java.net.InetSocketAddress;
26import java.net.Socket;
27import java.net.SocketAddress;
28import java.nio.channels.ByteChannel;
29import java.nio.channels.ServerSocketChannel;
30import java.nio.channels.SocketChannel;
31import java.text.DecimalFormat;
32import java.util.ArrayList;
33import java.util.List;
34import java.util.concurrent.ExecutorService;
35import java.util.concurrent.Executors;
36
tom74d49652014-09-25 23:48:46 -070037import static java.lang.String.format;
38import static java.lang.System.out;
tom1ae3d162014-09-26 09:38:16 -070039import static org.onlab.util.Tools.delay;
toma7083182014-09-25 21:38:03 -070040import static org.onlab.util.Tools.namedThreads;
41
42/**
43 * Auxiliary test fixture to measure speed of NIO-based channels.
44 */
tom74d49652014-09-25 23:48:46 -070045public class IOLoopTestServer {
toma7083182014-09-25 21:38:03 -070046
tom74d49652014-09-25 23:48:46 -070047 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
toma7083182014-09-25 21:38:03 -070048
49 private static final int PRUNE_FREQUENCY = 1000;
50
51 static final int PORT = 9876;
52 static final long TIMEOUT = 1000;
53
54 static final boolean SO_NO_DELAY = false;
tom74d49652014-09-25 23:48:46 -070055 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
56 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
toma7083182014-09-25 21:38:03 -070057
58 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
59
60 private final AcceptorLoop aloop;
61 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
62
63 private final List<CustomIOLoop> iloops = new ArrayList<>();
64 private final ExecutorService ipool;
65
66 private final int workerCount;
67 private final int msgLength;
68 private int lastWorker = -1;
69
tom2d6d3972014-09-25 22:38:57 -070070 Counter messages;
71 Counter bytes;
toma7083182014-09-25 21:38:03 -070072
73 /**
74 * Main entry point to launch the server.
75 *
76 * @param args command-line arguments
tom1ae3d162014-09-26 09:38:16 -070077 * @throws java.io.IOException if unable to crate IO loops
toma7083182014-09-25 21:38:03 -070078 */
79 public static void main(String[] args) throws IOException {
tom74d49652014-09-25 23:48:46 -070080 startStandalone(args);
81 System.exit(0);
82 }
83
84 /**
85 * Starts a standalone IO loop test server.
86 *
87 * @param args command-line arguments
88 */
tomf110fff2014-09-26 00:38:18 -070089 public static void startStandalone(String[] args) throws IOException {
toma7083182014-09-25 21:38:03 -070090 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
91 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
92 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
93
94 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
95 wc, ml, ip);
tom74d49652014-09-25 23:48:46 -070096 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
97 server.start();
toma7083182014-09-25 21:38:03 -070098
tom5f4df2d2014-09-26 12:19:51 -070099 // Start pruning clients and keep going until their number goes to 0.
100 int remaining = -1;
101 while (remaining == -1 || remaining > 0) {
toma7083182014-09-25 21:38:03 -0700102 delay(PRUNE_FREQUENCY);
tom5f4df2d2014-09-26 12:19:51 -0700103 int r = server.prune();
104 remaining = remaining == -1 && r == 0 ? remaining : r;
toma7083182014-09-25 21:38:03 -0700105 }
tom5f4df2d2014-09-26 12:19:51 -0700106 server.stop();
toma7083182014-09-25 21:38:03 -0700107 }
108
109 /**
110 * Creates a speed server.
111 *
112 * @param ip optional ip of the adapter where to bind
113 * @param wc worker count
114 * @param ml message length in bytes
115 * @param port listen port
tom1ae3d162014-09-26 09:38:16 -0700116 * @throws java.io.IOException if unable to create IO loops
toma7083182014-09-25 21:38:03 -0700117 */
tom74d49652014-09-25 23:48:46 -0700118 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
toma7083182014-09-25 21:38:03 -0700119 this.workerCount = wc;
120 this.msgLength = ml;
121 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
122
123 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
124 for (int i = 0; i < workerCount; i++) {
125 iloops.add(new CustomIOLoop());
126 }
127 }
128
129 /**
130 * Start the server IO loops and kicks off throughput tracking.
131 */
132 public void start() {
tom2d6d3972014-09-25 22:38:57 -0700133 messages = new Counter();
134 bytes = new Counter();
toma7083182014-09-25 21:38:03 -0700135
136 for (CustomIOLoop l : iloops) {
137 ipool.execute(l);
138 }
139 apool.execute(aloop);
tom2d6d3972014-09-25 22:38:57 -0700140
tom74d49652014-09-25 23:48:46 -0700141 for (CustomIOLoop l : iloops) {
142 l.awaitStart(TIMEOUT);
143 }
144 aloop.awaitStart(TIMEOUT);
toma7083182014-09-25 21:38:03 -0700145 }
146
147 /**
148 * Stop the server IO loops and freezes throughput tracking.
149 */
150 public void stop() {
151 aloop.shutdown();
152 for (CustomIOLoop l : iloops) {
153 l.shutdown();
154 }
155
tom74d49652014-09-25 23:48:46 -0700156 for (CustomIOLoop l : iloops) {
157 l.awaitStop(TIMEOUT);
158 }
159 aloop.awaitStop(TIMEOUT);
160
tom2d6d3972014-09-25 22:38:57 -0700161 messages.freeze();
162 bytes.freeze();
toma7083182014-09-25 21:38:03 -0700163 }
164
165 /**
tombf474382014-10-02 07:36:50 -0700166 * Reports on the accumulated throughput and latency.
toma7083182014-09-25 21:38:03 -0700167 */
168 public void report() {
tom2d6d3972014-09-25 22:38:57 -0700169 DecimalFormat f = new DecimalFormat("#,##0");
tom14dc4d02014-09-26 12:43:14 -0700170 out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
tom74d49652014-09-25 23:48:46 -0700171 f.format(messages.total()), f.format(bytes.total()),
172 f.format(messages.throughput()),
173 f.format(bytes.throughput() / (1024 * msgLength))));
toma7083182014-09-25 21:38:03 -0700174 }
175
176 /**
177 * Prunes the IO loops of stale message buffers.
tom5f4df2d2014-09-26 12:19:51 -0700178 *
179 * @return number of remaining IO loops among all workers.
toma7083182014-09-25 21:38:03 -0700180 */
tom5f4df2d2014-09-26 12:19:51 -0700181 public int prune() {
182 int count = 0;
toma7083182014-09-25 21:38:03 -0700183 for (CustomIOLoop l : iloops) {
tom5f4df2d2014-09-26 12:19:51 -0700184 count += l.pruneStaleStreams();
toma7083182014-09-25 21:38:03 -0700185 }
tom5f4df2d2014-09-26 12:19:51 -0700186 return count;
toma7083182014-09-25 21:38:03 -0700187 }
188
189 // Get the next worker to which a client should be assigned
190 private synchronized CustomIOLoop nextWorker() {
191 lastWorker = (lastWorker + 1) % workerCount;
192 return iloops.get(lastWorker);
193 }
194
195 // Loop for transfer of fixed-length messages
196 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
197
198 public CustomIOLoop() throws IOException {
199 super(500);
200 }
201
202 @Override
203 protected TestMessageStream createStream(ByteChannel channel) {
204 return new TestMessageStream(msgLength, channel, this);
205 }
206
207 @Override
208 protected void removeStream(MessageStream<TestMessage> stream) {
209 super.removeStream(stream);
tom2d6d3972014-09-25 22:38:57 -0700210 messages.add(stream.messagesIn().total());
211 bytes.add(stream.bytesIn().total());
toma7083182014-09-25 21:38:03 -0700212 }
213
214 @Override
215 protected void processMessages(List<TestMessage> messages,
216 MessageStream<TestMessage> stream) {
217 try {
tom1ae3d162014-09-26 09:38:16 -0700218 stream.write(createResponses(messages));
toma7083182014-09-25 21:38:03 -0700219 } catch (IOException e) {
220 log.error("Unable to echo messages", e);
221 }
222 }
tom1ae3d162014-09-26 09:38:16 -0700223
224 private List<TestMessage> createResponses(List<TestMessage> messages) {
225 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
226 for (TestMessage message : messages) {
227 responses.add(new TestMessage(message.length(), message.requestorTime(),
tom5f4df2d2014-09-26 12:19:51 -0700228 System.nanoTime(), message.padding()));
tom1ae3d162014-09-26 09:38:16 -0700229 }
230 return responses;
231 }
toma7083182014-09-25 21:38:03 -0700232 }
233
234 // Loop for accepting client connections
235 private class CustomAcceptLoop extends AcceptorLoop {
236
237 public CustomAcceptLoop(SocketAddress address) throws IOException {
238 super(500, address);
239 }
240
241 @Override
242 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
243 SocketChannel sc = channel.accept();
244 sc.configureBlocking(false);
245
246 Socket so = sc.socket();
247 so.setTcpNoDelay(SO_NO_DELAY);
248 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
249 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
250
251 nextWorker().acceptStream(sc);
252 log.info("Connected client");
253 }
254 }
255
256}