blob: 6bae01ee16ab115c78ccc8134c34163a81f7da6a [file] [log] [blame]
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07001package net.onrc.onos.core.datastore;
2
3import static org.junit.Assert.*;
4
5import java.util.ArrayList;
6import java.util.List;
7import java.util.concurrent.Callable;
8import java.util.concurrent.ConcurrentLinkedQueue;
9import java.util.concurrent.ConcurrentMap;
10import java.util.concurrent.ConcurrentNavigableMap;
11import java.util.concurrent.ConcurrentSkipListMap;
12import java.util.concurrent.ExecutionException;
13import java.util.concurrent.ExecutorService;
14import java.util.concurrent.Executors;
15import java.util.concurrent.Future;
16
17import net.onrc.onos.core.datastore.DataStoreClient;
18import net.onrc.onos.core.datastore.IKVClient;
19import net.onrc.onos.core.datastore.IKVTableID;
20import net.onrc.onos.core.datastore.ObjectDoesntExistException;
21import net.onrc.onos.core.datastore.ObjectExistsException;
22
23import org.junit.After;
24import org.junit.Before;
25import org.junit.Test;
26
27public class AtomicCounterTest {
28
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070029 static {
30 // configuration to quickly fall back to instance mode for faster test run
31 System.setProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit", "0");
32 }
33
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070034 private static final String TEST_COUNTER = "TestCounter";
35 private static final byte[] LONG_ZERO = {0, 0, 0, 0, 0, 0, 0, 0}; // 0L
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070036 private IKVTableID counterID;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070037
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070038 @Before
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070039 @After
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070040 public void resetCounter() {
41 IKVClient client = DataStoreClient.getClient();
Yuta HIGUCHIceb21b62014-04-17 15:46:05 -070042 counterID = client.getTable(TEST_COUNTER).getTableId();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070043 client.setCounter(counterID, LONG_ZERO, 0L);
44 client.destroyCounter(counterID, LONG_ZERO);
Yuta HIGUCHIc9552622014-04-24 11:01:48 -070045 counterID = client.getTable(TEST_COUNTER).getTableId();
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070046 }
47
48 @Test
49 public void testSetCounter() throws ObjectExistsException, ObjectDoesntExistException {
50 IKVClient client = DataStoreClient.getClient();
51
52 final long five = 5;
53 client.createCounter(counterID, LONG_ZERO, five);
54
55 final long three = 3;
56 client.setCounter(counterID, LONG_ZERO, three);
57
58 final long four = client.incrementCounter(counterID, LONG_ZERO, 1);
59 assertEquals(4, four);
60 }
61
62 @Test
63 public void testIncrementCounter() throws ObjectExistsException, ObjectDoesntExistException {
64
65 IKVClient client = DataStoreClient.getClient();
66
67 final long five = 5;
68 client.createCounter(counterID, LONG_ZERO, five);
69
70 final long six = client.incrementCounter(counterID, LONG_ZERO, 1);
71 assertEquals(6, six);
72
73
74 final long nine = client.incrementCounter(counterID, LONG_ZERO, 3);
75 assertEquals(9, nine);
76 }
77
78
Yuta HIGUCHI69200352014-04-11 12:32:44 -070079 private static final int NUM_INCREMENTS = Math.max(1, Integer
80 .valueOf(System.getProperty("AtomicCounterTest.NUM_INCREMENTS",
81 "500")));
82 private static final int NUM_THREADS = Math.max(1, Integer.valueOf(System
83 .getProperty("AtomicCounterTest.NUM_THREADS", "3")));
Yuta HIGUCHId47eac32014-04-07 13:44:47 -070084
85 class Incrementor implements Callable<Long> {
86 private final ConcurrentMap<Long,Long> uniquenessTestSet;
87 private final ConcurrentLinkedQueue<Long> incrementTimes;
88
89 public Incrementor(ConcurrentMap<Long, Long> uniquenessTestSet, ConcurrentLinkedQueue<Long> incrementTimes) {
90 super();
91 this.uniquenessTestSet = uniquenessTestSet;
92 this.incrementTimes = incrementTimes;
93 }
94
95 @Override
96 public Long call() throws ObjectDoesntExistException {
97 IKVClient client = DataStoreClient.getClient();
98 for (int i = 0 ; i < NUM_INCREMENTS ; ++i) {
99 final long start = System.nanoTime();
100 final long incremented = client.incrementCounter(counterID, LONG_ZERO, 1);
101 incrementTimes.add( System.nanoTime() - start );
102 final Long expectNull = uniquenessTestSet.putIfAbsent(incremented, incremented);
103 assertNull(expectNull);
104 }
105 return null;
106 }
107 }
108
109 @Test
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700110 public void testParallelIncrementCounter() throws ObjectExistsException,
111 InterruptedException, ExecutionException {
112
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700113 IKVClient client = DataStoreClient.getClient();
114
115 client.createCounter(counterID, LONG_ZERO, 0L);
116
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700117 ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
118 final int initThreads = Math.max(1, Integer.valueOf(System
119 .getProperty("AtomicCounterTest.initThreads",
120 String.valueOf(NUM_THREADS))));
121 for (int num_threads = initThreads; num_threads <= NUM_THREADS; ++num_threads) {
122 client.setCounter(counterID, LONG_ZERO, 0L);
123 parallelIncrementCounter(executor, num_threads);
124 }
125
126 executor.shutdown();
127 }
128
129 private void parallelIncrementCounter(final ExecutorService executor,
130 final int num_threads) throws InterruptedException, ExecutionException {
131
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700132 ConcurrentNavigableMap<Long,Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
133 ConcurrentLinkedQueue<Long> incrementTimes = new ConcurrentLinkedQueue<Long>();
134
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700135 List<Callable<Long>> tasks = new ArrayList<>(num_threads);
136 for (int i = 0 ; i < num_threads ; ++i) {
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700137 tasks.add(new Incrementor(uniquenessTestSet, incrementTimes));
138 }
139 List<Future<Long>> futures = executor.invokeAll(tasks);
140
141 // wait for all tasks to end
142 for (Future<Long> future : futures) {
143 future.get();
144 }
145
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700146 assertEquals(num_threads * NUM_INCREMENTS , uniquenessTestSet.size());
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700147 long prevValue = 0;
148 for (Long value : uniquenessTestSet.keySet() ) {
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700149 assertEquals( (prevValue + 1), value.longValue() );
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700150 prevValue = value;
151 }
152
153 long max = 0L;
154 long min = Long.MAX_VALUE;
155 long sum = 0L;
156 for (Long time : incrementTimes) {
157 sum += time;
158 max = Math.max(max, time);
159 min = Math.min(min, time);
160 }
Yuta HIGUCHIc7b72c32014-04-17 19:49:18 -0700161 System.err.printf("incrementCounter: th, incs, N, avg(ns), min(ns), max(ns)\n");
162 System.err.printf("incrementCounter: %d, %d, %d, %f, %d, %d\n",
Yuta HIGUCHI69200352014-04-11 12:32:44 -0700163 num_threads, NUM_INCREMENTS, incrementTimes.size(),
164 sum/(double)incrementTimes.size(), min, max );
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700165 }
166
167}