blob: a193f504a2d834027a2754b2ac6ae3706e23acaf [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
3import org.slf4j.Logger;
4import org.slf4j.LoggerFactory;
5
6import java.io.IOException;
7import java.net.InetAddress;
8import java.net.InetSocketAddress;
9import java.net.Socket;
10import java.net.SocketAddress;
11import java.nio.channels.ByteChannel;
12import java.nio.channels.ServerSocketChannel;
13import java.nio.channels.SocketChannel;
14import java.text.DecimalFormat;
15import java.util.ArrayList;
16import java.util.List;
17import java.util.concurrent.ExecutorService;
18import java.util.concurrent.Executors;
19
20import static org.onlab.junit.TestTools.delay;
21import static org.onlab.util.Tools.namedThreads;
22
23/**
24 * Auxiliary test fixture to measure speed of NIO-based channels.
25 */
26public class StandaloneSpeedServer {
27
28 private static Logger log = LoggerFactory.getLogger(StandaloneSpeedServer.class);
29
30 private static final int PRUNE_FREQUENCY = 1000;
31
32 static final int PORT = 9876;
33 static final long TIMEOUT = 1000;
34
35 static final boolean SO_NO_DELAY = false;
36 static final int SO_SEND_BUFFER_SIZE = 1024 * 1024;
37 static final int SO_RCV_BUFFER_SIZE = 1024 * 1024;
38
39 static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
40
41 private final AcceptorLoop aloop;
42 private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
43
44 private final List<CustomIOLoop> iloops = new ArrayList<>();
45 private final ExecutorService ipool;
46
47 private final int workerCount;
48 private final int msgLength;
49 private int lastWorker = -1;
50
51// ThroughputTracker messages;
52// ThroughputTracker bytes;
53
54 /**
55 * Main entry point to launch the server.
56 *
57 * @param args command-line arguments
58 * @throws IOException if unable to crate IO loops
59 */
60 public static void main(String[] args) throws IOException {
61 InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
62 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
63 int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
64
65 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
66 wc, ml, ip);
67 StandaloneSpeedServer ss = new StandaloneSpeedServer(ip, wc, ml, PORT);
68 ss.start();
69
70 // Start pruning clients.
71 while (true) {
72 delay(PRUNE_FREQUENCY);
73 ss.prune();
74 }
75 }
76
77 /**
78 * Creates a speed server.
79 *
80 * @param ip optional ip of the adapter where to bind
81 * @param wc worker count
82 * @param ml message length in bytes
83 * @param port listen port
84 * @throws IOException if unable to create IO loops
85 */
86 public StandaloneSpeedServer(InetAddress ip, int wc, int ml, int port) throws IOException {
87 this.workerCount = wc;
88 this.msgLength = ml;
89 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
90
91 this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
92 for (int i = 0; i < workerCount; i++) {
93 iloops.add(new CustomIOLoop());
94 }
95 }
96
97 /**
98 * Start the server IO loops and kicks off throughput tracking.
99 */
100 public void start() {
101// messages = new ThroughputTracker();
102// bytes = new ThroughputTracker();
103
104 for (CustomIOLoop l : iloops) {
105 ipool.execute(l);
106 }
107 apool.execute(aloop);
108//
109// for (CustomIOLoop l : iloops)
110// l.waitForStart(TIMEOUT);
111// aloop.waitForStart(TIMEOUT);
112 }
113
114 /**
115 * Stop the server IO loops and freezes throughput tracking.
116 */
117 public void stop() {
118 aloop.shutdown();
119 for (CustomIOLoop l : iloops) {
120 l.shutdown();
121 }
122
123// for (CustomIOLoop l : iloops)
124// l.waitForFinish(TIMEOUT);
125// aloop.waitForFinish(TIMEOUT);
126//
127// messages.freeze();
128// bytes.freeze();
129 }
130
131 /**
132 * Reports on the accumulated throughput trackers.
133 */
134 public void report() {
135// DecimalFormat f = new DecimalFormat("#,##0");
136// log.info("{} messages; {} bytes; {} mps; {} Mbs",
137// f.format(messages.total()),
138// f.format(bytes.total()),
139// f.format(messages.throughput()),
140// f.format(bytes.throughput() / (1024 * 128)));
141 }
142
143 /**
144 * Prunes the IO loops of stale message buffers.
145 */
146 public void prune() {
147 for (CustomIOLoop l : iloops) {
148 l.pruneStaleStreams();
149 }
150 }
151
152 // Get the next worker to which a client should be assigned
153 private synchronized CustomIOLoop nextWorker() {
154 lastWorker = (lastWorker + 1) % workerCount;
155 return iloops.get(lastWorker);
156 }
157
158 // Loop for transfer of fixed-length messages
159 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
160
161 public CustomIOLoop() throws IOException {
162 super(500);
163 }
164
165 @Override
166 protected TestMessageStream createStream(ByteChannel channel) {
167 return new TestMessageStream(msgLength, channel, this);
168 }
169
170 @Override
171 protected void removeStream(MessageStream<TestMessage> stream) {
172 super.removeStream(stream);
173//
174// messages.add(b.inMessages().total());
175// bytes.add(b.inBytes().total());
176//
177// log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
178// format.format(b.inMessages().throughput()),
179// format.format(b.inBytes().throughput() / (1024 * 128)),
180// format.format(b.outMessages().throughput()),
181// format.format(b.outBytes().throughput() / (1024 * 128)));
182 }
183
184 @Override
185 protected void processMessages(List<TestMessage> messages,
186 MessageStream<TestMessage> stream) {
187 try {
188 stream.write(messages);
189 } catch (IOException e) {
190 log.error("Unable to echo messages", e);
191 }
192 }
193 }
194
195 // Loop for accepting client connections
196 private class CustomAcceptLoop extends AcceptorLoop {
197
198 public CustomAcceptLoop(SocketAddress address) throws IOException {
199 super(500, address);
200 }
201
202 @Override
203 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
204 SocketChannel sc = channel.accept();
205 sc.configureBlocking(false);
206
207 Socket so = sc.socket();
208 so.setTcpNoDelay(SO_NO_DELAY);
209 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
210 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
211
212 nextWorker().acceptStream(sc);
213 log.info("Connected client");
214 }
215 }
216
217}