DatabaseService that uses Copycat Raft to provide a strongly consistent and durable database.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
new file mode 100644
index 0000000..3c92800
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -0,0 +1,144 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import net.kuujo.copycat.protocol.Response.Status;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.onlab.netty.Endpoint;
+import org.onlab.netty.NettyMessagingService;
+import org.onlab.onos.store.service.DatabaseException;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.WriteRequest;
+
+public class DatabaseClient {
+
+    private final Endpoint copycatEp;
+    ProtocolClient client;
+    NettyMessagingService messagingService;
+
+    public DatabaseClient(Endpoint copycatEp) {
+        this.copycatEp = copycatEp;
+    }
+
+    private static String nextId() {
+        return UUID.randomUUID().toString();
+    }
+
+    public void activate() throws Exception {
+        messagingService = new NettyMessagingService(RandomUtils.nextInt(10000, 40000));
+        messagingService.activate();
+        client = new NettyProtocolClient(copycatEp, messagingService);
+    }
+
+    public void deactivate() throws Exception {
+        messagingService.deactivate();
+    }
+
+    public boolean createTable(String tableName) {
+
+        SubmitRequest request =
+                new SubmitRequest(
+                        nextId(),
+                        "createTable",
+                        Arrays.asList(tableName));
+        CompletableFuture<SubmitResponse> future = client.submit(request);
+        try {
+            return (boolean) future.get().result();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new DatabaseException(e);
+        }
+    }
+
+    public void dropTable(String tableName) {
+
+        SubmitRequest request =
+                new SubmitRequest(
+                        nextId(),
+                        "dropTable",
+                        Arrays.asList(tableName));
+        CompletableFuture<SubmitResponse> future = client.submit(request);
+        try {
+            if (future.get().status() == Status.OK) {
+                throw new DatabaseException(future.get().toString());
+            }
+
+        } catch (InterruptedException | ExecutionException e) {
+            throw new DatabaseException(e);
+        }
+    }
+
+    public void dropAllTables() {
+
+        SubmitRequest request =
+                new SubmitRequest(
+                        nextId(),
+                        "dropAllTables",
+                        Arrays.asList());
+        CompletableFuture<SubmitResponse> future = client.submit(request);
+        try {
+            if (future.get().status() != Status.OK) {
+                throw new DatabaseException(future.get().toString());
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            throw new DatabaseException(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<String> listTables() {
+
+        SubmitRequest request =
+                new SubmitRequest(
+                        nextId(),
+                        "listTables",
+                        Arrays.asList());
+        CompletableFuture<SubmitResponse> future = client.submit(request);
+        try {
+            return (List<String>) future.get().result();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new DatabaseException(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<InternalReadResult> batchRead(List<ReadRequest> requests) {
+
+        SubmitRequest request = new SubmitRequest(
+                        nextId(),
+                        "read",
+                        Arrays.asList(requests));
+
+        CompletableFuture<SubmitResponse> future = client.submit(request);
+        try {
+            List<InternalReadResult> internalReadResults = (List<InternalReadResult>) future.get().result();
+            return internalReadResults;
+        } catch (InterruptedException | ExecutionException e) {
+            throw new DatabaseException(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) {
+
+        SubmitRequest request = new SubmitRequest(
+                        nextId(),
+                        "write",
+                        Arrays.asList(requests));
+
+        CompletableFuture<SubmitResponse> future = client.submit(request);
+        try {
+            List<InternalWriteResult> internalWriteResults = (List<InternalWriteResult>) future.get().result();
+            return internalWriteResults;
+        } catch (InterruptedException | ExecutionException e) {
+            throw new DatabaseException(e);
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
new file mode 100644
index 0000000..00ce12d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -0,0 +1,210 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import net.kuujo.copycat.Copycat;
+import net.kuujo.copycat.StateMachine;
+import net.kuujo.copycat.cluster.TcpCluster;
+import net.kuujo.copycat.cluster.TcpClusterConfig;
+import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.log.ChronicleLog;
+import net.kuujo.copycat.log.Log;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.netty.Endpoint;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.store.service.DatabaseAdminService;
+import org.onlab.onos.store.service.DatabaseException;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.NoSuchTableException;
+import org.onlab.onos.store.service.OptimisticLockException;
+import org.onlab.onos.store.service.OptionalResult;
+import org.onlab.onos.store.service.PreconditionFailedException;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.WriteAborted;
+import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.WriteResult;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Strongly consistent and durable state management service based on
+ * Copycat implementation of Raft consensus protocol.
+ */
+@Component(immediate = true)
+@Service
+public class DatabaseManager implements DatabaseService, DatabaseAdminService {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    ClusterService clusterService;
+
+    public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
+
+    private Copycat copycat;
+    private DatabaseClient client;
+
+    @Activate
+    public void activate() {
+        TcpMember localMember =
+                new TcpMember(
+                        clusterService.getLocalNode().ip().toString(),
+                        clusterService.getLocalNode().tcpPort());
+        List<TcpMember> remoteMembers = Lists.newArrayList();
+
+        for (ControllerNode node : clusterService.getNodes()) {
+            TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
+            if (!member.equals(localMember)) {
+                remoteMembers.add(member);
+            }
+        }
+
+        // Configure the cluster.
+        TcpClusterConfig config = new TcpClusterConfig();
+
+        config.setLocalMember(localMember);
+        config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
+
+        // Create the cluster.
+        TcpCluster cluster = new TcpCluster(config);
+
+        StateMachine stateMachine = new DatabaseStateMachine();
+        ControllerNode thisNode = clusterService.getLocalNode();
+        Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
+
+        copycat = new Copycat(stateMachine, consensusLog, cluster, new NettyProtocol());
+        copycat.start();
+
+        client = new DatabaseClient(new Endpoint(localMember.host(), localMember.port()));
+
+        log.info("Started.");
+    }
+
+    @Activate
+    public void deactivate() {
+        copycat.stop();
+    }
+
+    @Override
+    public boolean createTable(String name) {
+        return client.createTable(name);
+    }
+
+    @Override
+    public void dropTable(String name) {
+        client.dropTable(name);
+    }
+
+    @Override
+    public void dropAllTables() {
+        client.dropAllTables();
+    }
+
+    @Override
+    public List<String> listTables() {
+        return client.listTables();
+    }
+
+    @Override
+    public ReadResult read(ReadRequest request) {
+        return batchRead(Arrays.asList(request)).get(0).get();
+    }
+
+    @Override
+    public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
+            List<ReadRequest> batch) {
+        List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
+        for (InternalReadResult internalReadResult : client.batchRead(batch)) {
+            if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
+                readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
+                        new NoSuchTableException()));
+            } else {
+                readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
+                        internalReadResult.result()));
+            }
+        }
+        return readResults;
+    }
+
+    @Override
+    public WriteResult write(WriteRequest request) {
+        return batchWrite(Arrays.asList(request)).get(0).get();
+    }
+
+    @Override
+    public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
+            List<WriteRequest> batch) {
+        List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
+        for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
+            if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
+                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
+                        new NoSuchTableException()));
+            } else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
+                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
+                        new OptimisticLockException()));
+            } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
+                // TODO: throw a different exception?
+                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
+                        new PreconditionFailedException()));
+            } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
+                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
+                        new WriteAborted()));
+            } else {
+                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
+                        internalWriteResult.result()));
+            }
+        }
+        return writeResults;
+
+    }
+
+    private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
+
+        private final R result;
+        private final DatabaseException exception;
+
+        public DatabaseOperationResult(R result) {
+            this.result = result;
+            this.exception = null;
+        }
+
+        public DatabaseOperationResult(DatabaseException exception) {
+            this.result = null;
+            this.exception = exception;
+        }
+
+        @Override
+        public R get() {
+            if (result != null) {
+                return result;
+            }
+            throw exception;
+        }
+
+        @Override
+        public boolean hasValidResult() {
+            return result != null;
+        }
+
+        @Override
+        public String toString() {
+            if (result != null) {
+                return result.toString();
+            } else {
+                return exception.toString();
+            }
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
new file mode 100644
index 0000000..cbca729
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -0,0 +1,169 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.kuujo.copycat.Command;
+import net.kuujo.copycat.Query;
+import net.kuujo.copycat.StateMachine;
+
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.WriteResult;
+import org.onlab.util.KryoNamespace;
+
+import com.google.common.collect.Maps;
+
+public class DatabaseStateMachine implements StateMachine {
+
+    public static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(VersionedValue.class)
+                    .register(State.class)
+                    .register(NettyProtocol.COMMON)
+                    .build()
+                    .populate(1);
+        }
+    };
+
+    private State state = new State();
+
+    @Command
+    public boolean createTable(String tableName) {
+        return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
+    }
+
+    @Command
+    public boolean dropTable(String tableName) {
+        return state.getTables().remove(tableName) != null;
+    }
+
+    @Command
+    public boolean dropAllTables() {
+        state.getTables().clear();
+        return true;
+    }
+
+    @Query
+    public Set<String> listTables() {
+        return state.getTables().keySet();
+    }
+
+    @Query
+    public List<InternalReadResult> read(List<ReadRequest> requests) {
+        List<InternalReadResult> results = new ArrayList<>(requests.size());
+        for (ReadRequest request : requests) {
+            Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+            if (table == null) {
+                results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
+                continue;
+            }
+            VersionedValue value = table.get(request.key());
+            results.add(new InternalReadResult(
+                    InternalReadResult.Status.OK,
+                    new ReadResult(
+                            request.tableName(),
+                            request.key(),
+                            value)));
+        }
+        return results;
+    }
+
+    @Command
+    public List<InternalWriteResult> write(List<WriteRequest> requests) {
+        boolean abort = false;
+        List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
+        for (WriteRequest request : requests) {
+            Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+            if (table == null) {
+                validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
+                abort = true;
+                continue;
+            }
+            VersionedValue value = table.get(request.key());
+            if (value == null) {
+                if (request.oldValue() != null) {
+                    validationResults.add(InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH);
+                    abort = true;
+                    continue;
+                } else if (request.previousVersion() >= 0) {
+                    validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
+                    abort = true;
+                    continue;
+                }
+            }
+            if (request.previousVersion() >= 0 && value.version() != request.previousVersion()) {
+                validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
+                abort = true;
+                continue;
+            }
+
+            validationResults.add(InternalWriteResult.Status.OK);
+        }
+
+        List<InternalWriteResult> results = new ArrayList<>(requests.size());
+
+        if (abort) {
+            for (InternalWriteResult.Status validationResult : validationResults) {
+                if (validationResult == InternalWriteResult.Status.OK) {
+                    results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
+                } else {
+                    results.add(new InternalWriteResult(validationResult, null));
+                }
+            }
+            return results;
+        }
+
+        for (WriteRequest request : requests) {
+            Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+            synchronized (table) {
+                VersionedValue previousValue =
+                        table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
+                results.add(new InternalWriteResult(
+                        InternalWriteResult.Status.OK,
+                        new WriteResult(request.tableName(), request.key(), previousValue)));
+            }
+        }
+        return results;
+    }
+
+    public class State {
+
+        private final Map<String, Map<String, VersionedValue>> tables =
+                Maps.newHashMap();
+        private long versionCounter = 1;
+
+        Map<String, Map<String, VersionedValue>> getTables() {
+            return tables;
+        }
+
+        long nextVersion() {
+            return versionCounter++;
+        }
+    }
+
+    @Override
+    public byte[] takeSnapshot() {
+        try {
+            return SERIALIZER.encode(state);
+        } catch (Exception e) {
+            e.printStackTrace();
+            return null;
+        }
+    }
+
+    @Override
+    public void installSnapshot(byte[] data) {
+        try {
+            this.state = SERIALIZER.decode(data);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java
new file mode 100644
index 0000000..f6fcf51
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java
@@ -0,0 +1,33 @@
+package org.onlab.onos.store.service.impl;
+
+import org.onlab.onos.store.service.ReadResult;
+
+public class InternalReadResult {
+
+    public enum Status {
+        OK,
+        NO_SUCH_TABLE
+    }
+
+    private final Status status;
+    private final ReadResult result;
+
+    public InternalReadResult(Status status, ReadResult result) {
+        this.status = status;
+        this.result = result;
+    }
+
+    public Status status() {
+        return status;
+    }
+
+    public ReadResult result() {
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "InternalReadResult [status=" + status + ", result=" + result
+                + "]";
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java
new file mode 100644
index 0000000..e6dbb1f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java
@@ -0,0 +1,30 @@
+package org.onlab.onos.store.service.impl;
+
+import org.onlab.onos.store.service.WriteResult;
+
+public class InternalWriteResult {
+
+    public enum Status {
+        OK,
+        ABORTED,
+        NO_SUCH_TABLE,
+        OPTIMISTIC_LOCK_FAILURE,
+        PREVIOUS_VALUE_MISMATCH
+    }
+
+    private final Status status;
+    private final WriteResult result;
+
+    public InternalWriteResult(Status status, WriteResult result) {
+        this.status = status;
+        this.result = result;
+    }
+
+    public Status status() {
+        return status;
+    }
+
+    public WriteResult result() {
+        return result;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java
new file mode 100644
index 0000000..9855ec6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java
@@ -0,0 +1,145 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Vector;
+
+import net.kuujo.copycat.cluster.TcpClusterConfig;
+import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.internal.log.ConfigurationEntry;
+import net.kuujo.copycat.internal.log.CopycatEntry;
+import net.kuujo.copycat.internal.log.OperationEntry;
+import net.kuujo.copycat.internal.log.SnapshotEntry;
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
+import net.kuujo.copycat.protocol.Response.Status;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
+import net.kuujo.copycat.spi.protocol.Protocol;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+import net.kuujo.copycat.spi.protocol.ProtocolServer;
+
+import org.onlab.onos.store.serializers.ImmutableListSerializer;
+import org.onlab.onos.store.serializers.ImmutableMapSerializer;
+import org.onlab.onos.store.serializers.ImmutableSetSerializer;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.WriteResult;
+import org.onlab.util.KryoNamespace;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * {@link Protocol} based on {@link org.onlab.netty.NettyMessagingService}.
+ */
+public class NettyProtocol implements Protocol<TcpMember> {
+
+    public static final String COPYCAT_PING = "copycat-raft-consensus-ping";
+    public static final String COPYCAT_SYNC = "copycat-raft-consensus-sync";
+    public static final String COPYCAT_POLL = "copycat-raft-consensus-poll";
+    public static final String COPYCAT_SUBMIT = "copycat-raft-consensus-submit";
+
+    // TODO: make this configurable.
+    public static final long RETRY_INTERVAL_MILLIS = 2000;
+
+    private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+            .register(PingRequest.class)
+            .register(PingResponse.class)
+            .register(PollRequest.class)
+            .register(PollResponse.class)
+            .register(SyncRequest.class)
+            .register(SyncResponse.class)
+            .register(SubmitRequest.class)
+            .register(SubmitResponse.class)
+            .register(Status.class)
+            .register(ConfigurationEntry.class)
+            .register(SnapshotEntry.class)
+            .register(CopycatEntry.class)
+            .register(OperationEntry.class)
+            .register(TcpClusterConfig.class)
+            .register(TcpMember.class)
+            .build();
+
+    // TODO: Move to the right place.
+    private static final KryoNamespace CRAFT = KryoNamespace.newBuilder()
+            .register(ReadRequest.class)
+            .register(WriteRequest.class)
+            .register(InternalReadResult.class)
+            .register(InternalWriteResult.class)
+            .register(InternalReadResult.Status.class)
+            .register(WriteResult.class)
+            .register(ReadResult.class)
+            .register(InternalWriteResult.Status.class)
+            .register(VersionedValue.class)
+            .build();
+
+    public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
+            .register(Arrays.asList().getClass(), new CollectionSerializer() {
+                @Override
+                @SuppressWarnings("rawtypes")
+                protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
+                    return new ArrayList();
+                }
+            })
+            .register(ImmutableMap.class, new ImmutableMapSerializer())
+            .register(ImmutableList.class, new ImmutableListSerializer())
+            .register(ImmutableSet.class, new ImmutableSetSerializer())
+            .register(
+                    Vector.class,
+                    ArrayList.class,
+                    Arrays.asList().getClass(),
+                    HashMap.class,
+                    HashSet.class,
+                    LinkedList.class,
+                    byte[].class)
+            .build();
+
+    public static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(COPYCAT)
+                    .register(COMMON)
+                    .register(CRAFT)
+                    .build()
+                    .populate(1);
+        }
+    };
+
+    private NettyProtocolServer server = null;
+
+    // FIXME: This is a total hack.Assumes
+    // ProtocolServer is initialized before ProtocolClient
+    protected NettyProtocolServer getServer() {
+        if (server == null) {
+            throw new IllegalStateException("ProtocolServer is not initialized yet!");
+        }
+        return server;
+    }
+
+    @Override
+    public ProtocolServer createServer(TcpMember member) {
+        server = new NettyProtocolServer(member);
+        return server;
+    }
+
+    @Override
+    public ProtocolClient createClient(TcpMember member) {
+        return new NettyProtocolClient(this, member);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java
new file mode 100644
index 0000000..a791990
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java
@@ -0,0 +1,148 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+
+import org.onlab.netty.Endpoint;
+import org.onlab.netty.NettyMessagingService;
+import org.slf4j.Logger;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * {@link NettyMessagingService} based Copycat protocol client.
+ */
+public class NettyProtocolClient implements ProtocolClient {
+
+    private final Logger log = getLogger(getClass());
+    private static final ThreadFactory THREAD_FACTORY =
+            new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
+
+    // Remote endpoint, this client instance is used
+    // for communicating with.
+    private final Endpoint remoteEp;
+    private final NettyMessagingService messagingService;
+
+    // TODO: Is 10 the right number of threads?
+    private static final ScheduledExecutorService THREAD_POOL =
+            new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
+
+    public NettyProtocolClient(NettyProtocol protocol, TcpMember member) {
+        this(new Endpoint(member.host(), member.port()), protocol.getServer().getNettyMessagingService());
+    }
+
+    public NettyProtocolClient(Endpoint remoteEp, NettyMessagingService messagingService) {
+        this.remoteEp = remoteEp;
+        this.messagingService = messagingService;
+    }
+
+    @Override
+    public CompletableFuture<PingResponse> ping(PingRequest request) {
+        return requestReply(request);
+    }
+
+    @Override
+    public CompletableFuture<SyncResponse> sync(SyncRequest request) {
+        return requestReply(request);
+    }
+
+    @Override
+    public CompletableFuture<PollResponse> poll(PollRequest request) {
+        return requestReply(request);
+    }
+
+    @Override
+    public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
+        return requestReply(request);
+    }
+
+    @Override
+    public CompletableFuture<Void> connect() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> close() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    public <I> String messageType(I input) {
+        Class<?> clazz = input.getClass();
+        if (clazz.equals(PollRequest.class)) {
+            return NettyProtocol.COPYCAT_POLL;
+        } else if (clazz.equals(SyncRequest.class)) {
+            return NettyProtocol.COPYCAT_SYNC;
+        } else if (clazz.equals(SubmitRequest.class)) {
+            return NettyProtocol.COPYCAT_SUBMIT;
+        } else if (clazz.equals(PingRequest.class)) {
+            return NettyProtocol.COPYCAT_PING;
+        } else {
+            throw new IllegalArgumentException("Unknown class " + clazz.getName());
+        }
+
+    }
+
+    private <I, O> CompletableFuture<O> requestReply(I request) {
+        CompletableFuture<O> future = new CompletableFuture<>();
+        THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
+        return future;
+    }
+
+    private class RPCTask<I, O> implements Runnable {
+
+        private final String messageType;
+        private final byte[] payload;
+
+        private final CompletableFuture<O> future;
+
+        public RPCTask(I request, CompletableFuture<O> future) {
+            this.messageType = messageType(request);
+            this.payload = NettyProtocol.SERIALIZER.encode(request);
+            this.future = future;
+        }
+
+        @Override
+        public void run() {
+            try {
+                byte[] response = messagingService
+                    .sendAndReceive(remoteEp, messageType, payload)
+                    .get(NettyProtocol.RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+                future.complete(NettyProtocol.SERIALIZER.decode(response));
+
+            } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
+                if (messageType.equals(NettyProtocol.COPYCAT_SYNC) ||
+                        messageType.equals(NettyProtocol.COPYCAT_PING)) {
+                    log.warn("Request to {} failed. Will retry "
+                            + "in {} ms", remoteEp, NettyProtocol.RETRY_INTERVAL_MILLIS);
+                    THREAD_POOL.schedule(
+                            this,
+                            NettyProtocol.RETRY_INTERVAL_MILLIS,
+                            TimeUnit.MILLISECONDS);
+                } else {
+                    future.completeExceptionally(e);
+                }
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+            }
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java
new file mode 100644
index 0000000..d06999e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java
@@ -0,0 +1,115 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.RequestHandler;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.spi.protocol.ProtocolServer;
+
+import org.onlab.netty.Message;
+import org.onlab.netty.MessageHandler;
+import org.onlab.netty.NettyMessagingService;
+import org.slf4j.Logger;
+
+/**
+ * {@link NettyMessagingService} based Copycat protocol server.
+ */
+public class NettyProtocolServer implements ProtocolServer {
+
+    private final Logger log = getLogger(getClass());
+
+    private final NettyMessagingService messagingService;
+    private RequestHandler handler;
+
+
+    public NettyProtocolServer(TcpMember member) {
+        messagingService = new NettyMessagingService(member.host(), member.port());
+
+        messagingService.registerHandler(NettyProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
+        messagingService.registerHandler(NettyProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
+        messagingService.registerHandler(NettyProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
+        messagingService.registerHandler(NettyProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
+    }
+
+    protected NettyMessagingService getNettyMessagingService() {
+        return messagingService;
+    }
+
+    @Override
+    public void requestHandler(RequestHandler handler) {
+        this.handler = handler;
+    }
+
+    @Override
+    public CompletableFuture<Void> listen() {
+        try {
+            messagingService.activate();
+            return CompletableFuture.completedFuture(null);
+        } catch (Exception e) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> close() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
+            messagingService.deactivate();
+            future.complete(null);
+            return future;
+        } catch (Exception e) {
+            future.completeExceptionally(e);
+            return future;
+        }
+    }
+
+    private class CopycatMessageHandler<T> implements MessageHandler {
+
+        @Override
+        public void handle(Message message) throws IOException {
+            T request = NettyProtocol.SERIALIZER.decode(message.payload());
+            if (request.getClass().equals(PingRequest.class)) {
+                handler.ping((PingRequest) request).whenComplete((response, error) -> {
+                    try {
+                        message.respond(NettyProtocol.SERIALIZER.encode(response));
+                    } catch (Exception e) {
+                        log.error("Failed to respond to ping request", e);
+                    }
+                });
+            } else if (request.getClass().equals(PollRequest.class)) {
+                handler.poll((PollRequest) request).whenComplete((response, error) -> {
+                    try {
+                        message.respond(NettyProtocol.SERIALIZER.encode(response));
+                    } catch (Exception e) {
+                        log.error("Failed to respond to poll request", e);
+                    }
+                });
+            } else if (request.getClass().equals(SyncRequest.class)) {
+                handler.sync((SyncRequest) request).whenComplete((response, error) -> {
+                    try {
+                        message.respond(NettyProtocol.SERIALIZER.encode(response));
+                    } catch (Exception e) {
+                        log.error("Failed to respond to sync request", e);
+                    }
+                });
+            } else if (request.getClass().equals(SubmitRequest.class)) {
+                handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
+                    try {
+                        message.respond(NettyProtocol.SERIALIZER.encode(response));
+                    } catch (Exception e) {
+                        log.error("Failed to respond to submit request", e);
+                    }
+                });
+            }
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/VersionedValue.java
new file mode 100644
index 0000000..31bdcc2
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/VersionedValue.java
@@ -0,0 +1,44 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.Arrays;
+
+/**
+ * Wrapper object that holds the object (as byte array) and its version.
+ */
+public class VersionedValue {
+
+    private final byte[] value;
+    private final long version;
+
+    /**
+     * Creates a new instance with the specified value and version.
+     * @param value
+     * @param version
+     */
+    public VersionedValue(byte[] value, long version) {
+        this.value = value;
+        this.version = version;
+    }
+
+    /**
+     * Returns the value.
+     * @return value.
+     */
+    public byte[] value() {
+        return value;
+    }
+
+    /**
+     * Returns the version.
+     * @return version.
+     */
+    public long version() {
+        return version;
+    }
+
+    @Override
+    public String toString() {
+        return "VersionedValue [value=" + Arrays.toString(value) + ", version="
+                + version + "]";
+    }
+}