ONOS-1983: Migrating all copycat Raft protocol specific communication to use ONOS cluster communication primitives

Change-Id: I3f07266e50106b1adc13f722c647686c2b42ef7f
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index bef0bc2..eb36202 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -164,7 +164,7 @@
      *
      * @param subject message subject
      * @param decoder decoder for resurrecting incoming message
-     * @param handler handler function that process the incoming message and produces a reply
+     * @param handler handler function that processes the incoming message and produces a reply
      * @param encoder encoder for serializing reply
      * @param executor executor to run this handler on
      * @param <M> incoming message type
@@ -180,6 +180,21 @@
      * Adds a new subscriber for the specified message subject.
      *
      * @param subject message subject
+     * @param decoder decoder for resurrecting incoming message
+     * @param handler handler function that processes the incoming message and produces a reply
+     * @param encoder encoder for serializing reply
+     * @param <M> incoming message type
+     * @param <R> reply message type
+     */
+    <M, R> void addSubscriber(MessageSubject subject,
+                              Function<byte[], M> decoder,
+                              Function<M, CompletableFuture<R>> handler,
+                              Function<R, byte[]> encoder);
+
+    /**
+     * Adds a new subscriber for the specified message subject.
+     *
+     * @param subject message subject
      * @param decoder decoder to resurrecting incoming message
      * @param handler handler for handling message
      * @param executor executor to run this handler on
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
index 3fe335b..09f15f8 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
@@ -62,6 +62,13 @@
     void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor);
 
     /**
+     * Registers a new message handler for message type.
+     * @param type message type.
+     * @param handler message handler
+     */
+    void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler);
+
+    /**
      * Unregister current handler, if one exists for message type.
      * @param type message type
      */
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 7475819..21b0919 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -21,6 +21,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
@@ -182,11 +183,15 @@
                                                       Function<M, byte[]> encoder,
                                                       Function<byte[], R> decoder,
                                                       NodeId toNodeId) {
-        ClusterMessage envelope = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                encoder.apply(message));
-        return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+        try {
+            ClusterMessage envelope = new ClusterMessage(
+                    clusterService.getLocalNode().id(),
+                    subject,
+                    encoder.apply(message));
+            return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
     }
 
     private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
@@ -223,7 +228,6 @@
         messagingService.unregisterHandler(subject.value());
     }
 
-
     @Override
     public <M, R> void addSubscriber(MessageSubject subject,
             Function<byte[], M> decoder,
@@ -231,8 +235,26 @@
             Function<R, byte[]> encoder,
             Executor executor) {
         messagingService.registerHandler(subject.value(),
-                new InternalMessageResponder<>(decoder, encoder, handler),
-                executor);
+                new InternalMessageResponder<M, R>(decoder, encoder, m -> {
+                    CompletableFuture<R> responseFuture = new CompletableFuture<>();
+                    executor.execute(() -> {
+                        try {
+                            responseFuture.complete(handler.apply(m));
+                        } catch (Exception e) {
+                            responseFuture.completeExceptionally(e);
+                        }
+                    });
+                    return responseFuture;
+                }));
+    }
+
+    @Override
+    public <M, R> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Function<M, CompletableFuture<R>> handler,
+            Function<R, byte[]> encoder) {
+        messagingService.registerHandler(subject.value(),
+                new InternalMessageResponder<>(decoder, encoder, handler));
     }
 
     @Override
@@ -260,23 +282,22 @@
         }
     }
 
-    private class InternalMessageResponder<M, R> implements Function<byte[], byte[]> {
+    private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
         private final Function<byte[], M> decoder;
         private final Function<R, byte[]> encoder;
-        private final Function<M, R> handler;
+        private final Function<M, CompletableFuture<R>> handler;
 
         public InternalMessageResponder(Function<byte[], M> decoder,
                                         Function<R, byte[]> encoder,
-                                        Function<M, R> handler) {
+                                        Function<M, CompletableFuture<R>> handler) {
             this.decoder = decoder;
             this.encoder = encoder;
             this.handler = handler;
         }
 
         @Override
-        public byte[] apply(byte[] bytes) {
-            R reply = handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
-            return encoder.apply(reply);
+        public CompletableFuture<byte[]> apply(byte[] bytes) {
+            return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
         }
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java
new file mode 100644
index 0000000..628ff71
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java
@@ -0,0 +1,119 @@
+package org.onosproject.store.consistent.impl;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+import net.kuujo.copycat.protocol.AbstractProtocol;
+import net.kuujo.copycat.protocol.ProtocolClient;
+import net.kuujo.copycat.protocol.ProtocolHandler;
+import net.kuujo.copycat.protocol.ProtocolServer;
+import net.kuujo.copycat.util.Configurable;
+
+/**
+ * Protocol for Copycat communication that employs
+ * {@code ClusterCommunicationService}.
+ */
+public class CopycatCommunicationProtocol extends AbstractProtocol {
+
+    private static final MessageSubject COPYCAT_MESSAGE_SUBJECT =
+            new MessageSubject("onos-copycat-message");
+
+    protected ClusterService clusterService;
+    protected ClusterCommunicationService clusterCommunicator;
+
+    public CopycatCommunicationProtocol(ClusterService clusterService,
+                                        ClusterCommunicationService clusterCommunicator) {
+        this.clusterService = clusterService;
+        this.clusterCommunicator = clusterCommunicator;
+    }
+
+    @Override
+    public Configurable copy() {
+        return this;
+    }
+
+    @Override
+    public ProtocolClient createClient(URI uri) {
+        NodeId nodeId = uriToNodeId(uri);
+        if (nodeId == null) {
+            throw new IllegalStateException("Unknown peer " + uri);
+        }
+        return new Client(nodeId);
+    }
+
+    @Override
+    public ProtocolServer createServer(URI uri) {
+        return new Server();
+    }
+
+    private class Server implements ProtocolServer {
+
+        @Override
+        public void handler(ProtocolHandler handler) {
+            if (handler == null) {
+                clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
+            } else {
+                clusterCommunicator.addSubscriber(COPYCAT_MESSAGE_SUBJECT,
+                        ByteBuffer::wrap,
+                        handler,
+                        Tools::byteBuffertoArray);
+                // FIXME: Tools::byteBuffertoArray involves a array copy.
+            }
+        }
+
+        @Override
+        public CompletableFuture<Void> listen() {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> close() {
+            clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    private class Client implements ProtocolClient {
+        private final NodeId peer;
+
+        public Client(NodeId peer) {
+            this.peer = peer;
+        }
+
+        @Override
+        public CompletableFuture<ByteBuffer> write(ByteBuffer request) {
+            return clusterCommunicator.sendAndReceive(request,
+                    COPYCAT_MESSAGE_SUBJECT,
+                    Tools::byteBuffertoArray,
+                    ByteBuffer::wrap,
+                    peer);
+        }
+
+        @Override
+        public CompletableFuture<Void> connect() {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> close() {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    private NodeId uriToNodeId(URI uri) {
+        return clusterService.getNodes()
+                             .stream()
+                             .filter(node -> uri.getHost().equals(node.ip().toString()))
+                             .map(ControllerNode::id)
+                             .findAny()
+                             .orElse(null);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index c06ebdf..390a444 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -107,7 +107,7 @@
     protected ClusterCommunicationService clusterCommunicator;
 
     protected String nodeToUri(NodeInfo node) {
-        return String.format("tcp://%s:%d", node.getIp(), COPYCAT_TCP_PORT);
+        return String.format("onos://%s:%d", node.getIp(), node.getTcpPort());
     }
 
     @Activate
@@ -136,9 +136,10 @@
                     .toArray(String[]::new);
 
         String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode()));
+        Protocol protocol = new CopycatCommunicationProtocol(clusterService, clusterCommunicator);
 
         ClusterConfig clusterConfig = new ClusterConfig()
-            .withProtocol(newNettyProtocol())
+            .withProtocol(protocol)
             .withElectionTimeout(electionTimeoutMillis(activeNodeUris))
             .withHeartbeatInterval(heartbeatTimeoutMillis(activeNodeUris))
             .withMembers(activeNodeUris)
@@ -232,6 +233,7 @@
                 .collect(Collectors.toList());
     }
 
+    @SuppressWarnings("unused")
     private Protocol newNettyProtocol() {
         return new NettyTcpProtocol()
             .withSsl(false)
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index c215f5b..1b41d0c 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -785,6 +785,12 @@
         }
 
         @Override
+        public <M, R> void addSubscriber(MessageSubject subject,
+                Function<byte[], M> decoder, Function<M, CompletableFuture<R>> handler,
+                Function<R, byte[]> encoder) {
+        }
+
+        @Override
         public <M> void addSubscriber(MessageSubject subject,
                 Function<byte[], M> decoder, Consumer<M> handler,
                 Executor executor) {
diff --git a/tools/test/bin/onos-gen-partitions b/tools/test/bin/onos-gen-partitions
index 7f32281..a255839 100755
--- a/tools/test/bin/onos-gen-partitions
+++ b/tools/test/bin/onos-gen-partitions
@@ -22,7 +22,7 @@
       vars.append(var)
   return sorted(vars, key=alphanum_key)
 
-def get_nodes(vars, port=7238):
+def get_nodes(vars, port=9876):
   node = lambda k: { 'id': k, 'ip': k, 'tcpPort': port }
   return [ node(environ[v]) for v in vars ]
 
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 4a93068..61d3c56 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -25,6 +25,7 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
@@ -34,9 +35,11 @@
 import java.nio.file.StandardCopyOption;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Dictionary;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
@@ -388,6 +391,37 @@
         }
     }
 
+    /**
+     * Returns a future that is completed exceptionally.
+     * @param t exception
+     * @param <T> future value type
+     * @return future
+     */
+    public static <T> CompletableFuture<T> exceptionalFuture(Throwable t) {
+        CompletableFuture<T> future = new CompletableFuture<>();
+        future.completeExceptionally(t);
+        return future;
+    }
+
+    /**
+     * Returns the contents of {@code ByteBuffer} as byte array.
+     * <p>
+     * WARNING: There is a performance cost due to array copy
+     * when using this method.
+     * @param buffer byte buffer
+     * @return byte array containing the byte buffer contents
+     */
+    public static byte[] byteBuffertoArray(ByteBuffer buffer) {
+        int length = buffer.remaining();
+        if (buffer.hasArray()) {
+            int offset = buffer.arrayOffset() + buffer.position();
+            return Arrays.copyOfRange(buffer.array(), offset, offset + length);
+        }
+        byte[] bytes = new byte[length];
+        buffer.duplicate().get(bytes);
+        return bytes;
+    }
+
     // Auxiliary path visitor for recursive directory structure copying.
     private static class DirectoryCopier extends SimpleFileVisitor<Path> {
         private Path src;
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index 44b7027..c19dc59 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -203,6 +203,25 @@
     }
 
     @Override
+    public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
+        handlers.put(type, message -> {
+            handler.apply(message.payload()).whenComplete((result, error) -> {
+            if (error == null) {
+                InternalMessage response = new InternalMessage(message.id(),
+                        localEp,
+                        REPLY_MESSAGE_TYPE,
+                        result);
+                try {
+                    sendAsync(message.sender(), response);
+                } catch (IOException e) {
+                    log.debug("Failed to respond", e);
+                }
+            }
+            });
+        });
+    }
+
+    @Override
     public void unregisterHandler(String type) {
         handlers.remove(type);
     }
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
index 69b312a..37a6535 100644
--- a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
+++ b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
@@ -212,6 +212,23 @@
     }
 
     @Override
+    public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
+        handlers.put(type, message -> handler.apply(message.payload()).whenComplete((result, error) -> {
+            if (error == null) {
+                DefaultMessage response = new DefaultMessage(message.id(),
+                        localEp,
+                        REPLY_MESSAGE_TYPE,
+                        result);
+                try {
+                    sendAsync(message.sender(), response);
+                } catch (IOException e) {
+                    log.debug("Failed to respond", e);
+                }
+            }
+        }));
+    }
+
+    @Override
     public void unregisterHandler(String type) {
         handlers.remove(type);
     }
@@ -312,4 +329,4 @@
             return stream.isClosed();
         }
     }
-}
\ No newline at end of file
+}