1. Fixed a synchronization issue with database update processing and expiry tracking.
2. Fixed a synchronization issue with MapDBLog appendEntries method.
3. DatabaseClient now uses ProtocolClient to interact with Raft cluster.
4. Misc javdoc and logging improvements
Change-Id: I147eb5bf859cf9827df452d62ab415d643a00aa4
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 72e4b95..876f6cc 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -1,21 +1,24 @@
package org.onlab.onos.store.service.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import net.kuujo.copycat.Copycat;
+import net.kuujo.copycat.cluster.Member;
+import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchWriteRequest;
@@ -28,7 +31,7 @@
/**
* Client for interacting with the Copycat Raft cluster.
*/
-public class DatabaseClient {
+public class DatabaseClient implements EventHandler<LeaderElectEvent> {
private static final int RETRIES = 5;
@@ -36,137 +39,101 @@
private final Logger log = getLogger(getClass());
- private final Copycat copycat;
+ private final DatabaseProtocolService protocol;
+ private volatile ProtocolClient copycat = null;
+ private volatile Member currentLeader = null;
- public DatabaseClient(Copycat copycat) {
- this.copycat = checkNotNull(copycat);
+ public DatabaseClient(DatabaseProtocolService protocol) {
+ this.protocol = protocol;
+ }
+
+ @Override
+ public void handle(LeaderElectEvent event) {
+ Member newLeader = event.leader();
+ if (newLeader != null && !newLeader.equals(currentLeader)) {
+ currentLeader = newLeader;
+ if (copycat != null) {
+ copycat.close();
+ }
+ copycat = protocol.createClient((TcpMember) currentLeader);
+ copycat.connect();
+ }
+ }
+
+ private String nextRequestId() {
+ return UUID.randomUUID().toString();
}
public void waitForLeader() {
- if (copycat.leader() != null) {
+ if (currentLeader != null) {
return;
}
log.info("No leader in cluster, waiting for election.");
- final CountDownLatch latch = new CountDownLatch(1);
- final EventHandler<LeaderElectEvent> leaderLsnr = new EventHandler<LeaderElectEvent>() {
- @Override
- public void handle(LeaderElectEvent event) {
- log.info("Leader chosen: {}", event);
- latch.countDown();
- }
- };
-
- copycat.event(LeaderElectEvent.class).registerHandler(leaderLsnr);
try {
- while (copycat.leader() == null) {
- latch.await(200, TimeUnit.MILLISECONDS);
+ while (currentLeader == null) {
+ Thread.sleep(200);
}
- log.info("Leader appeared: {}", copycat.leader());
+ log.info("Leader appeared: {}", currentLeader);
return;
} catch (InterruptedException e) {
log.error("Interrupted while waiting for Leader", e);
Thread.currentThread().interrupt();
- } finally {
- copycat.event(LeaderElectEvent.class).unregisterHandler(leaderLsnr);
+ }
+ }
+
+ private <T> T submit(String operationName, Object... args) {
+ waitForLeader();
+ if (currentLeader == null) {
+ throw new DatabaseException("Raft cluster does not have a leader.");
+ }
+
+ SubmitRequest request =
+ new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
+
+ CompletableFuture<SubmitResponse> submitResponse = copycat.submit(request);
+
+ log.debug("Sent {} to {}", request, currentLeader);
+
+ try {
+ return (T) submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS).result();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new DatabaseException(e);
+ } catch (TimeoutException e) {
+ throw new DatabaseException.Timeout(e);
}
}
public boolean createTable(String tableName) {
- waitForLeader();
- CompletableFuture<Boolean> future = copycat.submit("createTable", tableName);
- try {
- return future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- }
+ return submit("createTable", tableName);
}
public boolean createTable(String tableName, int ttlMillis) {
- waitForLeader();
- CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
- try {
- return future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- }
+ return submit("createTable", tableName, ttlMillis);
}
public void dropTable(String tableName) {
- waitForLeader();
- CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
- try {
- future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- }
+ submit("dropTable", tableName);
}
public void dropAllTables() {
- waitForLeader();
- CompletableFuture<Void> future = copycat.submit("dropAllTables");
- try {
- future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- }
+ submit("dropAllTables");
}
public Set<String> listTables() {
- waitForLeader();
- try {
- for (int i = 0; i < RETRIES; ++i) {
- CompletableFuture<Set<String>> future = copycat.submit("listTables");
- try {
- return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- log.debug("Timed out retrying {}", i);
- future.cancel(true);
- waitForLeader();
- }
- }
- // TODO: proper timeout handling
- log.error("Timed out");
- return Collections.emptySet();
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- }
+ return submit("listTables");
}
public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
- waitForLeader();
- CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
- try {
- return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- } catch (TimeoutException e) {
- throw new DatabaseException(e);
- }
+ return submit("read", batchRequest);
}
public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
- waitForLeader();
- CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
- try {
- return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- } catch (TimeoutException e) {
- throw new DatabaseException(e);
- }
+ return submit("write", batchRequest);
}
public Map<String, VersionedValue> getAll(String tableName) {
- waitForLeader();
- CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName);
- try {
- return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException | ExecutionException e) {
- throw new DatabaseException(e);
- } catch (TimeoutException e) {
- throw new DatabaseException(e);
- }
+ return submit("getAll", tableName);
}
}