blob: b686687d7c6f6f0c70a6c86b909b4fb2747d9f03 [file] [log] [blame]
Jordan Halterman948d6592017-04-20 17:18:24 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman948d6592017-04-20 17:18:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.ArrayList;
19import java.util.Arrays;
20import java.util.Collection;
21import java.util.Collections;
22import java.util.HashMap;
23import java.util.List;
24import java.util.Map;
25import java.util.Set;
26import java.util.UUID;
27import java.util.concurrent.CompletableFuture;
28import java.util.concurrent.Executor;
29import java.util.concurrent.atomic.AtomicLong;
30import java.util.function.BiFunction;
31import java.util.function.Function;
32import java.util.function.Predicate;
33
34import com.google.common.hash.Hashing;
35import org.junit.Test;
36import org.onosproject.cluster.PartitionId;
37import org.onosproject.store.primitives.MapUpdate;
38import org.onosproject.store.primitives.TransactionId;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.service.AsyncConsistentMap;
41import org.onosproject.store.service.CommitStatus;
42import org.onosproject.store.service.ConsistentMap;
43import org.onosproject.store.service.MapEventListener;
44import org.onosproject.store.service.Serializer;
45import org.onosproject.store.service.TransactionLog;
46import org.onosproject.store.service.Version;
47import org.onosproject.store.service.Versioned;
48
49import static junit.framework.TestCase.assertNull;
50import static org.easymock.EasyMock.anyObject;
51import static org.easymock.EasyMock.anyString;
52import static org.easymock.EasyMock.expect;
53import static org.easymock.EasyMock.mock;
54import static org.easymock.EasyMock.replay;
55import static org.easymock.EasyMock.strictMock;
56import static org.easymock.EasyMock.verify;
57import static org.junit.Assert.assertEquals;
58import static org.junit.Assert.assertFalse;
59import static org.junit.Assert.assertTrue;
60import static org.junit.Assert.fail;
61
62/**
63 * Transaction test.
64 */
65public class TransactionTest {
66
67 @Test
68 public void testTransaction() throws Exception {
69 AsyncConsistentMap<String, String> asyncMap = strictMock(AsyncConsistentMap.class);
70 TransactionId transactionId = TransactionId.from("foo");
71 List<MapUpdate<String, String>> updates = Collections.singletonList(new MapUpdate<>());
72 Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
73 assertEquals(transactionId, transaction.transactionId());
74
75 expect(asyncMap.begin(transactionId))
76 .andReturn(CompletableFuture.completedFuture(new Version(1)));
Jordan Halterman5f97a302017-04-26 23:41:31 -070077 expect(asyncMap.prepare(new TransactionLog<>(transactionId, 1, updates)))
Jordan Halterman948d6592017-04-20 17:18:24 -070078 .andReturn(CompletableFuture.completedFuture(true));
79 expect(asyncMap.commit(transactionId))
80 .andReturn(CompletableFuture.completedFuture(null));
81 replay(asyncMap);
82
83 assertEquals(Transaction.State.ACTIVE, transaction.state());
84 assertEquals(1, transaction.begin().join().value());
85 assertEquals(Transaction.State.ACTIVE, transaction.state());
86 assertTrue(transaction.prepare(updates).join());
87 assertEquals(Transaction.State.PREPARED, transaction.state());
88 transaction.commit();
89 assertEquals(Transaction.State.COMMITTED, transaction.state());
90 verify(asyncMap);
91 }
92
93 @Test
94 public void testTransactionFailOnOutOfOrderCalls() throws Exception {
95 AsyncConsistentMap<String, String> asyncMap = strictMock(AsyncConsistentMap.class);
96 TransactionId transactionId = TransactionId.from("foo");
97 List<MapUpdate<String, String>> updates = Collections.singletonList(new MapUpdate<>());
98 Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
99
100 try {
101 transaction.prepare(updates);
102 fail();
103 } catch (IllegalStateException e) {
104 }
105
106 try {
107 transaction.commit();
108 fail();
109 } catch (IllegalStateException e) {
110 }
111
112 try {
113 transaction.rollback();
114 fail();
115 } catch (IllegalStateException e) {
116 }
117
118 expect(asyncMap.begin(transactionId))
119 .andReturn(CompletableFuture.completedFuture(new Version(1)));
Jordan Halterman5f97a302017-04-26 23:41:31 -0700120 expect(asyncMap.prepare(new TransactionLog<>(transactionId, 1, updates)))
Jordan Halterman948d6592017-04-20 17:18:24 -0700121 .andReturn(CompletableFuture.completedFuture(true));
122 replay(asyncMap);
123
124 assertFalse(transaction.isOpen());
125 assertEquals(Transaction.State.ACTIVE, transaction.state());
126 assertEquals(1, transaction.begin().join().value());
127 assertTrue(transaction.isOpen());
128 assertEquals(Transaction.State.ACTIVE, transaction.state());
129 assertTrue(transaction.prepare(updates).join());
130 assertEquals(Transaction.State.PREPARED, transaction.state());
131
132 try {
133 transaction.begin();
134 fail();
135 } catch (IllegalStateException e) {
136 }
137 verify(asyncMap);
138 }
139
140 @Test
141 public void testCoordinatedMapTransaction() throws Exception {
142 List<Object> mocks = new ArrayList<>();
143
144 Map<PartitionId, DefaultTransactionalMapParticipant<String, String>> participants = new HashMap<>();
145 List<PartitionId> sortedParticipants = new ArrayList<>();
146 TransactionId transactionId = TransactionId.from(UUID.randomUUID().toString());
147 for (int i = 1; i <= 3; i++) {
148 AsyncConsistentMap<String, String> asyncMap = mock(AsyncConsistentMap.class);
149 mocks.add(asyncMap);
150
151 ConsistentMap<String, String> consistentMap = new TestConsistentMap<>();
152 Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
153 PartitionId partitionId = PartitionId.from(i);
154 participants.put(partitionId, new DefaultTransactionalMapParticipant<>(consistentMap, transaction));
155 sortedParticipants.add(partitionId);
156 }
157
158 expect(participants.get(PartitionId.from(1)).transaction.transactionalObject
159 .begin(anyObject(TransactionId.class)))
160 .andReturn(CompletableFuture.completedFuture(new Version(1)));
161
162 expect(participants.get(PartitionId.from(1)).transaction.transactionalObject.prepare(
Jordan Halterman5f97a302017-04-26 23:41:31 -0700163 new TransactionLog<>(transactionId, 1, Arrays.asList(
Jordan Halterman948d6592017-04-20 17:18:24 -0700164 MapUpdate.<String, String>newBuilder()
165 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
166 .withKey("foo")
Jordan Halterman5f97a302017-04-26 23:41:31 -0700167 .withVersion(1)
Jordan Halterman948d6592017-04-20 17:18:24 -0700168 .build(),
169 MapUpdate.<String, String>newBuilder()
170 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
171 .withKey("baz")
Jordan Halterman5f97a302017-04-26 23:41:31 -0700172 .withVersion(2)
Jordan Halterman948d6592017-04-20 17:18:24 -0700173 .build()
174 )))).andReturn(CompletableFuture.completedFuture(true));
175
176 expect(participants.get(PartitionId.from(1)).transaction.transactionalObject.commit(transactionId))
177 .andReturn(CompletableFuture.completedFuture(null));
178
179 expect(participants.get(PartitionId.from(3)).transaction.transactionalObject
180 .begin(anyObject(TransactionId.class)))
181 .andReturn(CompletableFuture.completedFuture(new Version(1)));
182
183 expect(participants.get(PartitionId.from(3)).transaction.transactionalObject.prepare(
Jordan Halterman5f97a302017-04-26 23:41:31 -0700184 new TransactionLog<>(transactionId, 1, Arrays.asList(
Jordan Halterman948d6592017-04-20 17:18:24 -0700185 MapUpdate.<String, String>newBuilder()
Jordan Halterman5f97a302017-04-26 23:41:31 -0700186 .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
Jordan Halterman948d6592017-04-20 17:18:24 -0700187 .withKey("bar")
188 .withValue("baz")
Jordan Halterman5f97a302017-04-26 23:41:31 -0700189 .withVersion(1)
Jordan Halterman948d6592017-04-20 17:18:24 -0700190 .build()
191 )))).andReturn(CompletableFuture.completedFuture(true));
192
193 expect(participants.get(PartitionId.from(3)).transaction.transactionalObject.commit(transactionId))
194 .andReturn(CompletableFuture.completedFuture(null));
195
196 TransactionManager transactionManager = mock(TransactionManager.class);
197 expect(transactionManager.updateState(anyObject(TransactionId.class), anyObject(Transaction.State.class)))
198 .andReturn(CompletableFuture.completedFuture(null))
199 .anyTimes();
200 expect(transactionManager.remove(anyObject(TransactionId.class)))
201 .andReturn(CompletableFuture.completedFuture(null))
202 .anyTimes();
203 mocks.add(transactionManager);
204
205 TransactionCoordinator transactionCoordinator = new TransactionCoordinator(transactionId, transactionManager);
206
207 Hasher<String> hasher = key -> {
208 int hashCode = Hashing.sha256().hashBytes(key.getBytes()).asInt();
209 return sortedParticipants.get(Math.abs(hashCode) % sortedParticipants.size());
210 };
211
212 expect(transactionManager.<String, String>getTransactionalMap(anyString(), anyObject(), anyObject()))
213 .andReturn(new PartitionedTransactionalMap(participants, hasher));
214
215 replay(mocks.toArray());
216
217 PartitionedTransactionalMap<String, String> transactionalMap = (PartitionedTransactionalMap)
218 transactionCoordinator.getTransactionalMap("foo", Serializer.using(KryoNamespaces.API));
219
220 // Sneak a couple entries in the first partition.
221 transactionalMap.partitions.get(PartitionId.from(1)).backingMap.put("foo", "bar");
222 transactionalMap.partitions.get(PartitionId.from(1)).backingMap.put("baz", "foo");
223
224 assertTrue(transactionalMap.containsKey("foo"));
225 assertEquals("bar", transactionalMap.remove("foo"));
226 assertFalse(transactionalMap.containsKey("bar"));
227 assertNull(transactionalMap.put("bar", "baz"));
228 assertTrue(transactionalMap.containsKey("bar"));
229 assertTrue(transactionalMap.containsKey("baz"));
230 assertFalse(transactionalMap.remove("baz", "baz"));
231 assertTrue(transactionalMap.remove("baz", "foo"));
232 assertFalse(transactionalMap.containsKey("baz"));
233
234 assertEquals(CommitStatus.SUCCESS, transactionCoordinator.commit().join());
235 verify(mocks.toArray());
236 }
237
238 private static class TestConsistentMap<K, V> implements ConsistentMap<K, V> {
239 private final Map<K, Versioned<V>> map = new HashMap<>();
240 private final AtomicLong version = new AtomicLong();
241
242 @Override
243 public String name() {
244 return null;
245 }
246
247 @Override
248 public Type primitiveType() {
249 return Type.CONSISTENT_MAP;
250 }
251
252 private long nextVersion() {
253 return version.incrementAndGet();
254 }
255
256 @Override
257 public int size() {
258 return map.size();
259 }
260
261 @Override
262 public boolean isEmpty() {
263 return map.isEmpty();
264 }
265
266 @Override
267 public boolean containsKey(K key) {
268 return map.containsKey(key);
269 }
270
271 @Override
272 public boolean containsValue(V value) {
273 return map.containsValue(value);
274 }
275
276 @Override
277 public Versioned<V> get(K key) {
278 return map.get(key);
279 }
280
281 @Override
282 public Versioned<V> getOrDefault(K key, V defaultValue) {
283 return map.getOrDefault(key, new Versioned<>(defaultValue, 0));
284 }
285
286 @Override
287 public Versioned<V> computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
288 throw new UnsupportedOperationException();
289 }
290
291 @Override
292 public Versioned<V> compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
293 throw new UnsupportedOperationException();
294 }
295
296 @Override
297 public Versioned<V> computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
298 throw new UnsupportedOperationException();
299 }
300
301 @Override
302 public Versioned<V> computeIf(K key,
303 Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
304 throw new UnsupportedOperationException();
305 }
306
307 @Override
308 public Versioned<V> put(K key, V value) {
309 return map.put(key, new Versioned<>(value, nextVersion()));
310 }
311
312 @Override
313 public Versioned<V> putAndGet(K key, V value) {
314 return put(key, value);
315 }
316
317 @Override
318 public Versioned<V> remove(K key) {
319 return map.remove(key);
320 }
321
322 @Override
323 public void clear() {
324 map.clear();
325 }
326
327 @Override
328 public Set<K> keySet() {
329 return map.keySet();
330 }
331
332 @Override
333 public Collection<Versioned<V>> values() {
334 return map.values();
335 }
336
337 @Override
338 public Set<Map.Entry<K, Versioned<V>>> entrySet() {
339 return map.entrySet();
340 }
341
342 @Override
343 public Versioned<V> putIfAbsent(K key, V value) {
344 return map.putIfAbsent(key, new Versioned<>(value, nextVersion()));
345 }
346
347 @Override
348 public boolean remove(K key, V value) {
349 return map.remove(key, value);
350 }
351
352 @Override
353 public boolean remove(K key, long version) {
354 Versioned<V> value = map.get(key);
355 if (value != null && value.version() == version) {
356 map.remove(key);
357 return true;
358 }
359 return false;
360 }
361
362 @Override
363 public Versioned<V> replace(K key, V value) {
364 return map.replace(key, new Versioned<>(value, nextVersion()));
365 }
366
367 @Override
368 public boolean replace(K key, V oldValue, V newValue) {
369 throw new UnsupportedOperationException();
370 }
371
372 @Override
373 public boolean replace(K key, long oldVersion, V newValue) {
374 Versioned<V> value = map.get(key);
375 if (value != null && value.version() == oldVersion) {
376 map.put(key, new Versioned<>(newValue, nextVersion()));
377 return true;
378 }
379 return false;
380 }
381
382 @Override
383 public void addListener(MapEventListener<K, V> listener, Executor executor) {
384 throw new UnsupportedOperationException();
385 }
386
387 @Override
388 public void removeListener(MapEventListener<K, V> listener) {
389 throw new UnsupportedOperationException();
390 }
391
392 @Override
393 public Map<K, V> asJavaMap() {
394 throw new UnsupportedOperationException();
395 }
396 }
397
398}