blob: 69b312a0dd6057b583352704c6343567c6708e11 [file] [log] [blame]
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio.service;
import static org.onlab.util.Tools.groupedThreads;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.nio.AcceptorLoop;
import org.onlab.nio.SelectorLoop;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
/**
* MessagingService implementation based on IOLoop.
*/
public class IOLoopMessaging implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
static final long TIMEOUT = 1000;
static final boolean SO_NO_DELAY = false;
static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
private static final int NUM_WORKERS = 8;
private AcceptorLoop acceptorLoop;
private final ExecutorService acceptorThreadPool =
Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor"));
private final ExecutorService ioThreadPool =
Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d"));
private final List<DefaultIOLoop> ioLoops = Lists.newArrayList();
private int lastWorker = -1;
private final AtomicBoolean started = new AtomicBoolean(false);
private Endpoint localEp;
private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
@Override
public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
if (entry.wasEvicted()) {
entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
}
}
})
.build();
/**
* Activates IO Loops.
*
* @param localEp local end-point
* @throws IOException is activation fails
*/
public void start(Endpoint localEp) throws IOException {
if (started.get()) {
log.warn("IOMessaging is already running at {}", localEp);
return;
}
this.localEp = localEp;
streams.setLifo(false);
this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
for (int i = 0; i < NUM_WORKERS; i++) {
ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
}
ioLoops.forEach(ioThreadPool::execute);
acceptorThreadPool.execute(acceptorLoop);
ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
acceptorLoop.awaitStart(TIMEOUT);
started.set(true);
}
/**
* Shuts down IO loops.
*/
public void stop() {
if (started.get()) {
ioLoops.forEach(SelectorLoop::shutdown);
acceptorLoop.shutdown();
ioThreadPool.shutdown();
acceptorThreadPool.shutdown();
started.set(false);
}
}
@Override
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
DefaultMessage message = new DefaultMessage(
messageIdGenerator.incrementAndGet(),
localEp,
type,
payload);
sendAsync(ep, message);
}
protected void sendAsync(Endpoint ep, DefaultMessage message) throws IOException {
if (ep.equals(localEp)) {
dispatchLocally(message);
return;
}
DefaultMessageStream stream = null;
try {
stream = streams.borrowObject(ep);
} catch (Exception e) {
throw new IOException(e);
}
try {
stream.write(message);
} finally {
try {
streams.returnObject(ep, stream);
} catch (Exception e) {
log.warn("Failed to return stream to pool");
}
}
}
@Override
public CompletableFuture<byte[]> sendAndReceive(
Endpoint ep,
String type,
byte[] payload) {
CompletableFuture<byte[]> response = new CompletableFuture<>();
Long messageId = messageIdGenerator.incrementAndGet();
responseFutures.put(messageId, response);
DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload);
try {
sendAsync(ep, message);
} catch (Exception e) {
responseFutures.invalidate(messageId);
response.completeExceptionally(e);
}
return response;
}
@Override
public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
}
@Override
public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> {
byte[] responsePayload = handler.apply(message.payload());
if (responsePayload != null) {
DefaultMessage response = new DefaultMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
responsePayload);
try {
sendAsync(message.sender(), response);
} catch (IOException e) {
log.debug("Failed to respond", e);
}
}
}));
}
@Override
public void unregisterHandler(String type) {
handlers.remove(type);
}
protected void dispatchLocally(DefaultMessage message) {
String type = message.type();
if (REPLY_MESSAGE_TYPE.equals(type)) {
try {
CompletableFuture<byte[]> futureResponse =
responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
futureResponse.complete(message.payload());
} else {
log.warn("Received a reply for message id:[{}]. "
+ " from {}. But was unable to locate the"
+ " request handle", message.id(), message.sender());
}
} finally {
responseFutures.invalidate(message.id());
}
return;
}
Consumer<DefaultMessage> handler = handlers.get(type);
if (handler != null) {
handler.accept(message);
} else {
log.debug("No handler registered for {}", type);
}
}
// Get the next worker to which a client should be assigned
private synchronized DefaultIOLoop nextWorker() {
lastWorker = (lastWorker + 1) % NUM_WORKERS;
return ioLoops.get(lastWorker);
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException {
SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port());
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(false);
DefaultMessageStream stream = loop.connectStream(ch);
ch.connect(sa);
return stream;
}
// Loop for accepting client connections
private class DefaultAcceptorLoop extends AcceptorLoop {
public DefaultAcceptorLoop(SocketAddress address) throws IOException {
super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address);
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
nextWorker().acceptStream(sc);
}
}
private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> {
@Override
public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception {
}
@Override
public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
stream.close();
}
@Override
public DefaultMessageStream makeObject(Endpoint ep) throws Exception {
DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get();
log.info("Established a new connection to {}", ep);
return stream;
}
@Override
public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
}
@Override
public boolean validateObject(Endpoint ep, DefaultMessageStream stream) {
return stream.isClosed();
}
}
}