DatabaseService that uses Copycat Raft to provide a strongly consistent and durable database.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java
new file mode 100644
index 0000000..a791990
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java
@@ -0,0 +1,148 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+
+import org.onlab.netty.Endpoint;
+import org.onlab.netty.NettyMessagingService;
+import org.slf4j.Logger;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * {@link NettyMessagingService} based Copycat protocol client.
+ */
+public class NettyProtocolClient implements ProtocolClient {
+
+ private final Logger log = getLogger(getClass());
+ private static final ThreadFactory THREAD_FACTORY =
+ new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
+
+ // Remote endpoint, this client instance is used
+ // for communicating with.
+ private final Endpoint remoteEp;
+ private final NettyMessagingService messagingService;
+
+ // TODO: Is 10 the right number of threads?
+ private static final ScheduledExecutorService THREAD_POOL =
+ new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
+
+ public NettyProtocolClient(NettyProtocol protocol, TcpMember member) {
+ this(new Endpoint(member.host(), member.port()), protocol.getServer().getNettyMessagingService());
+ }
+
+ public NettyProtocolClient(Endpoint remoteEp, NettyMessagingService messagingService) {
+ this.remoteEp = remoteEp;
+ this.messagingService = messagingService;
+ }
+
+ @Override
+ public CompletableFuture<PingResponse> ping(PingRequest request) {
+ return requestReply(request);
+ }
+
+ @Override
+ public CompletableFuture<SyncResponse> sync(SyncRequest request) {
+ return requestReply(request);
+ }
+
+ @Override
+ public CompletableFuture<PollResponse> poll(PollRequest request) {
+ return requestReply(request);
+ }
+
+ @Override
+ public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
+ return requestReply(request);
+ }
+
+ @Override
+ public CompletableFuture<Void> connect() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ public <I> String messageType(I input) {
+ Class<?> clazz = input.getClass();
+ if (clazz.equals(PollRequest.class)) {
+ return NettyProtocol.COPYCAT_POLL;
+ } else if (clazz.equals(SyncRequest.class)) {
+ return NettyProtocol.COPYCAT_SYNC;
+ } else if (clazz.equals(SubmitRequest.class)) {
+ return NettyProtocol.COPYCAT_SUBMIT;
+ } else if (clazz.equals(PingRequest.class)) {
+ return NettyProtocol.COPYCAT_PING;
+ } else {
+ throw new IllegalArgumentException("Unknown class " + clazz.getName());
+ }
+
+ }
+
+ private <I, O> CompletableFuture<O> requestReply(I request) {
+ CompletableFuture<O> future = new CompletableFuture<>();
+ THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
+ return future;
+ }
+
+ private class RPCTask<I, O> implements Runnable {
+
+ private final String messageType;
+ private final byte[] payload;
+
+ private final CompletableFuture<O> future;
+
+ public RPCTask(I request, CompletableFuture<O> future) {
+ this.messageType = messageType(request);
+ this.payload = NettyProtocol.SERIALIZER.encode(request);
+ this.future = future;
+ }
+
+ @Override
+ public void run() {
+ try {
+ byte[] response = messagingService
+ .sendAndReceive(remoteEp, messageType, payload)
+ .get(NettyProtocol.RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+ future.complete(NettyProtocol.SERIALIZER.decode(response));
+
+ } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
+ if (messageType.equals(NettyProtocol.COPYCAT_SYNC) ||
+ messageType.equals(NettyProtocol.COPYCAT_PING)) {
+ log.warn("Request to {} failed. Will retry "
+ + "in {} ms", remoteEp, NettyProtocol.RETRY_INTERVAL_MILLIS);
+ THREAD_POOL.schedule(
+ this,
+ NettyProtocol.RETRY_INTERVAL_MILLIS,
+ TimeUnit.MILLISECONDS);
+ } else {
+ future.completeExceptionally(e);
+ }
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ }
+ }
+}