1. Fixed a synchronization issue with database update processing and expiry tracking.
2. Fixed a synchronization issue with MapDBLog appendEntries method.
3. DatabaseClient now uses ProtocolClient to interact with Raft cluster.
4. Misc javdoc and logging improvements

Change-Id: I147eb5bf859cf9827df452d62ab415d643a00aa4
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 72e4b95..876f6cc 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -1,21 +1,24 @@
 package org.onlab.onos.store.service.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import net.kuujo.copycat.Copycat;
+import net.kuujo.copycat.cluster.Member;
+import net.kuujo.copycat.cluster.TcpMember;
 import net.kuujo.copycat.event.EventHandler;
 import net.kuujo.copycat.event.LeaderElectEvent;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
 
 import org.onlab.onos.store.service.BatchReadRequest;
 import org.onlab.onos.store.service.BatchWriteRequest;
@@ -28,7 +31,7 @@
 /**
  * Client for interacting with the Copycat Raft cluster.
  */
-public class DatabaseClient {
+public class DatabaseClient implements EventHandler<LeaderElectEvent> {
 
     private static final int RETRIES = 5;
 
@@ -36,137 +39,101 @@
 
     private final Logger log = getLogger(getClass());
 
-    private final Copycat copycat;
+    private final DatabaseProtocolService protocol;
+    private volatile ProtocolClient copycat = null;
+    private volatile Member currentLeader = null;
 
-    public DatabaseClient(Copycat copycat) {
-        this.copycat = checkNotNull(copycat);
+    public DatabaseClient(DatabaseProtocolService protocol) {
+        this.protocol = protocol;
+    }
+
+    @Override
+    public void handle(LeaderElectEvent event) {
+        Member newLeader = event.leader();
+        if (newLeader != null && !newLeader.equals(currentLeader)) {
+            currentLeader = newLeader;
+            if (copycat != null) {
+                copycat.close();
+            }
+            copycat = protocol.createClient((TcpMember) currentLeader);
+            copycat.connect();
+        }
+    }
+
+    private String nextRequestId() {
+        return UUID.randomUUID().toString();
     }
 
     public void waitForLeader() {
-        if (copycat.leader() != null) {
+        if (currentLeader != null) {
             return;
         }
 
         log.info("No leader in cluster, waiting for election.");
-        final CountDownLatch latch = new CountDownLatch(1);
-        final EventHandler<LeaderElectEvent> leaderLsnr = new EventHandler<LeaderElectEvent>() {
 
-            @Override
-            public void handle(LeaderElectEvent event) {
-                log.info("Leader chosen: {}", event);
-                latch.countDown();
-            }
-        };
-
-        copycat.event(LeaderElectEvent.class).registerHandler(leaderLsnr);
         try {
-            while (copycat.leader() == null) {
-                latch.await(200, TimeUnit.MILLISECONDS);
+            while (currentLeader == null) {
+                Thread.sleep(200);
             }
-            log.info("Leader appeared: {}", copycat.leader());
+            log.info("Leader appeared: {}", currentLeader);
             return;
         } catch (InterruptedException e) {
             log.error("Interrupted while waiting for Leader", e);
             Thread.currentThread().interrupt();
-        } finally {
-            copycat.event(LeaderElectEvent.class).unregisterHandler(leaderLsnr);
+        }
+    }
+
+    private <T> T submit(String operationName, Object... args) {
+        waitForLeader();
+        if (currentLeader == null) {
+            throw new DatabaseException("Raft cluster does not have a leader.");
+        }
+
+        SubmitRequest request =
+                new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
+
+        CompletableFuture<SubmitResponse> submitResponse = copycat.submit(request);
+
+        log.debug("Sent {} to {}", request, currentLeader);
+
+        try {
+            return (T) submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS).result();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new DatabaseException(e);
+        } catch (TimeoutException e) {
+            throw new DatabaseException.Timeout(e);
         }
     }
 
     public boolean createTable(String tableName) {
-        waitForLeader();
-        CompletableFuture<Boolean> future = copycat.submit("createTable", tableName);
-        try {
-            return future.get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        }
+        return submit("createTable", tableName);
     }
 
     public boolean createTable(String tableName, int ttlMillis) {
-        waitForLeader();
-        CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
-        try {
-            return future.get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        }
+        return submit("createTable", tableName, ttlMillis);
     }
 
     public void dropTable(String tableName) {
-        waitForLeader();
-        CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
-        try {
-            future.get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        }
+        submit("dropTable", tableName);
     }
 
     public void dropAllTables() {
-        waitForLeader();
-        CompletableFuture<Void> future = copycat.submit("dropAllTables");
-        try {
-            future.get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        }
+        submit("dropAllTables");
     }
 
     public Set<String> listTables() {
-        waitForLeader();
-        try {
-            for (int i = 0; i < RETRIES; ++i) {
-                CompletableFuture<Set<String>> future = copycat.submit("listTables");
-                try {
-                    return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
-                } catch (TimeoutException e) {
-                    log.debug("Timed out retrying {}", i);
-                    future.cancel(true);
-                    waitForLeader();
-                }
-            }
-            // TODO: proper timeout handling
-            log.error("Timed out");
-            return Collections.emptySet();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        }
+        return submit("listTables");
     }
 
     public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
-        waitForLeader();
-        CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
-        try {
-            return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        } catch (TimeoutException e) {
-            throw new DatabaseException(e);
-        }
+        return submit("read", batchRequest);
     }
 
     public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
-        waitForLeader();
-        CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
-        try {
-            return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        } catch (TimeoutException e) {
-            throw new DatabaseException(e);
-        }
+        return submit("write", batchRequest);
     }
 
     public Map<String, VersionedValue> getAll(String tableName) {
-        waitForLeader();
-        CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName);
-        try {
-            return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        } catch (TimeoutException e) {
-            throw new DatabaseException(e);
-        }
+        return submit("getAll", tableName);
     }
 }