Added distributed transaction support through a two phase commit protocol

Change-Id: I85d64234a24823fee8b3c2ea830abbb6867dad38
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
index 21e3bb4..7d6ad4d 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
@@ -35,6 +35,12 @@
     }
 
     /**
+     * ConsistentMap update conflicts with an in flight transaction.
+     */
+    public static class ConcurrentModification extends ConsistentMapException {
+    }
+
+    /**
      * ConsistentMap operation interrupted.
      */
     public static class Interrupted extends ConsistentMapException {
diff --git a/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java b/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java
new file mode 100644
index 0000000..fd4373e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/DatabaseUpdate.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Database update operation.
+ *
+ */
+public final class DatabaseUpdate {
+
+    /**
+     * Type of database update operation.
+     */
+    public static enum Type {
+        /**
+         * Insert/Update entry without any checks.
+         */
+        PUT,
+        /**
+         * Insert an entry iff there is no existing entry for that key.
+         */
+        PUT_IF_ABSENT,
+
+        /**
+         * Update entry if the current version matches specified version.
+         */
+        PUT_IF_VERSION_MATCH,
+
+        /**
+         * Update entry if the current value matches specified value.
+         */
+        PUT_IF_VALUE_MATCH,
+
+        /**
+         * Remove entry without any checks.
+         */
+        REMOVE,
+
+        /**
+         * Remove entry if the current version matches specified version.
+         */
+        REMOVE_IF_VERSION_MATCH,
+
+        /**
+         * Remove entry if the current value matches specified value.
+         */
+        REMOVE_IF_VALUE_MATCH,
+    }
+
+    private Type type;
+    private String tableName;
+    private String key;
+    private byte[] value;
+    private byte[] currentValue;
+    private long currentVersion = -1;
+
+    /**
+     * Returns the type of update operation.
+     * @return type of update.
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the tableName being updated.
+     * @return table name.
+     */
+    public String tableName() {
+        return tableName;
+    }
+
+    /**
+     * Returns the item key being updated.
+     * @return item key
+     */
+    public String key() {
+        return key;
+    }
+
+    /**
+     * Returns the new value.
+     * @return item's target value.
+     */
+    public byte[] value() {
+        return value;
+    }
+
+    /**
+     * Returns the expected current value in the database value for the key.
+     * @return current value in database.
+     */
+    public byte[] currentValue() {
+        return currentValue;
+    }
+
+    /**
+     * Returns the expected current version in the database for the key.
+     * @return expected version.
+     */
+    public long currentVersion() {
+        return currentVersion;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("type", type)
+            .add("tableName", tableName)
+            .add("key", key)
+            .add("value", value)
+            .add("currentValue", currentValue)
+            .add("currentVersion", currentVersion)
+            .toString();
+    }
+
+    /**
+     * Creates a new builder instance.
+     *
+     * @return builder.
+     */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * DatabaseUpdate builder.
+     *
+     */
+    public static final class Builder {
+
+        private DatabaseUpdate update = new DatabaseUpdate();
+
+        public DatabaseUpdate build() {
+            validateInputs();
+            return update;
+        }
+
+        public Builder withType(Type type) {
+            update.type = checkNotNull(type, "type cannot be null");
+            return this;
+        }
+
+        public Builder withTableName(String tableName) {
+            update.tableName = checkNotNull(tableName, "tableName cannot be null");
+            return this;
+        }
+
+        public Builder withKey(String key) {
+            update.key = checkNotNull(key, "key cannot be null");
+            return this;
+        }
+
+        public Builder withCurrentValue(byte[] value) {
+            update.currentValue = checkNotNull(value, "currentValue cannot be null");
+            return this;
+        }
+
+        public Builder withValue(byte[] value) {
+            update.value = checkNotNull(value, "value cannot be null");
+            return this;
+        }
+
+        public Builder withCurrentVersion(long version) {
+            checkArgument(version >= 0, "version cannot be negative");
+            update.currentVersion = version;
+            return this;
+        }
+
+        private void validateInputs() {
+            checkNotNull(update.type, "type must be specified");
+            checkNotNull(update.tableName, "table name must be specified");
+            checkNotNull(update.key, "key must be specified");
+            switch (update.type) {
+            case PUT:
+            case PUT_IF_ABSENT:
+                checkNotNull(update.value, "value must be specified.");
+                break;
+            case PUT_IF_VERSION_MATCH:
+                checkNotNull(update.value, "value must be specified.");
+                checkState(update.currentVersion >= 0, "current version must be specified");
+                break;
+            case PUT_IF_VALUE_MATCH:
+                checkNotNull(update.value, "value must be specified.");
+                checkNotNull(update.currentValue, "currentValue must be specified.");
+                break;
+            case REMOVE:
+                break;
+            case REMOVE_IF_VERSION_MATCH:
+                checkState(update.currentVersion >= 0, "current version must be specified");
+                break;
+            case REMOVE_IF_VALUE_MATCH:
+                checkNotNull(update.currentValue, "currentValue must be specified.");
+                break;
+            default:
+                throw new IllegalStateException("Unknown operation type");
+            }
+        }
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
index 572867c..eb129fe 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.service;
 
+import java.util.Collection;
 import java.util.List;
 
 /**
@@ -35,4 +36,16 @@
      * @return list of map information
      */
     List<MapInfo> getMapInfo();
+
+    /**
+     * Returns all the transactions in the system.
+     *
+     * @return collection of transactions
+     */
+    Collection<Transaction> getTransactions();
+
+    /**
+     * Redrives stuck transactions while removing those that are done.
+     */
+    void redriveTransactions();
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index e165a95..5ea0420 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -29,13 +29,6 @@
 public interface StorageService {
 
     /**
-     * Creates a new transaction context.
-     *
-     * @return transaction context
-     */
-    TransactionContext createTransactionContext();
-
-    /**
      * Creates a new EventuallyConsistentMapBuilder.
      *
      * @param <K> key type
@@ -45,11 +38,11 @@
     <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
 
     /**
-     * Creates a new EventuallyConsistentMapBuilder.
+     * Creates a new ConsistentMapBuilder.
      *
      * @param <K> key type
      * @param <V> value type
-     * @return builder for an eventually consistent map
+     * @return builder for a consistent map
      */
     <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder();
 
@@ -60,4 +53,11 @@
      * @return builder for an distributed set
      */
     <E> SetBuilder<E> setBuilder();
-}
\ No newline at end of file
+
+    /**
+     * Creates a new transaction context.
+     *
+     * @return transaction context
+     */
+    TransactionContext createTransactionContext();
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/Transaction.java b/core/api/src/main/java/org/onosproject/store/service/Transaction.java
new file mode 100644
index 0000000..514e1ab
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/Transaction.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.List;
+
+/**
+ * An immutable transaction object.
+ */
+public interface Transaction {
+
+    public enum State {
+        /**
+         * Indicates a new transaction that is about to be prepared. All transactions
+         * start their life in this state.
+         */
+        PREPARING,
+
+        /**
+         * Indicates a transaction that is successfully prepared i.e. all participants voted to commit
+         */
+        PREPARED,
+
+        /**
+         * Indicates a transaction that is about to be committed.
+         */
+        COMMITTING,
+
+        /**
+         * Indicates a transaction that has successfully committed.
+         */
+        COMMITTED,
+
+        /**
+         * Indicates a transaction that is about to be rolled back.
+         */
+        ROLLINGBACK,
+
+        /**
+         * Indicates a transaction that has been rolled back and all locks are released.
+         */
+        ROLLEDBACK
+    }
+
+    /**
+     * Returns the transaction Id.
+     *
+     * @return transaction id
+     */
+    long id();
+
+    /**
+     * Returns the list of updates that are part of this transaction.
+     *
+     * @return list of database updates
+     */
+    List<DatabaseUpdate> updates();
+
+    /**
+     * Returns the current state of this transaction.
+     *
+     * @return transaction state
+     */
+    State state();
+
+    /**
+     * Returns true if this transaction has completed execution.
+     *
+     * @return true is yes, false otherwise
+     */
+    public default boolean isDone() {
+        return state() == State.COMMITTED || state() == State.ROLLEDBACK;
+    }
+
+    /**
+     * Returns a new transaction that is created by transitioning this one to the specified state.
+     *
+     * @param newState destination state
+     * @return a new transaction instance similar to the current one but its state set to specified state
+     */
+    Transaction transition(State newState);
+
+    /**
+     * Returns the system time when the transaction was last updated.
+     *
+     * @return last update time
+     */
+    public long lastUpdated();
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
index b404bae..94942e2 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
@@ -19,21 +19,31 @@
 /**
  * Provides a context for transactional operations.
  * <p>
- * A transaction context provides a boundary within which transactions
- * are run. It also is a place where all modifications made within a transaction
- * are cached until the point when the transaction commits or aborts. It thus ensures
- * isolation of work happening with in the transaction boundary.
- * <p>
  * A transaction context is a vehicle for grouping operations into a unit with the
  * properties of atomicity, isolation, and durability. Transactions also provide the
  * ability to maintain an application's invariants or integrity constraints,
  * supporting the property of consistency. Together these properties are known as ACID.
+ * <p>
+ * A transaction context provides a boundary within which transactions
+ * are run. It also is a place where all modifications made within a transaction
+ * are cached until the point when the transaction commits or aborts. It thus ensures
+ * isolation of work happening with in the transaction boundary. Within a transaction
+ * context isolation level is REPEATABLE_READS i.e. only data that is committed can be read.
+ * The only uncommitted data that can be read is the data modified by the current transaction.
  */
 public interface TransactionContext {
 
     /**
+     * Returns the unique transactionId.
+     *
+     * @return transaction id
+     */
+    long transactionId();
+
+    /**
      * Returns if this transaction context is open.
-     * @return true if open, false otherwise.
+     *
+     * @return true if open, false otherwise
      */
     boolean isOpen();
 
@@ -45,22 +55,24 @@
     /**
      * Commits a transaction that was previously started thereby making its changes permanent
      * and externally visible.
-     * @throws TransactionException if transaction fails to commit.
+     *
+     * @throws TransactionException if transaction fails to commit
      */
     void commit();
 
     /**
-     * Rolls back the current transaction, discarding all its changes.
+     * Aborts any changes made in this transaction context and discarding all locally cached updates.
      */
-    void rollback();
+    void abort();
 
     /**
-     * Creates a new transactional map.
+     * Returns a transactional map data structure with the specified name.
+     *
      * @param <K> key type
      * @param <V> value type
-     * @param mapName name of the transactional map.
-     * @param serializer serializer to use for encoding/decoding keys and vaulues.
-     * @return new Transactional Map.
+     * @param mapName name of the transactional map
+     * @param serializer serializer to use for encoding/decoding keys and values of the map
+     * @return Transactional Map
      */
-    <K, V> TransactionalMap<K, V> createTransactionalMap(String mapName, Serializer serializer);
+    <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName, Serializer serializer);
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionException.java b/core/api/src/main/java/org/onosproject/store/service/TransactionException.java
index 8261fbd..6135394 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionException.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionException.java
@@ -41,8 +41,14 @@
     }
 
     /**
-     * Transaction failure due to optimistic concurrency failure.
+     * Transaction failure due to optimistic concurrency violation.
      */
     public static class OptimisticConcurrencyFailure extends TransactionException {
     }
+
+    /**
+     * Transaction failure due to a conflicting transaction in progress.
+     */
+    public static class ConcurrentModification extends TransactionException {
+    }
 }
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java b/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java
index c59600c..ebc6611 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionalMap.java
@@ -16,15 +16,12 @@
 
 package org.onosproject.store.service;
 
-import java.util.Collection;
-import java.util.Set;
-import java.util.Map.Entry;
 
 /**
  * Transactional Map data structure.
  * <p>
- * A TransactionalMap is created by invoking {@link TransactionContext#createTransactionalMap createTransactionalMap}
- * method. All operations performed on this map with in a transaction boundary are invisible externally
+ * A TransactionalMap is created by invoking {@link TransactionContext#getTransactionalMap getTransactionalMap}
+ * method. All operations performed on this map within a transaction boundary are invisible externally
  * until the point when the transaction commits. A commit usually succeeds in the absence of conflicts.
  *
  * @param <K> type of key.
@@ -33,36 +30,6 @@
 public interface TransactionalMap<K, V> {
 
     /**
-     * Returns the number of entries in the map.
-     *
-     * @return map size.
-     */
-    int size();
-
-    /**
-     * Returns true if the map is empty.
-     *
-     * @return true if map has no entries, false otherwise.
-     */
-    boolean isEmpty();
-
-    /**
-     * Returns true if this map contains a mapping for the specified key.
-     *
-     * @param key key
-     * @return true if map contains key, false otherwise.
-     */
-    boolean containsKey(K key);
-
-    /**
-     * Returns true if this map contains the specified value.
-     *
-     * @param value value
-     * @return true if map contains value, false otherwise.
-     */
-    boolean containsValue(V value);
-
-    /**
      * Returns the value to which the specified key is mapped, or null if this
      * map contains no mapping for the key.
      *
@@ -94,45 +61,6 @@
     V remove(K key);
 
     /**
-     * Removes all of the mappings from this map (optional operation).
-     * The map will be empty after this call returns.
-     */
-    void clear();
-
-    /**
-     * Returns a Set view of the keys contained in this map.
-     * This method differs from the behavior of java.util.Map.keySet() in that
-     * what is returned is a unmodifiable snapshot view of the keys in the ConsistentMap.
-     * Attempts to modify the returned set, whether direct or via its iterator,
-     * result in an UnsupportedOperationException.
-     *
-     * @return a set of the keys contained in this map
-     */
-    Set<K> keySet();
-
-    /**
-     * Returns the collection of values contained in this map.
-     * This method differs from the behavior of java.util.Map.values() in that
-     * what is returned is a unmodifiable snapshot view of the values in the ConsistentMap.
-     * Attempts to modify the returned collection, whether direct or via its iterator,
-     * result in an UnsupportedOperationException.
-     *
-     * @return a collection of the values contained in this map
-     */
-    Collection<V> values();
-
-    /**
-     * Returns the set of entries contained in this map.
-     * This method differs from the behavior of java.util.Map.entrySet() in that
-     * what is returned is a unmodifiable snapshot view of the entries in the ConsistentMap.
-     * Attempts to modify the returned set, whether direct or via its iterator,
-     * result in an UnsupportedOperationException.
-     *
-     * @return set of entries contained in this map.
-     */
-    Set<Entry<K, V>> entrySet();
-
-    /**
      * If the specified key is not already associated with a value
      * associates it with the given value and returns null, else returns the current value.
      *
diff --git a/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java b/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
deleted file mode 100644
index b4b05fc..0000000
--- a/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.service;
-
-import static com.google.common.base.Preconditions.*;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Database update operation.
- *
- * @param <K> key type.
- * @param <V> value type.
- */
-public class UpdateOperation<K, V> {
-
-    /**
-     * Type of database update operation.
-     */
-    public static enum Type {
-        PUT,
-        PUT_IF_ABSENT,
-        PUT_IF_VERSION_MATCH,
-        PUT_IF_VALUE_MATCH,
-        REMOVE,
-        REMOVE_IF_VERSION_MATCH,
-        REMOVE_IF_VALUE_MATCH,
-    }
-
-    private Type type;
-    private String tableName;
-    private K key;
-    private V value;
-    private V currentValue;
-    private long currentVersion = -1;
-
-    /**
-     * Returns the type of update operation.
-     * @return type of update.
-     */
-    public Type type() {
-        return type;
-    }
-
-    /**
-     * Returns the tableName being updated.
-     * @return table name.
-     */
-    public String tableName() {
-        return tableName;
-    }
-
-    /**
-     * Returns the item key being updated.
-     * @return item key
-     */
-    public K key() {
-        return key;
-    }
-
-    /**
-     * Returns the new value.
-     * @return item's target value.
-     */
-    public V value() {
-        return value;
-    }
-
-    /**
-     * Returns the expected current value in the database value for the key.
-     * @return current value in database.
-     */
-    public V currentValue() {
-        return currentValue;
-    }
-
-    /**
-     * Returns the expected current version in the database for the key.
-     * @return expected version.
-     */
-    public long currentVersion() {
-        return currentVersion;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-            .add("type", type)
-            .add("tableName", tableName)
-            .add("key", key)
-            .add("value", value)
-            .add("currentValue", currentValue)
-            .add("currentVersion", currentVersion)
-            .toString();
-    }
-
-    /**
-     * Creates a new builder instance.
-     * @param <K> key type.
-     * @param <V> value type.
-     *
-     * @return builder.
-     */
-    public static <K, V> Builder<K, V> newBuilder() {
-        return new Builder<>();
-    }
-
-    /**
-     * UpdatOperation builder.
-     *
-     * @param <K> key type.
-     * @param <V> value type.
-     */
-    public static final class Builder<K, V> {
-
-        private UpdateOperation<K, V> operation = new UpdateOperation<>();
-
-        public UpdateOperation<K, V> build() {
-            validateInputs();
-            return operation;
-        }
-
-        public Builder<K, V> withType(Type type) {
-            operation.type = checkNotNull(type, "type cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withTableName(String tableName) {
-            operation.tableName = checkNotNull(tableName, "tableName cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withKey(K key) {
-            operation.key = checkNotNull(key, "key cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withCurrentValue(V value) {
-            operation.currentValue = checkNotNull(value, "currentValue cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withValue(V value) {
-            operation.value = checkNotNull(value, "value cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withCurrentVersion(long version) {
-            checkArgument(version >= 0, "version cannot be negative");
-            operation.currentVersion = version;
-            return this;
-        }
-
-        private void validateInputs() {
-            checkNotNull(operation.type, "type must be specified");
-            checkNotNull(operation.tableName, "table name must be specified");
-            checkNotNull(operation.key, "key must be specified");
-            switch (operation.type) {
-            case PUT:
-            case PUT_IF_ABSENT:
-                checkNotNull(operation.value, "value must be specified.");
-                break;
-            case PUT_IF_VERSION_MATCH:
-                checkNotNull(operation.value, "value must be specified.");
-                checkState(operation.currentVersion >= 0, "current version must be specified");
-                break;
-            case PUT_IF_VALUE_MATCH:
-                checkNotNull(operation.value, "value must be specified.");
-                checkNotNull(operation.currentValue, "currentValue must be specified.");
-                break;
-            case REMOVE:
-                break;
-            case REMOVE_IF_VERSION_MATCH:
-                checkState(operation.currentVersion >= 0, "current version must be specified");
-                break;
-            case REMOVE_IF_VALUE_MATCH:
-                checkNotNull(operation.currentValue, "currentValue must be specified.");
-                break;
-            default:
-                throw new IllegalStateException("Unknown operation type");
-            }
-        }
-    }
-}