blob: 70f2f768ca0174a42ae7f249f96862a3eeb55cf8 [file] [log] [blame]
Madan Jampanic26eede2015-04-16 11:42:16 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.nio.service;
17
18import static org.onlab.util.Tools.groupedThreads;
19
20import java.io.IOException;
21import java.net.InetSocketAddress;
22import java.net.Socket;
23import java.net.SocketAddress;
24import java.nio.channels.ServerSocketChannel;
25import java.nio.channels.SocketChannel;
26import java.util.List;
27import java.util.concurrent.CompletableFuture;
28import java.util.concurrent.ConcurrentHashMap;
29import java.util.concurrent.ConcurrentMap;
30import java.util.concurrent.Executor;
31import java.util.concurrent.ExecutorService;
32import java.util.concurrent.Executors;
33import java.util.concurrent.TimeoutException;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070034import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampanic26eede2015-04-16 11:42:16 -070035import java.util.concurrent.atomic.AtomicLong;
36import java.util.function.Consumer;
37import java.util.function.Function;
38
39import org.apache.commons.pool.KeyedPoolableObjectFactory;
40import org.apache.commons.pool.impl.GenericKeyedObjectPool;
41import org.onlab.nio.AcceptorLoop;
Thomas Vachuskab093c912015-04-20 10:28:26 -070042import org.onlab.nio.SelectorLoop;
Madan Jampanic26eede2015-04-16 11:42:16 -070043import org.onosproject.store.cluster.messaging.Endpoint;
44import org.onosproject.store.cluster.messaging.MessagingService;
45import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
47
48import com.google.common.cache.Cache;
49import com.google.common.cache.CacheBuilder;
50import com.google.common.cache.RemovalListener;
51import com.google.common.cache.RemovalNotification;
52import com.google.common.collect.Lists;
53
54/**
55 * MessagingService implementation based on IOLoop.
56 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070057public class IOLoopMessaging implements MessagingService {
Madan Jampanic26eede2015-04-16 11:42:16 -070058
59 private final Logger log = LoggerFactory.getLogger(getClass());
60
61 private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
62
Madan Jampanic26eede2015-04-16 11:42:16 -070063 static final long TIMEOUT = 1000;
64
65 static final boolean SO_NO_DELAY = false;
66 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
67 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
68
69 private static final int NUM_WORKERS = 8;
70
71 private AcceptorLoop acceptorLoop;
72 private final ExecutorService acceptorThreadPool =
73 Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor"));
74 private final ExecutorService ioThreadPool =
75 Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d"));
76
77 private final List<DefaultIOLoop> ioLoops = Lists.newArrayList();
78
79 private int lastWorker = -1;
80
Madan Jampaniafeebbd2015-05-19 15:26:01 -070081 private final AtomicBoolean started = new AtomicBoolean(false);
82 private Endpoint localEp;
Madan Jampanic26eede2015-04-16 11:42:16 -070083
84 private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
Thomas Vachuskab093c912015-04-20 10:28:26 -070085 new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
Madan Jampanic26eede2015-04-16 11:42:16 -070086
87 private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>();
88 private final AtomicLong messageIdGenerator = new AtomicLong(0);
89 private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
90 .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
91 @Override
92 public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
93 if (entry.wasEvicted()) {
94 entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
95 }
96 }
97 })
98 .build();
99
Madan Jampanic26eede2015-04-16 11:42:16 -0700100 /**
101 * Activates IO Loops.
Thomas Vachuskab093c912015-04-20 10:28:26 -0700102 *
Madan Jampanic26eede2015-04-16 11:42:16 -0700103 * @throws IOException is activation fails
104 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700105 public void start(Endpoint localEp) throws IOException {
106 if (started.get()) {
107 log.warn("IOMessaging is already running at {}", localEp);
108 return;
109 }
110 this.localEp = localEp;
Madan Jampanic26eede2015-04-16 11:42:16 -0700111 streams.setLifo(false);
112 this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
113
114 for (int i = 0; i < NUM_WORKERS; i++) {
115 ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
116 }
117
Thomas Vachuskab093c912015-04-20 10:28:26 -0700118 ioLoops.forEach(ioThreadPool::execute);
Madan Jampanic26eede2015-04-16 11:42:16 -0700119 acceptorThreadPool.execute(acceptorLoop);
120 ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
121 acceptorLoop.awaitStart(TIMEOUT);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700122 started.set(true);
Madan Jampanic26eede2015-04-16 11:42:16 -0700123 }
124
125 /**
126 * Shuts down IO loops.
127 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700128 public void stop() {
129 if (started.get()) {
130 ioLoops.forEach(SelectorLoop::shutdown);
131 acceptorLoop.shutdown();
132 ioThreadPool.shutdown();
133 acceptorThreadPool.shutdown();
134 started.set(false);
135 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700136 }
137
138
139 @Override
140 public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
141 DefaultMessage message = new DefaultMessage(
142 messageIdGenerator.incrementAndGet(),
143 localEp,
144 type,
145 payload);
146 sendAsync(ep, message);
147 }
148
149 protected void sendAsync(Endpoint ep, DefaultMessage message) throws IOException {
150 if (ep.equals(localEp)) {
151 dispatchLocally(message);
152 return;
153 }
154
155 DefaultMessageStream stream = null;
156 try {
157 stream = streams.borrowObject(ep);
158 } catch (Exception e) {
159 throw new IOException(e);
160 }
161 try {
162 stream.write(message);
163 } finally {
164 try {
165 streams.returnObject(ep, stream);
166 } catch (Exception e) {
167 log.warn("Failed to return stream to pool");
168 }
169 }
170 }
171
172 @Override
173 public CompletableFuture<byte[]> sendAndReceive(
174 Endpoint ep,
175 String type,
176 byte[] payload) {
177 CompletableFuture<byte[]> response = new CompletableFuture<>();
178 Long messageId = messageIdGenerator.incrementAndGet();
179 responseFutures.put(messageId, response);
180 DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload);
181 try {
182 sendAsync(ep, message);
183 } catch (Exception e) {
184 responseFutures.invalidate(messageId);
185 response.completeExceptionally(e);
186 }
187 return response;
188 }
189
190 @Override
191 public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
192 handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
193 }
194
195 @Override
196 public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
197 handlers.put(type, message -> executor.execute(() -> {
198 byte[] responsePayload = handler.apply(message.payload());
199 if (responsePayload != null) {
200 DefaultMessage response = new DefaultMessage(message.id(),
201 localEp,
202 REPLY_MESSAGE_TYPE,
203 responsePayload);
204 try {
205 sendAsync(message.sender(), response);
206 } catch (IOException e) {
207 log.debug("Failed to respond", e);
208 }
209 }
210 }));
211 }
212
213 @Override
214 public void unregisterHandler(String type) {
215 handlers.remove(type);
216 }
217
218 protected void dispatchLocally(DefaultMessage message) {
219 String type = message.type();
220 if (REPLY_MESSAGE_TYPE.equals(type)) {
221 try {
222 CompletableFuture<byte[]> futureResponse =
223 responseFutures.getIfPresent(message.id());
224 if (futureResponse != null) {
225 futureResponse.complete(message.payload());
226 } else {
227 log.warn("Received a reply for message id:[{}]. "
228 + " from {}. But was unable to locate the"
229 + " request handle", message.id(), message.sender());
230 }
231 } finally {
232 responseFutures.invalidate(message.id());
233 }
234 return;
235 }
236 Consumer<DefaultMessage> handler = handlers.get(type);
237 if (handler != null) {
238 handler.accept(message);
239 } else {
240 log.debug("No handler registered for {}", type);
241 }
242 }
243
244 // Get the next worker to which a client should be assigned
245 private synchronized DefaultIOLoop nextWorker() {
246 lastWorker = (lastWorker + 1) % NUM_WORKERS;
247 return ioLoops.get(lastWorker);
248 }
249
250 /**
251 * Initiates open connection request and registers the pending socket
252 * channel with the given IO loop.
253 *
254 * @param loop loop with which the channel should be registered
255 * @throws java.io.IOException if the socket could not be open or connected
256 */
257 private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException {
258 SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port());
259 SocketChannel ch = SocketChannel.open();
260 ch.configureBlocking(false);
261 DefaultMessageStream stream = loop.connectStream(ch);
262 ch.connect(sa);
263 return stream;
264 }
265
266 // Loop for accepting client connections
267 private class DefaultAcceptorLoop extends AcceptorLoop {
268
269 public DefaultAcceptorLoop(SocketAddress address) throws IOException {
270 super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address);
271 }
272
273 @Override
274 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
275 SocketChannel sc = channel.accept();
276 sc.configureBlocking(false);
277
278 Socket so = sc.socket();
279 so.setTcpNoDelay(SO_NO_DELAY);
280 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
281 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
282
283 nextWorker().acceptStream(sc);
284 }
285 }
286
287 private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> {
288
289 @Override
290 public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception {
291 }
292
293 @Override
294 public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
295 stream.close();
296 }
297
298 @Override
299 public DefaultMessageStream makeObject(Endpoint ep) throws Exception {
300 DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get();
301 log.info("Established a new connection to {}", ep);
302 return stream;
303 }
304
305 @Override
306 public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
307 }
308
309 @Override
310 public boolean validateObject(Endpoint ep, DefaultMessageStream stream) {
311 return stream.isClosed();
312 }
313 }
314}