Add sample accessing database service to Foo
Change-Id: I514c57a278dea368448d284eb5bf0d41bb0013e3
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 9662976..9c323f0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -3,8 +3,10 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+
import net.kuujo.copycat.Command;
import net.kuujo.copycat.Query;
import net.kuujo.copycat.StateMachine;
@@ -15,6 +17,7 @@
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
+import org.onlab.onos.store.service.impl.InternalWriteResult.Status;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
@@ -32,6 +35,7 @@
private final Logger log = getLogger(getClass());
+ // serializer used for snapshot
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
@@ -87,6 +91,37 @@
return results;
}
+ InternalWriteResult.Status checkIfApplicable(WriteRequest request,
+ VersionedValue value) {
+
+ switch (request.type()) {
+ case PUT:
+ return InternalWriteResult.Status.OK;
+
+ case PUT_IF_ABSENT:
+ if (value == null) {
+ return InternalWriteResult.Status.OK;
+ }
+ return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
+ case PUT_IF_VALUE:
+ case REMOVE_IF_VALUE:
+ if (value != null && Arrays.equals(value.value(), request.oldValue())) {
+ return InternalWriteResult.Status.OK;
+ }
+ return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
+ case PUT_IF_VERSION:
+ case REMOVE_IF_VERSION:
+ if (value != null && request.previousVersion() == value.version()) {
+ return InternalWriteResult.Status.OK;
+ }
+ return InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH;
+ case REMOVE:
+ return InternalWriteResult.Status.OK;
+ }
+ log.error("Should never reach here {}", request);
+ return InternalWriteResult.Status.ABORTED;
+ }
+
@Command
public List<InternalWriteResult> write(List<WriteRequest> requests) {
@@ -100,25 +135,12 @@
abort = true;
continue;
}
- VersionedValue value = table.get(request.key());
- if (value == null) {
- if (request.oldValue() != null) {
- validationResults.add(InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH);
- abort = true;
- continue;
- } else if (request.previousVersion() >= 0) {
- validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
- abort = true;
- continue;
- }
- }
- if (request.previousVersion() >= 0 && value.version() != request.previousVersion()) {
- validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
+ final VersionedValue value = table.get(request.key());
+ Status result = checkIfApplicable(request, value);
+ validationResults.add(result);
+ if (result != Status.OK) {
abort = true;
- continue;
}
-
- validationResults.add(InternalWriteResult.Status.OK);
}
List<InternalWriteResult> results = new ArrayList<>(requests.size());
@@ -126,6 +148,7 @@
if (abort) {
for (InternalWriteResult.Status validationResult : validationResults) {
if (validationResult == InternalWriteResult.Status.OK) {
+ // aborted due to applicability check failure on other request
results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
} else {
results.add(new InternalWriteResult(validationResult, null));
@@ -141,12 +164,31 @@
// synchronization scope is wrong.
// Whole function including applicability check needs to be protected.
// Confirm copycat's thread safety requirement for StateMachine
+ // TODO: If we need isolation, we need to block reads also
synchronized (table) {
- VersionedValue previousValue =
- table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
- results.add(new InternalWriteResult(
- InternalWriteResult.Status.OK,
- new WriteResult(request.tableName(), request.key(), previousValue)));
+ switch (request.type()) {
+ case PUT:
+ case PUT_IF_ABSENT:
+ case PUT_IF_VALUE:
+ case PUT_IF_VERSION:
+ VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
+ VersionedValue previousValue = table.put(request.key(), newValue);
+ WriteResult putResult = new WriteResult(request.tableName(), request.key(), previousValue);
+ results.add(InternalWriteResult.ok(putResult));
+ break;
+
+ case REMOVE:
+ case REMOVE_IF_VALUE:
+ case REMOVE_IF_VERSION:
+ VersionedValue removedValue = table.remove(request.key());
+ WriteResult removeResult = new WriteResult(request.tableName(), request.key(), removedValue);
+ results.add(InternalWriteResult.ok(removeResult));
+ break;
+
+ default:
+ log.error("Invalid WriteRequest type {}", request.type());
+ break;
+ }
}
}
return results;