DatabaseClient: fixed to use copycat instance instead
Change-Id: If13ec051f362e0d3bc8311dc30e2c0f70e55c42e
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index c2beb59..c749197 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -1,15 +1,12 @@
package org.onlab.onos.store.service.impl;
-import java.util.Arrays;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import net.kuujo.copycat.protocol.Response.Status;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
-import net.kuujo.copycat.spi.protocol.ProtocolClient;
+import net.kuujo.copycat.Copycat;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.ReadRequest;
@@ -20,31 +17,17 @@
*/
public class DatabaseClient {
- private final ProtocolClient client;
+ private final Copycat copycat;
- public DatabaseClient(ProtocolClient client) {
- this.client = client;
- }
-
- private static String nextId() {
- return UUID.randomUUID().toString();
+ public DatabaseClient(Copycat copycat) {
+ this.copycat = checkNotNull(copycat);
}
public boolean createTable(String tableName) {
- SubmitRequest request =
- new SubmitRequest(
- nextId(),
- "createTable",
- Arrays.asList(tableName));
- CompletableFuture<SubmitResponse> future = client.submit(request);
+ CompletableFuture<Boolean> future = copycat.submit("createTable", tableName);
try {
- final SubmitResponse submitResponse = future.get();
- if (submitResponse.status() == Status.OK) {
- return (boolean) submitResponse.result();
- } else {
- throw new DatabaseException(submitResponse.error());
- }
+ return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
@@ -52,17 +35,9 @@
public void dropTable(String tableName) {
- SubmitRequest request =
- new SubmitRequest(
- nextId(),
- "dropTable",
- Arrays.asList(tableName));
- CompletableFuture<SubmitResponse> future = client.submit(request);
+ CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
try {
- if (future.get().status() != Status.OK) {
- throw new DatabaseException(future.get().toString());
- }
-
+ future.get();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
@@ -70,79 +45,39 @@
public void dropAllTables() {
- SubmitRequest request =
- new SubmitRequest(
- nextId(),
- "dropAllTables",
- Arrays.asList());
- CompletableFuture<SubmitResponse> future = client.submit(request);
+ CompletableFuture<Void> future = copycat.submit("dropAllTables");
try {
- if (future.get().status() != Status.OK) {
- throw new DatabaseException(future.get().toString());
- }
+ future.get();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
}
- @SuppressWarnings("unchecked")
public List<String> listTables() {
- SubmitRequest request =
- new SubmitRequest(
- nextId(),
- "listTables",
- Arrays.asList());
- CompletableFuture<SubmitResponse> future = client.submit(request);
+ CompletableFuture<List<String>> future = copycat.submit("listTables");
try {
- final SubmitResponse submitResponse = future.get();
- if (submitResponse.status() == Status.OK) {
- return (List<String>) submitResponse.result();
- } else {
- throw new DatabaseException(submitResponse.error());
- }
+ return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
}
- @SuppressWarnings("unchecked")
public List<InternalReadResult> batchRead(List<ReadRequest> requests) {
- SubmitRequest request = new SubmitRequest(
- nextId(),
- "read",
- Arrays.asList(requests));
-
- CompletableFuture<SubmitResponse> future = client.submit(request);
+ CompletableFuture<List<InternalReadResult>> future = copycat.submit("read", requests);
try {
- final SubmitResponse submitResponse = future.get();
- if (submitResponse.status() == Status.OK) {
- return (List<InternalReadResult>) submitResponse.result();
- } else {
- throw new DatabaseException(submitResponse.error());
- }
+ return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
}
- @SuppressWarnings("unchecked")
public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) {
- SubmitRequest request = new SubmitRequest(
- nextId(),
- "write",
- Arrays.asList(requests));
-
- CompletableFuture<SubmitResponse> future = client.submit(request);
+ CompletableFuture<List<InternalWriteResult>> future = copycat.submit("write", requests);
try {
- final SubmitResponse submitResponse = future.get();
- if (submitResponse.status() == Status.OK) {
- return (List<InternalWriteResult>) submitResponse.result();
- } else {
- throw new DatabaseException(submitResponse.error());
- }
+ return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index f328842..0d06e08 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -25,7 +25,6 @@
import net.kuujo.copycat.log.InMemoryLog;
import net.kuujo.copycat.log.Log;
-import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -53,7 +52,6 @@
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
/**
* Strongly consistent and durable state management service based on
@@ -125,30 +123,16 @@
}
final ControllerNode localNode = clusterService.getLocalNode();
- TcpMember clientHandler = null;
for (ControllerNode member : defaultMember) {
final TcpMember tcpMember = new TcpMember(member.ip().toString(),
member.tcpPort());
if (localNode.equals(member)) {
- clientHandler = tcpMember;
clusterConfig.setLocalMember(tcpMember);
} else {
clusterConfig.addRemoteMember(tcpMember);
}
}
- // TODO should be removed after DatabaseClient refactoring
- if (clientHandler == null) {
- Set<TcpMember> members = clusterConfig.getMembers();
- if (members.isEmpty()) {
- log.error("No member found in [{}] tablet configuration.",
- DEFAULT_TABLET);
- throw new IllegalStateException("No member found in tablet configuration");
- }
- int position = RandomUtils.nextInt(0, members.size());
- clientHandler = Iterables.get(members, position);
- }
-
// note: from this point beyond, clusterConfig requires synchronization
clusterEventLatch = new CountDownLatch(1);
clusterEventListener = new InternalClusterEventListener();
@@ -182,8 +166,7 @@
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.start();
- // FIXME Redo DatabaseClient. Needs fall back mechanism etc.
- client = new DatabaseClient(copycatMessagingProtocol.createClient(clientHandler));
+ client = new DatabaseClient(copycat);
log.info("Started.");
}