blob: c2d4aa32861032795d5a1a7709bdd2693de9b490 [file] [log] [blame]
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07001package net.onrc.onos.core.datastore;
2
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -07003import static org.junit.Assert.assertEquals;
4import static org.junit.Assert.assertNull;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07005
6import java.util.ArrayList;
7import java.util.List;
Yuta HIGUCHI3e7994c2014-05-12 21:01:33 -07008import java.util.UUID;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07009import java.util.concurrent.Callable;
10import java.util.concurrent.ConcurrentLinkedQueue;
11import java.util.concurrent.ConcurrentMap;
12import java.util.concurrent.ConcurrentNavigableMap;
13import java.util.concurrent.ConcurrentSkipListMap;
14import java.util.concurrent.ExecutionException;
15import java.util.concurrent.ExecutorService;
16import java.util.concurrent.Executors;
17import java.util.concurrent.Future;
18
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070019import org.junit.After;
20import org.junit.Before;
21import org.junit.Test;
22
23public class AtomicCounterTest {
24
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070025 static {
26 // configuration to quickly fall back to instance mode for faster test run
27 System.setProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit", "0");
28 }
29
Yuta HIGUCHI3e7994c2014-05-12 21:01:33 -070030 private static final String TEST_COUNTER = "TestCounter" + UUID.randomUUID();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070031 private static final byte[] LONG_ZERO = {0, 0, 0, 0, 0, 0, 0, 0}; // 0L
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070032 private IKVTableID counterID;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070033
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070034 @Before
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070035 @After
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070036 public void resetCounter() {
37 IKVClient client = DataStoreClient.getClient();
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070038 counterID = client.getTable(TEST_COUNTER).getTableId();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070039 client.setCounter(counterID, LONG_ZERO, 0L);
40 client.destroyCounter(counterID, LONG_ZERO);
Yuta HIGUCHIc9552622014-04-24 11:01:48 -070041 counterID = client.getTable(TEST_COUNTER).getTableId();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070042 }
43
44 @Test
45 public void testSetCounter() throws ObjectExistsException, ObjectDoesntExistException {
46 IKVClient client = DataStoreClient.getClient();
47
48 final long five = 5;
49 client.createCounter(counterID, LONG_ZERO, five);
50
51 final long three = 3;
52 client.setCounter(counterID, LONG_ZERO, three);
53
54 final long four = client.incrementCounter(counterID, LONG_ZERO, 1);
55 assertEquals(4, four);
56 }
57
58 @Test
59 public void testIncrementCounter() throws ObjectExistsException, ObjectDoesntExistException {
60
61 IKVClient client = DataStoreClient.getClient();
62
63 final long five = 5;
64 client.createCounter(counterID, LONG_ZERO, five);
65
66 final long six = client.incrementCounter(counterID, LONG_ZERO, 1);
67 assertEquals(6, six);
68
69
70 final long nine = client.incrementCounter(counterID, LONG_ZERO, 3);
71 assertEquals(9, nine);
72 }
73
74
Yuta HIGUCHI69200352014-04-11 12:32:44 -070075 private static final int NUM_INCREMENTS = Math.max(1, Integer
76 .valueOf(System.getProperty("AtomicCounterTest.NUM_INCREMENTS",
77 "500")));
78 private static final int NUM_THREADS = Math.max(1, Integer.valueOf(System
79 .getProperty("AtomicCounterTest.NUM_THREADS", "3")));
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070080
81 class Incrementor implements Callable<Long> {
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -070082 private final ConcurrentMap<Long, Long> uniquenessTestSet;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070083 private final ConcurrentLinkedQueue<Long> incrementTimes;
84
85 public Incrementor(ConcurrentMap<Long, Long> uniquenessTestSet, ConcurrentLinkedQueue<Long> incrementTimes) {
86 super();
87 this.uniquenessTestSet = uniquenessTestSet;
88 this.incrementTimes = incrementTimes;
89 }
90
91 @Override
92 public Long call() throws ObjectDoesntExistException {
93 IKVClient client = DataStoreClient.getClient();
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -070094 for (int i = 0; i < NUM_INCREMENTS; ++i) {
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070095 final long start = System.nanoTime();
96 final long incremented = client.incrementCounter(counterID, LONG_ZERO, 1);
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -070097 incrementTimes.add(System.nanoTime() - start);
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070098 final Long expectNull = uniquenessTestSet.putIfAbsent(incremented, incremented);
99 assertNull(expectNull);
100 }
101 return null;
102 }
103 }
104
105 @Test
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700106 public void testParallelIncrementCounter() throws ObjectExistsException,
107 InterruptedException, ExecutionException {
108
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700109 IKVClient client = DataStoreClient.getClient();
110
111 client.createCounter(counterID, LONG_ZERO, 0L);
112
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700113 ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
114 final int initThreads = Math.max(1, Integer.valueOf(System
115 .getProperty("AtomicCounterTest.initThreads",
116 String.valueOf(NUM_THREADS))));
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700117 for (int numThreads = initThreads; numThreads <= NUM_THREADS; ++numThreads) {
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700118 client.setCounter(counterID, LONG_ZERO, 0L);
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700119 parallelIncrementCounter(executor, numThreads);
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700120 }
121
122 executor.shutdown();
123 }
124
125 private void parallelIncrementCounter(final ExecutorService executor,
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700126 final int numThreads) throws InterruptedException, ExecutionException {
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700127
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700128 ConcurrentNavigableMap<Long, Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700129 ConcurrentLinkedQueue<Long> incrementTimes = new ConcurrentLinkedQueue<Long>();
130
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700131 List<Callable<Long>> tasks = new ArrayList<>(numThreads);
132 for (int i = 0; i < numThreads; ++i) {
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700133 tasks.add(new Incrementor(uniquenessTestSet, incrementTimes));
134 }
135 List<Future<Long>> futures = executor.invokeAll(tasks);
136
137 // wait for all tasks to end
138 for (Future<Long> future : futures) {
139 future.get();
140 }
141
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700142 assertEquals(numThreads * NUM_INCREMENTS , uniquenessTestSet.size());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700143 long prevValue = 0;
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700144 for (Long value : uniquenessTestSet.keySet()) {
145 assertEquals((prevValue + 1), value.longValue());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700146 prevValue = value;
147 }
148
149 long max = 0L;
150 long min = Long.MAX_VALUE;
151 long sum = 0L;
152 for (Long time : incrementTimes) {
153 sum += time;
154 max = Math.max(max, time);
155 min = Math.min(min, time);
156 }
Yuta HIGUCHIc7b72c32014-04-17 19:49:18 -0700157 System.err.printf("incrementCounter: th, incs, N, avg(ns), min(ns), max(ns)\n");
158 System.err.printf("incrementCounter: %d, %d, %d, %f, %d, %d\n",
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700159 numThreads, NUM_INCREMENTS, incrementTimes.size(),
160 sum / (double) incrementTimes.size(), min, max);
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700161 }
162
163}