blob: a4aed30f161d55ce03a0166306f53c86f8fd4106 [file] [log] [blame]
Thomas Vachuska781d18b2014-10-27 10:31:25 -07001/*
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07002 * Copyright 2014 Open Networking Laboratory
Thomas Vachuska781d18b2014-10-27 10:31:25 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska781d18b2014-10-27 10:31:25 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska781d18b2014-10-27 10:31:25 -070015 */
tomf110fff2014-09-26 00:38:18 -070016package org.onlab.onos.foo;
17
tom0e0863f2014-09-26 09:02:33 -070018import com.google.common.collect.Lists;
tomf110fff2014-09-26 00:38:18 -070019import org.onlab.nio.AcceptorLoop;
20import org.onlab.nio.IOLoop;
21import org.onlab.nio.MessageStream;
22import org.onlab.util.Counter;
23import org.slf4j.Logger;
24import org.slf4j.LoggerFactory;
25
26import java.io.IOException;
27import java.net.InetAddress;
28import java.net.InetSocketAddress;
29import java.net.Socket;
30import java.net.SocketAddress;
31import java.nio.channels.ByteChannel;
32import java.nio.channels.ServerSocketChannel;
33import java.nio.channels.SocketChannel;
34import java.text.DecimalFormat;
35import java.util.ArrayList;
36import java.util.List;
37import java.util.concurrent.ExecutorService;
38import java.util.concurrent.Executors;
39
40import static java.lang.String.format;
tom5f4df2d2014-09-26 12:19:51 -070041import static java.lang.System.nanoTime;
tomf110fff2014-09-26 00:38:18 -070042import static java.lang.System.out;
43import static org.onlab.util.Tools.delay;
44import static org.onlab.util.Tools.namedThreads;
45
46/**
47 * Auxiliary test fixture to measure speed of NIO-based channels.
48 */
49public class IOLoopTestServer {
50
51 private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
52
53 private static final int PRUNE_FREQUENCY = 1000;
54
55 static final int PORT = 9876;
56 static final long TIMEOUT = 1000;
57
58 static final boolean SO_NO_DELAY = false;
59 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
60 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
61
62 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
63
64 private final AcceptorLoop aloop;
65 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
66
67 private final List<CustomIOLoop> iloops = new ArrayList<>();
68 private final ExecutorService ipool;
69
70 private final int workerCount;
71 private final int msgLength;
72 private int lastWorker = -1;
73
74 Counter messages;
75 Counter bytes;
76
77 /**
78 * Main entry point to launch the server.
79 *
80 * @param args command-line arguments
81 * @throws java.io.IOException if unable to crate IO loops
82 */
83 public static void main(String[] args) throws IOException {
84 startStandalone(args);
85 System.exit(0);
86 }
87
88 /**
89 * Starts a standalone IO loop test server.
90 *
91 * @param args command-line arguments
92 */
93 public static void startStandalone(String[] args) throws IOException {
94 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
95 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
96 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
97
98 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
99 wc, ml, ip);
100 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
101 server.start();
102
tomf7e13b02014-09-26 11:12:25 -0700103 // Start pruning clients and keep going until their number goes to 0.
104 int remaining = -1;
105 while (remaining == -1 || remaining > 0) {
tomf110fff2014-09-26 00:38:18 -0700106 delay(PRUNE_FREQUENCY);
tomf7e13b02014-09-26 11:12:25 -0700107 int r = server.prune();
108 remaining = remaining == -1 && r == 0 ? remaining : r;
tomf110fff2014-09-26 00:38:18 -0700109 }
tom5f4df2d2014-09-26 12:19:51 -0700110 server.stop();
tomf110fff2014-09-26 00:38:18 -0700111 }
112
113 /**
114 * Creates a speed server.
115 *
116 * @param ip optional ip of the adapter where to bind
117 * @param wc worker count
118 * @param ml message length in bytes
119 * @param port listen port
120 * @throws java.io.IOException if unable to create IO loops
121 */
122 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
123 this.workerCount = wc;
124 this.msgLength = ml;
125 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
126
127 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
128 for (int i = 0; i < workerCount; i++) {
129 iloops.add(new CustomIOLoop());
130 }
131 }
132
133 /**
134 * Start the server IO loops and kicks off throughput tracking.
135 */
136 public void start() {
137 messages = new Counter();
138 bytes = new Counter();
139
140 for (CustomIOLoop l : iloops) {
141 ipool.execute(l);
142 }
143 apool.execute(aloop);
144
145 for (CustomIOLoop l : iloops) {
146 l.awaitStart(TIMEOUT);
147 }
148 aloop.awaitStart(TIMEOUT);
149 }
150
151 /**
152 * Stop the server IO loops and freezes throughput tracking.
153 */
154 public void stop() {
155 aloop.shutdown();
156 for (CustomIOLoop l : iloops) {
157 l.shutdown();
158 }
159
160 for (CustomIOLoop l : iloops) {
161 l.awaitStop(TIMEOUT);
162 }
163 aloop.awaitStop(TIMEOUT);
164
165 messages.freeze();
166 bytes.freeze();
167 }
168
169 /**
tombf474382014-10-02 07:36:50 -0700170 * Reports on the accumulated throughput and latency.
tomf110fff2014-09-26 00:38:18 -0700171 */
172 public void report() {
173 DecimalFormat f = new DecimalFormat("#,##0");
tom14dc4d02014-09-26 12:43:14 -0700174 out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
tomf110fff2014-09-26 00:38:18 -0700175 f.format(messages.total()), f.format(bytes.total()),
176 f.format(messages.throughput()),
177 f.format(bytes.throughput() / (1024 * msgLength))));
178 }
179
180 /**
181 * Prunes the IO loops of stale message buffers.
tomf7e13b02014-09-26 11:12:25 -0700182 *
183 * @return number of remaining IO loops among all workers.
tomf110fff2014-09-26 00:38:18 -0700184 */
tomf7e13b02014-09-26 11:12:25 -0700185 public int prune() {
186 int count = 0;
tomf110fff2014-09-26 00:38:18 -0700187 for (CustomIOLoop l : iloops) {
tomf7e13b02014-09-26 11:12:25 -0700188 count += l.pruneStaleStreams();
tomf110fff2014-09-26 00:38:18 -0700189 }
tomf7e13b02014-09-26 11:12:25 -0700190 return count;
tomf110fff2014-09-26 00:38:18 -0700191 }
192
193 // Get the next worker to which a client should be assigned
194 private synchronized CustomIOLoop nextWorker() {
195 lastWorker = (lastWorker + 1) % workerCount;
196 return iloops.get(lastWorker);
197 }
198
199 // Loop for transfer of fixed-length messages
200 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
201
202 public CustomIOLoop() throws IOException {
203 super(500);
204 }
205
206 @Override
207 protected TestMessageStream createStream(ByteChannel channel) {
208 return new TestMessageStream(msgLength, channel, this);
209 }
210
211 @Override
212 protected void removeStream(MessageStream<TestMessage> stream) {
213 super.removeStream(stream);
tomf110fff2014-09-26 00:38:18 -0700214 messages.add(stream.messagesIn().total());
215 bytes.add(stream.bytesIn().total());
tomf110fff2014-09-26 00:38:18 -0700216 }
217
218 @Override
219 protected void processMessages(List<TestMessage> messages,
220 MessageStream<TestMessage> stream) {
221 try {
tom0e0863f2014-09-26 09:02:33 -0700222 stream.write(createResponses(messages));
tomf110fff2014-09-26 00:38:18 -0700223 } catch (IOException e) {
224 log.error("Unable to echo messages", e);
225 }
226 }
tom0e0863f2014-09-26 09:02:33 -0700227
228 private List<TestMessage> createResponses(List<TestMessage> messages) {
229 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
230 for (TestMessage message : messages) {
231 responses.add(new TestMessage(message.length(), message.requestorTime(),
tom5f4df2d2014-09-26 12:19:51 -0700232 nanoTime(), message.padding()));
tom0e0863f2014-09-26 09:02:33 -0700233 }
234 return responses;
235 }
tomf110fff2014-09-26 00:38:18 -0700236 }
237
238 // Loop for accepting client connections
239 private class CustomAcceptLoop extends AcceptorLoop {
240
241 public CustomAcceptLoop(SocketAddress address) throws IOException {
242 super(500, address);
243 }
244
245 @Override
246 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
247 SocketChannel sc = channel.accept();
248 sc.configureBlocking(false);
249
250 Socket so = sc.socket();
251 so.setTcpNoDelay(SO_NO_DELAY);
252 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
253 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
254
255 nextWorker().acceptStream(sc);
256 log.info("Connected client");
257 }
258 }
259
260}