blob: 439dd53df663d8c3251ffe1379e18ab3d989cd6b [file] [log] [blame]
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07001package net.onrc.onos.core.datastore;
2
3import static org.junit.Assert.*;
4
5import java.util.ArrayList;
6import java.util.List;
7import java.util.concurrent.Callable;
8import java.util.concurrent.ConcurrentLinkedQueue;
9import java.util.concurrent.ConcurrentMap;
10import java.util.concurrent.ConcurrentNavigableMap;
11import java.util.concurrent.ConcurrentSkipListMap;
12import java.util.concurrent.ExecutionException;
13import java.util.concurrent.ExecutorService;
14import java.util.concurrent.Executors;
15import java.util.concurrent.Future;
16
17import net.onrc.onos.core.datastore.DataStoreClient;
18import net.onrc.onos.core.datastore.IKVClient;
19import net.onrc.onos.core.datastore.IKVTableID;
20import net.onrc.onos.core.datastore.ObjectDoesntExistException;
21import net.onrc.onos.core.datastore.ObjectExistsException;
22
23import org.junit.After;
24import org.junit.Before;
25import org.junit.Test;
26
27public class AtomicCounterTest {
28
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070029 static {
30 // configuration to quickly fall back to instance mode for faster test run
31 System.setProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit", "0");
32 }
33
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070034 private static final String TEST_COUNTER = "TestCounter";
35 private static final byte[] LONG_ZERO = {0, 0, 0, 0, 0, 0, 0, 0}; // 0L
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070036 private IKVTableID counterID;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070037
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070038 @Before
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070039 @After
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070040 public void resetCounter() {
41 IKVClient client = DataStoreClient.getClient();
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070042 counterID = client.getTable(TEST_COUNTER).getTableId();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070043 client.setCounter(counterID, LONG_ZERO, 0L);
44 client.destroyCounter(counterID, LONG_ZERO);
45 }
46
47 @Test
48 public void testSetCounter() throws ObjectExistsException, ObjectDoesntExistException {
49 IKVClient client = DataStoreClient.getClient();
50
51 final long five = 5;
52 client.createCounter(counterID, LONG_ZERO, five);
53
54 final long three = 3;
55 client.setCounter(counterID, LONG_ZERO, three);
56
57 final long four = client.incrementCounter(counterID, LONG_ZERO, 1);
58 assertEquals(4, four);
59 }
60
61 @Test
62 public void testIncrementCounter() throws ObjectExistsException, ObjectDoesntExistException {
63
64 IKVClient client = DataStoreClient.getClient();
65
66 final long five = 5;
67 client.createCounter(counterID, LONG_ZERO, five);
68
69 final long six = client.incrementCounter(counterID, LONG_ZERO, 1);
70 assertEquals(6, six);
71
72
73 final long nine = client.incrementCounter(counterID, LONG_ZERO, 3);
74 assertEquals(9, nine);
75 }
76
77
Yuta HIGUCHI69200352014-04-11 12:32:44 -070078 private static final int NUM_INCREMENTS = Math.max(1, Integer
79 .valueOf(System.getProperty("AtomicCounterTest.NUM_INCREMENTS",
80 "500")));
81 private static final int NUM_THREADS = Math.max(1, Integer.valueOf(System
82 .getProperty("AtomicCounterTest.NUM_THREADS", "3")));
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070083
84 class Incrementor implements Callable<Long> {
85 private final ConcurrentMap<Long,Long> uniquenessTestSet;
86 private final ConcurrentLinkedQueue<Long> incrementTimes;
87
88 public Incrementor(ConcurrentMap<Long, Long> uniquenessTestSet, ConcurrentLinkedQueue<Long> incrementTimes) {
89 super();
90 this.uniquenessTestSet = uniquenessTestSet;
91 this.incrementTimes = incrementTimes;
92 }
93
94 @Override
95 public Long call() throws ObjectDoesntExistException {
96 IKVClient client = DataStoreClient.getClient();
97 for (int i = 0 ; i < NUM_INCREMENTS ; ++i) {
98 final long start = System.nanoTime();
99 final long incremented = client.incrementCounter(counterID, LONG_ZERO, 1);
100 incrementTimes.add( System.nanoTime() - start );
101 final Long expectNull = uniquenessTestSet.putIfAbsent(incremented, incremented);
102 assertNull(expectNull);
103 }
104 return null;
105 }
106 }
107
108 @Test
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700109 public void testParallelIncrementCounter() throws ObjectExistsException,
110 InterruptedException, ExecutionException {
111
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700112 IKVClient client = DataStoreClient.getClient();
113
114 client.createCounter(counterID, LONG_ZERO, 0L);
115
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700116 ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
117 final int initThreads = Math.max(1, Integer.valueOf(System
118 .getProperty("AtomicCounterTest.initThreads",
119 String.valueOf(NUM_THREADS))));
120 for (int num_threads = initThreads; num_threads <= NUM_THREADS; ++num_threads) {
121 client.setCounter(counterID, LONG_ZERO, 0L);
122 parallelIncrementCounter(executor, num_threads);
123 }
124
125 executor.shutdown();
126 }
127
128 private void parallelIncrementCounter(final ExecutorService executor,
129 final int num_threads) throws InterruptedException, ExecutionException {
130
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700131 ConcurrentNavigableMap<Long,Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
132 ConcurrentLinkedQueue<Long> incrementTimes = new ConcurrentLinkedQueue<Long>();
133
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700134 List<Callable<Long>> tasks = new ArrayList<>(num_threads);
135 for (int i = 0 ; i < num_threads ; ++i) {
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700136 tasks.add(new Incrementor(uniquenessTestSet, incrementTimes));
137 }
138 List<Future<Long>> futures = executor.invokeAll(tasks);
139
140 // wait for all tasks to end
141 for (Future<Long> future : futures) {
142 future.get();
143 }
144
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700145 assertEquals(num_threads * NUM_INCREMENTS , uniquenessTestSet.size());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700146 long prevValue = 0;
147 for (Long value : uniquenessTestSet.keySet() ) {
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700148 assertEquals( (prevValue + 1), value.longValue() );
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700149 prevValue = value;
150 }
151
152 long max = 0L;
153 long min = Long.MAX_VALUE;
154 long sum = 0L;
155 for (Long time : incrementTimes) {
156 sum += time;
157 max = Math.max(max, time);
158 min = Math.min(min, time);
159 }
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700160 System.err.printf("incrementCounter(th:%d , incs:%d ) N:%d\tavg:%f (ns) min:%d (ns) max:%d (ns)\n",
161 num_threads, NUM_INCREMENTS, incrementTimes.size(),
162 sum/(double)incrementTimes.size(), min, max );
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700163 }
164
165}