DatabaseClient: Add timeout
- timeout + retry to listTable
- timeout to service API
Change-Id: I8b54dd24d380dcc9e8d44baf3bbf5e379ccca53b
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index bd898b6..72e4b95 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -3,6 +3,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -10,6 +11,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.event.EventHandler;
@@ -28,6 +30,10 @@
*/
public class DatabaseClient {
+ private static final int RETRIES = 5;
+
+ private static final int TIMEOUT_MS = 2000;
+
private final Logger log = getLogger(getClass());
private final Copycat copycat;
@@ -109,9 +115,20 @@
public Set<String> listTables() {
waitForLeader();
- CompletableFuture<Set<String>> future = copycat.submit("listTables");
try {
- return future.get();
+ for (int i = 0; i < RETRIES; ++i) {
+ CompletableFuture<Set<String>> future = copycat.submit("listTables");
+ try {
+ return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ log.debug("Timed out retrying {}", i);
+ future.cancel(true);
+ waitForLeader();
+ }
+ }
+ // TODO: proper timeout handling
+ log.error("Timed out");
+ return Collections.emptySet();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
@@ -121,9 +138,11 @@
waitForLeader();
CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
try {
- return future.get();
+ return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
+ } catch (TimeoutException e) {
+ throw new DatabaseException(e);
}
}
@@ -131,9 +150,11 @@
waitForLeader();
CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
try {
- return future.get();
+ return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
+ } catch (TimeoutException e) {
+ throw new DatabaseException(e);
}
}
@@ -141,9 +162,11 @@
waitForLeader();
CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName);
try {
- return future.get();
+ return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
+ } catch (TimeoutException e) {
+ throw new DatabaseException(e);
}
}
}