ONOS-1983: Migrating all copycat Raft protocol specific communication to use ONOS cluster communication primitives

Change-Id: I3f07266e50106b1adc13f722c647686c2b42ef7f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 7475819..21b0919 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -21,6 +21,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
@@ -182,11 +183,15 @@
                                                       Function<M, byte[]> encoder,
                                                       Function<byte[], R> decoder,
                                                       NodeId toNodeId) {
-        ClusterMessage envelope = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                encoder.apply(message));
-        return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+        try {
+            ClusterMessage envelope = new ClusterMessage(
+                    clusterService.getLocalNode().id(),
+                    subject,
+                    encoder.apply(message));
+            return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
     }
 
     private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
@@ -223,7 +228,6 @@
         messagingService.unregisterHandler(subject.value());
     }
 
-
     @Override
     public <M, R> void addSubscriber(MessageSubject subject,
             Function<byte[], M> decoder,
@@ -231,8 +235,26 @@
             Function<R, byte[]> encoder,
             Executor executor) {
         messagingService.registerHandler(subject.value(),
-                new InternalMessageResponder<>(decoder, encoder, handler),
-                executor);
+                new InternalMessageResponder<M, R>(decoder, encoder, m -> {
+                    CompletableFuture<R> responseFuture = new CompletableFuture<>();
+                    executor.execute(() -> {
+                        try {
+                            responseFuture.complete(handler.apply(m));
+                        } catch (Exception e) {
+                            responseFuture.completeExceptionally(e);
+                        }
+                    });
+                    return responseFuture;
+                }));
+    }
+
+    @Override
+    public <M, R> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Function<M, CompletableFuture<R>> handler,
+            Function<R, byte[]> encoder) {
+        messagingService.registerHandler(subject.value(),
+                new InternalMessageResponder<>(decoder, encoder, handler));
     }
 
     @Override
@@ -260,23 +282,22 @@
         }
     }
 
-    private class InternalMessageResponder<M, R> implements Function<byte[], byte[]> {
+    private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
         private final Function<byte[], M> decoder;
         private final Function<R, byte[]> encoder;
-        private final Function<M, R> handler;
+        private final Function<M, CompletableFuture<R>> handler;
 
         public InternalMessageResponder(Function<byte[], M> decoder,
                                         Function<R, byte[]> encoder,
-                                        Function<M, R> handler) {
+                                        Function<M, CompletableFuture<R>> handler) {
             this.decoder = decoder;
             this.encoder = encoder;
             this.handler = handler;
         }
 
         @Override
-        public byte[] apply(byte[] bytes) {
-            R reply = handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
-            return encoder.apply(reply);
+        public CompletableFuture<byte[]> apply(byte[] bytes) {
+            return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
         }
     }