Revamped ClusterCommunicationService API
Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 42a79b3..204e0fd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.store.cluster.messaging.impl;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -37,8 +36,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
@@ -122,7 +130,84 @@
return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
}
- private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
+ @Override
+ public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
+ SettableFuture<byte[]> response = SettableFuture.create();
+ sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
+ if (e == null) {
+ response.set(r);
+ } else {
+ response.setException(e);
+ }
+ });
+ return response;
+ }
+
+ @Override
+ public <M> void broadcast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> boolean unicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ NodeId toNodeId) {
+ byte[] payload = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ encoder.apply(message)).getBytes();
+ return unicastUnchecked(subject, payload, toNodeId);
+ }
+
+ @Override
+ public <M> void multicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Set<NodeId> nodes) {
+ byte[] payload = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ encoder.apply(message)).getBytes();
+ nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId) {
+ ClusterMessage envelope = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ encoder.apply(message));
+ return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+ }
+
+ private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
@@ -131,37 +216,15 @@
return true;
} catch (IOException e) {
log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
- throw e;
- }
- }
-
- private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- try {
- return unicast(subject, payload, toNodeId);
- } catch (IOException e) {
return false;
}
}
- @Override
- public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+ private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- try {
- return messagingService.sendAndReceive(nodeEp, message.subject().value(), message.getBytes());
-
- } catch (IOException e) {
- log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
- throw e;
- }
- }
-
- @Override
- @Deprecated
- public void addSubscriber(MessageSubject subject,
- ClusterMessageHandler subscriber) {
- messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
+ return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
}
@Override
@@ -202,6 +265,60 @@
}
}
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, R> handler,
+ Function<R, byte[]> encoder,
+ ExecutorService executor) {
+ messagingService.registerHandler(subject.value(),
+ new InternalMessageResponder<>(decoder, encoder, handler),
+ executor);
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Consumer<M> handler,
+ ExecutorService executor) {
+ messagingService.registerHandler(subject.value(),
+ new InternalMessageConsumer<>(decoder, handler),
+ executor);
+ }
+
+ private class InternalMessageResponder<M, R> implements MessageHandler {
+ private final Function<byte[], M> decoder;
+ private final Function<R, byte[]> encoder;
+ private final Function<M, R> handler;
+
+ public InternalMessageResponder(Function<byte[], M> decoder,
+ Function<R, byte[]> encoder,
+ Function<M, R> handler) {
+ this.decoder = decoder;
+ this.encoder = encoder;
+ this.handler = handler;
+ }
+ @Override
+ public void handle(Message message) throws IOException {
+ R response = handler.apply(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
+ message.respond(encoder.apply(response));
+ }
+ }
+
+ private class InternalMessageConsumer<M> implements MessageHandler {
+ private final Function<byte[], M> decoder;
+ private final Consumer<M> consumer;
+
+ public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
+ this.decoder = decoder;
+ this.consumer = consumer;
+ }
+ @Override
+ public void handle(Message message) throws IOException {
+ consumer.accept(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
+ }
+ }
+
public static final class InternalClusterMessage extends ClusterMessage {
private final Message rawMessage;