Copycat messaging new happens over the same cluster messaging used for all other ONOS p2p communication
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java
index 449fd71..11cc5ff 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java
@@ -26,7 +26,7 @@
 
     /**
      * Performs a write operation on the database.
-     * @param request
+     * @param request write request
      * @return write result.
      * @throws DatabaseException if there is failure in execution write.
      */
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java
index 0db5dfe..33b57d2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java
@@ -17,15 +17,15 @@
     }
 
     /**
-     * Database table name.
-     * @return
+     * Returns database table name.
+     * @return table name.
      */
     public String tableName() {
         return tableName;
     }
 
     /**
-     * Database table key.
+     * Returns database table key.
      * @return key.
      */
     public String key() {
@@ -33,7 +33,7 @@
     }
 
     /**
-     * value associated with the key.
+     * Returns value associated with the key.
      * @return non-null value if the table contains one, null otherwise.
      */
     public VersionedValue value() {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java
index ee1d0f0..d88d35e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java
@@ -12,8 +12,8 @@
 
     /**
      * Creates a new instance with the specified value and version.
-     * @param value
-     * @param version
+     * @param value value
+     * @param version version
      */
     public VersionedValue(byte[] value, long version) {
         this.value = value;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
new file mode 100644
index 0000000..2e7fe11
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
@@ -0,0 +1,196 @@
+package org.onlab.onos.store.service.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Vector;
+
+import net.kuujo.copycat.cluster.TcpClusterConfig;
+import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.internal.log.ConfigurationEntry;
+import net.kuujo.copycat.internal.log.CopycatEntry;
+import net.kuujo.copycat.internal.log.OperationEntry;
+import net.kuujo.copycat.internal.log.SnapshotEntry;
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
+import net.kuujo.copycat.protocol.Response.Status;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
+import net.kuujo.copycat.spi.protocol.Protocol;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+import net.kuujo.copycat.spi.protocol.ProtocolServer;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.ImmutableListSerializer;
+import org.onlab.onos.store.serializers.ImmutableMapSerializer;
+import org.onlab.onos.store.serializers.ImmutableSetSerializer;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.VersionedValue;
+import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.WriteResult;
+import org.onlab.util.KryoNamespace;
+import org.slf4j.Logger;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under
+ * the License.
+ */
+
+@Component(immediate = true)
+@Service
+public class ClusterMessagingProtocol implements Protocol<TcpMember> {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    ClusterCommunicationService clusterCommunicator;
+
+    public static final MessageSubject COPYCAT_PING =
+            new MessageSubject("copycat-raft-consensus-ping");
+    public static final MessageSubject COPYCAT_SYNC =
+            new MessageSubject("copycat-raft-consensus-sync");
+    public static final MessageSubject COPYCAT_POLL =
+            new MessageSubject("copycat-raft-consensus-poll");
+    public static final MessageSubject COPYCAT_SUBMIT =
+            new MessageSubject("copycat-raft-consensus-submit");
+
+    private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+            .register(PingRequest.class)
+            .register(PingResponse.class)
+            .register(PollRequest.class)
+            .register(PollResponse.class)
+            .register(SyncRequest.class)
+            .register(SyncResponse.class)
+            .register(SubmitRequest.class)
+            .register(SubmitResponse.class)
+            .register(Status.class)
+            .register(ConfigurationEntry.class)
+            .register(SnapshotEntry.class)
+            .register(CopycatEntry.class)
+            .register(OperationEntry.class)
+            .register(TcpClusterConfig.class)
+            .register(TcpMember.class)
+            .build();
+
+    private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
+            .register(ReadRequest.class)
+            .register(WriteRequest.class)
+            .register(InternalReadResult.class)
+            .register(InternalWriteResult.class)
+            .register(InternalReadResult.Status.class)
+            .register(WriteResult.class)
+            .register(ReadResult.class)
+            .register(InternalWriteResult.Status.class)
+            .register(VersionedValue.class)
+            .build();
+
+    public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
+            .register(Arrays.asList().getClass(), new CollectionSerializer() {
+                @Override
+                @SuppressWarnings("rawtypes")
+                protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
+                    return new ArrayList();
+                }
+            })
+            .register(ImmutableMap.class, new ImmutableMapSerializer())
+            .register(ImmutableList.class, new ImmutableListSerializer())
+            .register(ImmutableSet.class, new ImmutableSetSerializer())
+            .register(
+                    Vector.class,
+                    ArrayList.class,
+                    Arrays.asList().getClass(),
+                    HashMap.class,
+                    HashSet.class,
+                    LinkedList.class,
+                    byte[].class)
+            .build();
+
+    public static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(COPYCAT)
+                    .register(COMMON)
+                    .register(DATABASE)
+                    .build()
+                    .populate(1);
+        }
+    };
+
+    @Activate
+    public void activate() {
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped.");
+    }
+
+    @Override
+    public ProtocolServer createServer(TcpMember member) {
+        return new ClusterMessagingProtocolServer(clusterCommunicator);
+    }
+
+    @Override
+    public ProtocolClient createClient(TcpMember member) {
+        ControllerNode node = getControllerNode(member.host(), member.port());
+        checkNotNull(node, "A valid controller node is expected");
+        return new ClusterMessagingProtocolClient(
+                clusterCommunicator, node);
+    }
+
+    private ControllerNode getControllerNode(String host, int port) {
+        for (ControllerNode node : clusterService.getNodes()) {
+            if (node.ip().toString().equals(host) && node.tcpPort() == port) {
+                return node;
+            }
+        }
+        return null;
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
new file mode 100644
index 0000000..f638444
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -0,0 +1,165 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.slf4j.Logger;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ClusterMessagingProtocolClient implements ProtocolClient {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final ThreadFactory THREAD_FACTORY =
+            new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
+
+    public static final long RETRY_INTERVAL_MILLIS = 2000;
+
+    private final ClusterCommunicationService clusterCommunicator;
+    private final ControllerNode remoteNode;
+
+    // FIXME: Thread pool sizing.
+    private static final ScheduledExecutorService THREAD_POOL =
+            new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
+
+    public ClusterMessagingProtocolClient(
+            ClusterCommunicationService clusterCommunicator,
+            ControllerNode remoteNode) {
+        this.clusterCommunicator = clusterCommunicator;
+        this.remoteNode = remoteNode;
+    }
+
+    @Override
+    public CompletableFuture<PingResponse> ping(PingRequest request) {
+        return requestReply(request);
+    }
+
+    @Override
+    public CompletableFuture<SyncResponse> sync(SyncRequest request) {
+        return requestReply(request);
+    }
+
+    @Override
+    public CompletableFuture<PollResponse> poll(PollRequest request) {
+        return requestReply(request);
+    }
+
+    @Override
+    public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
+        return requestReply(request);
+    }
+
+    @Override
+    public CompletableFuture<Void> connect() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> close() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    public <I> MessageSubject messageType(I input) {
+        Class<?> clazz = input.getClass();
+        if (clazz.equals(PollRequest.class)) {
+            return ClusterMessagingProtocol.COPYCAT_POLL;
+        } else if (clazz.equals(SyncRequest.class)) {
+            return ClusterMessagingProtocol.COPYCAT_SYNC;
+        } else if (clazz.equals(SubmitRequest.class)) {
+            return ClusterMessagingProtocol.COPYCAT_SUBMIT;
+        } else if (clazz.equals(PingRequest.class)) {
+            return ClusterMessagingProtocol.COPYCAT_PING;
+        } else {
+            throw new IllegalArgumentException("Unknown class " + clazz.getName());
+        }
+
+    }
+
+    private <I, O> CompletableFuture<O> requestReply(I request) {
+        CompletableFuture<O> future = new CompletableFuture<>();
+        THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
+        return future;
+    }
+
+    private class RPCTask<I, O> implements Runnable {
+
+        private final ClusterMessage message;
+        private final CompletableFuture<O> future;
+
+        public RPCTask(I request, CompletableFuture<O> future) {
+            this.message =
+                    new ClusterMessage(
+                            null,
+                            messageType(request),
+                            ClusterMessagingProtocol.SERIALIZER.encode(request));
+            this.future = future;
+        }
+
+        @Override
+        public void run() {
+            try {
+                byte[] response = clusterCommunicator
+                    .sendAndReceive(message, remoteNode.id())
+                    .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+                future.complete(ClusterMessagingProtocol.SERIALIZER.decode(response));
+
+            } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
+                if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) ||
+                        message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) {
+                    log.warn("Request to {} failed. Will retry "
+                            + "in {} ms", remoteNode, RETRY_INTERVAL_MILLIS);
+                    THREAD_POOL.schedule(
+                            this,
+                            RETRY_INTERVAL_MILLIS,
+                            TimeUnit.MILLISECONDS);
+                } else {
+                    future.completeExceptionally(e);
+                }
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
new file mode 100644
index 0000000..0449b8a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -0,0 +1,110 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.concurrent.CompletableFuture;
+
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.RequestHandler;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.spi.protocol.ProtocolServer;
+
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.slf4j.Logger;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ClusterMessagingProtocolServer implements ProtocolServer {
+
+    private final Logger log = getLogger(getClass());
+    private RequestHandler handler;
+
+    public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
+
+        clusterCommunicator.addSubscriber(
+                ClusterMessagingProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
+        clusterCommunicator.addSubscriber(
+                ClusterMessagingProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
+        clusterCommunicator.addSubscriber(
+                ClusterMessagingProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
+        clusterCommunicator.addSubscriber(
+                ClusterMessagingProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
+    }
+
+    @Override
+    public void requestHandler(RequestHandler handler) {
+        this.handler = handler;
+    }
+
+    @Override
+    public CompletableFuture<Void> listen() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> close() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private class CopycatMessageHandler<T> implements ClusterMessageHandler {
+
+        @Override
+        public void handle(ClusterMessage message) {
+            T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
+            if (request.getClass().equals(PingRequest.class)) {
+                handler.ping((PingRequest) request).whenComplete((response, error) -> {
+                    try {
+                        message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
+                    } catch (Exception e) {
+                        log.error("Failed to respond to ping request", e);
+                    }
+                });
+            } else if (request.getClass().equals(PollRequest.class)) {
+                handler.poll((PollRequest) request).whenComplete((response, error) -> {
+                    try {
+                        message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
+                    } catch (Exception e) {
+                        log.error("Failed to respond to poll request", e);
+                    }
+                });
+            } else if (request.getClass().equals(SyncRequest.class)) {
+                handler.sync((SyncRequest) request).whenComplete((response, error) -> {
+                    try {
+                        message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
+                    } catch (Exception e) {
+                        log.error("Failed to respond to sync request", e);
+                    }
+                });
+            } else if (request.getClass().equals(SubmitRequest.class)) {
+                handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
+                    try {
+                        message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
+                    } catch (Exception e) {
+                        log.error("Failed to respond to submit request", e);
+                    }
+                });
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 3c92800..d07d1d3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -11,37 +11,22 @@
 import net.kuujo.copycat.protocol.SubmitResponse;
 import net.kuujo.copycat.spi.protocol.ProtocolClient;
 
-import org.apache.commons.lang3.RandomUtils;
-import org.onlab.netty.Endpoint;
-import org.onlab.netty.NettyMessagingService;
 import org.onlab.onos.store.service.DatabaseException;
 import org.onlab.onos.store.service.ReadRequest;
 import org.onlab.onos.store.service.WriteRequest;
 
 public class DatabaseClient {
 
-    private final Endpoint copycatEp;
-    ProtocolClient client;
-    NettyMessagingService messagingService;
+    private final ProtocolClient client;
 
-    public DatabaseClient(Endpoint copycatEp) {
-        this.copycatEp = copycatEp;
+    public DatabaseClient(ProtocolClient client) {
+        this.client = client;
     }
 
     private static String nextId() {
         return UUID.randomUUID().toString();
     }
 
-    public void activate() throws Exception {
-        messagingService = new NettyMessagingService(RandomUtils.nextInt(10000, 40000));
-        messagingService.activate();
-        client = new NettyProtocolClient(copycatEp, messagingService);
-    }
-
-    public void deactivate() throws Exception {
-        messagingService.deactivate();
-    }
-
     public boolean createTable(String tableName) {
 
         SubmitRequest request =
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 8042cc6..44d3041 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -18,7 +18,6 @@
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onlab.netty.Endpoint;
 import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.store.service.DatabaseAdminService;
@@ -50,6 +49,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     ClusterService clusterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    ClusterMessagingProtocol copycatMessagingProtocol;
+
     public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
 
     private Copycat copycat;
@@ -57,15 +59,14 @@
 
     @Activate
     public void activate() {
-        // FIXME hack tcpPort +1 for copycat communication
         TcpMember localMember =
                 new TcpMember(
                         clusterService.getLocalNode().ip().toString(),
-                        clusterService.getLocalNode().tcpPort() + 1);
+                        clusterService.getLocalNode().tcpPort());
         List<TcpMember> remoteMembers = Lists.newArrayList();
 
         for (ControllerNode node : clusterService.getNodes()) {
-            TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort() + 1);
+            TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
             if (!member.equals(localMember)) {
                 remoteMembers.add(member);
             }
@@ -84,10 +85,10 @@
         ControllerNode thisNode = clusterService.getLocalNode();
         Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
 
-        copycat = new Copycat(stateMachine, consensusLog, cluster, new NettyProtocol());
+        copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
         copycat.start();
 
-        client = new DatabaseClient(new Endpoint(localMember.host(), localMember.port()));
+        client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
 
         log.info("Started.");
     }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 663e9e4..c974486 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -27,7 +27,7 @@
             serializerPool = KryoNamespace.newBuilder()
                     .register(VersionedValue.class)
                     .register(State.class)
-                    .register(NettyProtocol.COMMON)
+                    .register(ClusterMessagingProtocol.COMMON)
                     .build()
                     .populate(1);
         }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java
deleted file mode 100644
index 9a2259a..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package org.onlab.onos.store.service.impl;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Vector;
-
-import net.kuujo.copycat.cluster.TcpClusterConfig;
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.internal.log.ConfigurationEntry;
-import net.kuujo.copycat.internal.log.CopycatEntry;
-import net.kuujo.copycat.internal.log.OperationEntry;
-import net.kuujo.copycat.internal.log.SnapshotEntry;
-import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PingResponse;
-import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.PollResponse;
-import net.kuujo.copycat.protocol.Response.Status;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
-import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.protocol.SyncResponse;
-import net.kuujo.copycat.spi.protocol.Protocol;
-import net.kuujo.copycat.spi.protocol.ProtocolClient;
-import net.kuujo.copycat.spi.protocol.ProtocolServer;
-
-import org.onlab.onos.store.serializers.ImmutableListSerializer;
-import org.onlab.onos.store.serializers.ImmutableMapSerializer;
-import org.onlab.onos.store.serializers.ImmutableSetSerializer;
-import org.onlab.onos.store.serializers.KryoSerializer;
-import org.onlab.onos.store.service.ReadRequest;
-import org.onlab.onos.store.service.ReadResult;
-import org.onlab.onos.store.service.VersionedValue;
-import org.onlab.onos.store.service.WriteRequest;
-import org.onlab.onos.store.service.WriteResult;
-import org.onlab.util.KryoNamespace;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-/**
- * {@link Protocol} based on {@link org.onlab.netty.NettyMessagingService}.
- */
-public class NettyProtocol implements Protocol<TcpMember> {
-
-    public static final String COPYCAT_PING = "copycat-raft-consensus-ping";
-    public static final String COPYCAT_SYNC = "copycat-raft-consensus-sync";
-    public static final String COPYCAT_POLL = "copycat-raft-consensus-poll";
-    public static final String COPYCAT_SUBMIT = "copycat-raft-consensus-submit";
-
-    // TODO: make this configurable.
-    public static final long RETRY_INTERVAL_MILLIS = 2000;
-
-    private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
-            .register(PingRequest.class)
-            .register(PingResponse.class)
-            .register(PollRequest.class)
-            .register(PollResponse.class)
-            .register(SyncRequest.class)
-            .register(SyncResponse.class)
-            .register(SubmitRequest.class)
-            .register(SubmitResponse.class)
-            .register(Status.class)
-            .register(ConfigurationEntry.class)
-            .register(SnapshotEntry.class)
-            .register(CopycatEntry.class)
-            .register(OperationEntry.class)
-            .register(TcpClusterConfig.class)
-            .register(TcpMember.class)
-            .build();
-
-    // TODO: Move to the right place.
-    private static final KryoNamespace CRAFT = KryoNamespace.newBuilder()
-            .register(ReadRequest.class)
-            .register(WriteRequest.class)
-            .register(InternalReadResult.class)
-            .register(InternalWriteResult.class)
-            .register(InternalReadResult.Status.class)
-            .register(WriteResult.class)
-            .register(ReadResult.class)
-            .register(InternalWriteResult.Status.class)
-            .register(VersionedValue.class)
-            .build();
-
-    public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
-            .register(Arrays.asList().getClass(), new CollectionSerializer() {
-                @Override
-                @SuppressWarnings("rawtypes")
-                protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
-                    return new ArrayList();
-                }
-            })
-            .register(ImmutableMap.class, new ImmutableMapSerializer())
-            .register(ImmutableList.class, new ImmutableListSerializer())
-            .register(ImmutableSet.class, new ImmutableSetSerializer())
-            .register(
-                    Vector.class,
-                    ArrayList.class,
-                    Arrays.asList().getClass(),
-                    HashMap.class,
-                    HashSet.class,
-                    LinkedList.class,
-                    byte[].class)
-            .build();
-
-    public static final KryoSerializer SERIALIZER = new KryoSerializer() {
-        @Override
-        protected void setupKryoPool() {
-            serializerPool = KryoNamespace.newBuilder()
-                    .register(COPYCAT)
-                    .register(COMMON)
-                    .register(CRAFT)
-                    .build()
-                    .populate(1);
-        }
-    };
-
-    private NettyProtocolServer server = null;
-
-    // FIXME: This is a total hack.Assumes
-    // ProtocolServer is initialized before ProtocolClient
-    protected NettyProtocolServer getServer() {
-        if (server == null) {
-            throw new IllegalStateException("ProtocolServer is not initialized yet!");
-        }
-        return server;
-    }
-
-    @Override
-    public ProtocolServer createServer(TcpMember member) {
-        server = new NettyProtocolServer(member);
-        return server;
-    }
-
-    @Override
-    public ProtocolClient createClient(TcpMember member) {
-        return new NettyProtocolClient(this, member);
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java
deleted file mode 100644
index a791990..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package org.onlab.onos.store.service.impl;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PingResponse;
-import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.PollResponse;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
-import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.protocol.SyncResponse;
-import net.kuujo.copycat.spi.protocol.ProtocolClient;
-
-import org.onlab.netty.Endpoint;
-import org.onlab.netty.NettyMessagingService;
-import org.slf4j.Logger;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * {@link NettyMessagingService} based Copycat protocol client.
- */
-public class NettyProtocolClient implements ProtocolClient {
-
-    private final Logger log = getLogger(getClass());
-    private static final ThreadFactory THREAD_FACTORY =
-            new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
-
-    // Remote endpoint, this client instance is used
-    // for communicating with.
-    private final Endpoint remoteEp;
-    private final NettyMessagingService messagingService;
-
-    // TODO: Is 10 the right number of threads?
-    private static final ScheduledExecutorService THREAD_POOL =
-            new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
-
-    public NettyProtocolClient(NettyProtocol protocol, TcpMember member) {
-        this(new Endpoint(member.host(), member.port()), protocol.getServer().getNettyMessagingService());
-    }
-
-    public NettyProtocolClient(Endpoint remoteEp, NettyMessagingService messagingService) {
-        this.remoteEp = remoteEp;
-        this.messagingService = messagingService;
-    }
-
-    @Override
-    public CompletableFuture<PingResponse> ping(PingRequest request) {
-        return requestReply(request);
-    }
-
-    @Override
-    public CompletableFuture<SyncResponse> sync(SyncRequest request) {
-        return requestReply(request);
-    }
-
-    @Override
-    public CompletableFuture<PollResponse> poll(PollRequest request) {
-        return requestReply(request);
-    }
-
-    @Override
-    public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
-        return requestReply(request);
-    }
-
-    @Override
-    public CompletableFuture<Void> connect() {
-        return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
-    public CompletableFuture<Void> close() {
-        return CompletableFuture.completedFuture(null);
-    }
-
-    public <I> String messageType(I input) {
-        Class<?> clazz = input.getClass();
-        if (clazz.equals(PollRequest.class)) {
-            return NettyProtocol.COPYCAT_POLL;
-        } else if (clazz.equals(SyncRequest.class)) {
-            return NettyProtocol.COPYCAT_SYNC;
-        } else if (clazz.equals(SubmitRequest.class)) {
-            return NettyProtocol.COPYCAT_SUBMIT;
-        } else if (clazz.equals(PingRequest.class)) {
-            return NettyProtocol.COPYCAT_PING;
-        } else {
-            throw new IllegalArgumentException("Unknown class " + clazz.getName());
-        }
-
-    }
-
-    private <I, O> CompletableFuture<O> requestReply(I request) {
-        CompletableFuture<O> future = new CompletableFuture<>();
-        THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
-        return future;
-    }
-
-    private class RPCTask<I, O> implements Runnable {
-
-        private final String messageType;
-        private final byte[] payload;
-
-        private final CompletableFuture<O> future;
-
-        public RPCTask(I request, CompletableFuture<O> future) {
-            this.messageType = messageType(request);
-            this.payload = NettyProtocol.SERIALIZER.encode(request);
-            this.future = future;
-        }
-
-        @Override
-        public void run() {
-            try {
-                byte[] response = messagingService
-                    .sendAndReceive(remoteEp, messageType, payload)
-                    .get(NettyProtocol.RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
-                future.complete(NettyProtocol.SERIALIZER.decode(response));
-
-            } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
-                if (messageType.equals(NettyProtocol.COPYCAT_SYNC) ||
-                        messageType.equals(NettyProtocol.COPYCAT_PING)) {
-                    log.warn("Request to {} failed. Will retry "
-                            + "in {} ms", remoteEp, NettyProtocol.RETRY_INTERVAL_MILLIS);
-                    THREAD_POOL.schedule(
-                            this,
-                            NettyProtocol.RETRY_INTERVAL_MILLIS,
-                            TimeUnit.MILLISECONDS);
-                } else {
-                    future.completeExceptionally(e);
-                }
-            } catch (Exception e) {
-                future.completeExceptionally(e);
-            }
-        }
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java
deleted file mode 100644
index d06999e..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.onlab.onos.store.service.impl;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.RequestHandler;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.spi.protocol.ProtocolServer;
-
-import org.onlab.netty.Message;
-import org.onlab.netty.MessageHandler;
-import org.onlab.netty.NettyMessagingService;
-import org.slf4j.Logger;
-
-/**
- * {@link NettyMessagingService} based Copycat protocol server.
- */
-public class NettyProtocolServer implements ProtocolServer {
-
-    private final Logger log = getLogger(getClass());
-
-    private final NettyMessagingService messagingService;
-    private RequestHandler handler;
-
-
-    public NettyProtocolServer(TcpMember member) {
-        messagingService = new NettyMessagingService(member.host(), member.port());
-
-        messagingService.registerHandler(NettyProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
-        messagingService.registerHandler(NettyProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
-        messagingService.registerHandler(NettyProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
-        messagingService.registerHandler(NettyProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
-    }
-
-    protected NettyMessagingService getNettyMessagingService() {
-        return messagingService;
-    }
-
-    @Override
-    public void requestHandler(RequestHandler handler) {
-        this.handler = handler;
-    }
-
-    @Override
-    public CompletableFuture<Void> listen() {
-        try {
-            messagingService.activate();
-            return CompletableFuture.completedFuture(null);
-        } catch (Exception e) {
-            CompletableFuture<Void> future = new CompletableFuture<>();
-            future.completeExceptionally(e);
-            return future;
-        }
-    }
-
-    @Override
-    public CompletableFuture<Void> close() {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        try {
-            messagingService.deactivate();
-            future.complete(null);
-            return future;
-        } catch (Exception e) {
-            future.completeExceptionally(e);
-            return future;
-        }
-    }
-
-    private class CopycatMessageHandler<T> implements MessageHandler {
-
-        @Override
-        public void handle(Message message) throws IOException {
-            T request = NettyProtocol.SERIALIZER.decode(message.payload());
-            if (request.getClass().equals(PingRequest.class)) {
-                handler.ping((PingRequest) request).whenComplete((response, error) -> {
-                    try {
-                        message.respond(NettyProtocol.SERIALIZER.encode(response));
-                    } catch (Exception e) {
-                        log.error("Failed to respond to ping request", e);
-                    }
-                });
-            } else if (request.getClass().equals(PollRequest.class)) {
-                handler.poll((PollRequest) request).whenComplete((response, error) -> {
-                    try {
-                        message.respond(NettyProtocol.SERIALIZER.encode(response));
-                    } catch (Exception e) {
-                        log.error("Failed to respond to poll request", e);
-                    }
-                });
-            } else if (request.getClass().equals(SyncRequest.class)) {
-                handler.sync((SyncRequest) request).whenComplete((response, error) -> {
-                    try {
-                        message.respond(NettyProtocol.SERIALIZER.encode(response));
-                    } catch (Exception e) {
-                        log.error("Failed to respond to sync request", e);
-                    }
-                });
-            } else if (request.getClass().equals(SubmitRequest.class)) {
-                handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
-                    try {
-                        message.respond(NettyProtocol.SERIALIZER.encode(response));
-                    } catch (Exception e) {
-                        log.error("Failed to respond to submit request", e);
-                    }
-                });
-            }
-        }
-    }
-}