Migrating netty messaging into netty messaging manager.
Change-Id: I971db195c9dc155cdf76850f0427ef9b9210113c
diff --git a/utils/netty/src/main/java/org/onlab/netty/DecoderState.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
similarity index 89%
rename from utils/netty/src/main/java/org/onlab/netty/DecoderState.java
rename to core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
index c439301..e113a3f 100644
--- a/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2015 Open Networking Laboratory
+ * Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onlab.netty;
+package org.onosproject.store.cluster.messaging.impl;
/**
* State transitions a decoder goes through as it is decoding an incoming message.
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
similarity index 94%
rename from utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
rename to core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
index 102e2a2..9deec66 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2015 Open Networking Laboratory
+ * Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,12 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onlab.netty;
-
-import org.onlab.util.ByteArraySizeHashPrinter;
-import org.onosproject.store.cluster.messaging.Endpoint;
+package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.MoreObjects;
+import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.store.cluster.messaging.Endpoint;
/**
* Internal message representation with additional attributes
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
similarity index 90%
rename from utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
rename to core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
index 1c582a1..149b706 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2015 Open Networking Laboratory
+ * Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,21 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onlab.netty;
+package org.onosproject.store.cluster.messaging.impl;
+import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
-
-import java.util.List;
-
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
+import java.util.List;
import static com.google.common.base.Preconditions.checkState;
@@ -54,7 +52,7 @@
}
@Override
- @java.lang.SuppressWarnings("squid:S128") // suppress switch fall through warning
+ @SuppressWarnings("squid:S128") // suppress switch fall through warning
protected void decode(
ChannelHandlerContext context,
ByteBuf buffer,
@@ -97,9 +95,9 @@
byte[] payload = new byte[contentLength];
buffer.readBytes(payload);
InternalMessage message = new InternalMessage(messageId,
- new Endpoint(senderIp, senderPort),
- messageType,
- payload);
+ new Endpoint(senderIp, senderPort),
+ messageType,
+ payload);
out.add(message);
checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
break;
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
similarity index 96%
rename from utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
rename to core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
index c74c1de..48c75dd 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2015 Open Networking Laboratory
+ * Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,22 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onlab.netty;
+package org.onosproject.store.cluster.messaging.impl;
+import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
-
-import java.io.IOException;
-
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
+import java.io.IOException;
/**
* Encode InternalMessage out into a byte buffer.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index ca6f9c1..3d4a4f7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -17,29 +17,114 @@
import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.netty.NettyMessaging;
+import org.onlab.netty.InternalMessage;
+import org.onlab.netty.MessageDecoder;
+import org.onlab.netty.MessageEncoder;
+import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.KeyStore;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
/**
* Netty based MessagingService.
*/
@Component(immediate = true, enabled = true)
@Service
-public class NettyMessagingManager extends NettyMessaging {
+public class NettyMessagingManager implements MessagingService {
+
+ private static final short MIN_KS_LENGTH = 6;
private final Logger log = LoggerFactory.getLogger(getClass());
- private static final short MIN_KS_LENGTH = 6;
+ private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+
+ private Endpoint localEp;
+ private int preamble;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
+ private final AtomicLong messageIdGenerator = new AtomicLong(0);
+ private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
+ .expireAfterWrite(10, TimeUnit.SECONDS)
+ .removalListener(new RemovalListener<Long, Callback>() {
+ @Override
+ public void onRemoval(RemovalNotification<Long, Callback> entry) {
+ if (entry.wasEvicted()) {
+ entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
+ }
+ }
+ })
+ .build();
+
+ private final GenericKeyedObjectPool<Endpoint, Connection> channels
+ = new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
+
+ private EventLoopGroup serverGroup;
+ private EventLoopGroup clientGroup;
+ private Class<? extends ServerChannel> serverChannelClass;
+ private Class<? extends Channel> clientChannelClass;
+
+ protected static final boolean TLS_DISABLED = false;
+ protected boolean enableNettyTls = TLS_DISABLED;
+
+ protected String ksLocation;
+ protected String tsLocation;
+ protected char[] ksPwd;
+ protected char[] tsPwd;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataService clusterMetadataService;
@@ -48,14 +133,32 @@
public void activate() throws Exception {
ControllerNode localNode = clusterMetadataService.getLocalNode();
getTlsParameters();
- super.start(clusterMetadataService.getClusterMetadata().getName().hashCode(),
- new Endpoint(localNode.ip(), localNode.tcpPort()));
+
+ if (started.get()) {
+ log.warn("Already running at local endpoint: {}", localEp);
+ return;
+ }
+ this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
+ this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
+ channels.setLifo(true);
+ channels.setTestOnBorrow(true);
+ channels.setTestOnReturn(true);
+ channels.setMinEvictableIdleTimeMillis(60_000L);
+ channels.setTimeBetweenEvictionRunsMillis(30_000L);
+ initEventLoopGroup();
+ startAcceptingConnections();
+ started.set(true);
log.info("Started");
}
@Deactivate
public void deactivate() throws Exception {
- super.stop();
+ if (started.get()) {
+ channels.close();
+ serverGroup.shutdownGracefully();
+ clientGroup.shutdownGracefully();
+ started.set(false);
+ }
log.info("Stopped");
}
@@ -86,4 +189,409 @@
}
}
}
+ private void initEventLoopGroup() {
+ // try Epoll first and if that does work, use nio.
+ try {
+ clientGroup = new EpollEventLoopGroup();
+ serverGroup = new EpollEventLoopGroup();
+ serverChannelClass = EpollServerSocketChannel.class;
+ clientChannelClass = EpollSocketChannel.class;
+ return;
+ } catch (Throwable e) {
+ log.debug("Failed to initialize native (epoll) transport. "
+ + "Reason: {}. Proceeding with nio.", e.getMessage());
+ }
+ clientGroup = new NioEventLoopGroup();
+ serverGroup = new NioEventLoopGroup();
+ serverChannelClass = NioServerSocketChannel.class;
+ clientChannelClass = NioSocketChannel.class;
+ }
+
+ @Override
+ public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
+ InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
+ localEp,
+ type,
+ payload);
+ return sendAsync(ep, message);
+ }
+
+ protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
+ if (ep.equals(localEp)) {
+ try {
+ dispatchLocally(message);
+ } catch (IOException e) {
+ return Tools.exceptionalFuture(e);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
+ Connection connection = null;
+ try {
+ connection = channels.borrowObject(ep);
+ connection.send(message, future);
+ } finally {
+ channels.returnObject(ep, connection);
+ }
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
+ return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
+ }
+
+ @Override
+ public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
+ CompletableFuture<byte[]> response = new CompletableFuture<>();
+ Callback callback = new Callback(response, executor);
+ Long messageId = messageIdGenerator.incrementAndGet();
+ callbacks.put(messageId, callback);
+ InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
+ return sendAsync(ep, message).whenComplete((r, e) -> {
+ if (e != null) {
+ callbacks.invalidate(messageId);
+ }
+ }).thenCompose(v -> response);
+ }
+
+ @Override
+ public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
+ handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
+ }
+
+ @Override
+ public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
+ handlers.put(type, message -> executor.execute(() -> {
+ byte[] responsePayload = handler.apply(message.sender(), message.payload());
+ if (responsePayload != null) {
+ InternalMessage response = new InternalMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ responsePayload);
+ sendAsync(message.sender(), response).whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to respond", error);
+ }
+ });
+ }
+ }));
+ }
+
+ @Override
+ public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
+ handlers.put(type, message -> {
+ handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
+ if (error == null) {
+ InternalMessage response = new InternalMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ result);
+ sendAsync(message.sender(), response).whenComplete((r, e) -> {
+ if (e != null) {
+ log.debug("Failed to respond", e);
+ }
+ });
+ }
+ });
+ });
+ }
+
+ @Override
+ public void unregisterHandler(String type) {
+ handlers.remove(type);
+ }
+
+ private void startAcceptingConnections() throws InterruptedException {
+ ServerBootstrap b = new ServerBootstrap();
+ b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+ b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+ b.option(ChannelOption.SO_RCVBUF, 1048576);
+ b.option(ChannelOption.TCP_NODELAY, true);
+ b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ b.group(serverGroup, clientGroup);
+ b.channel(serverChannelClass);
+ if (enableNettyTls) {
+ b.childHandler(new SslServerCommunicationChannelInitializer());
+ } else {
+ b.childHandler(new OnosCommunicationChannelInitializer());
+ }
+ b.option(ChannelOption.SO_BACKLOG, 128);
+ b.childOption(ChannelOption.SO_KEEPALIVE, true);
+
+ // Bind and start to accept incoming connections.
+ b.bind(localEp.port()).sync().addListener(future -> {
+ if (future.isSuccess()) {
+ log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
+ } else {
+ log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
+ }
+ });
+ }
+
+ private class OnosCommunicationChannelFactory
+ implements KeyedPoolableObjectFactory<Endpoint, Connection> {
+
+ @Override
+ public void activateObject(Endpoint endpoint, Connection connection)
+ throws Exception {
+ }
+
+ @Override
+ public void destroyObject(Endpoint ep, Connection connection) throws Exception {
+ log.debug("Closing connection to {}", ep);
+ //Is this the right way to destroy?
+ connection.destroy();
+ }
+
+ @Override
+ public Connection makeObject(Endpoint ep) throws Exception {
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
+ bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
+ bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
+ bootstrap.group(clientGroup);
+ // TODO: Make this faster:
+ // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
+ bootstrap.channel(clientChannelClass);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ if (enableNettyTls) {
+ bootstrap.handler(new SslClientCommunicationChannelInitializer());
+ } else {
+ bootstrap.handler(new OnosCommunicationChannelInitializer());
+ }
+ // Start the client.
+ CompletableFuture<Channel> retFuture = new CompletableFuture<>();
+ ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
+
+ f.addListener(future -> {
+ if (future.isSuccess()) {
+ retFuture.complete(f.channel());
+ } else {
+ retFuture.completeExceptionally(future.cause());
+ }
+ });
+ log.debug("Established a new connection to {}", ep);
+ return new Connection(retFuture);
+ }
+
+ @Override
+ public void passivateObject(Endpoint ep, Connection connection)
+ throws Exception {
+ }
+
+ @Override
+ public boolean validateObject(Endpoint ep, Connection connection) {
+ return connection.validate();
+ }
+ }
+
+ private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+ private final ChannelHandler dispatcher = new InboundMessageDispatcher();
+ private final ChannelHandler encoder = new MessageEncoder(preamble);
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ KeyStore ts = KeyStore.getInstance("JKS");
+ ts.load(new FileInputStream(tsLocation), tsPwd);
+ tmFactory.init(ts);
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ KeyStore ks = KeyStore.getInstance("JKS");
+ ks.load(new FileInputStream(ksLocation), ksPwd);
+ kmf.init(ks, ksPwd);
+
+ SSLContext serverContext = SSLContext.getInstance("TLS");
+ serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
+
+ SSLEngine serverSslEngine = serverContext.createSSLEngine();
+
+ serverSslEngine.setNeedClientAuth(true);
+ serverSslEngine.setUseClientMode(false);
+ serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
+ serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
+ serverSslEngine.setEnableSessionCreation(true);
+
+ channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
+ .addLast("encoder", encoder)
+ .addLast("decoder", new MessageDecoder(preamble))
+ .addLast("handler", dispatcher);
+ }
+ }
+
+ private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+ private final ChannelHandler dispatcher = new InboundMessageDispatcher();
+ private final ChannelHandler encoder = new MessageEncoder(preamble);
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ KeyStore ts = KeyStore.getInstance("JKS");
+ ts.load(new FileInputStream(tsLocation), tsPwd);
+ tmFactory.init(ts);
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ KeyStore ks = KeyStore.getInstance("JKS");
+ ks.load(new FileInputStream(ksLocation), ksPwd);
+ kmf.init(ks, ksPwd);
+
+ SSLContext clientContext = SSLContext.getInstance("TLS");
+ clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
+
+ SSLEngine clientSslEngine = clientContext.createSSLEngine();
+
+ clientSslEngine.setUseClientMode(true);
+ clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
+ clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
+ clientSslEngine.setEnableSessionCreation(true);
+
+ channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
+ .addLast("encoder", encoder)
+ .addLast("decoder", new MessageDecoder(preamble))
+ .addLast("handler", dispatcher);
+ }
+ }
+
+ private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+ private final ChannelHandler dispatcher = new InboundMessageDispatcher();
+ private final ChannelHandler encoder = new MessageEncoder(preamble);
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ channel.pipeline()
+ .addLast("encoder", encoder)
+ .addLast("decoder", new MessageDecoder(preamble))
+ .addLast("handler", dispatcher);
+ }
+ }
+
+ @ChannelHandler.Sharable
+ private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
+ try {
+ dispatchLocally(message);
+ } catch (RejectedExecutionException e) {
+ log.warn("Unable to dispatch message due to {}", e.getMessage());
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
+ context.close();
+ }
+ }
+ private void dispatchLocally(InternalMessage message) throws IOException {
+ String type = message.type();
+ if (REPLY_MESSAGE_TYPE.equals(type)) {
+ try {
+ Callback callback =
+ callbacks.getIfPresent(message.id());
+ if (callback != null) {
+ callback.complete(message.payload());
+ } else {
+ log.warn("Received a reply for message id:[{}]. "
+ + " from {}. But was unable to locate the"
+ + " request handle", message.id(), message.sender());
+ }
+ } finally {
+ callbacks.invalidate(message.id());
+ }
+ return;
+ }
+ Consumer<InternalMessage> handler = handlers.get(type);
+ if (handler != null) {
+ handler.accept(message);
+ } else {
+ log.debug("No handler registered for {}", type);
+ }
+ }
+
+ private final class Callback {
+ private final CompletableFuture<byte[]> future;
+ private final Executor executor;
+
+ public Callback(CompletableFuture<byte[]> future, Executor executor) {
+ this.future = future;
+ this.executor = executor;
+ }
+
+ public void complete(byte[] value) {
+ executor.execute(() -> future.complete(value));
+ }
+
+ public void completeExceptionally(Throwable error) {
+ executor.execute(() -> future.completeExceptionally(error));
+ }
+ }
+ private final class Connection {
+ private final CompletableFuture<Channel> internalFuture;
+
+ public Connection(CompletableFuture<Channel> internalFuture) {
+ this.internalFuture = internalFuture;
+ }
+
+ /**
+ * Sends a message out on its channel and associated the message with a
+ * completable future used for signaling.
+ * @param message the message to be sent
+ * @param future a future that is completed normally or exceptionally if
+ * message sending succeeds or fails respectively
+ */
+ public void send(Object message, CompletableFuture<Void> future) {
+ internalFuture.whenComplete((channel, throwable) -> {
+ if (throwable == null) {
+ channel.writeAndFlush(message).addListener(channelFuture -> {
+ if (!channelFuture.isSuccess()) {
+ future.completeExceptionally(channelFuture.cause());
+ } else {
+ future.complete(null);
+ }
+ });
+ } else {
+ future.completeExceptionally(throwable);
+ }
+ });
+ }
+
+ /**
+ * Destroys a channel by closing its channel (if it exists) and
+ * cancelling its future.
+ */
+ public void destroy() {
+ Channel channel = internalFuture.getNow(null);
+ if (channel != null) {
+ channel.close();
+ }
+ internalFuture.cancel(false);
+ }
+
+ /**
+ * Determines whether the connection is valid meaning it is either
+ * complete with and active channel
+ * or it has not yet completed.
+ * @return true if the channel has an active connection or has not
+ * yet completed
+ */
+ public boolean validate() {
+ if (internalFuture.isCompletedExceptionally()) {
+ return false;
+ }
+ Channel channel = internalFuture.getNow(null);
+ return channel == null || channel.isActive();
+ }
+ }
}
diff --git a/utils/netty/src/test/java/org/onlab/netty/NettyMessagingTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
similarity index 60%
rename from utils/netty/src/test/java/org/onlab/netty/NettyMessagingTest.java
rename to core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
index f7737d2..d04c777 100644
--- a/utils/netty/src/test/java/org/onlab/netty/NettyMessagingTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
@@ -1,4 +1,4 @@
-package org.onlab.netty;
+package org.onosproject.store.cluster.messaging.impl;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
@@ -9,10 +9,17 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
+import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterMetadata;
+import org.onosproject.cluster.ClusterMetadataEventListener;
+import org.onosproject.cluster.ClusterMetadataService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.cluster.messaging.Endpoint;
import com.google.common.util.concurrent.MoreExecutors;
@@ -24,34 +31,39 @@
/**
* Unit tests for NettyMessaging.
*/
-public class NettyMessagingTest {
+public class NettyMessagingManagerTest {
- NettyMessaging netty1;
- NettyMessaging netty2;
+ NettyMessagingManager netty1;
+ NettyMessagingManager netty2;
- Endpoint ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5001);
- Endpoint ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5002);
- Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5003);
+ private static final String DUMMY_NAME = "node";
+ private static final String IP_STRING = "127.0.0.1";
+
+ Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
+ Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
+ Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
@Before
public void setUp() throws Exception {
ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
- netty1 = new NettyMessaging();
- netty1.start(12, ep1);
+ netty1 = new NettyMessagingManager();
+ netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
+ netty1.activate();
ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
- netty2 = new NettyMessaging();
- netty2.start(12, ep2);
+ netty2 = new NettyMessagingManager();
+ netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
+ netty2.activate();
}
@After
public void tearDown() throws Exception {
if (netty1 != null) {
- netty1.stop();
+ netty1.deactivate();
}
if (netty2 != null) {
- netty2.stop();
+ netty2.deactivate();
}
}
@@ -113,9 +125,9 @@
netty2.registerHandler("test-subject", handler, handlerExecutor);
CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
- "test-subject",
- "hello world".getBytes(),
- completionExecutor);
+ "test-subject",
+ "hello world".getBytes(),
+ completionExecutor);
response.whenComplete((r, e) -> {
completionThreadName.set(Thread.currentThread().getName());
});
@@ -125,4 +137,40 @@
assertEquals("completion-thread", completionThreadName.get());
assertEquals("handler-thread", handlerThreadName.get());
}
-}
+
+ private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
+ return new ClusterMetadataService() {
+ @Override
+ public ClusterMetadata getClusterMetadata() {
+ return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
+ name, Sets.newHashSet(), Sets.newHashSet());
+ }
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return new ControllerNode() {
+ @Override
+ public NodeId id() {
+ return null;
+ }
+
+ @Override
+ public IpAddress ip() {
+ return IpAddress.valueOf(ipAddress);
+ }
+
+ @Override
+ public int tcpPort() {
+ return ep.port();
+ }
+ };
+ }
+
+ @Override
+ public void addListener(ClusterMetadataEventListener listener) {}
+
+ @Override
+ public void removeListener(ClusterMetadataEventListener listener) {}
+ };
+ }
+}
\ No newline at end of file
diff --git a/utils/netty/pom.xml b/utils/netty/pom.xml
deleted file mode 100644
index e4ec42e..0000000
--- a/utils/netty/pom.xml
+++ /dev/null
@@ -1,83 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Copyright 2014 Open Networking Laboratory
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.onosproject</groupId>
- <artifactId>onlab-utils</artifactId>
- <version>1.5.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>onlab-netty</artifactId>
- <packaging>bundle</packaging>
-
- <description>Network I/O using Netty framework</description>
-
- <dependencies>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-testlib</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onlab-misc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onlab-junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-common</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport-native-epoll</artifactId>
- <version>${netty4.version}</version>
- </dependency>
- </dependencies>
-</project>
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
deleted file mode 100644
index 8b003f0..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-import org.apache.commons.pool.KeyedPoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericKeyedObjectPool;
-import org.onlab.util.Tools;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.TrustManagerFactory;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.security.KeyStore;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-
-/**
- * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
- */
-public class NettyMessaging implements MessagingService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
-
- private Endpoint localEp;
- private int preamble;
- private final AtomicBoolean started = new AtomicBoolean(false);
- private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
- private final AtomicLong messageIdGenerator = new AtomicLong(0);
- private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
- .expireAfterWrite(10, TimeUnit.SECONDS)
- .removalListener(new RemovalListener<Long, Callback>() {
- @Override
- public void onRemoval(RemovalNotification<Long, Callback> entry) {
- if (entry.wasEvicted()) {
- entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
- }
- }
- })
- .build();
-
- private final GenericKeyedObjectPool<Endpoint, Connection> channels
- = new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
-
- private EventLoopGroup serverGroup;
- private EventLoopGroup clientGroup;
- private Class<? extends ServerChannel> serverChannelClass;
- private Class<? extends Channel> clientChannelClass;
-
- protected static final boolean TLS_DISABLED = false;
- protected boolean enableNettyTls = TLS_DISABLED;
-
- protected String ksLocation;
- protected String tsLocation;
- protected char[] ksPwd;
- protected char[] tsPwd;
-
- @SuppressWarnings("squid:S1181")
- // We really need to catch Throwable due to netty native epoll() handling
- private void initEventLoopGroup() {
- // try Epoll first and if that does work, use nio.
- try {
- clientGroup = new EpollEventLoopGroup();
- serverGroup = new EpollEventLoopGroup();
- serverChannelClass = EpollServerSocketChannel.class;
- clientChannelClass = EpollSocketChannel.class;
- return;
- } catch (Throwable e) {
- log.debug("Failed to initialize native (epoll) transport. "
- + "Reason: {}. Proceeding with nio.", e.getMessage());
- }
- clientGroup = new NioEventLoopGroup();
- serverGroup = new NioEventLoopGroup();
- serverChannelClass = NioServerSocketChannel.class;
- clientChannelClass = NioSocketChannel.class;
- }
-
- public void start(int preamble, Endpoint localEp) throws Exception {
- if (started.get()) {
- log.warn("Already running at local endpoint: {}", localEp);
- return;
- }
- this.preamble = preamble;
- this.localEp = localEp;
- channels.setLifo(true);
- channels.setTestOnBorrow(true);
- channels.setTestOnReturn(true);
- channels.setMinEvictableIdleTimeMillis(60_000L);
- channels.setTimeBetweenEvictionRunsMillis(30_000L);
- initEventLoopGroup();
- startAcceptingConnections();
- started.set(true);
- }
-
- public void stop() throws Exception {
- if (started.get()) {
- channels.close();
- serverGroup.shutdownGracefully();
- clientGroup.shutdownGracefully();
- started.set(false);
- }
- }
-
- @Override
- public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
- InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
- localEp,
- type,
- payload);
- return sendAsync(ep, message);
- }
-
- protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
- if (ep.equals(localEp)) {
- try {
- dispatchLocally(message);
- } catch (IOException e) {
- return Tools.exceptionalFuture(e);
- }
- return CompletableFuture.completedFuture(null);
- }
-
- CompletableFuture<Void> future = new CompletableFuture<>();
- try {
- Connection connection = null;
- try {
- connection = channels.borrowObject(ep);
- connection.send(message, future);
-
- } finally {
- channels.returnObject(ep, connection);
- }
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
- return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
- CompletableFuture<byte[]> response = new CompletableFuture<>();
- Callback callback = new Callback(response, executor);
- Long messageId = messageIdGenerator.incrementAndGet();
- callbacks.put(messageId, callback);
- InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
- return sendAsync(ep, message).whenComplete((r, e) -> {
- if (e != null) {
- callbacks.invalidate(messageId);
- }
- }).thenCompose(v -> response);
- }
-
- @Override
- public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
- handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
- }
-
- @Override
- public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
- handlers.put(type, message -> executor.execute(() -> {
- byte[] responsePayload = handler.apply(message.sender(), message.payload());
- if (responsePayload != null) {
- InternalMessage response = new InternalMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- responsePayload);
- sendAsync(message.sender(), response).whenComplete((result, error) -> {
- if (error != null) {
- log.debug("Failed to respond", error);
- }
- });
- }
- }));
- }
-
- @Override
- public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
- handlers.put(type, message -> {
- handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
- if (error == null) {
- InternalMessage response = new InternalMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- result);
- sendAsync(message.sender(), response).whenComplete((r, e) -> {
- if (e != null) {
- log.debug("Failed to respond", e);
- }
- });
- }
- });
- });
- }
-
- @Override
- public void unregisterHandler(String type) {
- handlers.remove(type);
- }
-
- private void startAcceptingConnections() throws InterruptedException {
- ServerBootstrap b = new ServerBootstrap();
- b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
- b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
- b.option(ChannelOption.SO_RCVBUF, 1048576);
- b.option(ChannelOption.TCP_NODELAY, true);
- b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- b.group(serverGroup, clientGroup);
- b.channel(serverChannelClass);
- if (enableNettyTls) {
- b.childHandler(new SslServerCommunicationChannelInitializer());
- } else {
- b.childHandler(new OnosCommunicationChannelInitializer());
- }
- b.option(ChannelOption.SO_BACKLOG, 128);
- b.childOption(ChannelOption.SO_KEEPALIVE, true);
-
- // Bind and start to accept incoming connections.
- b.bind(localEp.port()).sync().addListener(future -> {
- if (future.isSuccess()) {
- log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
- } else {
- log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
- }
- });
- }
-
- private class OnosCommunicationChannelFactory
- implements KeyedPoolableObjectFactory<Endpoint, Connection> {
-
- @Override
- public void activateObject(Endpoint endpoint, Connection connection)
- throws Exception {
- }
-
- @Override
- public void destroyObject(Endpoint ep, Connection connection) throws Exception {
- log.debug("Closing connection to {}", ep);
- //Is this the right way to destroy?
- connection.destroy();
- }
-
- @Override
- public Connection makeObject(Endpoint ep) throws Exception {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
- bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
- bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
- bootstrap.group(clientGroup);
- // TODO: Make this faster:
- // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
- bootstrap.channel(clientChannelClass);
- bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
- if (enableNettyTls) {
- bootstrap.handler(new SslClientCommunicationChannelInitializer());
- } else {
- bootstrap.handler(new OnosCommunicationChannelInitializer());
- }
- // Start the client.
- CompletableFuture<Channel> retFuture = new CompletableFuture<>();
- ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
-
- f.addListener(future -> {
- if (future.isSuccess()) {
- retFuture.complete(f.channel());
- } else {
- retFuture.completeExceptionally(future.cause());
- }
- });
- log.debug("Established a new connection to {}", ep);
- return new Connection(retFuture);
- }
-
- @Override
- public void passivateObject(Endpoint ep, Connection connection)
- throws Exception {
- }
-
- @Override
- public boolean validateObject(Endpoint ep, Connection connection) {
- return connection.validate();
- }
- }
-
- private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(preamble);
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- KeyStore ts = KeyStore.getInstance("JKS");
- ts.load(new FileInputStream(tsLocation), tsPwd);
- tmFactory.init(ts);
-
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(new FileInputStream(ksLocation), ksPwd);
- kmf.init(ks, ksPwd);
-
- SSLContext serverContext = SSLContext.getInstance("TLS");
- serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
-
- SSLEngine serverSslEngine = serverContext.createSSLEngine();
-
- serverSslEngine.setNeedClientAuth(true);
- serverSslEngine.setUseClientMode(false);
- serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
- serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
- serverSslEngine.setEnableSessionCreation(true);
-
- channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
- .addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(preamble))
- .addLast("handler", dispatcher);
- }
-
- }
-
- private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(preamble);
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- KeyStore ts = KeyStore.getInstance("JKS");
- ts.load(new FileInputStream(tsLocation), tsPwd);
- tmFactory.init(ts);
-
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(new FileInputStream(ksLocation), ksPwd);
- kmf.init(ks, ksPwd);
-
- SSLContext clientContext = SSLContext.getInstance("TLS");
- clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
-
- SSLEngine clientSslEngine = clientContext.createSSLEngine();
-
- clientSslEngine.setUseClientMode(true);
- clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
- clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
- clientSslEngine.setEnableSessionCreation(true);
-
- channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
- .addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(preamble))
- .addLast("handler", dispatcher);
- }
-
- }
-
- private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(preamble);
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- channel.pipeline()
- .addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(preamble))
- .addLast("handler", dispatcher);
- }
- }
-
- @ChannelHandler.Sharable
- private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
- try {
- dispatchLocally(message);
- } catch (RejectedExecutionException e) {
- log.warn("Unable to dispatch message due to {}", e.getMessage());
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
- log.error("Exception inside channel handling pipeline.", cause);
- context.close();
- }
- }
- private void dispatchLocally(InternalMessage message) throws IOException {
- String type = message.type();
- if (REPLY_MESSAGE_TYPE.equals(type)) {
- try {
- Callback callback =
- callbacks.getIfPresent(message.id());
- if (callback != null) {
- callback.complete(message.payload());
- } else {
- log.warn("Received a reply for message id:[{}]. "
- + " from {}. But was unable to locate the"
- + " request handle", message.id(), message.sender());
- }
- } finally {
- callbacks.invalidate(message.id());
- }
- return;
- }
- Consumer<InternalMessage> handler = handlers.get(type);
- if (handler != null) {
- handler.accept(message);
- } else {
- log.debug("No handler registered for {}", type);
- }
- }
-
- private final class Callback {
- private final CompletableFuture<byte[]> future;
- private final Executor executor;
-
- public Callback(CompletableFuture<byte[]> future, Executor executor) {
- this.future = future;
- this.executor = executor;
- }
-
- public void complete(byte[] value) {
- executor.execute(() -> future.complete(value));
- }
-
- public void completeExceptionally(Throwable error) {
- executor.execute(() -> future.completeExceptionally(error));
- }
- }
- private final class Connection {
- private final CompletableFuture<Channel> internalFuture;
-
- public Connection(CompletableFuture<Channel> internalFuture) {
- this.internalFuture = internalFuture;
- }
-
- /**
- * Sends a message out on its channel and associated the message with a
- * completable future used for signaling.
- * @param message the message to be sent
- * @param future a future that is completed normally or exceptionally if
- * message sending succeeds or fails respectively
- */
- public void send(Object message, CompletableFuture<Void> future) {
- internalFuture.whenComplete((channel, throwable) -> {
- if (throwable == null) {
- channel.writeAndFlush(message).addListener(channelFuture -> {
- if (!channelFuture.isSuccess()) {
- future.completeExceptionally(channelFuture.cause());
- } else {
- future.complete(null);
- }
- });
- } else {
- future.completeExceptionally(throwable);
- }
-
- });
- }
-
- /**
- * Destroys a channel by closing its channel (if it exists) and
- * cancelling its future.
- */
- public void destroy() {
- Channel channel = internalFuture.getNow(null);
- if (channel != null) {
- channel.close();
- }
- internalFuture.cancel(false);
- }
-
- /**
- * Determines whether the connection is valid meaning it is either
- * complete with and active channel
- * or it has not yet completed.
- * @return true if the channel has an active connection or has not
- * yet completed
- */
- public boolean validate() {
- if (internalFuture.isCompletedExceptionally()) {
- return false;
- }
- Channel channel = internalFuture.getNow(null);
- return channel == null || channel.isActive();
- }
- }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/package-info.java b/utils/netty/src/main/java/org/onlab/netty/package-info.java
deleted file mode 100644
index 2d6257f..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Asynchronous messaging APIs implemented using the Netty framework.
- */
-package org.onlab.netty;
diff --git a/utils/pom.xml b/utils/pom.xml
index a407052..6c27a7d 100644
--- a/utils/pom.xml
+++ b/utils/pom.xml
@@ -34,7 +34,6 @@
<modules>
<module>junit</module>
<module>misc</module>
- <module>netty</module>
<module>nio</module>
<module>yangutils</module>
<module>osgi</module>