[ONOS-6342] Refactor transaction architecture to support a shared cache for transactional primitives

Change-Id: I2a17965100895f5aa4d2202028047bb980c11d26
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java
new file mode 100644
index 0000000..9245a03
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.HashMap;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.primitives.PartitionService;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Transaction manager test.
+ */
+public class TransactionManagerTest {
+
+    @Test
+    public void testTransactionMapCaching() throws Exception {
+        AsyncConsistentMap asyncMap = mock(AsyncConsistentMap.class);
+        expect(asyncMap.name()).andReturn("foo");
+        expect(asyncMap.addListener(anyObject(MapEventListener.class), anyObject(Executor.class)))
+                .andReturn(CompletableFuture.completedFuture(null)).anyTimes();
+        asyncMap.addStatusChangeListener(anyObject(Consumer.class));
+        expectLastCall().anyTimes();
+        expect(asyncMap.entrySet()).andReturn(CompletableFuture.completedFuture(new HashMap<>().entrySet())).anyTimes();
+
+        ConsistentMapBuilder mapBuilder = mock(ConsistentMapBuilder.class);
+        expect(mapBuilder.withName(anyString())).andReturn(mapBuilder).anyTimes();
+        expect(mapBuilder.withSerializer(anyObject(Serializer.class))).andReturn(mapBuilder).anyTimes();
+        expect(mapBuilder.buildAsyncMap()).andReturn(asyncMap).anyTimes();
+
+        DistributedPrimitiveCreator primitiveCreator = mock(DistributedPrimitiveCreator.class);
+        expect(primitiveCreator.newAsyncConsistentMap(anyString(), anyObject(Serializer.class)))
+                .andReturn(asyncMap).anyTimes();
+
+        StorageService storageService = mock(StorageService.class);
+        expect(storageService.consistentMapBuilder()).andReturn(mapBuilder);
+
+        PartitionService partitionService = mock(PartitionService.class);
+        Set<PartitionId> partitionIds = Sets.newHashSet(PartitionId.from(1), PartitionId.from(2), PartitionId.from(3));
+        expect(partitionService.getAllPartitionIds())
+                .andReturn(partitionIds).anyTimes();
+        expect(partitionService.getNumberOfPartitions())
+                .andReturn(partitionIds.size()).anyTimes();
+        expect(partitionService.getDistributedPrimitiveCreator(anyObject(PartitionId.class)))
+                .andReturn(primitiveCreator).anyTimes();
+
+        replay(storageService, partitionService, asyncMap, primitiveCreator, mapBuilder);
+
+        TransactionManager transactionManager = new TransactionManager(storageService, partitionService);
+        TransactionId transactionId = TransactionId.from(UUID.randomUUID().toString());
+        TransactionCoordinator transactionCoordinator = new TransactionCoordinator(transactionId, transactionManager);
+        Serializer serializer = Serializer.using(KryoNamespaces.API);
+
+        PartitionedTransactionalMap<String, String> transactionalMap1 = (PartitionedTransactionalMap)
+                transactionManager.getTransactionalMap("foo", serializer, transactionCoordinator);
+        PartitionedTransactionalMap<String, String> transactionalMap2 = (PartitionedTransactionalMap)
+                transactionManager.getTransactionalMap("foo", serializer, transactionCoordinator);
+
+        assertSame(transactionalMap1.partitions.get(PartitionId.from(1)).transaction.transactionalObject,
+                transactionalMap2.partitions.get(PartitionId.from(1)).transaction.transactionalObject);
+        assertSame(transactionalMap1.partitions.get(PartitionId.from(2)).transaction.transactionalObject,
+                transactionalMap2.partitions.get(PartitionId.from(2)).transaction.transactionalObject);
+        assertSame(transactionalMap1.partitions.get(PartitionId.from(3)).transaction.transactionalObject,
+                transactionalMap2.partitions.get(PartitionId.from(3)).transaction.transactionalObject);
+    }
+
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
new file mode 100644
index 0000000..533ea97
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
@@ -0,0 +1,397 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.hash.Hashing;
+import org.junit.Test;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.CommitStatus;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
+import static junit.framework.TestCase.assertNull;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.strictMock;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Transaction test.
+ */
+public class TransactionTest {
+
+    @Test
+    public void testTransaction() throws Exception {
+        AsyncConsistentMap<String, String> asyncMap = strictMock(AsyncConsistentMap.class);
+        TransactionId transactionId = TransactionId.from("foo");
+        List<MapUpdate<String, String>> updates = Collections.singletonList(new MapUpdate<>());
+        Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
+        assertEquals(transactionId, transaction.transactionId());
+
+        expect(asyncMap.begin(transactionId))
+                .andReturn(CompletableFuture.completedFuture(new Version(1)));
+        expect(asyncMap.prepare(new TransactionLog<>(transactionId, updates)))
+                .andReturn(CompletableFuture.completedFuture(true));
+        expect(asyncMap.commit(transactionId))
+                .andReturn(CompletableFuture.completedFuture(null));
+        replay(asyncMap);
+
+        assertEquals(Transaction.State.ACTIVE, transaction.state());
+        assertEquals(1, transaction.begin().join().value());
+        assertEquals(Transaction.State.ACTIVE, transaction.state());
+        assertTrue(transaction.prepare(updates).join());
+        assertEquals(Transaction.State.PREPARED, transaction.state());
+        transaction.commit();
+        assertEquals(Transaction.State.COMMITTED, transaction.state());
+        verify(asyncMap);
+    }
+
+    @Test
+    public void testTransactionFailOnOutOfOrderCalls() throws Exception {
+        AsyncConsistentMap<String, String> asyncMap = strictMock(AsyncConsistentMap.class);
+        TransactionId transactionId = TransactionId.from("foo");
+        List<MapUpdate<String, String>> updates = Collections.singletonList(new MapUpdate<>());
+        Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
+
+        try {
+            transaction.prepare(updates);
+            fail();
+        } catch (IllegalStateException e) {
+        }
+
+        try {
+            transaction.commit();
+            fail();
+        } catch (IllegalStateException e) {
+        }
+
+        try {
+            transaction.rollback();
+            fail();
+        } catch (IllegalStateException e) {
+        }
+
+        expect(asyncMap.begin(transactionId))
+                .andReturn(CompletableFuture.completedFuture(new Version(1)));
+        expect(asyncMap.prepare(new TransactionLog<>(transactionId, updates)))
+                .andReturn(CompletableFuture.completedFuture(true));
+        replay(asyncMap);
+
+        assertFalse(transaction.isOpen());
+        assertEquals(Transaction.State.ACTIVE, transaction.state());
+        assertEquals(1, transaction.begin().join().value());
+        assertTrue(transaction.isOpen());
+        assertEquals(Transaction.State.ACTIVE, transaction.state());
+        assertTrue(transaction.prepare(updates).join());
+        assertEquals(Transaction.State.PREPARED, transaction.state());
+
+        try {
+            transaction.begin();
+            fail();
+        } catch (IllegalStateException e) {
+        }
+        verify(asyncMap);
+    }
+
+    @Test
+    public void testCoordinatedMapTransaction() throws Exception {
+        List<Object> mocks = new ArrayList<>();
+
+        Map<PartitionId, DefaultTransactionalMapParticipant<String, String>> participants = new HashMap<>();
+        List<PartitionId> sortedParticipants = new ArrayList<>();
+        TransactionId transactionId = TransactionId.from(UUID.randomUUID().toString());
+        for (int i = 1; i <= 3; i++) {
+            AsyncConsistentMap<String, String> asyncMap = mock(AsyncConsistentMap.class);
+            mocks.add(asyncMap);
+
+            ConsistentMap<String, String> consistentMap = new TestConsistentMap<>();
+            Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
+            PartitionId partitionId = PartitionId.from(i);
+            participants.put(partitionId, new DefaultTransactionalMapParticipant<>(consistentMap, transaction));
+            sortedParticipants.add(partitionId);
+        }
+
+        expect(participants.get(PartitionId.from(1)).transaction.transactionalObject
+                .begin(anyObject(TransactionId.class)))
+                .andReturn(CompletableFuture.completedFuture(new Version(1)));
+
+        expect(participants.get(PartitionId.from(1)).transaction.transactionalObject.prepare(
+                new TransactionLog<>(transactionId, Arrays.asList(
+                        MapUpdate.<String, String>newBuilder()
+                                .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                                .withKey("foo")
+                                .withCurrentVersion(1)
+                                .build(),
+                        MapUpdate.<String, String>newBuilder()
+                                .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                                .withKey("baz")
+                                .withCurrentVersion(2)
+                                .build()
+                )))).andReturn(CompletableFuture.completedFuture(true));
+
+        expect(participants.get(PartitionId.from(1)).transaction.transactionalObject.commit(transactionId))
+                .andReturn(CompletableFuture.completedFuture(null));
+
+        expect(participants.get(PartitionId.from(3)).transaction.transactionalObject
+                .begin(anyObject(TransactionId.class)))
+                .andReturn(CompletableFuture.completedFuture(new Version(1)));
+
+        expect(participants.get(PartitionId.from(3)).transaction.transactionalObject.prepare(
+                new TransactionLog<>(transactionId, Arrays.asList(
+                        MapUpdate.<String, String>newBuilder()
+                                .withType(MapUpdate.Type.PUT_IF_ABSENT)
+                                .withKey("bar")
+                                .withValue("baz")
+                                .build()
+                )))).andReturn(CompletableFuture.completedFuture(true));
+
+        expect(participants.get(PartitionId.from(3)).transaction.transactionalObject.commit(transactionId))
+                .andReturn(CompletableFuture.completedFuture(null));
+
+        TransactionManager transactionManager = mock(TransactionManager.class);
+        expect(transactionManager.updateState(anyObject(TransactionId.class), anyObject(Transaction.State.class)))
+                .andReturn(CompletableFuture.completedFuture(null))
+                .anyTimes();
+        expect(transactionManager.remove(anyObject(TransactionId.class)))
+                .andReturn(CompletableFuture.completedFuture(null))
+                .anyTimes();
+        mocks.add(transactionManager);
+
+        TransactionCoordinator transactionCoordinator = new TransactionCoordinator(transactionId, transactionManager);
+
+        Hasher<String> hasher = key -> {
+            int hashCode = Hashing.sha256().hashBytes(key.getBytes()).asInt();
+            return sortedParticipants.get(Math.abs(hashCode) % sortedParticipants.size());
+        };
+
+        expect(transactionManager.<String, String>getTransactionalMap(anyString(), anyObject(), anyObject()))
+                .andReturn(new PartitionedTransactionalMap(participants, hasher));
+
+        replay(mocks.toArray());
+
+        PartitionedTransactionalMap<String, String> transactionalMap = (PartitionedTransactionalMap)
+                transactionCoordinator.getTransactionalMap("foo", Serializer.using(KryoNamespaces.API));
+
+        // Sneak a couple entries in the first partition.
+        transactionalMap.partitions.get(PartitionId.from(1)).backingMap.put("foo", "bar");
+        transactionalMap.partitions.get(PartitionId.from(1)).backingMap.put("baz", "foo");
+
+        assertTrue(transactionalMap.containsKey("foo"));
+        assertEquals("bar", transactionalMap.remove("foo"));
+        assertFalse(transactionalMap.containsKey("bar"));
+        assertNull(transactionalMap.put("bar", "baz"));
+        assertTrue(transactionalMap.containsKey("bar"));
+        assertTrue(transactionalMap.containsKey("baz"));
+        assertFalse(transactionalMap.remove("baz", "baz"));
+        assertTrue(transactionalMap.remove("baz", "foo"));
+        assertFalse(transactionalMap.containsKey("baz"));
+
+        assertEquals(CommitStatus.SUCCESS, transactionCoordinator.commit().join());
+        verify(mocks.toArray());
+    }
+
+    private static class TestConsistentMap<K, V> implements ConsistentMap<K, V> {
+        private final Map<K, Versioned<V>> map = new HashMap<>();
+        private final AtomicLong version = new AtomicLong();
+
+        @Override
+        public String name() {
+            return null;
+        }
+
+        @Override
+        public Type primitiveType() {
+            return Type.CONSISTENT_MAP;
+        }
+
+        private long nextVersion() {
+            return version.incrementAndGet();
+        }
+
+        @Override
+        public int size() {
+            return map.size();
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return map.isEmpty();
+        }
+
+        @Override
+        public boolean containsKey(K key) {
+            return map.containsKey(key);
+        }
+
+        @Override
+        public boolean containsValue(V value) {
+            return map.containsValue(value);
+        }
+
+        @Override
+        public Versioned<V> get(K key) {
+            return map.get(key);
+        }
+
+        @Override
+        public Versioned<V> getOrDefault(K key, V defaultValue) {
+            return map.getOrDefault(key, new Versioned<>(defaultValue, 0));
+        }
+
+        @Override
+        public Versioned<V> computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Versioned<V> compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Versioned<V> computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Versioned<V> computeIf(K key,
+                Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Versioned<V> put(K key, V value) {
+            return map.put(key, new Versioned<>(value, nextVersion()));
+        }
+
+        @Override
+        public Versioned<V> putAndGet(K key, V value) {
+            return put(key, value);
+        }
+
+        @Override
+        public Versioned<V> remove(K key) {
+            return map.remove(key);
+        }
+
+        @Override
+        public void clear() {
+            map.clear();
+        }
+
+        @Override
+        public Set<K> keySet() {
+            return map.keySet();
+        }
+
+        @Override
+        public Collection<Versioned<V>> values() {
+            return map.values();
+        }
+
+        @Override
+        public Set<Map.Entry<K, Versioned<V>>> entrySet() {
+            return map.entrySet();
+        }
+
+        @Override
+        public Versioned<V> putIfAbsent(K key, V value) {
+            return map.putIfAbsent(key, new Versioned<>(value, nextVersion()));
+        }
+
+        @Override
+        public boolean remove(K key, V value) {
+            return map.remove(key, value);
+        }
+
+        @Override
+        public boolean remove(K key, long version) {
+            Versioned<V> value = map.get(key);
+            if (value != null && value.version() == version) {
+                map.remove(key);
+                return true;
+            }
+            return false;
+        }
+
+        @Override
+        public Versioned<V> replace(K key, V value) {
+            return map.replace(key, new Versioned<>(value, nextVersion()));
+        }
+
+        @Override
+        public boolean replace(K key, V oldValue, V newValue) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean replace(K key, long oldVersion, V newValue) {
+            Versioned<V> value = map.get(key);
+            if (value != null && value.version() == oldVersion) {
+                map.put(key, new Versioned<>(newValue, nextVersion()));
+                return true;
+            }
+            return false;
+        }
+
+        @Override
+        public void addListener(MapEventListener<K, V> listener, Executor executor) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void removeListener(MapEventListener<K, V> listener) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Map<K, V> asJavaMap() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 23d528b..ac4c60d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -26,7 +26,7 @@
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Arrays;
@@ -366,7 +366,8 @@
                 .withValue(value1)
                 .build();
 
-        MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
+        TransactionLog<MapUpdate<String, byte[]>> tx =
+                new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
 
         map.prepare(tx).thenAccept(result -> {
             assertEquals(true, result);
@@ -416,7 +417,7 @@
                 .withCurrentVersion(currFooVersion)
                 .build();
 
-        tx = new MapTransaction<>(TransactionId.from("tx2"), Arrays.asList(remove1));
+        tx = new TransactionLog<>(TransactionId.from("tx2"), Arrays.asList(remove1));
 
         map.prepare(tx).thenAccept(result -> {
             assertTrue("prepare should succeed", result);
@@ -459,7 +460,8 @@
                 .withKey("foo")
                 .withValue(value1)
                 .build();
-        MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
+        TransactionLog<MapUpdate<String, byte[]>> tx =
+                new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
         map.prepare(tx).thenAccept(result -> {
             assertEquals(true, result);
         }).join();