Reworked DatabaseService API.
Initial implementation of LockManager.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index afeaef9..e3de134 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -4,8 +4,6 @@
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -22,7 +20,6 @@
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.log.InMemoryLog;
import net.kuujo.copycat.log.Log;
import org.apache.felix.scr.annotations.Activate;
@@ -37,17 +34,18 @@
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchReadResult;
+import org.onlab.onos.store.service.BatchWriteRequest;
+import org.onlab.onos.store.service.BatchWriteResult;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
-import org.onlab.onos.store.service.NoSuchTableException;
-import org.onlab.onos.store.service.OptimisticLockException;
-import org.onlab.onos.store.service.OptionalResult;
-import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.ReadResult;
-import org.onlab.onos.store.service.WriteAborted;
-import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.ReadStatus;
+import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteResult;
+import org.onlab.onos.store.service.WriteStatus;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
@@ -199,66 +197,121 @@
}
@Override
- public ReadResult read(ReadRequest request) {
- return batchRead(Arrays.asList(request)).get(0).get();
- }
-
- @Override
- public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
- List<ReadRequest> batch) {
- List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
- for (InternalReadResult internalReadResult : client.batchRead(batch)) {
- if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
- readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
- new NoSuchTableException()));
- } else {
- readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
- internalReadResult.result()));
- }
+ public VersionedValue get(String tableName, String key) {
+ BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
+ ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
+ if (readResult.status().equals(ReadStatus.OK)) {
+ return readResult.value();
}
- return readResults;
+ throw new DatabaseException("get failed due to status: " + readResult.status());
}
@Override
- public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
- return batchWrite(Arrays.asList(request)).get(0);
+ public BatchReadResult batchRead(BatchReadRequest batchRequest) {
+ return new BatchReadResult(client.batchRead(batchRequest));
}
@Override
- public WriteResult write(WriteRequest request) {
-// throws OptimisticLockException, PreconditionFailedException {
- return writeNothrow(request).get();
+ public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
+ return new BatchWriteResult(client.batchWrite(batchRequest));
}
@Override
- public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
- List<WriteRequest> batch) {
- List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
- for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
- if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- new NoSuchTableException()));
- } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- new OptimisticLockException()));
- } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
- // TODO: throw a different exception?
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- new OptimisticLockException()));
- } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- new WriteAborted()));
- } else {
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- internalWriteResult.result()));
- }
+ public VersionedValue put(String tableName, String key, byte[] value) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return writeResult.previousValue();
}
- return writeResults;
+ throw new DatabaseException("put failed due to status: " + writeResult.status());
+ }
+ @Override
+ public boolean putIfAbsent(String tableName, String key, byte[] value) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfAbsent(tableName, key, value).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("putIfAbsent failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public boolean putIfVersionMatches(String tableName, String key,
+ byte[] value, long version) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfVersionMatches(tableName, key, value, version).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("putIfVersionMatches failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public boolean putIfValueMatches(String tableName, String key,
+ byte[] oldValue, byte[] newValue) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfValueMatches(tableName, key, oldValue, newValue).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("putIfValueMatches failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public VersionedValue remove(String tableName, String key) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().remove(tableName, key).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return writeResult.previousValue();
+ }
+ throw new DatabaseException("remove failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public boolean removeIfVersionMatches(String tableName, String key,
+ long version) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfVersionMatches(tableName, key, version).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("removeIfVersionMatches failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public boolean removeIfValueMatches(String tableName, String key,
+ byte[] value) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfValueMatches(tableName, key, value).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("removeIfValueMatches failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public void addMember(final ControllerNode node) {
+ final TcpMember tcpMember = new TcpMember(node.ip().toString(),
+ node.tcpPort());
+ log.info("{} was added to the cluster", tcpMember);
+ synchronized (clusterConfig) {
+ clusterConfig.addRemoteMember(tcpMember);
+ }
}
private final class InternalClusterEventListener
- implements ClusterEventListener {
+ implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
@@ -266,7 +319,7 @@
final ControllerNode node = event.subject();
final TcpMember tcpMember = new TcpMember(node.ip().toString(),
- node.tcpPort());
+ node.tcpPort());
switch (event.type()) {
case INSTANCE_ACTIVATED:
@@ -284,8 +337,8 @@
case INSTANCE_REMOVED:
if (autoAddMember) {
Set<DefaultControllerNode> members
- = tabletMembers.getOrDefault(DEFAULT_TABLET,
- Collections.emptySet());
+ = tabletMembers.getOrDefault(DEFAULT_TABLET,
+ Collections.emptySet());
// remove only if not the initial members
if (!members.contains(node)) {
synchronized (clusterConfig) {
@@ -308,63 +361,6 @@
}
- public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
- public KryoRegisteredInMemoryLog() {
- super();
- // required to deserialize object across bundles
- super.kryo.register(TcpMember.class, new TcpMemberSerializer());
- super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
- }
- }
-
- private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
-
- private final R result;
- private final DatabaseException exception;
-
- public DatabaseOperationResult(R result) {
- this.result = result;
- this.exception = null;
- }
-
- public DatabaseOperationResult(DatabaseException exception) {
- this.result = null;
- this.exception = exception;
- }
-
- @Override
- public R get() {
- if (result != null) {
- return result;
- }
- throw exception;
- }
-
- @Override
- public boolean hasValidResult() {
- return result != null;
- }
-
- @Override
- public String toString() {
- if (result != null) {
- return result.toString();
- } else {
- return exception.toString();
- }
- }
- }
-
- @Override
- public void addMember(final ControllerNode node) {
- final TcpMember tcpMember = new TcpMember(node.ip().toString(),
- node.tcpPort());
- log.info("{} was added to the cluster", tcpMember);
- synchronized (clusterConfig) {
- clusterConfig.addRemoteMember(tcpMember);
- }
- }
-
@Override
public void removeMember(final ControllerNode node) {
final TcpMember tcpMember = new TcpMember(node.ip().toString(),