blob: 1de93228c546c8464009bf8dd4f88472665215e6 [file] [log] [blame]
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07001package net.onrc.onos.core.datastore;
2
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -07003import static org.junit.Assert.assertEquals;
4import static org.junit.Assert.assertNull;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07005
6import java.util.ArrayList;
7import java.util.List;
Yuta HIGUCHI3e7994c2014-05-12 21:01:33 -07008import java.util.UUID;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07009import java.util.concurrent.Callable;
10import java.util.concurrent.ConcurrentLinkedQueue;
11import java.util.concurrent.ConcurrentMap;
12import java.util.concurrent.ConcurrentNavigableMap;
13import java.util.concurrent.ConcurrentSkipListMap;
14import java.util.concurrent.ExecutionException;
15import java.util.concurrent.ExecutorService;
16import java.util.concurrent.Executors;
17import java.util.concurrent.Future;
18
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070019import org.junit.After;
20import org.junit.Before;
21import org.junit.Test;
22
23public class AtomicCounterTest {
24
Yuta HIGUCHI3e7994c2014-05-12 21:01:33 -070025 private static final String TEST_COUNTER = "TestCounter" + UUID.randomUUID();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070026 private static final byte[] LONG_ZERO = {0, 0, 0, 0, 0, 0, 0, 0}; // 0L
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070027 private IKVTableID counterID;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070028
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070029 @Before
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070030 @After
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070031 public void resetCounter() {
32 IKVClient client = DataStoreClient.getClient();
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070033 counterID = client.getTable(TEST_COUNTER).getTableId();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070034 client.setCounter(counterID, LONG_ZERO, 0L);
35 client.destroyCounter(counterID, LONG_ZERO);
Yuta HIGUCHIc9552622014-04-24 11:01:48 -070036 counterID = client.getTable(TEST_COUNTER).getTableId();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070037 }
38
39 @Test
40 public void testSetCounter() throws ObjectExistsException, ObjectDoesntExistException {
41 IKVClient client = DataStoreClient.getClient();
42
43 final long five = 5;
44 client.createCounter(counterID, LONG_ZERO, five);
45
46 final long three = 3;
47 client.setCounter(counterID, LONG_ZERO, three);
48
49 final long four = client.incrementCounter(counterID, LONG_ZERO, 1);
50 assertEquals(4, four);
51 }
52
53 @Test
54 public void testIncrementCounter() throws ObjectExistsException, ObjectDoesntExistException {
55
56 IKVClient client = DataStoreClient.getClient();
57
58 final long five = 5;
59 client.createCounter(counterID, LONG_ZERO, five);
60
61 final long six = client.incrementCounter(counterID, LONG_ZERO, 1);
62 assertEquals(6, six);
63
64
65 final long nine = client.incrementCounter(counterID, LONG_ZERO, 3);
66 assertEquals(9, nine);
67 }
68
69
Yuta HIGUCHI69200352014-04-11 12:32:44 -070070 private static final int NUM_INCREMENTS = Math.max(1, Integer
71 .valueOf(System.getProperty("AtomicCounterTest.NUM_INCREMENTS",
72 "500")));
73 private static final int NUM_THREADS = Math.max(1, Integer.valueOf(System
74 .getProperty("AtomicCounterTest.NUM_THREADS", "3")));
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070075
76 class Incrementor implements Callable<Long> {
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -070077 private final ConcurrentMap<Long, Long> uniquenessTestSet;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070078 private final ConcurrentLinkedQueue<Long> incrementTimes;
79
80 public Incrementor(ConcurrentMap<Long, Long> uniquenessTestSet, ConcurrentLinkedQueue<Long> incrementTimes) {
81 super();
82 this.uniquenessTestSet = uniquenessTestSet;
83 this.incrementTimes = incrementTimes;
84 }
85
86 @Override
87 public Long call() throws ObjectDoesntExistException {
88 IKVClient client = DataStoreClient.getClient();
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -070089 for (int i = 0; i < NUM_INCREMENTS; ++i) {
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070090 final long start = System.nanoTime();
91 final long incremented = client.incrementCounter(counterID, LONG_ZERO, 1);
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -070092 incrementTimes.add(System.nanoTime() - start);
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070093 final Long expectNull = uniquenessTestSet.putIfAbsent(incremented, incremented);
94 assertNull(expectNull);
95 }
96 return null;
97 }
98 }
99
100 @Test
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700101 public void testParallelIncrementCounter() throws ObjectExistsException,
102 InterruptedException, ExecutionException {
103
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700104 IKVClient client = DataStoreClient.getClient();
105
106 client.createCounter(counterID, LONG_ZERO, 0L);
107
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700108 ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
109 final int initThreads = Math.max(1, Integer.valueOf(System
110 .getProperty("AtomicCounterTest.initThreads",
111 String.valueOf(NUM_THREADS))));
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700112 for (int numThreads = initThreads; numThreads <= NUM_THREADS; ++numThreads) {
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700113 client.setCounter(counterID, LONG_ZERO, 0L);
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700114 parallelIncrementCounter(executor, numThreads);
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700115 }
116
117 executor.shutdown();
118 }
119
120 private void parallelIncrementCounter(final ExecutorService executor,
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700121 final int numThreads) throws InterruptedException, ExecutionException {
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700122
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700123 ConcurrentNavigableMap<Long, Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700124 ConcurrentLinkedQueue<Long> incrementTimes = new ConcurrentLinkedQueue<Long>();
125
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700126 List<Callable<Long>> tasks = new ArrayList<>(numThreads);
127 for (int i = 0; i < numThreads; ++i) {
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700128 tasks.add(new Incrementor(uniquenessTestSet, incrementTimes));
129 }
130 List<Future<Long>> futures = executor.invokeAll(tasks);
131
132 // wait for all tasks to end
133 for (Future<Long> future : futures) {
134 future.get();
135 }
136
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700137 assertEquals(numThreads * NUM_INCREMENTS , uniquenessTestSet.size());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700138 long prevValue = 0;
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700139 for (Long value : uniquenessTestSet.keySet()) {
140 assertEquals((prevValue + 1), value.longValue());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700141 prevValue = value;
142 }
143
144 long max = 0L;
145 long min = Long.MAX_VALUE;
146 long sum = 0L;
147 for (Long time : incrementTimes) {
148 sum += time;
149 max = Math.max(max, time);
150 min = Math.min(min, time);
151 }
Yuta HIGUCHIc7b72c32014-04-17 19:49:18 -0700152 System.err.printf("incrementCounter: th, incs, N, avg(ns), min(ns), max(ns)\n");
153 System.err.printf("incrementCounter: %d, %d, %d, %f, %d, %d\n",
Yuta HIGUCHI44a0b352014-05-14 21:32:48 -0700154 numThreads, NUM_INCREMENTS, incrementTimes.size(),
155 sum / (double) incrementTimes.size(), min, max);
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700156 }
157
158}