blob: 160e015d367ca796cb61941e3fc4c698e6e604b1 [file] [log] [blame]
Yuta HIGUCHI598e57e2014-06-16 21:19:01 -07001package net.onrc.onos.core.datastore.hazelcast;
2
3import static org.junit.Assert.*;
4
5import java.util.ArrayList;
6import java.util.List;
7import java.util.Queue;
8import java.util.UUID;
9import java.util.concurrent.Callable;
10import java.util.concurrent.ConcurrentLinkedQueue;
11import java.util.concurrent.ConcurrentMap;
12import java.util.concurrent.ConcurrentSkipListMap;
13import java.util.concurrent.ExecutionException;
14import java.util.concurrent.ExecutorService;
15import java.util.concurrent.Executors;
16import java.util.concurrent.Future;
17
18import net.onrc.onos.core.datastore.ObjectDoesntExistException;
19import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
20
Yuta HIGUCHIcdb5d852014-07-31 10:27:43 -070021import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
22import org.junit.Assume;
Yuta HIGUCHI598e57e2014-06-16 21:19:01 -070023import org.junit.Before;
24import org.junit.Test;
25
Yuta HIGUCHIcdb5d852014-07-31 10:27:43 -070026import com.hazelcast.core.HazelcastInstance;
27
Yuta HIGUCHI598e57e2014-06-16 21:19:01 -070028/**
29 * Test cases for HZDistributedAtomicLong.
30 */
31public class HZDistributedAtomicLongTest {
Yuta HIGUCHI598e57e2014-06-16 21:19:01 -070032
33 static final String TEST_COUNTER_NAME = "Counter" + UUID.randomUUID();
Yuta HIGUCHIcdb5d852014-07-31 10:27:43 -070034 private static final int SEC_IN_NANO = 1000000000;
Yuta HIGUCHI598e57e2014-06-16 21:19:01 -070035
Yuta HIGUCHIcdb5d852014-07-31 10:27:43 -070036 private DistributedAtomicLong counter;
Yuta HIGUCHI598e57e2014-06-16 21:19:01 -070037
38 @Before
39 public void setUp() throws Exception {
40 counter = new HZDistributedAtomicLong(
41 HZClient.getClient(), TEST_COUNTER_NAME);
42 counter.set(0L);
43 }
44
45 @Test
46 public void testGet() {
47 assertEquals(0L, counter.get());
48 }
49
50 @Test
51 public void testAddAndGet() {
52 assertEquals(0L, counter.get());
53 assertEquals(1L, counter.addAndGet(1L));
54 assertEquals(3L, counter.addAndGet(2L));
55 assertEquals(7L, counter.addAndGet(4L));
56 }
57
58 @Test
59 public void testSet() {
60 counter.set(42L);
61 assertEquals(42L, counter.get());
62 }
63
64 @Test
65 public void testIncrementAndGet() {
66 assertEquals(1L, counter.incrementAndGet());
67 assertEquals(2L, counter.incrementAndGet());
68 assertEquals(3L, counter.incrementAndGet());
69 }
70
71 /**
72 * Callable task incrementing atomicLong.
73 */
74 private static final class AdderTask implements Callable<Long> {
75 // using Map as Set
76 private final ConcurrentMap<Long, Long> uniquenessTestSet;
77 // Queue given here should be Thread-safe
78 private final Queue<Long> incrementTimes;
79 private final HZDistributedAtomicLong counter;
80 private final int numIncrements;
81
82 /**
83 * Constructor.
84 *
85 * @param numIncrements number of increments to execute
86 * @param uniquenessTestSet ThreadSafe Map to store increment result
87 * @param incrementTimes ThreadSafe Queue to store time it
88 * took on each increment
89 */
90 public AdderTask(int numIncrements,
91 ConcurrentMap<Long, Long> uniquenessTestSet,
92 Queue<Long> incrementTimes) {
93
94 super();
95 this.uniquenessTestSet = uniquenessTestSet;
96 this.incrementTimes = incrementTimes;
97 this.counter = new HZDistributedAtomicLong(
98 HZClient.getClient(), TEST_COUNTER_NAME);
99 this.numIncrements = numIncrements;
100 }
101
102 @Override
103 public Long call() throws ObjectDoesntExistException {
104 for (int i = 0; i < numIncrements; ++i) {
105 final long start = System.nanoTime();
106 final long incremented = counter.addAndGet(1L);
107 incrementTimes.add(System.nanoTime() - start);
108 final Long expectNull = uniquenessTestSet.putIfAbsent(
109 incremented, incremented);
110 assertNull(expectNull);
111 }
112 return null;
113 }
114 }
115
116 private static final int NUM_THREADS = Integer.parseInt(
117 System.getProperty(
118 "HZDistributedAtomicLongTest.NUM_THREADS",
119 System.getProperty("NUM_THREADS",
120 "10")));
121
122 private static final int NUM_INCREMENTS = Integer.parseInt(
123 System.getProperty(
124 "HZDistributedAtomicLongTest.NUM_INCREMENTS",
125 System.getProperty("NUM_INCREMENTS",
126 "100")));
127
Yuta HIGUCHIcdb5d852014-07-31 10:27:43 -0700128 private static final int ROUNDS = Integer.parseInt(
129 System.getProperty(
130 "HZDistributedAtomicLongTest.ROUNDS",
131 System.getProperty("ROUNDS",
132 "100")));
133
Yuta HIGUCHI598e57e2014-06-16 21:19:01 -0700134 /**
135 * Increment using multiple threads to test addition is atomic.
136 */
137 @Test
138 public void testConcurrentAddAndGet() throws InterruptedException, ExecutionException {
139 ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
140
141 // using Map as Set
142 ConcurrentMap<Long, Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
143 Queue<Long> incrementTimes = new ConcurrentLinkedQueue<>();
144
145 // Start NUM_THREADS threads and increment NUM_INCREMENTS times each
146 List<Callable<Long>> tasks = new ArrayList<>(NUM_THREADS);
147 for (int i = 0; i < NUM_THREADS; ++i) {
148 tasks.add(new AdderTask(NUM_INCREMENTS,
149 uniquenessTestSet,
150 incrementTimes));
151 }
152 List<Future<Long>> futures = executor.invokeAll(tasks);
153
154 // wait for all tasks to end
155 for (Future<Long> future : futures) {
156 future.get();
157 }
158
159
160 assertEquals(NUM_THREADS * NUM_INCREMENTS , uniquenessTestSet.size());
161
162 // check uniqueness of result
163 long prevValue = 0;
164 for (Long value : uniquenessTestSet.keySet()) {
165 assertEquals((prevValue + 1), value.longValue());
166 prevValue = value;
167 }
168
169 long max = 0L;
170 long min = Long.MAX_VALUE;
171 long sum = 0L;
172 for (Long time : incrementTimes) {
173 sum += time;
174 max = Math.max(max, time);
175 min = Math.min(min, time);
176 }
177 System.err.printf("incrementCounter: th, incs, tot_incs,"
178 + " avg(ns), min(ns), max(ns), T-put(1s/avg)\n");
179 System.err.printf("incrementCounter: %d, %d, %d,"
180 + " %f, %d, %d, %f\n",
181 NUM_THREADS, NUM_INCREMENTS, incrementTimes.size(),
182 sum / (double) incrementTimes.size(), min, max,
183 Math.pow(10, 9) * incrementTimes.size() / sum);
184
185 executor.shutdown();
186 }
187
Yuta HIGUCHIcdb5d852014-07-31 10:27:43 -0700188
189 /**
190 * incrementAndGet throughput measurement.
191 */
192 @Test
193 public void incrementThroughput() throws InterruptedException {
194 // This test will run only if -Dbenchmark is set to something
195 // e.g., mvn test -Dtest=HZDistributedAtomicLongTest#incrementThroughput
196 // -Dbenchmark -DNUM_INCREMENTS=1000 -DROUNDS=10
197 Assume.assumeNotNull(System.getProperty("benchmark"));
198
199 // Warmup
200 counter.set(0);
201 for (int i = 0; i < NUM_INCREMENTS; i++) {
202 counter.incrementAndGet();
203 }
204 counter.set(0);
205 try {
206 Thread.sleep(5000);
207 } catch (InterruptedException e) { // CHECKSTYLE IGNORE THIS LINE
208 }
209
210 HazelcastInstance hz = HZClient.getClient().getHZInstance();
211 final int clusterSize = hz.getCluster().getMembers().size();
212
213 System.out.println("Starting benchmark with cluster size: " + clusterSize);
214 DescriptiveStatistics stats = new DescriptiveStatistics();
215
216 // Measurements
217 // Throughput calculated from total time it took to inc NUM_INCREMENTS
218 // Repeating ROUNDS time to get average Throughput, etc.
219 for (int i = 0; i < ROUNDS; i++) {
220 long timeBegin = System.nanoTime();
221 for (int j = 0; j < NUM_INCREMENTS; j++) {
222 counter.incrementAndGet();
223 }
224 long timeEnd = System.nanoTime();
225 double throughput = (double) NUM_INCREMENTS * SEC_IN_NANO
226 / (timeEnd - timeBegin);
227 stats.addValue(throughput);
228 System.out.println("Increments: " + NUM_INCREMENTS
229 + " IncrementThroughput(ops/s): " + throughput);
230 }
231
232 System.out.println();
233
234 System.out.println("incrementAndGet Throughput (ops/s) "
235 + "[ " + NUM_INCREMENTS + " increments] "
236 + "[ " + clusterSize + " HZnodes]");
237
238 System.out.println(stats.toString());
239 //DescriptiveStatistics:
240 // n: 100
241 // min: 1137.5270162666363
242 // max: 4056.1369351829317
243 // mean: 2727.695488835985
244 // std dev: 704.2206793204389
245 // median: 2729.6338956455156
246 // skewness: -0.17084469855647005
247 // kurtosis: -0.6018103898245659
248
249 // Wait for other instances stops incrementing, before exiting
250 long prev = counter.get();
251 while (prev != counter.get()) {
252 prev = counter.get();
253 Thread.sleep(1000);
254 }
255 }
Yuta HIGUCHI598e57e2014-06-16 21:19:01 -0700256}