Revamped ClusterCommunicationService API

Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 42a79b3..204e0fd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.cluster.messaging.impl;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -37,8 +36,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
 import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -122,7 +130,84 @@
         return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
     }
 
-    private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
+    @Override
+    public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
+        SettableFuture<byte[]> response = SettableFuture.create();
+        sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
+            if (e == null) {
+                response.set(r);
+            } else {
+                response.setException(e);
+            }
+        });
+        return response;
+    }
+
+    @Override
+    public <M> void broadcast(M message,
+                              MessageSubject subject,
+                              Function<M, byte[]> encoder) {
+        multicast(message,
+                  subject,
+                  encoder,
+                  clusterService.getNodes()
+                      .stream()
+                      .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
+                      .map(ControllerNode::id)
+                      .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public <M> void broadcastIncludeSelf(M message,
+                                         MessageSubject subject,
+                                         Function<M, byte[]> encoder) {
+        multicast(message,
+                  subject,
+                  encoder,
+                  clusterService.getNodes()
+                      .stream()
+                      .map(ControllerNode::id)
+                      .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public <M> boolean unicast(M message,
+                               MessageSubject subject,
+                               Function<M, byte[]> encoder,
+                               NodeId toNodeId) {
+        byte[] payload = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                encoder.apply(message)).getBytes();
+        return unicastUnchecked(subject, payload, toNodeId);
+    }
+
+    @Override
+    public <M> void multicast(M message,
+                              MessageSubject subject,
+                              Function<M, byte[]> encoder,
+                              Set<NodeId> nodes) {
+        byte[] payload = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                encoder.apply(message)).getBytes();
+        nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
+    }
+
+    @Override
+    public <M, R> CompletableFuture<R> sendAndReceive(M message,
+                                                      MessageSubject subject,
+                                                      Function<M, byte[]> encoder,
+                                                      Function<byte[], R> decoder,
+                                                      NodeId toNodeId) {
+        ClusterMessage envelope = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                encoder.apply(message));
+        return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+    }
+
+    private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
@@ -131,37 +216,15 @@
             return true;
         } catch (IOException e) {
             log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
-            throw e;
-        }
-    }
-
-    private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
-        try {
-            return unicast(subject, payload, toNodeId);
-        } catch (IOException e) {
             return false;
         }
     }
 
-    @Override
-    public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+    private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
-        try {
-            return messagingService.sendAndReceive(nodeEp, message.subject().value(), message.getBytes());
-
-        } catch (IOException e) {
-            log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
-            throw e;
-        }
-    }
-
-    @Override
-    @Deprecated
-    public void addSubscriber(MessageSubject subject,
-                              ClusterMessageHandler subscriber) {
-        messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
+        return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
     }
 
     @Override
@@ -202,6 +265,60 @@
         }
     }
 
+    @Override
+    public <M, R> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Function<M, R> handler,
+            Function<R, byte[]> encoder,
+            ExecutorService executor) {
+        messagingService.registerHandler(subject.value(),
+                new InternalMessageResponder<>(decoder, encoder, handler),
+                executor);
+    }
+
+    @Override
+    public <M> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Consumer<M> handler,
+            ExecutorService executor) {
+        messagingService.registerHandler(subject.value(),
+                new InternalMessageConsumer<>(decoder, handler),
+                executor);
+    }
+
+    private class InternalMessageResponder<M, R> implements MessageHandler {
+        private final Function<byte[], M> decoder;
+        private final Function<R, byte[]> encoder;
+        private final Function<M, R> handler;
+
+        public InternalMessageResponder(Function<byte[], M> decoder,
+                                        Function<R, byte[]> encoder,
+                                        Function<M, R> handler) {
+            this.decoder = decoder;
+            this.encoder = encoder;
+            this.handler = handler;
+        }
+        @Override
+        public void handle(Message message) throws IOException {
+            R response = handler.apply(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
+            message.respond(encoder.apply(response));
+        }
+    }
+
+    private class InternalMessageConsumer<M> implements MessageHandler {
+        private final Function<byte[], M> decoder;
+        private final Consumer<M> consumer;
+
+        public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
+            this.decoder = decoder;
+            this.consumer = consumer;
+        }
+        @Override
+        public void handle(Message message) throws IOException {
+            consumer.accept(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
+        }
+    }
+
     public static final class InternalClusterMessage extends ClusterMessage {
 
         private final Message rawMessage;