Added distributed transaction support through a two phase commit protocol

Change-Id: I85d64234a24823fee8b3c2ea830abbb6867dad38
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index e63a3d8..c190a28 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -18,43 +18,58 @@
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
 import java.util.Set;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.DatabaseUpdate.Type;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 
 import net.kuujo.copycat.state.Initializer;
 import net.kuujo.copycat.state.StateContext;
 
 /**
  * Default database state.
- *
- * @param <K> key type
- * @param <V> value type
  */
-public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
-
+public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
     private Long nextVersion;
-    private Map<String, Map<K, Versioned<V>>> tables;
+    private Map<String, Map<String, Versioned<byte[]>>> tables;
+
+    /**
+     * This locks map has a structure similar to the "tables" map above and
+     * holds all the provisional updates made during a transaction's prepare phase.
+     * The entry value is represented as the tuple: (transactionId, newValue)
+     * If newValue == null that signifies this update is attempting to
+     * delete the existing value.
+     * This map also serves as a lock on the entries that are being updated.
+     * The presence of a entry in this map indicates that element is
+     * participating in a transaction and is currently locked for updates.
+     */
+    private Map<String, Map<String, Pair<Long, byte[]>>> locks;
 
     @Initializer
     @Override
-    public void init(StateContext<DatabaseState<K, V>> context) {
+    public void init(StateContext<DatabaseState<String, byte[]>> context) {
         tables = context.get("tables");
         if (tables == null) {
-            tables = new HashMap<>();
+            tables = Maps.newConcurrentMap();
             context.put("tables", tables);
         }
+        locks = context.get("locks");
+        if (locks == null) {
+            locks = Maps.newConcurrentMap();
+            context.put("locks", locks);
+        }
         nextVersion = context.get("nextVersion");
         if (nextVersion == null) {
             nextVersion = new Long(0);
@@ -62,15 +77,6 @@
         }
     }
 
-    private Map<K, Versioned<V>> getTableMap(String tableName) {
-        Map<K, Versioned<V>> table = tables.get(tableName);
-        if (table == null) {
-            table = new HashMap<>();
-            tables.put(tableName, table);
-        }
-        return table;
-    }
-
     @Override
     public Set<String> tableNames() {
         return new HashSet<>(tables.keySet());
@@ -87,47 +93,55 @@
     }
 
     @Override
-    public boolean containsKey(String tableName, K key) {
+    public boolean containsKey(String tableName, String key) {
         return getTableMap(tableName).containsKey(key);
     }
 
     @Override
-    public boolean containsValue(String tableName, V value) {
-        return getTableMap(tableName).values().stream().anyMatch(v -> checkEquality(v.value(), value));
+    public boolean containsValue(String tableName, byte[] value) {
+        return getTableMap(tableName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
     }
 
     @Override
-    public Versioned<V> get(String tableName, K key) {
+    public Versioned<byte[]> get(String tableName, String key) {
         return getTableMap(tableName).get(key);
     }
 
     @Override
-    public Versioned<V> put(String tableName, K key, V value) {
-        return getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion));
+    public Result<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+        return isLockedForUpdates(tableName, key)
+                ? Result.locked()
+                : Result.ok(getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion)));
     }
 
     @Override
-    public Versioned<V> remove(String tableName, K key) {
-        return getTableMap(tableName).remove(key);
+    public Result<Versioned<byte[]>> remove(String tableName, String key) {
+        return isLockedForUpdates(tableName, key)
+                ? Result.locked()
+                : Result.ok(getTableMap(tableName).remove(key));
     }
 
     @Override
-    public void clear(String tableName) {
+    public Result<Void> clear(String tableName) {
+        if (areTransactionsInProgress(tableName)) {
+            return Result.locked();
+        }
         getTableMap(tableName).clear();
+        return Result.ok(null);
     }
 
     @Override
-    public Set<K> keySet(String tableName) {
+    public Set<String> keySet(String tableName) {
         return ImmutableSet.copyOf(getTableMap(tableName).keySet());
     }
 
     @Override
-    public Collection<Versioned<V>> values(String tableName) {
+    public Collection<Versioned<byte[]>> values(String tableName) {
         return ImmutableList.copyOf(getTableMap(tableName).values());
     }
 
     @Override
-    public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
+    public Set<Entry<String, Versioned<byte[]>>> entrySet(String tableName) {
         return ImmutableSet.copyOf(getTableMap(tableName)
                 .entrySet()
                 .stream()
@@ -136,93 +150,113 @@
     }
 
     @Override
-    public Versioned<V> putIfAbsent(String tableName, K key, V value) {
-        Versioned<V> existingValue = getTableMap(tableName).get(key);
-        return existingValue != null ? existingValue : put(tableName, key, value);
-    }
-
-    @Override
-    public boolean remove(String tableName, K key, V value) {
-        Versioned<V> existing = getTableMap(tableName).get(key);
-        if (existing != null && checkEquality(existing.value(), value)) {
-            getTableMap(tableName).remove(key);
-            return true;
+    public Result<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
         }
-        return false;
+        Versioned<byte[]> existingValue = get(tableName, key);
+        Versioned<byte[]> currentValue = existingValue != null ? existingValue : put(tableName, key, value).value();
+        return Result.ok(currentValue);
     }
 
     @Override
-    public boolean remove(String tableName, K key, long version) {
-        Versioned<V> existing = getTableMap(tableName).get(key);
+    public Result<Boolean> remove(String tableName, String key, byte[] value) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> existing = get(tableName, key);
+        if (existing != null && Arrays.equals(existing.value(), value)) {
+            getTableMap(tableName).remove(key);
+            return Result.ok(true);
+        }
+        return Result.ok(false);
+    }
+
+    @Override
+    public Result<Boolean> remove(String tableName, String key, long version) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> existing = get(tableName, key);
         if (existing != null && existing.version() == version) {
             remove(tableName, key);
-            return true;
+            return Result.ok(true);
         }
-        return false;
+        return Result.ok(false);
     }
 
     @Override
-    public boolean replace(String tableName, K key, V oldValue, V newValue) {
-        Versioned<V> existing = getTableMap(tableName).get(key);
-        if (existing != null && checkEquality(existing.value(), oldValue)) {
+    public Result<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> existing = get(tableName, key);
+        if (existing != null && Arrays.equals(existing.value(), oldValue)) {
             put(tableName, key, newValue);
-            return true;
+            return Result.ok(true);
         }
-        return false;
+        return Result.ok(false);
     }
 
     @Override
-    public boolean replace(String tableName, K key, long oldVersion, V newValue) {
-        Versioned<V> existing = getTableMap(tableName).get(key);
+    public Result<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+        if (isLockedForUpdates(tableName, key)) {
+            return Result.locked();
+        }
+        Versioned<byte[]> existing = get(tableName, key);
         if (existing != null && existing.version() == oldVersion) {
             put(tableName, key, newValue);
+            return Result.ok(true);
+        }
+        return Result.ok(false);
+    }
+
+    @Override
+    public boolean prepareAndCommit(Transaction transaction) {
+        if (prepare(transaction)) {
+            return commit(transaction);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean prepare(Transaction transaction) {
+        if (transaction.updates().stream().anyMatch(update ->
+                    isLockedByAnotherTransaction(update.tableName(),
+                                                 update.key(),
+                                                 transaction.id()))) {
+            return false;
+        }
+
+        if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
+            transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
             return true;
         }
         return false;
     }
 
     @Override
-    public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
-        if (updates.stream().anyMatch(update -> !checkIfUpdateIsPossible(update))) {
-            return false;
-        } else {
-            updates.stream().forEach(this::doUpdate);
-            return true;
-        }
+    public boolean commit(Transaction transaction) {
+        transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
+        return true;
     }
 
-    private void doUpdate(UpdateOperation<K, V> update) {
-        String tableName = update.tableName();
-        K key = update.key();
-        switch (update.type()) {
-        case PUT:
-            put(tableName, key, update.value());
-            return;
-        case REMOVE:
-            remove(tableName, key);
-            return;
-        case PUT_IF_ABSENT:
-            putIfAbsent(tableName, key, update.value());
-            return;
-        case PUT_IF_VERSION_MATCH:
-            replace(tableName, key, update.currentValue(), update.value());
-            return;
-        case PUT_IF_VALUE_MATCH:
-            replace(tableName, key, update.currentVersion(), update.value());
-            return;
-        case REMOVE_IF_VERSION_MATCH:
-            remove(tableName, key, update.currentVersion());
-            return;
-        case REMOVE_IF_VALUE_MATCH:
-            remove(tableName, key, update.currentValue());
-            return;
-        default:
-            throw new IllegalStateException("Unsupported type: " + update.type());
-        }
+    @Override
+    public boolean rollback(Transaction transaction) {
+        transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
+        return true;
     }
 
-    private boolean checkIfUpdateIsPossible(UpdateOperation<K, V> update) {
-        Versioned<V> existingEntry = get(update.tableName(), update.key());
+    private Map<String, Versioned<byte[]>> getTableMap(String tableName) {
+        return tables.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
+    }
+
+    private Map<String, Pair<Long, byte[]>> getLockMap(String tableName) {
+        return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
+    }
+
+    private boolean isUpdatePossible(DatabaseUpdate update) {
+        Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
         switch (update.type()) {
         case PUT:
         case REMOVE:
@@ -232,20 +266,85 @@
         case PUT_IF_VERSION_MATCH:
             return existingEntry != null && existingEntry.version() == update.currentVersion();
         case PUT_IF_VALUE_MATCH:
-            return existingEntry != null && checkEquality(existingEntry.value(), update.currentValue());
+            return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
         case REMOVE_IF_VERSION_MATCH:
             return existingEntry == null || existingEntry.version() == update.currentVersion();
         case REMOVE_IF_VALUE_MATCH:
-            return existingEntry == null || checkEquality(existingEntry.value(), update.currentValue());
+            return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
         default:
             throw new IllegalStateException("Unsupported type: " + update.type());
         }
     }
 
-    private boolean checkEquality(V value1, V value2) {
-        if (value1 instanceof byte[]) {
-            return Arrays.equals((byte[]) value1, (byte[]) value2);
+    private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+        Map<String, Pair<Long, byte[]>> lockMap = getLockMap(update.tableName());
+        switch (update.type()) {
+        case PUT:
+        case PUT_IF_ABSENT:
+        case PUT_IF_VERSION_MATCH:
+        case PUT_IF_VALUE_MATCH:
+            lockMap.put(update.key(), Pair.of(transactionId, update.value()));
+            break;
+        case REMOVE:
+        case REMOVE_IF_VERSION_MATCH:
+        case REMOVE_IF_VALUE_MATCH:
+            lockMap.put(update.key(), null);
+            break;
+        default:
+            throw new IllegalStateException("Unsupported type: " + update.type());
         }
-        return value1.equals(value2);
+    }
+
+    private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+        String tableName = update.tableName();
+        String key = update.key();
+        Type type = update.type();
+        Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
+        if (Objects.equal(transactionId, provisionalUpdate.getLeft()))  {
+            getLockMap(tableName).remove(key);
+        } else {
+            return;
+        }
+
+        switch (type) {
+        case PUT:
+        case PUT_IF_ABSENT:
+        case PUT_IF_VERSION_MATCH:
+        case PUT_IF_VALUE_MATCH:
+            put(tableName, key, provisionalUpdate.getRight());
+            break;
+        case REMOVE:
+        case REMOVE_IF_VERSION_MATCH:
+        case REMOVE_IF_VALUE_MATCH:
+            remove(tableName, key);
+            break;
+        default:
+            break;
+        }
+    }
+
+    private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+        String tableName = update.tableName();
+        String key = update.key();
+        Pair<Long, byte[]> provisionalUpdate = getLockMap(tableName).get(key);
+        if (provisionalUpdate == null) {
+            return;
+        }
+        if (Objects.equal(transactionId, provisionalUpdate.getLeft()))  {
+            getLockMap(tableName).remove(key);
+        }
+    }
+
+    private boolean isLockedByAnotherTransaction(String tableName, String key, long transactionId) {
+        Pair<Long, byte[]> update = getLockMap(tableName).get(key);
+        return update != null && !Objects.equal(transactionId, update.getLeft());
+    }
+
+    private boolean isLockedForUpdates(String tableName, String key) {
+        return getLockMap(tableName).containsKey(key);
+    }
+
+    private boolean areTransactionsInProgress(String tableName) {
+        return !getLockMap(tableName).isEmpty();
     }
 }