blob: 69b312a0dd6057b583352704c6343567c6708e11 [file] [log] [blame]
Madan Jampanic26eede2015-04-16 11:42:16 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.nio.service;
17
18import static org.onlab.util.Tools.groupedThreads;
19
20import java.io.IOException;
21import java.net.InetSocketAddress;
22import java.net.Socket;
23import java.net.SocketAddress;
24import java.nio.channels.ServerSocketChannel;
25import java.nio.channels.SocketChannel;
26import java.util.List;
27import java.util.concurrent.CompletableFuture;
28import java.util.concurrent.ConcurrentHashMap;
29import java.util.concurrent.ConcurrentMap;
30import java.util.concurrent.Executor;
31import java.util.concurrent.ExecutorService;
32import java.util.concurrent.Executors;
33import java.util.concurrent.TimeoutException;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070034import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampanic26eede2015-04-16 11:42:16 -070035import java.util.concurrent.atomic.AtomicLong;
36import java.util.function.Consumer;
37import java.util.function.Function;
38
39import org.apache.commons.pool.KeyedPoolableObjectFactory;
40import org.apache.commons.pool.impl.GenericKeyedObjectPool;
41import org.onlab.nio.AcceptorLoop;
Thomas Vachuskab093c912015-04-20 10:28:26 -070042import org.onlab.nio.SelectorLoop;
Madan Jampanic26eede2015-04-16 11:42:16 -070043import org.onosproject.store.cluster.messaging.Endpoint;
44import org.onosproject.store.cluster.messaging.MessagingService;
45import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
47
48import com.google.common.cache.Cache;
49import com.google.common.cache.CacheBuilder;
50import com.google.common.cache.RemovalListener;
51import com.google.common.cache.RemovalNotification;
52import com.google.common.collect.Lists;
53
54/**
55 * MessagingService implementation based on IOLoop.
56 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -070057public class IOLoopMessaging implements MessagingService {
Madan Jampanic26eede2015-04-16 11:42:16 -070058
59 private final Logger log = LoggerFactory.getLogger(getClass());
60
61 private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
62
Madan Jampanic26eede2015-04-16 11:42:16 -070063 static final long TIMEOUT = 1000;
64
65 static final boolean SO_NO_DELAY = false;
66 static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
67 static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
68
69 private static final int NUM_WORKERS = 8;
70
71 private AcceptorLoop acceptorLoop;
72 private final ExecutorService acceptorThreadPool =
73 Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor"));
74 private final ExecutorService ioThreadPool =
75 Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d"));
76
77 private final List<DefaultIOLoop> ioLoops = Lists.newArrayList();
78
79 private int lastWorker = -1;
80
Madan Jampaniafeebbd2015-05-19 15:26:01 -070081 private final AtomicBoolean started = new AtomicBoolean(false);
82 private Endpoint localEp;
Madan Jampanic26eede2015-04-16 11:42:16 -070083
84 private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
Thomas Vachuskab093c912015-04-20 10:28:26 -070085 new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
Madan Jampanic26eede2015-04-16 11:42:16 -070086
87 private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>();
88 private final AtomicLong messageIdGenerator = new AtomicLong(0);
89 private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
90 .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
91 @Override
92 public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
93 if (entry.wasEvicted()) {
94 entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
95 }
96 }
97 })
98 .build();
99
Madan Jampanic26eede2015-04-16 11:42:16 -0700100 /**
101 * Activates IO Loops.
Thomas Vachuskab093c912015-04-20 10:28:26 -0700102 *
Thomas Vachuska5d410a22015-05-19 17:52:37 -0700103 * @param localEp local end-point
Madan Jampanic26eede2015-04-16 11:42:16 -0700104 * @throws IOException is activation fails
105 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700106 public void start(Endpoint localEp) throws IOException {
107 if (started.get()) {
108 log.warn("IOMessaging is already running at {}", localEp);
109 return;
110 }
111 this.localEp = localEp;
Madan Jampanic26eede2015-04-16 11:42:16 -0700112 streams.setLifo(false);
113 this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
114
115 for (int i = 0; i < NUM_WORKERS; i++) {
116 ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
117 }
118
Thomas Vachuskab093c912015-04-20 10:28:26 -0700119 ioLoops.forEach(ioThreadPool::execute);
Madan Jampanic26eede2015-04-16 11:42:16 -0700120 acceptorThreadPool.execute(acceptorLoop);
121 ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
122 acceptorLoop.awaitStart(TIMEOUT);
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700123 started.set(true);
Madan Jampanic26eede2015-04-16 11:42:16 -0700124 }
125
126 /**
127 * Shuts down IO loops.
128 */
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700129 public void stop() {
130 if (started.get()) {
131 ioLoops.forEach(SelectorLoop::shutdown);
132 acceptorLoop.shutdown();
133 ioThreadPool.shutdown();
134 acceptorThreadPool.shutdown();
135 started.set(false);
136 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700137 }
138
139
140 @Override
141 public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
142 DefaultMessage message = new DefaultMessage(
143 messageIdGenerator.incrementAndGet(),
144 localEp,
145 type,
146 payload);
147 sendAsync(ep, message);
148 }
149
150 protected void sendAsync(Endpoint ep, DefaultMessage message) throws IOException {
151 if (ep.equals(localEp)) {
152 dispatchLocally(message);
153 return;
154 }
155
156 DefaultMessageStream stream = null;
157 try {
158 stream = streams.borrowObject(ep);
159 } catch (Exception e) {
160 throw new IOException(e);
161 }
162 try {
163 stream.write(message);
164 } finally {
165 try {
166 streams.returnObject(ep, stream);
167 } catch (Exception e) {
168 log.warn("Failed to return stream to pool");
169 }
170 }
171 }
172
173 @Override
174 public CompletableFuture<byte[]> sendAndReceive(
175 Endpoint ep,
176 String type,
177 byte[] payload) {
178 CompletableFuture<byte[]> response = new CompletableFuture<>();
179 Long messageId = messageIdGenerator.incrementAndGet();
180 responseFutures.put(messageId, response);
181 DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload);
182 try {
183 sendAsync(ep, message);
184 } catch (Exception e) {
185 responseFutures.invalidate(messageId);
186 response.completeExceptionally(e);
187 }
188 return response;
189 }
190
191 @Override
192 public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
193 handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
194 }
195
196 @Override
197 public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
198 handlers.put(type, message -> executor.execute(() -> {
199 byte[] responsePayload = handler.apply(message.payload());
200 if (responsePayload != null) {
201 DefaultMessage response = new DefaultMessage(message.id(),
202 localEp,
203 REPLY_MESSAGE_TYPE,
204 responsePayload);
205 try {
206 sendAsync(message.sender(), response);
207 } catch (IOException e) {
208 log.debug("Failed to respond", e);
209 }
210 }
211 }));
212 }
213
214 @Override
215 public void unregisterHandler(String type) {
216 handlers.remove(type);
217 }
218
219 protected void dispatchLocally(DefaultMessage message) {
220 String type = message.type();
221 if (REPLY_MESSAGE_TYPE.equals(type)) {
222 try {
223 CompletableFuture<byte[]> futureResponse =
224 responseFutures.getIfPresent(message.id());
225 if (futureResponse != null) {
226 futureResponse.complete(message.payload());
227 } else {
228 log.warn("Received a reply for message id:[{}]. "
229 + " from {}. But was unable to locate the"
230 + " request handle", message.id(), message.sender());
231 }
232 } finally {
233 responseFutures.invalidate(message.id());
234 }
235 return;
236 }
237 Consumer<DefaultMessage> handler = handlers.get(type);
238 if (handler != null) {
239 handler.accept(message);
240 } else {
241 log.debug("No handler registered for {}", type);
242 }
243 }
244
245 // Get the next worker to which a client should be assigned
246 private synchronized DefaultIOLoop nextWorker() {
247 lastWorker = (lastWorker + 1) % NUM_WORKERS;
248 return ioLoops.get(lastWorker);
249 }
250
251 /**
252 * Initiates open connection request and registers the pending socket
253 * channel with the given IO loop.
254 *
255 * @param loop loop with which the channel should be registered
256 * @throws java.io.IOException if the socket could not be open or connected
257 */
258 private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException {
259 SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port());
260 SocketChannel ch = SocketChannel.open();
261 ch.configureBlocking(false);
262 DefaultMessageStream stream = loop.connectStream(ch);
263 ch.connect(sa);
264 return stream;
265 }
266
267 // Loop for accepting client connections
268 private class DefaultAcceptorLoop extends AcceptorLoop {
269
270 public DefaultAcceptorLoop(SocketAddress address) throws IOException {
271 super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address);
272 }
273
274 @Override
275 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
276 SocketChannel sc = channel.accept();
277 sc.configureBlocking(false);
278
279 Socket so = sc.socket();
280 so.setTcpNoDelay(SO_NO_DELAY);
281 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
282 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
283
284 nextWorker().acceptStream(sc);
285 }
286 }
287
288 private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> {
289
290 @Override
291 public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception {
292 }
293
294 @Override
295 public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
296 stream.close();
297 }
298
299 @Override
300 public DefaultMessageStream makeObject(Endpoint ep) throws Exception {
301 DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get();
302 log.info("Established a new connection to {}", ep);
303 return stream;
304 }
305
306 @Override
307 public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
308 }
309
310 @Override
311 public boolean validateObject(Endpoint ep, DefaultMessageStream stream) {
312 return stream.isClosed();
313 }
314 }
315}