Wait for Leader to appear

- DatabaseClient: wait for Leader before DB access
- DatabaseManager: wait for Leader before activate

Change-Id: I5102e7cae1d33f49662bf452b1fba020173a51a0
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index d512635..bd898b6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -1,14 +1,19 @@
 package org.onlab.onos.store.service.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import net.kuujo.copycat.Copycat;
+import net.kuujo.copycat.event.EventHandler;
+import net.kuujo.copycat.event.LeaderElectEvent;
 
 import org.onlab.onos.store.service.BatchReadRequest;
 import org.onlab.onos.store.service.BatchWriteRequest;
@@ -16,20 +21,54 @@
 import org.onlab.onos.store.service.ReadResult;
 import org.onlab.onos.store.service.VersionedValue;
 import org.onlab.onos.store.service.WriteResult;
+import org.slf4j.Logger;
 
 /**
  * Client for interacting with the Copycat Raft cluster.
  */
 public class DatabaseClient {
 
+    private final Logger log = getLogger(getClass());
+
     private final Copycat copycat;
 
     public DatabaseClient(Copycat copycat) {
         this.copycat = checkNotNull(copycat);
     }
 
-    public boolean createTable(String tableName) {
+    public void waitForLeader() {
+        if (copycat.leader() != null) {
+            return;
+        }
 
+        log.info("No leader in cluster, waiting for election.");
+        final CountDownLatch latch = new CountDownLatch(1);
+        final EventHandler<LeaderElectEvent> leaderLsnr = new EventHandler<LeaderElectEvent>() {
+
+            @Override
+            public void handle(LeaderElectEvent event) {
+                log.info("Leader chosen: {}", event);
+                latch.countDown();
+            }
+        };
+
+        copycat.event(LeaderElectEvent.class).registerHandler(leaderLsnr);
+        try {
+            while (copycat.leader() == null) {
+                latch.await(200, TimeUnit.MILLISECONDS);
+            }
+            log.info("Leader appeared: {}", copycat.leader());
+            return;
+        } catch (InterruptedException e) {
+            log.error("Interrupted while waiting for Leader", e);
+            Thread.currentThread().interrupt();
+        } finally {
+            copycat.event(LeaderElectEvent.class).unregisterHandler(leaderLsnr);
+        }
+    }
+
+    public boolean createTable(String tableName) {
+        waitForLeader();
         CompletableFuture<Boolean> future = copycat.submit("createTable", tableName);
         try {
             return future.get();
@@ -39,7 +78,7 @@
     }
 
     public boolean createTable(String tableName, int ttlMillis) {
-
+        waitForLeader();
         CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
         try {
             return future.get();
@@ -49,7 +88,7 @@
     }
 
     public void dropTable(String tableName) {
-
+        waitForLeader();
         CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
         try {
             future.get();
@@ -59,7 +98,7 @@
     }
 
     public void dropAllTables() {
-
+        waitForLeader();
         CompletableFuture<Void> future = copycat.submit("dropAllTables");
         try {
             future.get();
@@ -69,7 +108,7 @@
     }
 
     public Set<String> listTables() {
-
+        waitForLeader();
         CompletableFuture<Set<String>> future = copycat.submit("listTables");
         try {
             return future.get();
@@ -79,7 +118,7 @@
     }
 
     public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
-
+        waitForLeader();
         CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
         try {
             return future.get();
@@ -89,7 +128,7 @@
     }
 
     public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
-
+        waitForLeader();
         CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
         try {
             return future.get();
@@ -99,6 +138,7 @@
     }
 
     public Map<String, VersionedValue> getAll(String tableName) {
+        waitForLeader();
         CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName);
         try {
             return future.get();
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index d84df22..645de00 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -10,6 +10,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import net.kuujo.copycat.Copycat;
@@ -99,7 +100,7 @@
     private boolean autoAddMember = false;
 
     @Activate
-    public void activate() {
+    public void activate() throws InterruptedException, ExecutionException {
 
         // TODO: Not every node should be part of the consensus ring.
 
@@ -176,9 +177,10 @@
 
         copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
 
-        copycat.start();
+        copycat.start().get();
 
         client = new DatabaseClient(copycat);
+        client.waitForLeader();
 
         log.info("Started.");
     }