Added distributed transaction support through a two phase commit protocol
Change-Id: I85d64234a24823fee8b3c2ea830abbb6867dad38
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index e63a3d8..c190a28 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -18,43 +18,58 @@
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.DatabaseUpdate.Type;
+import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.StateContext;
/**
* Default database state.
- *
- * @param <K> key type
- * @param <V> value type
*/
-public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
-
+public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
private Long nextVersion;
- private Map<String, Map<K, Versioned<V>>> tables;
+ private Map<String, Map<String, Versioned<byte[]>>> tables;
+
+ /**
+ * This locks map has a structure similar to the "tables" map above and
+ * holds all the provisional updates made during a transaction's prepare phase.
+ * The entry value is represented as the tuple: (transactionId, newValue)
+ * If newValue == null that signifies this update is attempting to
+ * delete the existing value.
+ * This map also serves as a lock on the entries that are being updated.
+ * The presence of a entry in this map indicates that element is
+ * participating in a transaction and is currently locked for updates.
+ */
+ private Map<String, Map<String, Pair<Long, byte[]>>> locks;
@Initializer
@Override
- public void init(StateContext<DatabaseState<K, V>> context) {
+ public void init(StateContext<DatabaseState<String, byte[]>> context) {
tables = context.get("tables");
if (tables == null) {
- tables = new HashMap<>();
+ tables = Maps.newConcurrentMap();
context.put("tables", tables);
}
+ locks = context.get("locks");
+ if (locks == null) {
+ locks = Maps.newConcurrentMap();
+ context.put("locks", locks);
+ }
nextVersion = context.get("nextVersion");
if (nextVersion == null) {
nextVersion = new Long(0);
@@ -62,15 +77,6 @@
}
}
- private Map<K, Versioned<V>> getTableMap(String tableName) {
- Map<K, Versioned<V>> table = tables.get(tableName);
- if (table == null) {
- table = new HashMap<>();
- tables.put(tableName, table);
- }
- return table;
- }
-
@Override
public Set<String> tableNames() {
return new HashSet<>(tables.keySet());
@@ -87,47 +93,55 @@
}
@Override
- public boolean containsKey(String tableName, K key) {
+ public boolean containsKey(String tableName, String key) {
return getTableMap(tableName).containsKey(key);
}
@Override
- public boolean containsValue(String tableName, V value) {
- return getTableMap(tableName).values().stream().anyMatch(v -> checkEquality(v.value(), value));
+ public boolean containsValue(String tableName, byte[] value) {
+ return getTableMap(tableName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
}
@Override
- public Versioned<V> get(String tableName, K key) {
+ public Versioned<byte[]> get(String tableName, String key) {
return getTableMap(tableName).get(key);
}
@Override
- public Versioned<V> put(String tableName, K key, V value) {
- return getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion));
+ public Result<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+ return isLockedForUpdates(tableName, key)
+ ? Result.locked()
+ : Result.ok(getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion)));
}
@Override
- public Versioned<V> remove(String tableName, K key) {
- return getTableMap(tableName).remove(key);
+ public Result<Versioned<byte[]>> remove(String tableName, String key) {
+ return isLockedForUpdates(tableName, key)
+ ? Result.locked()
+ : Result.ok(getTableMap(tableName).remove(key));
}
@Override
- public void clear(String tableName) {
+ public Result<Void> clear(String tableName) {
+ if (areTransactionsInProgress(tableName)) {
+ return Result.locked();
+ }
getTableMap(tableName).clear();
+ return Result.ok(null);
}
@Override
- public Set<K> keySet(String tableName) {
+ public Set<String> keySet(String tableName) {
return ImmutableSet.copyOf(getTableMap(tableName).keySet());
}
@Override
- public Collection<Versioned<V>> values(String tableName) {
+ public Collection<Versioned<byte[]>> values(String tableName) {
return ImmutableList.copyOf(getTableMap(tableName).values());
}
@Override
- public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
+ public Set<Entry<String, Versioned<byte[]>>> entrySet(String tableName) {
return ImmutableSet.copyOf(getTableMap(tableName)
.entrySet()
.stream()
@@ -136,93 +150,113 @@
}
@Override
- public Versioned<V> putIfAbsent(String tableName, K key, V value) {
- Versioned<V> existingValue = getTableMap(tableName).get(key);
- return existingValue != null ? existingValue : put(tableName, key, value);
- }
-
- @Override
- public boolean remove(String tableName, K key, V value) {
- Versioned<V> existing = getTableMap(tableName).get(key);
- if (existing != null && checkEquality(existing.value(), value)) {
- getTableMap(tableName).remove(key);
- return true;
+ public Result<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
}
- return false;
+ Versioned<byte[]> existingValue = get(tableName, key);
+ Versioned<byte[]> currentValue = existingValue != null ? existingValue : put(tableName, key, value).value();
+ return Result.ok(currentValue);
}
@Override
- public boolean remove(String tableName, K key, long version) {
- Versioned<V> existing = getTableMap(tableName).get(key);
+ public Result<Boolean> remove(String tableName, String key, byte[] value) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
+ }
+ Versioned<byte[]> existing = get(tableName, key);
+ if (existing != null && Arrays.equals(existing.value(), value)) {
+ getTableMap(tableName).remove(key);
+ return Result.ok(true);
+ }
+ return Result.ok(false);
+ }
+
+ @Override
+ public Result<Boolean> remove(String tableName, String key, long version) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
+ }
+ Versioned<byte[]> existing = get(tableName, key);
if (existing != null && existing.version() == version) {
remove(tableName, key);
- return true;
+ return Result.ok(true);
}
- return false;
+ return Result.ok(false);
}
@Override
- public boolean replace(String tableName, K key, V oldValue, V newValue) {
- Versioned<V> existing = getTableMap(tableName).get(key);
- if (existing != null && checkEquality(existing.value(), oldValue)) {
+ public Result<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
+ }
+ Versioned<byte[]> existing = get(tableName, key);
+ if (existing != null && Arrays.equals(existing.value(), oldValue)) {
put(tableName, key, newValue);
- return true;
+ return Result.ok(true);
}
- return false;
+ return Result.ok(false);
}
@Override
- public boolean replace(String tableName, K key, long oldVersion, V newValue) {
- Versioned<V> existing = getTableMap(tableName).get(key);
+ public Result<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+ if (isLockedForUpdates(tableName, key)) {
+ return Result.locked();
+ }
+ Versioned<byte[]> existing = get(tableName, key);
if (existing != null && existing.version() == oldVersion) {
put(tableName, key, newValue);
+ return Result.ok(true);
+ }
+ return Result.ok(false);
+ }
+
+ @Override
+ public boolean prepareAndCommit(Transaction transaction) {
+ if (prepare(transaction)) {
+ return commit(transaction);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean prepare(Transaction transaction) {
+ if (transaction.updates().stream().anyMatch(update ->
+ isLockedByAnotherTransaction(update.tableName(),
+ update.key(),
+ transaction.id()))) {
+ return false;
+ }
+
+ if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
+ transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
return true;
}
return false;
}
@Override
- public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
- if (updates.stream().anyMatch(update -> !checkIfUpdateIsPossible(update))) {
- return false;
- } else {
- updates.stream().forEach(this::doUpdate);
- return true;
- }
+ public boolean commit(Transaction transaction) {
+ transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
+ return true;
}
- private void doUpdate(UpdateOperation<K, V> update) {
- String tableName = update.tableName();
- K key = update.key();
- switch (update.type()) {
- case PUT:
- put(tableName, key, update.value());
- return;
- case REMOVE:
- remove(tableName, key);
- return;
- case PUT_IF_ABSENT:
- putIfAbsent(tableName, key, update.value());
- return;
- case PUT_IF_VERSION_MATCH:
- replace(tableName, key, update.currentValue(), update.value());
- return;
- case PUT_IF_VALUE_MATCH:
- replace(tableName, key, update.currentVersion(), update.value());
- return;
- case REMOVE_IF_VERSION_MATCH:
- remove(tableName, key, update.currentVersion());
- return;
- case REMOVE_IF_VALUE_MATCH:
- remove(tableName, key, update.currentValue());
- return;
- default:
- throw new IllegalStateException("Unsupported type: " + update.type());
- }
+ @Override
+ public boolean rollback(Transaction transaction) {
+ transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
+ return true;
}
- private boolean checkIfUpdateIsPossible(UpdateOperation<K, V> update) {
- Versioned<V> existingEntry = get(update.tableName(), update.key());
+ private Map<String, Versioned<byte[]>> getTableMap(String tableName) {
+ return tables.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
+ }
+
+ private Map<String, Pair<Long, byte[]>> getLockMap(String tableName) {
+ return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
+ }
+
+ private boolean isUpdatePossible(DatabaseUpdate update) {
+ Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
switch (update.type()) {
case PUT:
case REMOVE:
@@ -232,20 +266,85 @@
case PUT_IF_VERSION_MATCH:
return existingEntry != null && existingEntry.version() == update.currentVersion();
case PUT_IF_VALUE_MATCH:
- return existingEntry != null && checkEquality(existingEntry.value(), update.currentValue());
+ return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
case REMOVE_IF_VERSION_MATCH:
return existingEntry == null || existingEntry.version() == update.currentVersion();
case REMOVE_IF_VALUE_MATCH:
- return existingEntry == null || checkEquality(existingEntry.value(), update.currentValue());
+ return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
default:
throw new IllegalStateException("Unsupported type: " + update.type());
}
}
- private boolean checkEquality(V value1, V value2) {
- if (value1 instanceof byte[]) {
- return Arrays.equals((byte[]) value1, (byte[]) value2);
+ private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ Map<String, Pair<Long, byte[]>> lockMap = getLockMap(update.tableName());
+ switch (update.type()) {
+ case PUT:
+ case PUT_IF_ABSENT:
+ case PUT_IF_VERSION_MATCH:
+ case PUT_IF_VALUE_MATCH:
+ lockMap.put(update.key(), Pair.of(transactionId, update.value()));
+ break;
+ case REMOVE:
+ case REMOVE_IF_VERSION_MATCH:
+ case REMOVE_IF_VALUE_MATCH:
+ lockMap.put(update.key(), null);
+ break;
+ default:
+ throw new IllegalStateException("Unsupported type: " + update.type());
}
- return value1.equals(value2);
+ }
+
+ private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ String tableName = update.tableName();
+ String key = update.key();
+ Type type = update.type();
+ Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
+ if (Objects.equal(transactionId, provisionalUpdate.getLeft())) {
+ getLockMap(tableName).remove(key);
+ } else {
+ return;
+ }
+
+ switch (type) {
+ case PUT:
+ case PUT_IF_ABSENT:
+ case PUT_IF_VERSION_MATCH:
+ case PUT_IF_VALUE_MATCH:
+ put(tableName, key, provisionalUpdate.getRight());
+ break;
+ case REMOVE:
+ case REMOVE_IF_VERSION_MATCH:
+ case REMOVE_IF_VALUE_MATCH:
+ remove(tableName, key);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ String tableName = update.tableName();
+ String key = update.key();
+ Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
+ if (provisionalUpdate == null) {
+ return;
+ }
+ if (Objects.equal(transactionId, provisionalUpdate.getLeft())) {
+ getLockMap(tableName).remove(key);
+ }
+ }
+
+ private boolean isLockedByAnotherTransaction(String tableName, String key, long transactionId) {
+ Pair<Long, byte[]> update = getLockMap(tableName).get(key);
+ return update != null && !Objects.equal(transactionId, update.getLeft());
+ }
+
+ private boolean isLockedForUpdates(String tableName, String key) {
+ return getLockMap(tableName).containsKey(key);
+ }
+
+ private boolean areTransactionsInProgress(String tableName) {
+ return !getLockMap(tableName).isEmpty();
}
}