blob: 5d9e5d823a25624816eb921108281d67e2a7fcba [file] [log] [blame]
Jordan Halterman948d6592017-04-20 17:18:24 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman948d6592017-04-20 17:18:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.ArrayList;
19import java.util.Arrays;
20import java.util.Collection;
21import java.util.Collections;
22import java.util.HashMap;
Jordan Haltermandae11602018-07-03 00:00:47 -070023import java.util.Iterator;
Jordan Halterman948d6592017-04-20 17:18:24 -070024import java.util.List;
25import java.util.Map;
26import java.util.Set;
27import java.util.UUID;
28import java.util.concurrent.CompletableFuture;
29import java.util.concurrent.Executor;
30import java.util.concurrent.atomic.AtomicLong;
31import java.util.function.BiFunction;
32import java.util.function.Function;
33import java.util.function.Predicate;
34
35import com.google.common.hash.Hashing;
36import org.junit.Test;
37import org.onosproject.cluster.PartitionId;
38import org.onosproject.store.primitives.MapUpdate;
39import org.onosproject.store.primitives.TransactionId;
40import org.onosproject.store.serializers.KryoNamespaces;
41import org.onosproject.store.service.AsyncConsistentMap;
42import org.onosproject.store.service.CommitStatus;
43import org.onosproject.store.service.ConsistentMap;
44import org.onosproject.store.service.MapEventListener;
45import org.onosproject.store.service.Serializer;
46import org.onosproject.store.service.TransactionLog;
47import org.onosproject.store.service.Version;
48import org.onosproject.store.service.Versioned;
49
50import static junit.framework.TestCase.assertNull;
51import static org.easymock.EasyMock.anyObject;
52import static org.easymock.EasyMock.anyString;
53import static org.easymock.EasyMock.expect;
54import static org.easymock.EasyMock.mock;
55import static org.easymock.EasyMock.replay;
56import static org.easymock.EasyMock.strictMock;
57import static org.easymock.EasyMock.verify;
58import static org.junit.Assert.assertEquals;
59import static org.junit.Assert.assertFalse;
60import static org.junit.Assert.assertTrue;
61import static org.junit.Assert.fail;
62
63/**
64 * Transaction test.
65 */
66public class TransactionTest {
67
68 @Test
69 public void testTransaction() throws Exception {
70 AsyncConsistentMap<String, String> asyncMap = strictMock(AsyncConsistentMap.class);
71 TransactionId transactionId = TransactionId.from("foo");
72 List<MapUpdate<String, String>> updates = Collections.singletonList(new MapUpdate<>());
73 Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
74 assertEquals(transactionId, transaction.transactionId());
75
76 expect(asyncMap.begin(transactionId))
77 .andReturn(CompletableFuture.completedFuture(new Version(1)));
Jordan Halterman5f97a302017-04-26 23:41:31 -070078 expect(asyncMap.prepare(new TransactionLog<>(transactionId, 1, updates)))
Jordan Halterman948d6592017-04-20 17:18:24 -070079 .andReturn(CompletableFuture.completedFuture(true));
80 expect(asyncMap.commit(transactionId))
81 .andReturn(CompletableFuture.completedFuture(null));
82 replay(asyncMap);
83
84 assertEquals(Transaction.State.ACTIVE, transaction.state());
85 assertEquals(1, transaction.begin().join().value());
86 assertEquals(Transaction.State.ACTIVE, transaction.state());
87 assertTrue(transaction.prepare(updates).join());
88 assertEquals(Transaction.State.PREPARED, transaction.state());
89 transaction.commit();
90 assertEquals(Transaction.State.COMMITTED, transaction.state());
91 verify(asyncMap);
92 }
93
94 @Test
95 public void testTransactionFailOnOutOfOrderCalls() throws Exception {
96 AsyncConsistentMap<String, String> asyncMap = strictMock(AsyncConsistentMap.class);
97 TransactionId transactionId = TransactionId.from("foo");
98 List<MapUpdate<String, String>> updates = Collections.singletonList(new MapUpdate<>());
99 Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
100
101 try {
102 transaction.prepare(updates);
103 fail();
104 } catch (IllegalStateException e) {
105 }
106
107 try {
108 transaction.commit();
109 fail();
110 } catch (IllegalStateException e) {
111 }
112
113 try {
114 transaction.rollback();
115 fail();
116 } catch (IllegalStateException e) {
117 }
118
119 expect(asyncMap.begin(transactionId))
120 .andReturn(CompletableFuture.completedFuture(new Version(1)));
Jordan Halterman5f97a302017-04-26 23:41:31 -0700121 expect(asyncMap.prepare(new TransactionLog<>(transactionId, 1, updates)))
Jordan Halterman948d6592017-04-20 17:18:24 -0700122 .andReturn(CompletableFuture.completedFuture(true));
123 replay(asyncMap);
124
125 assertFalse(transaction.isOpen());
126 assertEquals(Transaction.State.ACTIVE, transaction.state());
127 assertEquals(1, transaction.begin().join().value());
128 assertTrue(transaction.isOpen());
129 assertEquals(Transaction.State.ACTIVE, transaction.state());
130 assertTrue(transaction.prepare(updates).join());
131 assertEquals(Transaction.State.PREPARED, transaction.state());
132
133 try {
134 transaction.begin();
135 fail();
136 } catch (IllegalStateException e) {
137 }
138 verify(asyncMap);
139 }
140
141 @Test
142 public void testCoordinatedMapTransaction() throws Exception {
143 List<Object> mocks = new ArrayList<>();
144
145 Map<PartitionId, DefaultTransactionalMapParticipant<String, String>> participants = new HashMap<>();
146 List<PartitionId> sortedParticipants = new ArrayList<>();
147 TransactionId transactionId = TransactionId.from(UUID.randomUUID().toString());
148 for (int i = 1; i <= 3; i++) {
149 AsyncConsistentMap<String, String> asyncMap = mock(AsyncConsistentMap.class);
150 mocks.add(asyncMap);
151
152 ConsistentMap<String, String> consistentMap = new TestConsistentMap<>();
153 Transaction<MapUpdate<String, String>> transaction = new Transaction<>(transactionId, asyncMap);
154 PartitionId partitionId = PartitionId.from(i);
155 participants.put(partitionId, new DefaultTransactionalMapParticipant<>(consistentMap, transaction));
156 sortedParticipants.add(partitionId);
157 }
158
159 expect(participants.get(PartitionId.from(1)).transaction.transactionalObject
160 .begin(anyObject(TransactionId.class)))
161 .andReturn(CompletableFuture.completedFuture(new Version(1)));
162
163 expect(participants.get(PartitionId.from(1)).transaction.transactionalObject.prepare(
Jordan Halterman5f97a302017-04-26 23:41:31 -0700164 new TransactionLog<>(transactionId, 1, Arrays.asList(
Jordan Halterman948d6592017-04-20 17:18:24 -0700165 MapUpdate.<String, String>newBuilder()
166 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
167 .withKey("foo")
Jordan Halterman5f97a302017-04-26 23:41:31 -0700168 .withVersion(1)
Jordan Halterman948d6592017-04-20 17:18:24 -0700169 .build(),
170 MapUpdate.<String, String>newBuilder()
171 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
172 .withKey("baz")
Jordan Halterman5f97a302017-04-26 23:41:31 -0700173 .withVersion(2)
Jordan Halterman948d6592017-04-20 17:18:24 -0700174 .build()
175 )))).andReturn(CompletableFuture.completedFuture(true));
176
177 expect(participants.get(PartitionId.from(1)).transaction.transactionalObject.commit(transactionId))
178 .andReturn(CompletableFuture.completedFuture(null));
179
180 expect(participants.get(PartitionId.from(3)).transaction.transactionalObject
181 .begin(anyObject(TransactionId.class)))
182 .andReturn(CompletableFuture.completedFuture(new Version(1)));
183
184 expect(participants.get(PartitionId.from(3)).transaction.transactionalObject.prepare(
Jordan Halterman5f97a302017-04-26 23:41:31 -0700185 new TransactionLog<>(transactionId, 1, Arrays.asList(
Jordan Halterman948d6592017-04-20 17:18:24 -0700186 MapUpdate.<String, String>newBuilder()
Jordan Halterman5f97a302017-04-26 23:41:31 -0700187 .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
Jordan Halterman948d6592017-04-20 17:18:24 -0700188 .withKey("bar")
189 .withValue("baz")
Jordan Halterman5f97a302017-04-26 23:41:31 -0700190 .withVersion(1)
Jordan Halterman948d6592017-04-20 17:18:24 -0700191 .build()
192 )))).andReturn(CompletableFuture.completedFuture(true));
193
194 expect(participants.get(PartitionId.from(3)).transaction.transactionalObject.commit(transactionId))
195 .andReturn(CompletableFuture.completedFuture(null));
196
197 TransactionManager transactionManager = mock(TransactionManager.class);
198 expect(transactionManager.updateState(anyObject(TransactionId.class), anyObject(Transaction.State.class)))
199 .andReturn(CompletableFuture.completedFuture(null))
200 .anyTimes();
201 expect(transactionManager.remove(anyObject(TransactionId.class)))
202 .andReturn(CompletableFuture.completedFuture(null))
203 .anyTimes();
204 mocks.add(transactionManager);
205
206 TransactionCoordinator transactionCoordinator = new TransactionCoordinator(transactionId, transactionManager);
207
208 Hasher<String> hasher = key -> {
209 int hashCode = Hashing.sha256().hashBytes(key.getBytes()).asInt();
210 return sortedParticipants.get(Math.abs(hashCode) % sortedParticipants.size());
211 };
212
213 expect(transactionManager.<String, String>getTransactionalMap(anyString(), anyObject(), anyObject()))
214 .andReturn(new PartitionedTransactionalMap(participants, hasher));
215
216 replay(mocks.toArray());
217
218 PartitionedTransactionalMap<String, String> transactionalMap = (PartitionedTransactionalMap)
219 transactionCoordinator.getTransactionalMap("foo", Serializer.using(KryoNamespaces.API));
220
221 // Sneak a couple entries in the first partition.
222 transactionalMap.partitions.get(PartitionId.from(1)).backingMap.put("foo", "bar");
223 transactionalMap.partitions.get(PartitionId.from(1)).backingMap.put("baz", "foo");
224
225 assertTrue(transactionalMap.containsKey("foo"));
226 assertEquals("bar", transactionalMap.remove("foo"));
227 assertFalse(transactionalMap.containsKey("bar"));
228 assertNull(transactionalMap.put("bar", "baz"));
229 assertTrue(transactionalMap.containsKey("bar"));
230 assertTrue(transactionalMap.containsKey("baz"));
231 assertFalse(transactionalMap.remove("baz", "baz"));
232 assertTrue(transactionalMap.remove("baz", "foo"));
233 assertFalse(transactionalMap.containsKey("baz"));
234
235 assertEquals(CommitStatus.SUCCESS, transactionCoordinator.commit().join());
236 verify(mocks.toArray());
237 }
238
239 private static class TestConsistentMap<K, V> implements ConsistentMap<K, V> {
240 private final Map<K, Versioned<V>> map = new HashMap<>();
241 private final AtomicLong version = new AtomicLong();
242
243 @Override
244 public String name() {
245 return null;
246 }
247
248 @Override
249 public Type primitiveType() {
250 return Type.CONSISTENT_MAP;
251 }
252
253 private long nextVersion() {
254 return version.incrementAndGet();
255 }
256
257 @Override
258 public int size() {
259 return map.size();
260 }
261
262 @Override
263 public boolean isEmpty() {
264 return map.isEmpty();
265 }
266
267 @Override
268 public boolean containsKey(K key) {
269 return map.containsKey(key);
270 }
271
272 @Override
273 public boolean containsValue(V value) {
274 return map.containsValue(value);
275 }
276
277 @Override
278 public Versioned<V> get(K key) {
279 return map.get(key);
280 }
281
282 @Override
283 public Versioned<V> getOrDefault(K key, V defaultValue) {
284 return map.getOrDefault(key, new Versioned<>(defaultValue, 0));
285 }
286
287 @Override
288 public Versioned<V> computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
289 throw new UnsupportedOperationException();
290 }
291
292 @Override
293 public Versioned<V> compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
294 throw new UnsupportedOperationException();
295 }
296
297 @Override
298 public Versioned<V> computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
299 throw new UnsupportedOperationException();
300 }
301
302 @Override
303 public Versioned<V> computeIf(K key,
304 Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
305 throw new UnsupportedOperationException();
306 }
307
308 @Override
309 public Versioned<V> put(K key, V value) {
310 return map.put(key, new Versioned<>(value, nextVersion()));
311 }
312
313 @Override
314 public Versioned<V> putAndGet(K key, V value) {
315 return put(key, value);
316 }
317
318 @Override
319 public Versioned<V> remove(K key) {
320 return map.remove(key);
321 }
322
323 @Override
324 public void clear() {
325 map.clear();
326 }
327
328 @Override
329 public Set<K> keySet() {
330 return map.keySet();
331 }
332
333 @Override
334 public Collection<Versioned<V>> values() {
335 return map.values();
336 }
337
338 @Override
339 public Set<Map.Entry<K, Versioned<V>>> entrySet() {
340 return map.entrySet();
341 }
342
343 @Override
344 public Versioned<V> putIfAbsent(K key, V value) {
345 return map.putIfAbsent(key, new Versioned<>(value, nextVersion()));
346 }
347
348 @Override
349 public boolean remove(K key, V value) {
350 return map.remove(key, value);
351 }
352
353 @Override
354 public boolean remove(K key, long version) {
355 Versioned<V> value = map.get(key);
356 if (value != null && value.version() == version) {
357 map.remove(key);
358 return true;
359 }
360 return false;
361 }
362
363 @Override
364 public Versioned<V> replace(K key, V value) {
365 return map.replace(key, new Versioned<>(value, nextVersion()));
366 }
367
368 @Override
369 public boolean replace(K key, V oldValue, V newValue) {
370 throw new UnsupportedOperationException();
371 }
372
373 @Override
374 public boolean replace(K key, long oldVersion, V newValue) {
375 Versioned<V> value = map.get(key);
376 if (value != null && value.version() == oldVersion) {
377 map.put(key, new Versioned<>(newValue, nextVersion()));
378 return true;
379 }
380 return false;
381 }
382
383 @Override
Jordan Haltermandae11602018-07-03 00:00:47 -0700384 public Iterator<Map.Entry<K, Versioned<V>>> iterator() {
385 return map.entrySet().iterator();
386 }
387
388 @Override
Jordan Halterman948d6592017-04-20 17:18:24 -0700389 public void addListener(MapEventListener<K, V> listener, Executor executor) {
390 throw new UnsupportedOperationException();
391 }
392
393 @Override
394 public void removeListener(MapEventListener<K, V> listener) {
395 throw new UnsupportedOperationException();
396 }
397
398 @Override
399 public Map<K, V> asJavaMap() {
400 throw new UnsupportedOperationException();
401 }
402 }
403
404}