Renamed IOLoop & Netty to *MessagingManager for consistency.
Change-Id: Id8859e24d0c7ac7f948516388069639093bad524
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index c81079d..5d36bda 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -25,7 +25,7 @@
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
-import org.onlab.netty.NettyMessagingService;
+import org.onlab.netty.NettyMessagingManager;
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterEvent;
@@ -108,7 +108,7 @@
private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
- private NettyMessagingService messagingService;
+ private NettyMessagingManager messagingService;
private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/cluster/membership", "heartbeat-sender"));
private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
@@ -148,7 +148,7 @@
establishSelfIdentity();
- messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
+ messagingService = new NettyMessagingManager(HEARTBEAT_FD_PORT);
try {
messagingService.activate();
} catch (InterruptedException e) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 6f47b48..1a4512d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -21,8 +21,8 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.netty.NettyMessagingService;
-import org.onlab.nio.service.IOLoopMessagingService;
+import org.onlab.netty.NettyMessagingManager;
+import org.onlab.nio.service.IOLoopMessagingManager;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
@@ -69,7 +69,7 @@
public void activate() {
ControllerNode localNode = clusterService.getLocalNode();
if (useNetty) {
- NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
+ NettyMessagingManager netty = new NettyMessagingManager(localNode.ip(), localNode.tcpPort());
try {
netty.activate();
messagingService = netty;
@@ -77,7 +77,7 @@
log.error("NettyMessagingService#activate", e);
}
} else {
- IOLoopMessagingService ioLoop = new IOLoopMessagingService(localNode.ip(), localNode.tcpPort());
+ IOLoopMessagingManager ioLoop = new IOLoopMessagingManager(localNode.ip(), localNode.tcpPort());
try {
ioLoop.activate();
messagingService = ioLoop;
@@ -94,9 +94,9 @@
// FIXME: workaround until it becomes a service.
try {
if (useNetty) {
- ((NettyMessagingService) messagingService).deactivate();
+ ((NettyMessagingManager) messagingService).deactivate();
} else {
- ((IOLoopMessagingService) messagingService).deactivate();
+ ((IOLoopMessagingManager) messagingService).deactivate();
}
} catch (Exception e) {
log.error("MessagingService#deactivate", e);
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
index e63a6d4..1a106ba 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
@@ -22,7 +22,7 @@
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.impl.ClusterNodesDelegate;
-import org.onlab.netty.NettyMessagingService;
+import org.onlab.netty.NettyMessagingManager;
import org.onlab.packet.IpAddress;
import java.util.concurrent.CountDownLatch;
@@ -56,7 +56,7 @@
@Before
public void setUp() throws Exception {
- NettyMessagingService messagingService = new NettyMessagingService();
+ NettyMessagingManager messagingService = new NettyMessagingManager();
messagingService.activate();
ccm1 = new ClusterCommunicationManager();
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java
similarity index 98%
rename from utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
rename to utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java
index eeba05e..bfc1c70 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java
@@ -65,7 +65,7 @@
/**
* Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
*/
-public class NettyMessagingService implements MessagingService {
+public class NettyMessagingManager implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -111,15 +111,15 @@
clientChannelClass = NioSocketChannel.class;
}
- public NettyMessagingService(IpAddress ip, int port) {
+ public NettyMessagingManager(IpAddress ip, int port) {
localEp = new Endpoint(ip, port);
}
- public NettyMessagingService() {
+ public NettyMessagingManager() {
this(8080);
}
- public NettyMessagingService(int port) {
+ public NettyMessagingManager(int port) {
try {
localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port);
} catch (UnknownHostException e) {
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
index 61d8541..53a36e3 100644
--- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
+++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
@@ -38,8 +38,8 @@
@Ignore("Turning off fragile test")
@Test
public void testPingPong() throws Exception {
- NettyMessagingService pinger = new NettyMessagingService(8085);
- NettyMessagingService ponger = new NettyMessagingService(9086);
+ NettyMessagingManager pinger = new NettyMessagingManager(8085);
+ NettyMessagingManager ponger = new NettyMessagingManager(9086);
try {
pinger.activate();
ponger.activate();
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingService.java b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingManager.java
similarity index 95%
rename from utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingService.java
rename to utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingManager.java
index ce917f7..c183523 100644
--- a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingService.java
+++ b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingManager.java
@@ -38,6 +38,7 @@
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.nio.AcceptorLoop;
+import org.onlab.nio.SelectorLoop;
import org.onlab.packet.IpAddress;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
@@ -53,7 +54,7 @@
/**
* MessagingService implementation based on IOLoop.
*/
-public class IOLoopMessagingService implements MessagingService {
+public class IOLoopMessagingManager implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -81,7 +82,7 @@
private final Endpoint localEp;
private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
- new GenericKeyedObjectPool<Endpoint, DefaultMessageStream>(new DefaultMessageStreamFactory());
+ new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
@@ -97,20 +98,21 @@
.build();
- public IOLoopMessagingService(int port) {
+ public IOLoopMessagingManager(int port) {
this(new Endpoint(IpAddress.valueOf("127.0.0.1"), port));
}
- public IOLoopMessagingService(IpAddress ip, int port) {
+ public IOLoopMessagingManager(IpAddress ip, int port) {
this(new Endpoint(ip, port));
}
- public IOLoopMessagingService(Endpoint localEp) {
+ public IOLoopMessagingManager(Endpoint localEp) {
this.localEp = localEp;
}
/**
* Returns the local endpoint.
+ *
* @return local endpoint
*/
public Endpoint localEp() {
@@ -119,6 +121,7 @@
/**
* Activates IO Loops.
+ *
* @throws IOException is activation fails
*/
public void activate() throws IOException {
@@ -129,7 +132,7 @@
ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
}
- ioLoops.forEach(loop -> ioThreadPool.execute(loop));
+ ioLoops.forEach(ioThreadPool::execute);
acceptorThreadPool.execute(acceptorLoop);
ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
acceptorLoop.awaitStart(TIMEOUT);
@@ -139,7 +142,7 @@
* Shuts down IO loops.
*/
public void deactivate() {
- ioLoops.forEach(loop -> loop.shutdown());
+ ioLoops.forEach(SelectorLoop::shutdown);
acceptorLoop.shutdown();
ioThreadPool.shutdown();
acceptorThreadPool.shutdown();