ONOS-1983: Migrating all copycat Raft protocol specific communication to use ONOS cluster communication primitives
Change-Id: I3f07266e50106b1adc13f722c647686c2b42ef7f
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index bef0bc2..eb36202 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -164,7 +164,7 @@
*
* @param subject message subject
* @param decoder decoder for resurrecting incoming message
- * @param handler handler function that process the incoming message and produces a reply
+ * @param handler handler function that processes the incoming message and produces a reply
* @param encoder encoder for serializing reply
* @param executor executor to run this handler on
* @param <M> incoming message type
@@ -180,6 +180,21 @@
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
+ * @param decoder decoder for resurrecting incoming message
+ * @param handler handler function that processes the incoming message and produces a reply
+ * @param encoder encoder for serializing reply
+ * @param <M> incoming message type
+ * @param <R> reply message type
+ */
+ <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder);
+
+ /**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
* @param decoder decoder to resurrecting incoming message
* @param handler handler for handling message
* @param executor executor to run this handler on
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
index 3fe335b..09f15f8 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
@@ -62,6 +62,13 @@
void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor);
/**
+ * Registers a new message handler for message type.
+ * @param type message type.
+ * @param handler message handler
+ */
+ void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler);
+
+ /**
* Unregister current handler, if one exists for message type.
* @param type message type
*/
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 7475819..21b0919 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -21,6 +21,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
@@ -182,11 +183,15 @@
Function<M, byte[]> encoder,
Function<byte[], R> decoder,
NodeId toNodeId) {
- ClusterMessage envelope = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- encoder.apply(message));
- return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+ try {
+ ClusterMessage envelope = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ encoder.apply(message));
+ return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
}
private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
@@ -223,7 +228,6 @@
messagingService.unregisterHandler(subject.value());
}
-
@Override
public <M, R> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
@@ -231,8 +235,26 @@
Function<R, byte[]> encoder,
Executor executor) {
messagingService.registerHandler(subject.value(),
- new InternalMessageResponder<>(decoder, encoder, handler),
- executor);
+ new InternalMessageResponder<M, R>(decoder, encoder, m -> {
+ CompletableFuture<R> responseFuture = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ responseFuture.complete(handler.apply(m));
+ } catch (Exception e) {
+ responseFuture.completeExceptionally(e);
+ }
+ });
+ return responseFuture;
+ }));
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder) {
+ messagingService.registerHandler(subject.value(),
+ new InternalMessageResponder<>(decoder, encoder, handler));
}
@Override
@@ -260,23 +282,22 @@
}
}
- private class InternalMessageResponder<M, R> implements Function<byte[], byte[]> {
+ private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
private final Function<byte[], M> decoder;
private final Function<R, byte[]> encoder;
- private final Function<M, R> handler;
+ private final Function<M, CompletableFuture<R>> handler;
public InternalMessageResponder(Function<byte[], M> decoder,
Function<R, byte[]> encoder,
- Function<M, R> handler) {
+ Function<M, CompletableFuture<R>> handler) {
this.decoder = decoder;
this.encoder = encoder;
this.handler = handler;
}
@Override
- public byte[] apply(byte[] bytes) {
- R reply = handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
- return encoder.apply(reply);
+ public CompletableFuture<byte[]> apply(byte[] bytes) {
+ return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java
new file mode 100644
index 0000000..628ff71
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java
@@ -0,0 +1,119 @@
+package org.onosproject.store.consistent.impl;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+import net.kuujo.copycat.protocol.AbstractProtocol;
+import net.kuujo.copycat.protocol.ProtocolClient;
+import net.kuujo.copycat.protocol.ProtocolHandler;
+import net.kuujo.copycat.protocol.ProtocolServer;
+import net.kuujo.copycat.util.Configurable;
+
+/**
+ * Protocol for Copycat communication that employs
+ * {@code ClusterCommunicationService}.
+ */
+public class CopycatCommunicationProtocol extends AbstractProtocol {
+
+ private static final MessageSubject COPYCAT_MESSAGE_SUBJECT =
+ new MessageSubject("onos-copycat-message");
+
+ protected ClusterService clusterService;
+ protected ClusterCommunicationService clusterCommunicator;
+
+ public CopycatCommunicationProtocol(ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator) {
+ this.clusterService = clusterService;
+ this.clusterCommunicator = clusterCommunicator;
+ }
+
+ @Override
+ public Configurable copy() {
+ return this;
+ }
+
+ @Override
+ public ProtocolClient createClient(URI uri) {
+ NodeId nodeId = uriToNodeId(uri);
+ if (nodeId == null) {
+ throw new IllegalStateException("Unknown peer " + uri);
+ }
+ return new Client(nodeId);
+ }
+
+ @Override
+ public ProtocolServer createServer(URI uri) {
+ return new Server();
+ }
+
+ private class Server implements ProtocolServer {
+
+ @Override
+ public void handler(ProtocolHandler handler) {
+ if (handler == null) {
+ clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
+ } else {
+ clusterCommunicator.addSubscriber(COPYCAT_MESSAGE_SUBJECT,
+ ByteBuffer::wrap,
+ handler,
+ Tools::byteBuffertoArray);
+ // FIXME: Tools::byteBuffertoArray involves a array copy.
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> listen() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private class Client implements ProtocolClient {
+ private final NodeId peer;
+
+ public Client(NodeId peer) {
+ this.peer = peer;
+ }
+
+ @Override
+ public CompletableFuture<ByteBuffer> write(ByteBuffer request) {
+ return clusterCommunicator.sendAndReceive(request,
+ COPYCAT_MESSAGE_SUBJECT,
+ Tools::byteBuffertoArray,
+ ByteBuffer::wrap,
+ peer);
+ }
+
+ @Override
+ public CompletableFuture<Void> connect() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private NodeId uriToNodeId(URI uri) {
+ return clusterService.getNodes()
+ .stream()
+ .filter(node -> uri.getHost().equals(node.ip().toString()))
+ .map(ControllerNode::id)
+ .findAny()
+ .orElse(null);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index c06ebdf..390a444 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -107,7 +107,7 @@
protected ClusterCommunicationService clusterCommunicator;
protected String nodeToUri(NodeInfo node) {
- return String.format("tcp://%s:%d", node.getIp(), COPYCAT_TCP_PORT);
+ return String.format("onos://%s:%d", node.getIp(), node.getTcpPort());
}
@Activate
@@ -136,9 +136,10 @@
.toArray(String[]::new);
String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode()));
+ Protocol protocol = new CopycatCommunicationProtocol(clusterService, clusterCommunicator);
ClusterConfig clusterConfig = new ClusterConfig()
- .withProtocol(newNettyProtocol())
+ .withProtocol(protocol)
.withElectionTimeout(electionTimeoutMillis(activeNodeUris))
.withHeartbeatInterval(heartbeatTimeoutMillis(activeNodeUris))
.withMembers(activeNodeUris)
@@ -232,6 +233,7 @@
.collect(Collectors.toList());
}
+ @SuppressWarnings("unused")
private Protocol newNettyProtocol() {
return new NettyTcpProtocol()
.withSsl(false)
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index c215f5b..1b41d0c 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -785,6 +785,12 @@
}
@Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder, Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder) {
+ }
+
+ @Override
public <M> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder, Consumer<M> handler,
Executor executor) {
diff --git a/tools/test/bin/onos-gen-partitions b/tools/test/bin/onos-gen-partitions
index 7f32281..a255839 100755
--- a/tools/test/bin/onos-gen-partitions
+++ b/tools/test/bin/onos-gen-partitions
@@ -22,7 +22,7 @@
vars.append(var)
return sorted(vars, key=alphanum_key)
-def get_nodes(vars, port=7238):
+def get_nodes(vars, port=9876):
node = lambda k: { 'id': k, 'ip': k, 'tcpPort': port }
return [ node(environ[v]) for v in vars ]
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 4a93068..61d3c56 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -25,6 +25,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
@@ -34,9 +35,11 @@
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Dictionary;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
@@ -388,6 +391,37 @@
}
}
+ /**
+ * Returns a future that is completed exceptionally.
+ * @param t exception
+ * @param <T> future value type
+ * @return future
+ */
+ public static <T> CompletableFuture<T> exceptionalFuture(Throwable t) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ future.completeExceptionally(t);
+ return future;
+ }
+
+ /**
+ * Returns the contents of {@code ByteBuffer} as byte array.
+ * <p>
+ * WARNING: There is a performance cost due to array copy
+ * when using this method.
+ * @param buffer byte buffer
+ * @return byte array containing the byte buffer contents
+ */
+ public static byte[] byteBuffertoArray(ByteBuffer buffer) {
+ int length = buffer.remaining();
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ return Arrays.copyOfRange(buffer.array(), offset, offset + length);
+ }
+ byte[] bytes = new byte[length];
+ buffer.duplicate().get(bytes);
+ return bytes;
+ }
+
// Auxiliary path visitor for recursive directory structure copying.
private static class DirectoryCopier extends SimpleFileVisitor<Path> {
private Path src;
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index 44b7027..c19dc59 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -203,6 +203,25 @@
}
@Override
+ public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
+ handlers.put(type, message -> {
+ handler.apply(message.payload()).whenComplete((result, error) -> {
+ if (error == null) {
+ InternalMessage response = new InternalMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ result);
+ try {
+ sendAsync(message.sender(), response);
+ } catch (IOException e) {
+ log.debug("Failed to respond", e);
+ }
+ }
+ });
+ });
+ }
+
+ @Override
public void unregisterHandler(String type) {
handlers.remove(type);
}
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
index 69b312a..37a6535 100644
--- a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
+++ b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
@@ -212,6 +212,23 @@
}
@Override
+ public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
+ handlers.put(type, message -> handler.apply(message.payload()).whenComplete((result, error) -> {
+ if (error == null) {
+ DefaultMessage response = new DefaultMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ result);
+ try {
+ sendAsync(message.sender(), response);
+ } catch (IOException e) {
+ log.debug("Failed to respond", e);
+ }
+ }
+ }));
+ }
+
+ @Override
public void unregisterHandler(String type) {
handlers.remove(type);
}
@@ -312,4 +329,4 @@
return stream.isClosed();
}
}
-}
\ No newline at end of file
+}