blob: fbfc058f25abb9715341e1dc97d712085e6a2ad6 [file] [log] [blame]
Yuta HIGUCHI598e57e2014-06-16 21:19:01 -07001package net.onrc.onos.core.datastore.hazelcast;
2
3import static org.junit.Assert.*;
4
5import java.util.ArrayList;
6import java.util.List;
7import java.util.Queue;
8import java.util.UUID;
9import java.util.concurrent.Callable;
10import java.util.concurrent.ConcurrentLinkedQueue;
11import java.util.concurrent.ConcurrentMap;
12import java.util.concurrent.ConcurrentSkipListMap;
13import java.util.concurrent.ExecutionException;
14import java.util.concurrent.ExecutorService;
15import java.util.concurrent.Executors;
16import java.util.concurrent.Future;
17
18import net.onrc.onos.core.datastore.ObjectDoesntExistException;
19import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
20
21import org.junit.Before;
22import org.junit.Test;
23
24/**
25 * Test cases for HZDistributedAtomicLong.
26 */
27public class HZDistributedAtomicLongTest {
28 static {
29 // configuration to quickly fall back to instance mode for faster test run
30 System.setProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit", "0");
31 }
32
33 static final String TEST_COUNTER_NAME = "Counter" + UUID.randomUUID();
34
35 DistributedAtomicLong counter;
36
37 @Before
38 public void setUp() throws Exception {
39 counter = new HZDistributedAtomicLong(
40 HZClient.getClient(), TEST_COUNTER_NAME);
41 counter.set(0L);
42 }
43
44 @Test
45 public void testGet() {
46 assertEquals(0L, counter.get());
47 }
48
49 @Test
50 public void testAddAndGet() {
51 assertEquals(0L, counter.get());
52 assertEquals(1L, counter.addAndGet(1L));
53 assertEquals(3L, counter.addAndGet(2L));
54 assertEquals(7L, counter.addAndGet(4L));
55 }
56
57 @Test
58 public void testSet() {
59 counter.set(42L);
60 assertEquals(42L, counter.get());
61 }
62
63 @Test
64 public void testIncrementAndGet() {
65 assertEquals(1L, counter.incrementAndGet());
66 assertEquals(2L, counter.incrementAndGet());
67 assertEquals(3L, counter.incrementAndGet());
68 }
69
70 /**
71 * Callable task incrementing atomicLong.
72 */
73 private static final class AdderTask implements Callable<Long> {
74 // using Map as Set
75 private final ConcurrentMap<Long, Long> uniquenessTestSet;
76 // Queue given here should be Thread-safe
77 private final Queue<Long> incrementTimes;
78 private final HZDistributedAtomicLong counter;
79 private final int numIncrements;
80
81 /**
82 * Constructor.
83 *
84 * @param numIncrements number of increments to execute
85 * @param uniquenessTestSet ThreadSafe Map to store increment result
86 * @param incrementTimes ThreadSafe Queue to store time it
87 * took on each increment
88 */
89 public AdderTask(int numIncrements,
90 ConcurrentMap<Long, Long> uniquenessTestSet,
91 Queue<Long> incrementTimes) {
92
93 super();
94 this.uniquenessTestSet = uniquenessTestSet;
95 this.incrementTimes = incrementTimes;
96 this.counter = new HZDistributedAtomicLong(
97 HZClient.getClient(), TEST_COUNTER_NAME);
98 this.numIncrements = numIncrements;
99 }
100
101 @Override
102 public Long call() throws ObjectDoesntExistException {
103 for (int i = 0; i < numIncrements; ++i) {
104 final long start = System.nanoTime();
105 final long incremented = counter.addAndGet(1L);
106 incrementTimes.add(System.nanoTime() - start);
107 final Long expectNull = uniquenessTestSet.putIfAbsent(
108 incremented, incremented);
109 assertNull(expectNull);
110 }
111 return null;
112 }
113 }
114
115 private static final int NUM_THREADS = Integer.parseInt(
116 System.getProperty(
117 "HZDistributedAtomicLongTest.NUM_THREADS",
118 System.getProperty("NUM_THREADS",
119 "10")));
120
121 private static final int NUM_INCREMENTS = Integer.parseInt(
122 System.getProperty(
123 "HZDistributedAtomicLongTest.NUM_INCREMENTS",
124 System.getProperty("NUM_INCREMENTS",
125 "100")));
126
127 /**
128 * Increment using multiple threads to test addition is atomic.
129 */
130 @Test
131 public void testConcurrentAddAndGet() throws InterruptedException, ExecutionException {
132 ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
133
134 // using Map as Set
135 ConcurrentMap<Long, Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
136 Queue<Long> incrementTimes = new ConcurrentLinkedQueue<>();
137
138 // Start NUM_THREADS threads and increment NUM_INCREMENTS times each
139 List<Callable<Long>> tasks = new ArrayList<>(NUM_THREADS);
140 for (int i = 0; i < NUM_THREADS; ++i) {
141 tasks.add(new AdderTask(NUM_INCREMENTS,
142 uniquenessTestSet,
143 incrementTimes));
144 }
145 List<Future<Long>> futures = executor.invokeAll(tasks);
146
147 // wait for all tasks to end
148 for (Future<Long> future : futures) {
149 future.get();
150 }
151
152
153 assertEquals(NUM_THREADS * NUM_INCREMENTS , uniquenessTestSet.size());
154
155 // check uniqueness of result
156 long prevValue = 0;
157 for (Long value : uniquenessTestSet.keySet()) {
158 assertEquals((prevValue + 1), value.longValue());
159 prevValue = value;
160 }
161
162 long max = 0L;
163 long min = Long.MAX_VALUE;
164 long sum = 0L;
165 for (Long time : incrementTimes) {
166 sum += time;
167 max = Math.max(max, time);
168 min = Math.min(min, time);
169 }
170 System.err.printf("incrementCounter: th, incs, tot_incs,"
171 + " avg(ns), min(ns), max(ns), T-put(1s/avg)\n");
172 System.err.printf("incrementCounter: %d, %d, %d,"
173 + " %f, %d, %d, %f\n",
174 NUM_THREADS, NUM_INCREMENTS, incrementTimes.size(),
175 sum / (double) incrementTimes.size(), min, max,
176 Math.pow(10, 9) * incrementTimes.size() / sum);
177
178 executor.shutdown();
179 }
180
181}