ONOS-1983: Migrating all copycat Raft protocol specific communication to use ONOS cluster communication primitives
Change-Id: I3f07266e50106b1adc13f722c647686c2b42ef7f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 7475819..21b0919 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -21,6 +21,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
@@ -182,11 +183,15 @@
Function<M, byte[]> encoder,
Function<byte[], R> decoder,
NodeId toNodeId) {
- ClusterMessage envelope = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- encoder.apply(message));
- return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+ try {
+ ClusterMessage envelope = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ encoder.apply(message));
+ return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
}
private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
@@ -223,7 +228,6 @@
messagingService.unregisterHandler(subject.value());
}
-
@Override
public <M, R> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
@@ -231,8 +235,26 @@
Function<R, byte[]> encoder,
Executor executor) {
messagingService.registerHandler(subject.value(),
- new InternalMessageResponder<>(decoder, encoder, handler),
- executor);
+ new InternalMessageResponder<M, R>(decoder, encoder, m -> {
+ CompletableFuture<R> responseFuture = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ responseFuture.complete(handler.apply(m));
+ } catch (Exception e) {
+ responseFuture.completeExceptionally(e);
+ }
+ });
+ return responseFuture;
+ }));
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder) {
+ messagingService.registerHandler(subject.value(),
+ new InternalMessageResponder<>(decoder, encoder, handler));
}
@Override
@@ -260,23 +282,22 @@
}
}
- private class InternalMessageResponder<M, R> implements Function<byte[], byte[]> {
+ private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
private final Function<byte[], M> decoder;
private final Function<R, byte[]> encoder;
- private final Function<M, R> handler;
+ private final Function<M, CompletableFuture<R>> handler;
public InternalMessageResponder(Function<byte[], M> decoder,
Function<R, byte[]> encoder,
- Function<M, R> handler) {
+ Function<M, CompletableFuture<R>> handler) {
this.decoder = decoder;
this.encoder = encoder;
this.handler = handler;
}
@Override
- public byte[] apply(byte[] bytes) {
- R reply = handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
- return encoder.apply(reply);
+ public CompletableFuture<byte[]> apply(byte[] bytes) {
+ return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java
new file mode 100644
index 0000000..628ff71
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java
@@ -0,0 +1,119 @@
+package org.onosproject.store.consistent.impl;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+import net.kuujo.copycat.protocol.AbstractProtocol;
+import net.kuujo.copycat.protocol.ProtocolClient;
+import net.kuujo.copycat.protocol.ProtocolHandler;
+import net.kuujo.copycat.protocol.ProtocolServer;
+import net.kuujo.copycat.util.Configurable;
+
+/**
+ * Protocol for Copycat communication that employs
+ * {@code ClusterCommunicationService}.
+ */
+public class CopycatCommunicationProtocol extends AbstractProtocol {
+
+ private static final MessageSubject COPYCAT_MESSAGE_SUBJECT =
+ new MessageSubject("onos-copycat-message");
+
+ protected ClusterService clusterService;
+ protected ClusterCommunicationService clusterCommunicator;
+
+ public CopycatCommunicationProtocol(ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator) {
+ this.clusterService = clusterService;
+ this.clusterCommunicator = clusterCommunicator;
+ }
+
+ @Override
+ public Configurable copy() {
+ return this;
+ }
+
+ @Override
+ public ProtocolClient createClient(URI uri) {
+ NodeId nodeId = uriToNodeId(uri);
+ if (nodeId == null) {
+ throw new IllegalStateException("Unknown peer " + uri);
+ }
+ return new Client(nodeId);
+ }
+
+ @Override
+ public ProtocolServer createServer(URI uri) {
+ return new Server();
+ }
+
+ private class Server implements ProtocolServer {
+
+ @Override
+ public void handler(ProtocolHandler handler) {
+ if (handler == null) {
+ clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
+ } else {
+ clusterCommunicator.addSubscriber(COPYCAT_MESSAGE_SUBJECT,
+ ByteBuffer::wrap,
+ handler,
+ Tools::byteBuffertoArray);
+ // FIXME: Tools::byteBuffertoArray involves a array copy.
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> listen() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private class Client implements ProtocolClient {
+ private final NodeId peer;
+
+ public Client(NodeId peer) {
+ this.peer = peer;
+ }
+
+ @Override
+ public CompletableFuture<ByteBuffer> write(ByteBuffer request) {
+ return clusterCommunicator.sendAndReceive(request,
+ COPYCAT_MESSAGE_SUBJECT,
+ Tools::byteBuffertoArray,
+ ByteBuffer::wrap,
+ peer);
+ }
+
+ @Override
+ public CompletableFuture<Void> connect() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private NodeId uriToNodeId(URI uri) {
+ return clusterService.getNodes()
+ .stream()
+ .filter(node -> uri.getHost().equals(node.ip().toString()))
+ .map(ControllerNode::id)
+ .findAny()
+ .orElse(null);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index c06ebdf..390a444 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -107,7 +107,7 @@
protected ClusterCommunicationService clusterCommunicator;
protected String nodeToUri(NodeInfo node) {
- return String.format("tcp://%s:%d", node.getIp(), COPYCAT_TCP_PORT);
+ return String.format("onos://%s:%d", node.getIp(), node.getTcpPort());
}
@Activate
@@ -136,9 +136,10 @@
.toArray(String[]::new);
String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode()));
+ Protocol protocol = new CopycatCommunicationProtocol(clusterService, clusterCommunicator);
ClusterConfig clusterConfig = new ClusterConfig()
- .withProtocol(newNettyProtocol())
+ .withProtocol(protocol)
.withElectionTimeout(electionTimeoutMillis(activeNodeUris))
.withHeartbeatInterval(heartbeatTimeoutMillis(activeNodeUris))
.withMembers(activeNodeUris)
@@ -232,6 +233,7 @@
.collect(Collectors.toList());
}
+ @SuppressWarnings("unused")
private Protocol newNettyProtocol() {
return new NettyTcpProtocol()
.withSsl(false)