Added distributed transaction support through a two phase commit protocol

Change-Id: I85d64234a24823fee8b3c2ea830abbb6867dad38
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index ad049e6..178f49f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -27,7 +27,8 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.Lists;
@@ -129,24 +130,27 @@
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).put(tableName, key, value);
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+    public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key);
     }
 
     @Override
-    public CompletableFuture<Void> clear(String tableName) {
+    public CompletableFuture<Result<Void>> clear(String tableName) {
+        AtomicBoolean isLocked = new AtomicBoolean(false);
         checkState(isOpen.get(), DB_NOT_OPEN);
         return CompletableFuture.allOf(partitions
                     .stream()
-                    .map(p -> p.clear(tableName))
-                    .toArray(CompletableFuture[]::new));
+                    .map(p -> p.clear(tableName)
+                            .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
     }
 
     @Override
@@ -183,59 +187,86 @@
     }
 
     @Override
-    public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
     }
 
     @Override
-    public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+    public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key, value);
     }
 
     @Override
-    public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+    public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).remove(tableName, key, version);
     }
 
     @Override
-    public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+    public CompletableFuture<Result<Boolean>> replace(
+            String tableName, String key, byte[] oldValue, byte[] newValue) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
     }
 
     @Override
-    public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+    public CompletableFuture<Result<Boolean>> replace(
+            String tableName, String key, long oldVersion, byte[] newValue) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
     }
 
     @Override
-    public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
-        checkState(isOpen.get(), DB_NOT_OPEN);
-        Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
-        for (UpdateOperation<String, byte[]> update : updates) {
-            Database partition = partitioner.getPartition(update.tableName(), update.key());
-            List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
-            if (partitionUpdates == null) {
-                partitionUpdates = Lists.newArrayList();
-                perPartitionUpdates.put(partition, partitionUpdates);
-            }
-            partitionUpdates.add(update);
-        }
-        if (perPartitionUpdates.size() > 1) {
-            // TODO
-            throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
+    public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
+        Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        if (subTransactions.isEmpty()) {
+            return CompletableFuture.completedFuture(true);
+        } else if (subTransactions.size() == 1) {
+            Entry<Database, Transaction> entry =
+                    subTransactions.entrySet().iterator().next();
+            return entry.getKey().prepareAndCommit(entry.getValue());
         } else {
-            Entry<Database, List<UpdateOperation<String, byte[]>>> only =
-                    perPartitionUpdates.entrySet().iterator().next();
-            return only.getKey().atomicBatchUpdate(only.getValue());
+            return new TransactionManager(this).execute(transaction);
         }
     }
 
     @Override
+    public CompletableFuture<Boolean> prepare(Transaction transaction) {
+        Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        AtomicBoolean status = new AtomicBoolean(true);
+        return CompletableFuture.allOf(subTransactions.entrySet()
+                .stream()
+                .map(entry -> entry
+                        .getKey()
+                        .prepare(entry.getValue())
+                        .thenApply(v -> status.compareAndSet(true, v)))
+                .toArray(CompletableFuture[]::new))
+            .thenApply(v -> status.get());
+    }
+
+    @Override
+    public CompletableFuture<Boolean> commit(Transaction transaction) {
+        Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        return CompletableFuture.allOf(subTransactions.entrySet()
+                .stream()
+                .map(entry -> entry.getKey().commit(entry.getValue()))
+                .toArray(CompletableFuture[]::new))
+        .thenApply(v -> true);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> rollback(Transaction transaction) {
+        Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
+        return CompletableFuture.allOf(subTransactions.entrySet()
+                .stream()
+                .map(entry -> entry.getKey().rollback(entry.getValue()))
+                .toArray(CompletableFuture[]::new))
+            .thenApply(v -> true);
+    }
+
+    @Override
     public CompletableFuture<Database> open() {
         return CompletableFuture.allOf(partitions
                     .stream()
@@ -243,7 +274,8 @@
                     .toArray(CompletableFuture[]::new))
                 .thenApply(v -> {
                     isOpen.set(true);
-                    return this; });
+                    return this;
+                });
     }
 
     @Override
@@ -279,4 +311,19 @@
     public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
         throw new UnsupportedOperationException();
     }
-}
\ No newline at end of file
+
+    private Map<Database, Transaction> createSubTransactions(
+            Transaction transaction) {
+        Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
+        for (DatabaseUpdate update : transaction.updates()) {
+            Database partition = partitioner.getPartition(update.tableName(), update.key());
+            List<DatabaseUpdate> partitionUpdates =
+                    perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
+            partitionUpdates.add(update);
+        }
+        Map<Database, Transaction> subTransactions = Maps.newHashMap();
+        perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
+
+        return subTransactions;
+    }
+}