Added distributed transaction support through a two phase commit protocol
Change-Id: I85d64234a24823fee8b3c2ea830abbb6867dad38
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index c14e57c..8ed7670 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -23,13 +23,12 @@
import net.kuujo.copycat.util.concurrent.Futures;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
/**
@@ -39,7 +38,7 @@
private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
private DatabaseProxy<String, byte[]> proxy;
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
public DefaultDatabase(ResourceContext context) {
super(context);
this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
@@ -91,17 +90,17 @@
}
@Override
- public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+ public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
return checkOpen(() -> proxy.put(tableName, key, value));
}
@Override
- public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+ public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
return checkOpen(() -> proxy.remove(tableName, key));
}
@Override
- public CompletableFuture<Void> clear(String tableName) {
+ public CompletableFuture<Result<Void>> clear(String tableName) {
return checkOpen(() -> proxy.clear(tableName));
}
@@ -121,33 +120,48 @@
}
@Override
- public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+ public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
}
@Override
- public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+ public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
return checkOpen(() -> proxy.remove(tableName, key, value));
}
@Override
- public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+ public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
return checkOpen(() -> proxy.remove(tableName, key, version));
}
@Override
- public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ public CompletableFuture<Result<Boolean>> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
}
@Override
- public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+ public CompletableFuture<Result<Boolean>> replace(String tableName, String key, long oldVersion, byte[] newValue) {
return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
}
@Override
- public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
- return checkOpen(() -> proxy.atomicBatchUpdate(updates));
+ public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+ return checkOpen(() -> proxy.prepareAndCommit(transaction));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(Transaction transaction) {
+ return checkOpen(() -> proxy.prepare(transaction));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> commit(Transaction transaction) {
+ return checkOpen(() -> proxy.commit(transaction));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> rollback(Transaction transaction) {
+ return checkOpen(() -> proxy.rollback(transaction));
}
@Override
@@ -180,4 +194,4 @@
}
return false;
}
-}
\ No newline at end of file
+}