Wait for Leader to appear
- DatabaseClient: wait for Leader before DB access
- DatabaseManager: wait for Leader before activate
Change-Id: I5102e7cae1d33f49662bf452b1fba020173a51a0
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index d512635..bd898b6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -1,14 +1,19 @@
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.Copycat;
+import net.kuujo.copycat.event.EventHandler;
+import net.kuujo.copycat.event.LeaderElectEvent;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchWriteRequest;
@@ -16,20 +21,54 @@
import org.onlab.onos.store.service.ReadResult;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteResult;
+import org.slf4j.Logger;
/**
* Client for interacting with the Copycat Raft cluster.
*/
public class DatabaseClient {
+ private final Logger log = getLogger(getClass());
+
private final Copycat copycat;
public DatabaseClient(Copycat copycat) {
this.copycat = checkNotNull(copycat);
}
- public boolean createTable(String tableName) {
+ public void waitForLeader() {
+ if (copycat.leader() != null) {
+ return;
+ }
+ log.info("No leader in cluster, waiting for election.");
+ final CountDownLatch latch = new CountDownLatch(1);
+ final EventHandler<LeaderElectEvent> leaderLsnr = new EventHandler<LeaderElectEvent>() {
+
+ @Override
+ public void handle(LeaderElectEvent event) {
+ log.info("Leader chosen: {}", event);
+ latch.countDown();
+ }
+ };
+
+ copycat.event(LeaderElectEvent.class).registerHandler(leaderLsnr);
+ try {
+ while (copycat.leader() == null) {
+ latch.await(200, TimeUnit.MILLISECONDS);
+ }
+ log.info("Leader appeared: {}", copycat.leader());
+ return;
+ } catch (InterruptedException e) {
+ log.error("Interrupted while waiting for Leader", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ copycat.event(LeaderElectEvent.class).unregisterHandler(leaderLsnr);
+ }
+ }
+
+ public boolean createTable(String tableName) {
+ waitForLeader();
CompletableFuture<Boolean> future = copycat.submit("createTable", tableName);
try {
return future.get();
@@ -39,7 +78,7 @@
}
public boolean createTable(String tableName, int ttlMillis) {
-
+ waitForLeader();
CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
try {
return future.get();
@@ -49,7 +88,7 @@
}
public void dropTable(String tableName) {
-
+ waitForLeader();
CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
try {
future.get();
@@ -59,7 +98,7 @@
}
public void dropAllTables() {
-
+ waitForLeader();
CompletableFuture<Void> future = copycat.submit("dropAllTables");
try {
future.get();
@@ -69,7 +108,7 @@
}
public Set<String> listTables() {
-
+ waitForLeader();
CompletableFuture<Set<String>> future = copycat.submit("listTables");
try {
return future.get();
@@ -79,7 +118,7 @@
}
public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
-
+ waitForLeader();
CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
try {
return future.get();
@@ -89,7 +128,7 @@
}
public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
-
+ waitForLeader();
CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
try {
return future.get();
@@ -99,6 +138,7 @@
}
public Map<String, VersionedValue> getAll(String tableName) {
+ waitForLeader();
CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName);
try {
return future.get();
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index d84df22..645de00 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -10,6 +10,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.Copycat;
@@ -99,7 +100,7 @@
private boolean autoAddMember = false;
@Activate
- public void activate() {
+ public void activate() throws InterruptedException, ExecutionException {
// TODO: Not every node should be part of the consensus ring.
@@ -176,9 +177,10 @@
copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
- copycat.start();
+ copycat.start().get();
client = new DatabaseClient(copycat);
+ client.waitForLeader();
log.info("Started.");
}