blob: c183523bcc6c12451d30039bbaa919276cd30308 [file] [log] [blame]
Madan Jampanic26eede2015-04-16 11:42:16 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.nio.service;
17
18import static org.onlab.util.Tools.groupedThreads;
19
20import java.io.IOException;
21import java.net.InetSocketAddress;
22import java.net.Socket;
23import java.net.SocketAddress;
24import java.nio.channels.ServerSocketChannel;
25import java.nio.channels.SocketChannel;
26import java.util.List;
27import java.util.concurrent.CompletableFuture;
28import java.util.concurrent.ConcurrentHashMap;
29import java.util.concurrent.ConcurrentMap;
30import java.util.concurrent.Executor;
31import java.util.concurrent.ExecutorService;
32import java.util.concurrent.Executors;
33import java.util.concurrent.TimeoutException;
34import java.util.concurrent.atomic.AtomicLong;
35import java.util.function.Consumer;
36import java.util.function.Function;
37
38import org.apache.commons.pool.KeyedPoolableObjectFactory;
39import org.apache.commons.pool.impl.GenericKeyedObjectPool;
40import org.onlab.nio.AcceptorLoop;
Thomas Vachuskab093c912015-04-20 10:28:26 -070041import org.onlab.nio.SelectorLoop;
Madan Jampanic26eede2015-04-16 11:42:16 -070042import org.onlab.packet.IpAddress;
43import org.onosproject.store.cluster.messaging.Endpoint;
44import org.onosproject.store.cluster.messaging.MessagingService;
45import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
47
48import com.google.common.cache.Cache;
49import com.google.common.cache.CacheBuilder;
50import com.google.common.cache.RemovalListener;
51import com.google.common.cache.RemovalNotification;
52import com.google.common.collect.Lists;
53
54/**
55 * MessagingService implementation based on IOLoop.
56 */
Thomas Vachuskab093c912015-04-20 10:28:26 -070057public class IOLoopMessagingManager implements MessagingService {
Madan Jampanic26eede2015-04-16 11:42:16 -070058
59 private final Logger log = LoggerFactory.getLogger(getClass());
60
61 private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
62
63 static final int PORT = 9876;
64 static final long TIMEOUT = 1000;
65
66 static final boolean SO_NO_DELAY = false;
67 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
68 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
69
70 private static final int NUM_WORKERS = 8;
71
72 private AcceptorLoop acceptorLoop;
73 private final ExecutorService acceptorThreadPool =
74 Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor"));
75 private final ExecutorService ioThreadPool =
76 Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d"));
77
78 private final List<DefaultIOLoop> ioLoops = Lists.newArrayList();
79
80 private int lastWorker = -1;
81
82 private final Endpoint localEp;
83
84 private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
Thomas Vachuskab093c912015-04-20 10:28:26 -070085 new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
Madan Jampanic26eede2015-04-16 11:42:16 -070086
87 private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>();
88 private final AtomicLong messageIdGenerator = new AtomicLong(0);
89 private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
90 .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
91 @Override
92 public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
93 if (entry.wasEvicted()) {
94 entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
95 }
96 }
97 })
98 .build();
99
100
Thomas Vachuskab093c912015-04-20 10:28:26 -0700101 public IOLoopMessagingManager(int port) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700102 this(new Endpoint(IpAddress.valueOf("127.0.0.1"), port));
103 }
104
Thomas Vachuskab093c912015-04-20 10:28:26 -0700105 public IOLoopMessagingManager(IpAddress ip, int port) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700106 this(new Endpoint(ip, port));
107 }
108
Thomas Vachuskab093c912015-04-20 10:28:26 -0700109 public IOLoopMessagingManager(Endpoint localEp) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700110 this.localEp = localEp;
111 }
112
113 /**
114 * Returns the local endpoint.
Thomas Vachuskab093c912015-04-20 10:28:26 -0700115 *
Madan Jampanic26eede2015-04-16 11:42:16 -0700116 * @return local endpoint
117 */
118 public Endpoint localEp() {
119 return localEp;
120 }
121
122 /**
123 * Activates IO Loops.
Thomas Vachuskab093c912015-04-20 10:28:26 -0700124 *
Madan Jampanic26eede2015-04-16 11:42:16 -0700125 * @throws IOException is activation fails
126 */
127 public void activate() throws IOException {
128 streams.setLifo(false);
129 this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
130
131 for (int i = 0; i < NUM_WORKERS; i++) {
132 ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
133 }
134
Thomas Vachuskab093c912015-04-20 10:28:26 -0700135 ioLoops.forEach(ioThreadPool::execute);
Madan Jampanic26eede2015-04-16 11:42:16 -0700136 acceptorThreadPool.execute(acceptorLoop);
137 ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
138 acceptorLoop.awaitStart(TIMEOUT);
139 }
140
141 /**
142 * Shuts down IO loops.
143 */
144 public void deactivate() {
Thomas Vachuskab093c912015-04-20 10:28:26 -0700145 ioLoops.forEach(SelectorLoop::shutdown);
Madan Jampanic26eede2015-04-16 11:42:16 -0700146 acceptorLoop.shutdown();
147 ioThreadPool.shutdown();
148 acceptorThreadPool.shutdown();
149 }
150
151
152 @Override
153 public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
154 DefaultMessage message = new DefaultMessage(
155 messageIdGenerator.incrementAndGet(),
156 localEp,
157 type,
158 payload);
159 sendAsync(ep, message);
160 }
161
162 protected void sendAsync(Endpoint ep, DefaultMessage message) throws IOException {
163 if (ep.equals(localEp)) {
164 dispatchLocally(message);
165 return;
166 }
167
168 DefaultMessageStream stream = null;
169 try {
170 stream = streams.borrowObject(ep);
171 } catch (Exception e) {
172 throw new IOException(e);
173 }
174 try {
175 stream.write(message);
176 } finally {
177 try {
178 streams.returnObject(ep, stream);
179 } catch (Exception e) {
180 log.warn("Failed to return stream to pool");
181 }
182 }
183 }
184
185 @Override
186 public CompletableFuture<byte[]> sendAndReceive(
187 Endpoint ep,
188 String type,
189 byte[] payload) {
190 CompletableFuture<byte[]> response = new CompletableFuture<>();
191 Long messageId = messageIdGenerator.incrementAndGet();
192 responseFutures.put(messageId, response);
193 DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload);
194 try {
195 sendAsync(ep, message);
196 } catch (Exception e) {
197 responseFutures.invalidate(messageId);
198 response.completeExceptionally(e);
199 }
200 return response;
201 }
202
203 @Override
204 public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
205 handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
206 }
207
208 @Override
209 public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
210 handlers.put(type, message -> executor.execute(() -> {
211 byte[] responsePayload = handler.apply(message.payload());
212 if (responsePayload != null) {
213 DefaultMessage response = new DefaultMessage(message.id(),
214 localEp,
215 REPLY_MESSAGE_TYPE,
216 responsePayload);
217 try {
218 sendAsync(message.sender(), response);
219 } catch (IOException e) {
220 log.debug("Failed to respond", e);
221 }
222 }
223 }));
224 }
225
226 @Override
227 public void unregisterHandler(String type) {
228 handlers.remove(type);
229 }
230
231 protected void dispatchLocally(DefaultMessage message) {
232 String type = message.type();
233 if (REPLY_MESSAGE_TYPE.equals(type)) {
234 try {
235 CompletableFuture<byte[]> futureResponse =
236 responseFutures.getIfPresent(message.id());
237 if (futureResponse != null) {
238 futureResponse.complete(message.payload());
239 } else {
240 log.warn("Received a reply for message id:[{}]. "
241 + " from {}. But was unable to locate the"
242 + " request handle", message.id(), message.sender());
243 }
244 } finally {
245 responseFutures.invalidate(message.id());
246 }
247 return;
248 }
249 Consumer<DefaultMessage> handler = handlers.get(type);
250 if (handler != null) {
251 handler.accept(message);
252 } else {
253 log.debug("No handler registered for {}", type);
254 }
255 }
256
257 // Get the next worker to which a client should be assigned
258 private synchronized DefaultIOLoop nextWorker() {
259 lastWorker = (lastWorker + 1) % NUM_WORKERS;
260 return ioLoops.get(lastWorker);
261 }
262
263 /**
264 * Initiates open connection request and registers the pending socket
265 * channel with the given IO loop.
266 *
267 * @param loop loop with which the channel should be registered
268 * @throws java.io.IOException if the socket could not be open or connected
269 */
270 private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException {
271 SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port());
272 SocketChannel ch = SocketChannel.open();
273 ch.configureBlocking(false);
274 DefaultMessageStream stream = loop.connectStream(ch);
275 ch.connect(sa);
276 return stream;
277 }
278
279 // Loop for accepting client connections
280 private class DefaultAcceptorLoop extends AcceptorLoop {
281
282 public DefaultAcceptorLoop(SocketAddress address) throws IOException {
283 super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address);
284 }
285
286 @Override
287 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
288 SocketChannel sc = channel.accept();
289 sc.configureBlocking(false);
290
291 Socket so = sc.socket();
292 so.setTcpNoDelay(SO_NO_DELAY);
293 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
294 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
295
296 nextWorker().acceptStream(sc);
297 }
298 }
299
300 private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> {
301
302 @Override
303 public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception {
304 }
305
306 @Override
307 public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
308 stream.close();
309 }
310
311 @Override
312 public DefaultMessageStream makeObject(Endpoint ep) throws Exception {
313 DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get();
314 log.info("Established a new connection to {}", ep);
315 return stream;
316 }
317
318 @Override
319 public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
320 }
321
322 @Override
323 public boolean validateObject(Endpoint ep, DefaultMessageStream stream) {
324 return stream.isClosed();
325 }
326 }
327}