ONOS-1982: MessagingService is now a OSGi service. Has implementations based on Netty and IOLoop
Change-Id: Ia4c99de18e91be1b49bd1fddd86fe89fb83e859c
diff --git a/utils/netty/pom.xml b/utils/netty/pom.xml
index 9e5677c..f54c93f 100644
--- a/utils/netty/pom.xml
+++ b/utils/netty/pom.xml
@@ -80,5 +80,4 @@
<version>${netty4.version}</version>
</dependency>
</dependencies>
-
</project>
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
similarity index 89%
rename from utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java
rename to utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index 0bbb8c1..44b7027 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -37,22 +37,20 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
-import org.onlab.packet.IpAddress;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
@@ -66,14 +64,15 @@
/**
* Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
*/
-public class NettyMessagingManager implements MessagingService {
+public class NettyMessaging implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
- private final Endpoint localEp;
- private final ConcurrentMap<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
+ private Endpoint localEp;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
@@ -104,7 +103,8 @@
clientChannelClass = EpollSocketChannel.class;
return;
} catch (Throwable e) {
- log.warn("Failed to initialize native (epoll) transport. Reason: {}. Proceeding with nio.", e.getMessage());
+ log.debug("Failed to initialize native (epoll) transport. "
+ + "Reason: {}. Proceeding with nio.", e.getMessage());
}
clientGroup = new NioEventLoopGroup();
serverGroup = new NioEventLoopGroup();
@@ -112,43 +112,27 @@
clientChannelClass = NioSocketChannel.class;
}
- public NettyMessagingManager(IpAddress ip, int port) {
- localEp = new Endpoint(ip, port);
- }
-
- public NettyMessagingManager() {
- this(8080);
- }
-
- public NettyMessagingManager(int port) {
- try {
- localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port);
- } catch (UnknownHostException e) {
- // Cannot resolve the local host, something is very wrong. Bailing out.
- throw new IllegalStateException("Cannot resolve local host", e);
+ public void start(Endpoint localEp) throws Exception {
+ if (started.get()) {
+ log.warn("Already running at local endpoint: {}", localEp);
+ return;
}
- }
-
- public void activate() throws InterruptedException {
+ this.localEp = localEp;
channels.setLifo(false);
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
initEventLoopGroup();
startAcceptingConnections();
+ started.set(true);
}
- public void deactivate() throws Exception {
- channels.close();
- serverGroup.shutdownGracefully();
- clientGroup.shutdownGracefully();
- }
-
- /**
- * Returns the local endpoint for this instance.
- * @return local end point.
- */
- public Endpoint localEp() {
- return localEp;
+ public void stop() throws Exception {
+ if (started.get()) {
+ channels.close();
+ serverGroup.shutdownGracefully();
+ clientGroup.shutdownGracefully();
+ started.set(false);
+ }
}
@Override
@@ -237,7 +221,13 @@
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
- b.bind(localEp.port()).sync();
+ b.bind(localEp.port()).sync().addListener(future -> {
+ if (future.isSuccess()) {
+ log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
+ } else {
+ log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
+ }
+ });
}
private class OnosCommunicationChannelFactory
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
deleted file mode 100644
index 53a36e3..0000000
--- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import java.net.InetAddress;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-import org.apache.commons.lang3.RandomUtils;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.onlab.packet.IpAddress;
-import org.onosproject.store.cluster.messaging.Endpoint;
-
-import com.google.common.util.concurrent.MoreExecutors;
-
-/**
- * Simple ping-pong test that exercises NettyMessagingService.
- */
-public class PingPongTest {
-
- @Ignore("Turning off fragile test")
- @Test
- public void testPingPong() throws Exception {
- NettyMessagingManager pinger = new NettyMessagingManager(8085);
- NettyMessagingManager ponger = new NettyMessagingManager(9086);
- try {
- pinger.activate();
- ponger.activate();
- ponger.registerHandler("echo", Function.identity(), MoreExecutors.directExecutor());
- byte[] payload = RandomUtils.nextBytes(100);
- CompletableFuture<byte[]> responseFuture =
- pinger.sendAndReceive(
- new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), 9086), "echo", payload);
- assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));
- } finally {
- pinger.deactivate();
- ponger.deactivate();
- }
- }
-}