ONOS-1983: Migrating all copycat Raft protocol specific communication to use ONOS cluster communication primitives

Change-Id: I3f07266e50106b1adc13f722c647686c2b42ef7f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 7475819..21b0919 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -21,6 +21,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
@@ -182,11 +183,15 @@
                                                       Function<M, byte[]> encoder,
                                                       Function<byte[], R> decoder,
                                                       NodeId toNodeId) {
-        ClusterMessage envelope = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                encoder.apply(message));
-        return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+        try {
+            ClusterMessage envelope = new ClusterMessage(
+                    clusterService.getLocalNode().id(),
+                    subject,
+                    encoder.apply(message));
+            return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
     }
 
     private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
@@ -223,7 +228,6 @@
         messagingService.unregisterHandler(subject.value());
     }
 
-
     @Override
     public <M, R> void addSubscriber(MessageSubject subject,
             Function<byte[], M> decoder,
@@ -231,8 +235,26 @@
             Function<R, byte[]> encoder,
             Executor executor) {
         messagingService.registerHandler(subject.value(),
-                new InternalMessageResponder<>(decoder, encoder, handler),
-                executor);
+                new InternalMessageResponder<M, R>(decoder, encoder, m -> {
+                    CompletableFuture<R> responseFuture = new CompletableFuture<>();
+                    executor.execute(() -> {
+                        try {
+                            responseFuture.complete(handler.apply(m));
+                        } catch (Exception e) {
+                            responseFuture.completeExceptionally(e);
+                        }
+                    });
+                    return responseFuture;
+                }));
+    }
+
+    @Override
+    public <M, R> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Function<M, CompletableFuture<R>> handler,
+            Function<R, byte[]> encoder) {
+        messagingService.registerHandler(subject.value(),
+                new InternalMessageResponder<>(decoder, encoder, handler));
     }
 
     @Override
@@ -260,23 +282,22 @@
         }
     }
 
-    private class InternalMessageResponder<M, R> implements Function<byte[], byte[]> {
+    private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
         private final Function<byte[], M> decoder;
         private final Function<R, byte[]> encoder;
-        private final Function<M, R> handler;
+        private final Function<M, CompletableFuture<R>> handler;
 
         public InternalMessageResponder(Function<byte[], M> decoder,
                                         Function<R, byte[]> encoder,
-                                        Function<M, R> handler) {
+                                        Function<M, CompletableFuture<R>> handler) {
             this.decoder = decoder;
             this.encoder = encoder;
             this.handler = handler;
         }
 
         @Override
-        public byte[] apply(byte[] bytes) {
-            R reply = handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
-            return encoder.apply(reply);
+        public CompletableFuture<byte[]> apply(byte[] bytes) {
+            return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
         }
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java
new file mode 100644
index 0000000..628ff71
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java
@@ -0,0 +1,119 @@
+package org.onosproject.store.consistent.impl;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+import net.kuujo.copycat.protocol.AbstractProtocol;
+import net.kuujo.copycat.protocol.ProtocolClient;
+import net.kuujo.copycat.protocol.ProtocolHandler;
+import net.kuujo.copycat.protocol.ProtocolServer;
+import net.kuujo.copycat.util.Configurable;
+
+/**
+ * Protocol for Copycat communication that employs
+ * {@code ClusterCommunicationService}.
+ */
+public class CopycatCommunicationProtocol extends AbstractProtocol {
+
+    private static final MessageSubject COPYCAT_MESSAGE_SUBJECT =
+            new MessageSubject("onos-copycat-message");
+
+    protected ClusterService clusterService;
+    protected ClusterCommunicationService clusterCommunicator;
+
+    public CopycatCommunicationProtocol(ClusterService clusterService,
+                                        ClusterCommunicationService clusterCommunicator) {
+        this.clusterService = clusterService;
+        this.clusterCommunicator = clusterCommunicator;
+    }
+
+    @Override
+    public Configurable copy() {
+        return this;
+    }
+
+    @Override
+    public ProtocolClient createClient(URI uri) {
+        NodeId nodeId = uriToNodeId(uri);
+        if (nodeId == null) {
+            throw new IllegalStateException("Unknown peer " + uri);
+        }
+        return new Client(nodeId);
+    }
+
+    @Override
+    public ProtocolServer createServer(URI uri) {
+        return new Server();
+    }
+
+    private class Server implements ProtocolServer {
+
+        @Override
+        public void handler(ProtocolHandler handler) {
+            if (handler == null) {
+                clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
+            } else {
+                clusterCommunicator.addSubscriber(COPYCAT_MESSAGE_SUBJECT,
+                        ByteBuffer::wrap,
+                        handler,
+                        Tools::byteBuffertoArray);
+                // FIXME: Tools::byteBuffertoArray involves a array copy.
+            }
+        }
+
+        @Override
+        public CompletableFuture<Void> listen() {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> close() {
+            clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    private class Client implements ProtocolClient {
+        private final NodeId peer;
+
+        public Client(NodeId peer) {
+            this.peer = peer;
+        }
+
+        @Override
+        public CompletableFuture<ByteBuffer> write(ByteBuffer request) {
+            return clusterCommunicator.sendAndReceive(request,
+                    COPYCAT_MESSAGE_SUBJECT,
+                    Tools::byteBuffertoArray,
+                    ByteBuffer::wrap,
+                    peer);
+        }
+
+        @Override
+        public CompletableFuture<Void> connect() {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> close() {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    private NodeId uriToNodeId(URI uri) {
+        return clusterService.getNodes()
+                             .stream()
+                             .filter(node -> uri.getHost().equals(node.ip().toString()))
+                             .map(ControllerNode::id)
+                             .findAny()
+                             .orElse(null);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index c06ebdf..390a444 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -107,7 +107,7 @@
     protected ClusterCommunicationService clusterCommunicator;
 
     protected String nodeToUri(NodeInfo node) {
-        return String.format("tcp://%s:%d", node.getIp(), COPYCAT_TCP_PORT);
+        return String.format("onos://%s:%d", node.getIp(), node.getTcpPort());
     }
 
     @Activate
@@ -136,9 +136,10 @@
                     .toArray(String[]::new);
 
         String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode()));
+        Protocol protocol = new CopycatCommunicationProtocol(clusterService, clusterCommunicator);
 
         ClusterConfig clusterConfig = new ClusterConfig()
-            .withProtocol(newNettyProtocol())
+            .withProtocol(protocol)
             .withElectionTimeout(electionTimeoutMillis(activeNodeUris))
             .withHeartbeatInterval(heartbeatTimeoutMillis(activeNodeUris))
             .withMembers(activeNodeUris)
@@ -232,6 +233,7 @@
                 .collect(Collectors.toList());
     }
 
+    @SuppressWarnings("unused")
     private Protocol newNettyProtocol() {
         return new NettyTcpProtocol()
             .withSsl(false)