blob: 384cc0380f1b1c50f2d8b94d5fa5ec3a23a766df [file] [log] [blame]
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07001package net.onrc.onos.core.datastore;
2
3import static org.junit.Assert.*;
4
5import java.util.ArrayList;
6import java.util.List;
Yuta HIGUCHI3e7994c2014-05-12 21:01:33 -07007import java.util.UUID;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07008import java.util.concurrent.Callable;
9import java.util.concurrent.ConcurrentLinkedQueue;
10import java.util.concurrent.ConcurrentMap;
11import java.util.concurrent.ConcurrentNavigableMap;
12import java.util.concurrent.ConcurrentSkipListMap;
13import java.util.concurrent.ExecutionException;
14import java.util.concurrent.ExecutorService;
15import java.util.concurrent.Executors;
16import java.util.concurrent.Future;
17
18import net.onrc.onos.core.datastore.DataStoreClient;
19import net.onrc.onos.core.datastore.IKVClient;
20import net.onrc.onos.core.datastore.IKVTableID;
21import net.onrc.onos.core.datastore.ObjectDoesntExistException;
22import net.onrc.onos.core.datastore.ObjectExistsException;
23
24import org.junit.After;
25import org.junit.Before;
26import org.junit.Test;
27
28public class AtomicCounterTest {
29
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070030 static {
31 // configuration to quickly fall back to instance mode for faster test run
32 System.setProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit", "0");
33 }
34
Yuta HIGUCHI3e7994c2014-05-12 21:01:33 -070035 private static final String TEST_COUNTER = "TestCounter" + UUID.randomUUID();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070036 private static final byte[] LONG_ZERO = {0, 0, 0, 0, 0, 0, 0, 0}; // 0L
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070037 private IKVTableID counterID;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070038
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070039 @Before
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070040 @After
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070041 public void resetCounter() {
42 IKVClient client = DataStoreClient.getClient();
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070043 counterID = client.getTable(TEST_COUNTER).getTableId();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070044 client.setCounter(counterID, LONG_ZERO, 0L);
45 client.destroyCounter(counterID, LONG_ZERO);
Yuta HIGUCHIc9552622014-04-24 11:01:48 -070046 counterID = client.getTable(TEST_COUNTER).getTableId();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070047 }
48
49 @Test
50 public void testSetCounter() throws ObjectExistsException, ObjectDoesntExistException {
51 IKVClient client = DataStoreClient.getClient();
52
53 final long five = 5;
54 client.createCounter(counterID, LONG_ZERO, five);
55
56 final long three = 3;
57 client.setCounter(counterID, LONG_ZERO, three);
58
59 final long four = client.incrementCounter(counterID, LONG_ZERO, 1);
60 assertEquals(4, four);
61 }
62
63 @Test
64 public void testIncrementCounter() throws ObjectExistsException, ObjectDoesntExistException {
65
66 IKVClient client = DataStoreClient.getClient();
67
68 final long five = 5;
69 client.createCounter(counterID, LONG_ZERO, five);
70
71 final long six = client.incrementCounter(counterID, LONG_ZERO, 1);
72 assertEquals(6, six);
73
74
75 final long nine = client.incrementCounter(counterID, LONG_ZERO, 3);
76 assertEquals(9, nine);
77 }
78
79
Yuta HIGUCHI69200352014-04-11 12:32:44 -070080 private static final int NUM_INCREMENTS = Math.max(1, Integer
81 .valueOf(System.getProperty("AtomicCounterTest.NUM_INCREMENTS",
82 "500")));
83 private static final int NUM_THREADS = Math.max(1, Integer.valueOf(System
84 .getProperty("AtomicCounterTest.NUM_THREADS", "3")));
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070085
86 class Incrementor implements Callable<Long> {
87 private final ConcurrentMap<Long,Long> uniquenessTestSet;
88 private final ConcurrentLinkedQueue<Long> incrementTimes;
89
90 public Incrementor(ConcurrentMap<Long, Long> uniquenessTestSet, ConcurrentLinkedQueue<Long> incrementTimes) {
91 super();
92 this.uniquenessTestSet = uniquenessTestSet;
93 this.incrementTimes = incrementTimes;
94 }
95
96 @Override
97 public Long call() throws ObjectDoesntExistException {
98 IKVClient client = DataStoreClient.getClient();
99 for (int i = 0 ; i < NUM_INCREMENTS ; ++i) {
100 final long start = System.nanoTime();
101 final long incremented = client.incrementCounter(counterID, LONG_ZERO, 1);
102 incrementTimes.add( System.nanoTime() - start );
103 final Long expectNull = uniquenessTestSet.putIfAbsent(incremented, incremented);
104 assertNull(expectNull);
105 }
106 return null;
107 }
108 }
109
110 @Test
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700111 public void testParallelIncrementCounter() throws ObjectExistsException,
112 InterruptedException, ExecutionException {
113
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700114 IKVClient client = DataStoreClient.getClient();
115
116 client.createCounter(counterID, LONG_ZERO, 0L);
117
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700118 ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
119 final int initThreads = Math.max(1, Integer.valueOf(System
120 .getProperty("AtomicCounterTest.initThreads",
121 String.valueOf(NUM_THREADS))));
122 for (int num_threads = initThreads; num_threads <= NUM_THREADS; ++num_threads) {
123 client.setCounter(counterID, LONG_ZERO, 0L);
124 parallelIncrementCounter(executor, num_threads);
125 }
126
127 executor.shutdown();
128 }
129
130 private void parallelIncrementCounter(final ExecutorService executor,
131 final int num_threads) throws InterruptedException, ExecutionException {
132
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700133 ConcurrentNavigableMap<Long,Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
134 ConcurrentLinkedQueue<Long> incrementTimes = new ConcurrentLinkedQueue<Long>();
135
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700136 List<Callable<Long>> tasks = new ArrayList<>(num_threads);
137 for (int i = 0 ; i < num_threads ; ++i) {
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700138 tasks.add(new Incrementor(uniquenessTestSet, incrementTimes));
139 }
140 List<Future<Long>> futures = executor.invokeAll(tasks);
141
142 // wait for all tasks to end
143 for (Future<Long> future : futures) {
144 future.get();
145 }
146
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700147 assertEquals(num_threads * NUM_INCREMENTS , uniquenessTestSet.size());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700148 long prevValue = 0;
149 for (Long value : uniquenessTestSet.keySet() ) {
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700150 assertEquals( (prevValue + 1), value.longValue() );
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700151 prevValue = value;
152 }
153
154 long max = 0L;
155 long min = Long.MAX_VALUE;
156 long sum = 0L;
157 for (Long time : incrementTimes) {
158 sum += time;
159 max = Math.max(max, time);
160 min = Math.min(min, time);
161 }
Yuta HIGUCHIc7b72c32014-04-17 19:49:18 -0700162 System.err.printf("incrementCounter: th, incs, N, avg(ns), min(ns), max(ns)\n");
163 System.err.printf("incrementCounter: %d, %d, %d, %f, %d, %d\n",
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700164 num_threads, NUM_INCREMENTS, incrementTimes.size(),
165 sum/(double)incrementTimes.size(), min, max );
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700166 }
167
168}