blob: 4813672986bfa879623483771cdf3e9cad076e5f [file] [log] [blame]
Yuta HIGUCHI598e57e2014-06-16 21:19:01 -07001package net.onrc.onos.core.datastore.hazelcast;
2
3import static org.junit.Assert.*;
4
5import java.util.ArrayList;
6import java.util.List;
7import java.util.Queue;
8import java.util.UUID;
9import java.util.concurrent.Callable;
10import java.util.concurrent.ConcurrentLinkedQueue;
11import java.util.concurrent.ConcurrentMap;
12import java.util.concurrent.ConcurrentSkipListMap;
13import java.util.concurrent.ExecutionException;
14import java.util.concurrent.ExecutorService;
15import java.util.concurrent.Executors;
16import java.util.concurrent.Future;
17
18import net.onrc.onos.core.datastore.ObjectDoesntExistException;
19import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
20
21import org.junit.Before;
22import org.junit.Test;
23
24/**
25 * Test cases for HZDistributedAtomicLong.
26 */
27public class HZDistributedAtomicLongTest {
Yuta HIGUCHI598e57e2014-06-16 21:19:01 -070028
29 static final String TEST_COUNTER_NAME = "Counter" + UUID.randomUUID();
30
31 DistributedAtomicLong counter;
32
33 @Before
34 public void setUp() throws Exception {
35 counter = new HZDistributedAtomicLong(
36 HZClient.getClient(), TEST_COUNTER_NAME);
37 counter.set(0L);
38 }
39
40 @Test
41 public void testGet() {
42 assertEquals(0L, counter.get());
43 }
44
45 @Test
46 public void testAddAndGet() {
47 assertEquals(0L, counter.get());
48 assertEquals(1L, counter.addAndGet(1L));
49 assertEquals(3L, counter.addAndGet(2L));
50 assertEquals(7L, counter.addAndGet(4L));
51 }
52
53 @Test
54 public void testSet() {
55 counter.set(42L);
56 assertEquals(42L, counter.get());
57 }
58
59 @Test
60 public void testIncrementAndGet() {
61 assertEquals(1L, counter.incrementAndGet());
62 assertEquals(2L, counter.incrementAndGet());
63 assertEquals(3L, counter.incrementAndGet());
64 }
65
66 /**
67 * Callable task incrementing atomicLong.
68 */
69 private static final class AdderTask implements Callable<Long> {
70 // using Map as Set
71 private final ConcurrentMap<Long, Long> uniquenessTestSet;
72 // Queue given here should be Thread-safe
73 private final Queue<Long> incrementTimes;
74 private final HZDistributedAtomicLong counter;
75 private final int numIncrements;
76
77 /**
78 * Constructor.
79 *
80 * @param numIncrements number of increments to execute
81 * @param uniquenessTestSet ThreadSafe Map to store increment result
82 * @param incrementTimes ThreadSafe Queue to store time it
83 * took on each increment
84 */
85 public AdderTask(int numIncrements,
86 ConcurrentMap<Long, Long> uniquenessTestSet,
87 Queue<Long> incrementTimes) {
88
89 super();
90 this.uniquenessTestSet = uniquenessTestSet;
91 this.incrementTimes = incrementTimes;
92 this.counter = new HZDistributedAtomicLong(
93 HZClient.getClient(), TEST_COUNTER_NAME);
94 this.numIncrements = numIncrements;
95 }
96
97 @Override
98 public Long call() throws ObjectDoesntExistException {
99 for (int i = 0; i < numIncrements; ++i) {
100 final long start = System.nanoTime();
101 final long incremented = counter.addAndGet(1L);
102 incrementTimes.add(System.nanoTime() - start);
103 final Long expectNull = uniquenessTestSet.putIfAbsent(
104 incremented, incremented);
105 assertNull(expectNull);
106 }
107 return null;
108 }
109 }
110
111 private static final int NUM_THREADS = Integer.parseInt(
112 System.getProperty(
113 "HZDistributedAtomicLongTest.NUM_THREADS",
114 System.getProperty("NUM_THREADS",
115 "10")));
116
117 private static final int NUM_INCREMENTS = Integer.parseInt(
118 System.getProperty(
119 "HZDistributedAtomicLongTest.NUM_INCREMENTS",
120 System.getProperty("NUM_INCREMENTS",
121 "100")));
122
123 /**
124 * Increment using multiple threads to test addition is atomic.
125 */
126 @Test
127 public void testConcurrentAddAndGet() throws InterruptedException, ExecutionException {
128 ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
129
130 // using Map as Set
131 ConcurrentMap<Long, Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
132 Queue<Long> incrementTimes = new ConcurrentLinkedQueue<>();
133
134 // Start NUM_THREADS threads and increment NUM_INCREMENTS times each
135 List<Callable<Long>> tasks = new ArrayList<>(NUM_THREADS);
136 for (int i = 0; i < NUM_THREADS; ++i) {
137 tasks.add(new AdderTask(NUM_INCREMENTS,
138 uniquenessTestSet,
139 incrementTimes));
140 }
141 List<Future<Long>> futures = executor.invokeAll(tasks);
142
143 // wait for all tasks to end
144 for (Future<Long> future : futures) {
145 future.get();
146 }
147
148
149 assertEquals(NUM_THREADS * NUM_INCREMENTS , uniquenessTestSet.size());
150
151 // check uniqueness of result
152 long prevValue = 0;
153 for (Long value : uniquenessTestSet.keySet()) {
154 assertEquals((prevValue + 1), value.longValue());
155 prevValue = value;
156 }
157
158 long max = 0L;
159 long min = Long.MAX_VALUE;
160 long sum = 0L;
161 for (Long time : incrementTimes) {
162 sum += time;
163 max = Math.max(max, time);
164 min = Math.min(min, time);
165 }
166 System.err.printf("incrementCounter: th, incs, tot_incs,"
167 + " avg(ns), min(ns), max(ns), T-put(1s/avg)\n");
168 System.err.printf("incrementCounter: %d, %d, %d,"
169 + " %f, %d, %d, %f\n",
170 NUM_THREADS, NUM_INCREMENTS, incrementTimes.size(),
171 sum / (double) incrementTimes.size(), min, max,
172 Math.pow(10, 9) * incrementTimes.size() / sum);
173
174 executor.shutdown();
175 }
176
177}