Added a messaging service implementation on top of IOLoop. Added ability to easily switch between netty and io loop (default is netty)
Change-Id: Id9af0756bf0a542f832f3611b486b2ac680b91e4
diff --git a/utils/netty/pom.xml b/utils/netty/pom.xml
index c95412b..9e5677c 100644
--- a/utils/netty/pom.xml
+++ b/utils/netty/pom.xml
@@ -39,6 +39,10 @@
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
<artifactId>onlab-misc</artifactId>
</dependency>
<dependency>
diff --git a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
deleted file mode 100644
index db38c19..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-//FIXME: Should be move out to test or app
-/**
- * Message handler that echos the message back to the sender.
- */
-public class EchoHandler implements MessageHandler {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Override
- public void handle(Message message) throws IOException {
- log.info("Received message. Echoing it back to the sender.");
- message.respond(message.payload());
- }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
deleted file mode 100644
index af97763..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import static com.google.common.base.Preconditions.*;
-
-import java.util.Objects;
-
-import org.onlab.packet.IpAddress;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Representation of a TCP/UDP communication end point.
- */
-public final class Endpoint {
-
- private final int port;
- private final IpAddress ip;
-
- public Endpoint(IpAddress host, int port) {
- this.ip = checkNotNull(host);
- this.port = port;
- }
-
- public IpAddress host() {
- return ip;
- }
-
- public int port() {
- return port;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("ip", ip)
- .add("port", port)
- .toString();
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(ip, port);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- Endpoint that = (Endpoint) obj;
- return Objects.equals(this.port, that.port) &&
- Objects.equals(this.ip, that.ip);
- }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index 9d42b6d..102e2a2 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -15,9 +15,8 @@
*/
package org.onlab.netty;
-import java.io.IOException;
-
import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.store.cluster.messaging.Endpoint;
import com.google.common.base.MoreObjects;
@@ -25,20 +24,14 @@
* Internal message representation with additional attributes
* for supporting, synchronous request/reply behavior.
*/
-public final class InternalMessage implements Message {
+public final class InternalMessage {
- public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+ private final long id;
+ private final Endpoint sender;
+ private final String type;
+ private final byte[] payload;
- private long id;
- private Endpoint sender;
- private String type;
- private byte[] payload;
- private transient NettyMessagingService messagingService;
-
- // Must be created using the Builder.
- private InternalMessage() {}
-
- InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
+ public InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
this.id = id;
this.sender = sender;
this.type = type;
@@ -57,26 +50,10 @@
return sender;
}
- @Override
public byte[] payload() {
return payload;
}
- protected void setMessagingService(NettyMessagingService messagingService) {
- this.messagingService = messagingService;
- }
-
- @Override
- public void respond(byte[] data) throws IOException {
- Builder builder = new Builder(messagingService);
- InternalMessage message = builder.withId(this.id)
- .withSender(messagingService.localEp())
- .withPayload(data)
- .withType(REPLY_MESSAGE_TYPE)
- .build();
- messagingService.sendAsync(sender, message);
- }
-
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
@@ -86,39 +63,4 @@
.add("payload", ByteArraySizeHashPrinter.of(payload))
.toString();
}
-
- /**
- * Builder for InternalMessages.
- */
- public static final class Builder {
- private InternalMessage message;
-
- public Builder(NettyMessagingService messagingService) {
- message = new InternalMessage();
- message.messagingService = messagingService;
- }
-
- public Builder withId(long id) {
- message.id = id;
- return this;
- }
-
- public Builder withType(String type) {
- message.type = type;
- return this;
- }
-
- public Builder withSender(Endpoint sender) {
- message.sender = sender;
- return this;
- }
- public Builder withPayload(byte[] payload) {
- message.payload = payload;
- return this;
- }
-
- public InternalMessage build() {
- return message;
- }
- }
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
deleted file mode 100644
index d91e8ae..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A MessageHandler that simply logs the information.
- */
-public class LoggingHandler implements MessageHandler {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Override
- public void handle(Message message) {
- log.info("Received message. Payload has {} bytes", message.payload().length);
- }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Message.java b/utils/netty/src/main/java/org/onlab/netty/Message.java
deleted file mode 100644
index 2baea96..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/Message.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import java.io.IOException;
-
-/**
- * A unit of communication.
- * Has a payload. Also supports a feature to respond back to the sender.
- */
-public interface Message {
-
- /**
- * Returns the payload of this message.
- * @return message payload.
- */
- public byte[] payload();
-
- /**
- * Sends a reply back to the sender of this message.
- * @param data payload of the response.
- * @throws IOException if there is a communication error.
- */
- public void respond(byte[] data) throws IOException;
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index 440fc52..c34d3cc 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -24,6 +24,7 @@
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
+import org.onosproject.store.cluster.messaging.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,8 +37,6 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private final NettyMessagingService messagingService;
-
private long messageId;
private Version ipVersion;
private IpAddress senderIp;
@@ -46,9 +45,8 @@
private String messageType;
private int contentLength;
- public MessageDecoder(NettyMessagingService messagingService) {
+ public MessageDecoder() {
super(DecoderState.READ_MESSAGE_ID);
- this.messagingService = messagingService;
}
@Override
@@ -91,7 +89,6 @@
new Endpoint(senderIp, senderPort),
messageType,
payload);
- message.setMessagingService(messagingService);
out.add(message);
checkpoint(DecoderState.READ_MESSAGE_ID);
break;
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index f5a8d2e..2b7784f 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -24,6 +24,7 @@
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
+import org.onosproject.store.cluster.messaging.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java b/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
deleted file mode 100644
index a87d8fc..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import java.io.IOException;
-
-/**
- * Handler for a message.
- */
-public interface MessageHandler {
-
- /**
- * Handles the message.
- *
- * @param message message.
- * @throws IOException if an error is encountered handling the message
- */
- public void handle(Message message) throws IOException;
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
deleted file mode 100644
index 1b42b04..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Interface for low level messaging primitives.
- */
-public interface MessagingService {
- /**
- * Sends a message asynchronously to the specified communication end point.
- * The message is specified using the type and payload.
- * @param ep end point to send the message to.
- * @param type type of message.
- * @param payload message payload bytes.
- * @throws IOException when I/O exception of some sort has occurred
- */
- public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
-
- /**
- * Sends a message synchronously and waits for a response.
- * @param ep end point to send the message to.
- * @param type type of message.
- * @param payload message payload.
- * @return a response future
- */
- public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
-
- /**
- * Registers a new message handler for message type.
- * @param type message type.
- * @param handler message handler
- * @param executor executor to use for running message handler logic.
- */
- public void registerHandler(String type, MessageHandler handler, Executor executor);
-
- /**
- * Registers a new message handler for message type.
- * @param type message type.
- * @param handler message handler
- */
- @Deprecated
- public void registerHandler(String type, MessageHandler handler);
-
- /**
- * Unregister current handler, if one exists for message type.
- * @param type message type
- */
- public void unregisterHandler(String type);
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index da72886..eeba05e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -46,10 +46,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.packet.IpAddress;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,17 +63,18 @@
import com.google.common.cache.RemovalNotification;
/**
- * A Netty based implementation of MessagingService.
+ * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
*/
public class NettyMessagingService implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+
private final Endpoint localEp;
- private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
- .maximumSize(100000)
.expireAfterWrite(10, TimeUnit.SECONDS)
.removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
@Override
@@ -124,6 +129,7 @@
}
public void activate() throws InterruptedException {
+ channels.setLifo(false);
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
initEventLoopGroup();
@@ -146,12 +152,10 @@
@Override
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
- InternalMessage message = new InternalMessage.Builder(this)
- .withId(messageIdGenerator.incrementAndGet())
- .withSender(localEp)
- .withType(type)
- .withPayload(payload)
- .build();
+ InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
+ localEp,
+ type,
+ payload);
sendAsync(ep, message);
}
@@ -164,7 +168,7 @@
try {
try {
channel = channels.borrowObject(ep);
- channel.eventLoop().execute(new WriteTask(channel, message));
+ channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} finally {
channels.returnObject(ep, channel);
}
@@ -173,7 +177,6 @@
} catch (Exception e) {
throw new IOException(e);
}
-
}
@Override
@@ -181,12 +184,7 @@
CompletableFuture<byte[]> response = new CompletableFuture<>();
Long messageId = messageIdGenerator.incrementAndGet();
responseFutures.put(messageId, response);
- InternalMessage message = new InternalMessage.Builder(this)
- .withId(messageId)
- .withSender(localEp)
- .withType(type)
- .withPayload(payload)
- .build();
+ InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
try {
sendAsync(ep, message);
} catch (Exception e) {
@@ -197,24 +195,26 @@
}
@Override
- public void registerHandler(String type, MessageHandler handler) {
- handlers.put(type, handler);
+ public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
+ handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
}
@Override
- public void registerHandler(String type, MessageHandler handler, Executor executor) {
- handlers.put(type, new MessageHandler() {
- @Override
- public void handle(Message message) throws IOException {
- executor.execute(() -> {
- try {
- handler.handle(message);
- } catch (Exception e) {
- log.debug("Failed to process message of type {}", type, e);
- }
- });
+ public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
+ handlers.put(type, message -> executor.execute(() -> {
+ byte[] responsePayload = handler.apply(message.payload());
+ if (responsePayload != null) {
+ InternalMessage response = new InternalMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ responsePayload);
+ try {
+ sendAsync(message.sender(), response);
+ } catch (IOException e) {
+ log.debug("Failed to respond", e);
+ }
}
- });
+ }));
}
@Override
@@ -222,14 +222,12 @@
handlers.remove(type);
}
- private MessageHandler getMessageHandler(String type) {
- return handlers.get(type);
- }
-
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+ b.option(ChannelOption.SO_RCVBUF, 1048576);
+ b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(serverGroup, clientGroup)
.channel(serverChannelClass)
@@ -258,8 +256,9 @@
public Channel makeObject(Endpoint ep) throws Exception {
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
- bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+ bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
+ bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
+ bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
bootstrap.group(clientGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
@@ -268,6 +267,7 @@
bootstrap.handler(new OnosCommunicationChannelInitializer());
// Start the client.
ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
+ log.info("Established a new connection to {}", ep);
return f.channel();
}
@@ -291,27 +291,11 @@
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
+ .addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
}
- private static class WriteTask implements Runnable {
-
- private final InternalMessage message;
- private final Channel channel;
-
- public WriteTask(Channel channel, InternalMessage message) {
- this.channel = channel;
- this.message = message;
- }
-
- @Override
- public void run() {
- channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- }
- }
-
@ChannelHandler.Sharable
private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
@@ -329,10 +313,10 @@
private void dispatchLocally(InternalMessage message) throws IOException {
String type = message.type();
- if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
+ if (REPLY_MESSAGE_TYPE.equals(type)) {
try {
CompletableFuture<byte[]> futureResponse =
- NettyMessagingService.this.responseFutures.getIfPresent(message.id());
+ responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
futureResponse.complete(message.payload());
} else {
@@ -341,13 +325,13 @@
+ " request handle", message.id(), message.sender());
}
} finally {
- NettyMessagingService.this.responseFutures.invalidate(message.id());
+ responseFutures.invalidate(message.id());
}
return;
}
- MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
+ Consumer<InternalMessage> handler = handlers.get(type);
if (handler != null) {
- handler.handle(message);
+ handler.accept(message);
} else {
log.debug("No handler registered for {}", type);
}
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
index 3eb0d39..61d8541 100644
--- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
+++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
@@ -18,13 +18,17 @@
import static org.junit.Assert.assertArrayEquals;
import java.net.InetAddress;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import org.apache.commons.lang3.RandomUtils;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.packet.IpAddress;
+import org.onosproject.store.cluster.messaging.Endpoint;
+
+import com.google.common.util.concurrent.MoreExecutors;
/**
* Simple ping-pong test that exercises NettyMessagingService.
@@ -39,9 +43,9 @@
try {
pinger.activate();
ponger.activate();
- ponger.registerHandler("echo", new EchoHandler());
+ ponger.registerHandler("echo", Function.identity(), MoreExecutors.directExecutor());
byte[] payload = RandomUtils.nextBytes(100);
- Future<byte[]> responseFuture =
+ CompletableFuture<byte[]> responseFuture =
pinger.sendAndReceive(
new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), 9086), "echo", payload);
assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));