[ONOS-6342] Refactor transaction architecture to support a shared cache for transactional primitives
Change-Id: I2a17965100895f5aa4d2202028047bb980c11d26
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java
new file mode 100644
index 0000000..9245a03
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.HashMap;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.primitives.PartitionService;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Transaction manager test.
+ */
+public class TransactionManagerTest {
+
+ @Test
+ public void testTransactionMapCaching() throws Exception {
+ AsyncConsistentMap asyncMap = mock(AsyncConsistentMap.class);
+ expect(asyncMap.name()).andReturn("foo");
+ expect(asyncMap.addListener(anyObject(MapEventListener.class), anyObject(Executor.class)))
+ .andReturn(CompletableFuture.completedFuture(null)).anyTimes();
+ asyncMap.addStatusChangeListener(anyObject(Consumer.class));
+ expectLastCall().anyTimes();
+ expect(asyncMap.entrySet()).andReturn(CompletableFuture.completedFuture(new HashMap<>().entrySet())).anyTimes();
+
+ ConsistentMapBuilder mapBuilder = mock(ConsistentMapBuilder.class);
+ expect(mapBuilder.withName(anyString())).andReturn(mapBuilder).anyTimes();
+ expect(mapBuilder.withSerializer(anyObject(Serializer.class))).andReturn(mapBuilder).anyTimes();
+ expect(mapBuilder.buildAsyncMap()).andReturn(asyncMap).anyTimes();
+
+ DistributedPrimitiveCreator primitiveCreator = mock(DistributedPrimitiveCreator.class);
+ expect(primitiveCreator.newAsyncConsistentMap(anyString(), anyObject(Serializer.class)))
+ .andReturn(asyncMap).anyTimes();
+
+ StorageService storageService = mock(StorageService.class);
+ expect(storageService.consistentMapBuilder()).andReturn(mapBuilder);
+
+ PartitionService partitionService = mock(PartitionService.class);
+ Set<PartitionId> partitionIds = Sets.newHashSet(PartitionId.from(1), PartitionId.from(2), PartitionId.from(3));
+ expect(partitionService.getAllPartitionIds())
+ .andReturn(partitionIds).anyTimes();
+ expect(partitionService.getNumberOfPartitions())
+ .andReturn(partitionIds.size()).anyTimes();
+ expect(partitionService.getDistributedPrimitiveCreator(anyObject(PartitionId.class)))
+ .andReturn(primitiveCreator).anyTimes();
+
+ replay(storageService, partitionService, asyncMap, primitiveCreator, mapBuilder);
+
+ TransactionManager transactionManager = new TransactionManager(storageService, partitionService);
+ TransactionId transactionId = TransactionId.from(UUID.randomUUID().toString());
+ TransactionCoordinator transactionCoordinator = new TransactionCoordinator(transactionId, transactionManager);
+ Serializer serializer = Serializer.using(KryoNamespaces.API);
+
+ PartitionedTransactionalMap<String, String> transactionalMap1 = (PartitionedTransactionalMap)
+ transactionManager.getTransactionalMap("foo", serializer, transactionCoordinator);
+ PartitionedTransactionalMap<String, String> transactionalMap2 = (PartitionedTransactionalMap)
+ transactionManager.getTransactionalMap("foo", serializer, transactionCoordinator);
+
+ assertSame(transactionalMap1.partitions.get(PartitionId.from(1)).transaction.transactionalObject,
+ transactionalMap2.partitions.get(PartitionId.from(1)).transaction.transactionalObject);
+ assertSame(transactionalMap1.partitions.get(PartitionId.from(2)).transaction.transactionalObject,
+ transactionalMap2.partitions.get(PartitionId.from(2)).transaction.transactionalObject);
+ assertSame(transactionalMap1.partitions.get(PartitionId.from(3)).transaction.transactionalObject,
+ transactionalMap2.partitions.get(PartitionId.from(3)).transaction.transactionalObject);
+ }
+
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
new file mode 100644
index 0000000..533ea97
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionTest.java
@@ -0,0 +1,397 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.hash.Hashing;
+import org.junit.Test;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.CommitStatus;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
+import static junit.framework.TestCase.assertNull;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.strictMock;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Transaction test.
+ */
+public class TransactionTest {
+
+ @Test
+ public void testTransaction() throws Exception {
+ AsyncConsistentMap<String, String> asyncMap = strictMock(AsyncConsistentMap.class);
+ TransactionId transactionId = TransactionId.from("foo");
+ List<MapUpdate<String, String>> updates = Collections.singletonList(new MapUpdate<>());
+ Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
+ assertEquals(transactionId, transaction.transactionId());
+
+ expect(asyncMap.begin(transactionId))
+ .andReturn(CompletableFuture.completedFuture(new Version(1)));
+ expect(asyncMap.prepare(new TransactionLog<>(transactionId, updates)))
+ .andReturn(CompletableFuture.completedFuture(true));
+ expect(asyncMap.commit(transactionId))
+ .andReturn(CompletableFuture.completedFuture(null));
+ replay(asyncMap);
+
+ assertEquals(Transaction.State.ACTIVE, transaction.state());
+ assertEquals(1, transaction.begin().join().value());
+ assertEquals(Transaction.State.ACTIVE, transaction.state());
+ assertTrue(transaction.prepare(updates).join());
+ assertEquals(Transaction.State.PREPARED, transaction.state());
+ transaction.commit();
+ assertEquals(Transaction.State.COMMITTED, transaction.state());
+ verify(asyncMap);
+ }
+
+ @Test
+ public void testTransactionFailOnOutOfOrderCalls() throws Exception {
+ AsyncConsistentMap<String, String> asyncMap = strictMock(AsyncConsistentMap.class);
+ TransactionId transactionId = TransactionId.from("foo");
+ List<MapUpdate<String, String>> updates = Collections.singletonList(new MapUpdate<>());
+ Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
+
+ try {
+ transaction.prepare(updates);
+ fail();
+ } catch (IllegalStateException e) {
+ }
+
+ try {
+ transaction.commit();
+ fail();
+ } catch (IllegalStateException e) {
+ }
+
+ try {
+ transaction.rollback();
+ fail();
+ } catch (IllegalStateException e) {
+ }
+
+ expect(asyncMap.begin(transactionId))
+ .andReturn(CompletableFuture.completedFuture(new Version(1)));
+ expect(asyncMap.prepare(new TransactionLog<>(transactionId, updates)))
+ .andReturn(CompletableFuture.completedFuture(true));
+ replay(asyncMap);
+
+ assertFalse(transaction.isOpen());
+ assertEquals(Transaction.State.ACTIVE, transaction.state());
+ assertEquals(1, transaction.begin().join().value());
+ assertTrue(transaction.isOpen());
+ assertEquals(Transaction.State.ACTIVE, transaction.state());
+ assertTrue(transaction.prepare(updates).join());
+ assertEquals(Transaction.State.PREPARED, transaction.state());
+
+ try {
+ transaction.begin();
+ fail();
+ } catch (IllegalStateException e) {
+ }
+ verify(asyncMap);
+ }
+
+ @Test
+ public void testCoordinatedMapTransaction() throws Exception {
+ List<Object> mocks = new ArrayList<>();
+
+ Map<PartitionId, DefaultTransactionalMapParticipant<String, String>> participants = new HashMap<>();
+ List<PartitionId> sortedParticipants = new ArrayList<>();
+ TransactionId transactionId = TransactionId.from(UUID.randomUUID().toString());
+ for (int i = 1; i <= 3; i++) {
+ AsyncConsistentMap<String, String> asyncMap = mock(AsyncConsistentMap.class);
+ mocks.add(asyncMap);
+
+ ConsistentMap<String, String> consistentMap = new TestConsistentMap<>();
+ Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
+ PartitionId partitionId = PartitionId.from(i);
+ participants.put(partitionId, new DefaultTransactionalMapParticipant<>(consistentMap, transaction));
+ sortedParticipants.add(partitionId);
+ }
+
+ expect(participants.get(PartitionId.from(1)).transaction.transactionalObject
+ .begin(anyObject(TransactionId.class)))
+ .andReturn(CompletableFuture.completedFuture(new Version(1)));
+
+ expect(participants.get(PartitionId.from(1)).transaction.transactionalObject.prepare(
+ new TransactionLog<>(transactionId, Arrays.asList(
+ MapUpdate.<String, String>newBuilder()
+ .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ .withKey("foo")
+ .withCurrentVersion(1)
+ .build(),
+ MapUpdate.<String, String>newBuilder()
+ .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ .withKey("baz")
+ .withCurrentVersion(2)
+ .build()
+ )))).andReturn(CompletableFuture.completedFuture(true));
+
+ expect(participants.get(PartitionId.from(1)).transaction.transactionalObject.commit(transactionId))
+ .andReturn(CompletableFuture.completedFuture(null));
+
+ expect(participants.get(PartitionId.from(3)).transaction.transactionalObject
+ .begin(anyObject(TransactionId.class)))
+ .andReturn(CompletableFuture.completedFuture(new Version(1)));
+
+ expect(participants.get(PartitionId.from(3)).transaction.transactionalObject.prepare(
+ new TransactionLog<>(transactionId, Arrays.asList(
+ MapUpdate.<String, String>newBuilder()
+ .withType(MapUpdate.Type.PUT_IF_ABSENT)
+ .withKey("bar")
+ .withValue("baz")
+ .build()
+ )))).andReturn(CompletableFuture.completedFuture(true));
+
+ expect(participants.get(PartitionId.from(3)).transaction.transactionalObject.commit(transactionId))
+ .andReturn(CompletableFuture.completedFuture(null));
+
+ TransactionManager transactionManager = mock(TransactionManager.class);
+ expect(transactionManager.updateState(anyObject(TransactionId.class), anyObject(Transaction.State.class)))
+ .andReturn(CompletableFuture.completedFuture(null))
+ .anyTimes();
+ expect(transactionManager.remove(anyObject(TransactionId.class)))
+ .andReturn(CompletableFuture.completedFuture(null))
+ .anyTimes();
+ mocks.add(transactionManager);
+
+ TransactionCoordinator transactionCoordinator = new TransactionCoordinator(transactionId, transactionManager);
+
+ Hasher<String> hasher = key -> {
+ int hashCode = Hashing.sha256().hashBytes(key.getBytes()).asInt();
+ return sortedParticipants.get(Math.abs(hashCode) % sortedParticipants.size());
+ };
+
+ expect(transactionManager.<String, String>getTransactionalMap(anyString(), anyObject(), anyObject()))
+ .andReturn(new PartitionedTransactionalMap(participants, hasher));
+
+ replay(mocks.toArray());
+
+ PartitionedTransactionalMap<String, String> transactionalMap = (PartitionedTransactionalMap)
+ transactionCoordinator.getTransactionalMap("foo", Serializer.using(KryoNamespaces.API));
+
+ // Sneak a couple entries in the first partition.
+ transactionalMap.partitions.get(PartitionId.from(1)).backingMap.put("foo", "bar");
+ transactionalMap.partitions.get(PartitionId.from(1)).backingMap.put("baz", "foo");
+
+ assertTrue(transactionalMap.containsKey("foo"));
+ assertEquals("bar", transactionalMap.remove("foo"));
+ assertFalse(transactionalMap.containsKey("bar"));
+ assertNull(transactionalMap.put("bar", "baz"));
+ assertTrue(transactionalMap.containsKey("bar"));
+ assertTrue(transactionalMap.containsKey("baz"));
+ assertFalse(transactionalMap.remove("baz", "baz"));
+ assertTrue(transactionalMap.remove("baz", "foo"));
+ assertFalse(transactionalMap.containsKey("baz"));
+
+ assertEquals(CommitStatus.SUCCESS, transactionCoordinator.commit().join());
+ verify(mocks.toArray());
+ }
+
+ private static class TestConsistentMap<K, V> implements ConsistentMap<K, V> {
+ private final Map<K, Versioned<V>> map = new HashMap<>();
+ private final AtomicLong version = new AtomicLong();
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public Type primitiveType() {
+ return Type.CONSISTENT_MAP;
+ }
+
+ private long nextVersion() {
+ return version.incrementAndGet();
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(K key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(V value) {
+ return map.containsValue(value);
+ }
+
+ @Override
+ public Versioned<V> get(K key) {
+ return map.get(key);
+ }
+
+ @Override
+ public Versioned<V> getOrDefault(K key, V defaultValue) {
+ return map.getOrDefault(key, new Versioned<>(defaultValue, 0));
+ }
+
+ @Override
+ public Versioned<V> computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Versioned<V> compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Versioned<V> computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Versioned<V> computeIf(K key,
+ Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Versioned<V> put(K key, V value) {
+ return map.put(key, new Versioned<>(value, nextVersion()));
+ }
+
+ @Override
+ public Versioned<V> putAndGet(K key, V value) {
+ return put(key, value);
+ }
+
+ @Override
+ public Versioned<V> remove(K key) {
+ return map.remove(key);
+ }
+
+ @Override
+ public void clear() {
+ map.clear();
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return map.keySet();
+ }
+
+ @Override
+ public Collection<Versioned<V>> values() {
+ return map.values();
+ }
+
+ @Override
+ public Set<Map.Entry<K, Versioned<V>>> entrySet() {
+ return map.entrySet();
+ }
+
+ @Override
+ public Versioned<V> putIfAbsent(K key, V value) {
+ return map.putIfAbsent(key, new Versioned<>(value, nextVersion()));
+ }
+
+ @Override
+ public boolean remove(K key, V value) {
+ return map.remove(key, value);
+ }
+
+ @Override
+ public boolean remove(K key, long version) {
+ Versioned<V> value = map.get(key);
+ if (value != null && value.version() == version) {
+ map.remove(key);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Versioned<V> replace(K key, V value) {
+ return map.replace(key, new Versioned<>(value, nextVersion()));
+ }
+
+ @Override
+ public boolean replace(K key, V oldValue, V newValue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean replace(K key, long oldVersion, V newValue) {
+ Versioned<V> value = map.get(key);
+ if (value != null && value.version() == oldVersion) {
+ map.put(key, new Versioned<>(newValue, nextVersion()));
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void addListener(MapEventListener<K, V> listener, Executor executor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void removeListener(MapEventListener<K, V> listener) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<K, V> asJavaMap() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 23d528b..ac4c60d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -26,7 +26,7 @@
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Versioned;
import java.util.Arrays;
@@ -366,7 +366,8 @@
.withValue(value1)
.build();
- MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
+ TransactionLog<MapUpdate<String, byte[]>> tx =
+ new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
map.prepare(tx).thenAccept(result -> {
assertEquals(true, result);
@@ -416,7 +417,7 @@
.withCurrentVersion(currFooVersion)
.build();
- tx = new MapTransaction<>(TransactionId.from("tx2"), Arrays.asList(remove1));
+ tx = new TransactionLog<>(TransactionId.from("tx2"), Arrays.asList(remove1));
map.prepare(tx).thenAccept(result -> {
assertTrue("prepare should succeed", result);
@@ -459,7 +460,8 @@
.withKey("foo")
.withValue(value1)
.build();
- MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
+ TransactionLog<MapUpdate<String, byte[]>> tx =
+ new TransactionLog<>(TransactionId.from("tx1"), Arrays.asList(update1));
map.prepare(tx).thenAccept(result -> {
assertEquals(true, result);
}).join();