Copycat messaging new happens over the same cluster messaging used for all other ONOS p2p communication
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
new file mode 100644
index 0000000..f638444
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -0,0 +1,165 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.slf4j.Logger;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ClusterMessagingProtocolClient implements ProtocolClient {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final ThreadFactory THREAD_FACTORY =
+            new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
+
+    public static final long RETRY_INTERVAL_MILLIS = 2000;
+
+    private final ClusterCommunicationService clusterCommunicator;
+    private final ControllerNode remoteNode;
+
+    // FIXME: Thread pool sizing.
+    private static final ScheduledExecutorService THREAD_POOL =
+            new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
+
+    public ClusterMessagingProtocolClient(
+            ClusterCommunicationService clusterCommunicator,
+            ControllerNode remoteNode) {
+        this.clusterCommunicator = clusterCommunicator;
+        this.remoteNode = remoteNode;
+    }
+
+    @Override
+    public CompletableFuture<PingResponse> ping(PingRequest request) {
+        return requestReply(request);
+    }
+
+    @Override
+    public CompletableFuture<SyncResponse> sync(SyncRequest request) {
+        return requestReply(request);
+    }
+
+    @Override
+    public CompletableFuture<PollResponse> poll(PollRequest request) {
+        return requestReply(request);
+    }
+
+    @Override
+    public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
+        return requestReply(request);
+    }
+
+    @Override
+    public CompletableFuture<Void> connect() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> close() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    public <I> MessageSubject messageType(I input) {
+        Class<?> clazz = input.getClass();
+        if (clazz.equals(PollRequest.class)) {
+            return ClusterMessagingProtocol.COPYCAT_POLL;
+        } else if (clazz.equals(SyncRequest.class)) {
+            return ClusterMessagingProtocol.COPYCAT_SYNC;
+        } else if (clazz.equals(SubmitRequest.class)) {
+            return ClusterMessagingProtocol.COPYCAT_SUBMIT;
+        } else if (clazz.equals(PingRequest.class)) {
+            return ClusterMessagingProtocol.COPYCAT_PING;
+        } else {
+            throw new IllegalArgumentException("Unknown class " + clazz.getName());
+        }
+
+    }
+
+    private <I, O> CompletableFuture<O> requestReply(I request) {
+        CompletableFuture<O> future = new CompletableFuture<>();
+        THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
+        return future;
+    }
+
+    private class RPCTask<I, O> implements Runnable {
+
+        private final ClusterMessage message;
+        private final CompletableFuture<O> future;
+
+        public RPCTask(I request, CompletableFuture<O> future) {
+            this.message =
+                    new ClusterMessage(
+                            null,
+                            messageType(request),
+                            ClusterMessagingProtocol.SERIALIZER.encode(request));
+            this.future = future;
+        }
+
+        @Override
+        public void run() {
+            try {
+                byte[] response = clusterCommunicator
+                    .sendAndReceive(message, remoteNode.id())
+                    .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+                future.complete(ClusterMessagingProtocol.SERIALIZER.decode(response));
+
+            } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
+                if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) ||
+                        message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) {
+                    log.warn("Request to {} failed. Will retry "
+                            + "in {} ms", remoteNode, RETRY_INTERVAL_MILLIS);
+                    THREAD_POOL.schedule(
+                            this,
+                            RETRY_INTERVAL_MILLIS,
+                            TimeUnit.MILLISECONDS);
+                } else {
+                    future.completeExceptionally(e);
+                }
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+            }
+        }
+    }
+}
\ No newline at end of file