blob: b64b8923a1378a1483e62d24907b650657638466 [file] [log] [blame]
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07001package net.onrc.onos.core.datastore;
2
3import static org.junit.Assert.*;
4
5import java.util.ArrayList;
6import java.util.List;
7import java.util.concurrent.Callable;
8import java.util.concurrent.ConcurrentLinkedQueue;
9import java.util.concurrent.ConcurrentMap;
10import java.util.concurrent.ConcurrentNavigableMap;
11import java.util.concurrent.ConcurrentSkipListMap;
12import java.util.concurrent.ExecutionException;
13import java.util.concurrent.ExecutorService;
14import java.util.concurrent.Executors;
15import java.util.concurrent.Future;
16
17import net.onrc.onos.core.datastore.DataStoreClient;
18import net.onrc.onos.core.datastore.IKVClient;
19import net.onrc.onos.core.datastore.IKVTableID;
20import net.onrc.onos.core.datastore.ObjectDoesntExistException;
21import net.onrc.onos.core.datastore.ObjectExistsException;
22
23import org.junit.After;
24import org.junit.Before;
25import org.junit.Test;
26
27public class AtomicCounterTest {
28
29 private static final String TEST_COUNTER = "TestCounter";
30 private static final byte[] LONG_ZERO = {0, 0, 0, 0, 0, 0, 0, 0}; // 0L
31 private static final IKVTableID counterID = DataStoreClient.getClient().getTable(TEST_COUNTER).getTableId();
32
33 @After
34 @Before
35 public void resetCounter() {
36 IKVClient client = DataStoreClient.getClient();
37 client.setCounter(counterID, LONG_ZERO, 0L);
38 client.destroyCounter(counterID, LONG_ZERO);
39 }
40
41 @Test
42 public void testSetCounter() throws ObjectExistsException, ObjectDoesntExistException {
43 IKVClient client = DataStoreClient.getClient();
44
45 final long five = 5;
46 client.createCounter(counterID, LONG_ZERO, five);
47
48 final long three = 3;
49 client.setCounter(counterID, LONG_ZERO, three);
50
51 final long four = client.incrementCounter(counterID, LONG_ZERO, 1);
52 assertEquals(4, four);
53 }
54
55 @Test
56 public void testIncrementCounter() throws ObjectExistsException, ObjectDoesntExistException {
57
58 IKVClient client = DataStoreClient.getClient();
59
60 final long five = 5;
61 client.createCounter(counterID, LONG_ZERO, five);
62
63 final long six = client.incrementCounter(counterID, LONG_ZERO, 1);
64 assertEquals(6, six);
65
66
67 final long nine = client.incrementCounter(counterID, LONG_ZERO, 3);
68 assertEquals(9, nine);
69 }
70
71
72 private static final int NUM_INCREMENTS = 500;
73 private static final int NUM_THREADS = 5;
74
75 class Incrementor implements Callable<Long> {
76 private final ConcurrentMap<Long,Long> uniquenessTestSet;
77 private final ConcurrentLinkedQueue<Long> incrementTimes;
78
79 public Incrementor(ConcurrentMap<Long, Long> uniquenessTestSet, ConcurrentLinkedQueue<Long> incrementTimes) {
80 super();
81 this.uniquenessTestSet = uniquenessTestSet;
82 this.incrementTimes = incrementTimes;
83 }
84
85 @Override
86 public Long call() throws ObjectDoesntExistException {
87 IKVClient client = DataStoreClient.getClient();
88 for (int i = 0 ; i < NUM_INCREMENTS ; ++i) {
89 final long start = System.nanoTime();
90 final long incremented = client.incrementCounter(counterID, LONG_ZERO, 1);
91 incrementTimes.add( System.nanoTime() - start );
92 final Long expectNull = uniquenessTestSet.putIfAbsent(incremented, incremented);
93 assertNull(expectNull);
94 }
95 return null;
96 }
97 }
98
99 @Test
100 public void testParallelIncrementCounter() throws ObjectExistsException, InterruptedException, ExecutionException {
101 IKVClient client = DataStoreClient.getClient();
102
103 client.createCounter(counterID, LONG_ZERO, 0L);
104
105 ConcurrentNavigableMap<Long,Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
106 ConcurrentLinkedQueue<Long> incrementTimes = new ConcurrentLinkedQueue<Long>();
107
108 ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
109
110 List<Callable<Long>> tasks = new ArrayList<>(NUM_THREADS);
111 for (int i = 0 ; i < NUM_THREADS ; ++i) {
112 tasks.add(new Incrementor(uniquenessTestSet, incrementTimes));
113 }
114 List<Future<Long>> futures = executor.invokeAll(tasks);
115
116 // wait for all tasks to end
117 for (Future<Long> future : futures) {
118 future.get();
119 }
120
121 assertEquals(NUM_THREADS * NUM_INCREMENTS , uniquenessTestSet.size() );
122 long prevValue = 0;
123 for (Long value : uniquenessTestSet.keySet() ) {
124 assertTrue( (prevValue + 1) == value );
125 prevValue = value;
126 }
127
128 long max = 0L;
129 long min = Long.MAX_VALUE;
130 long sum = 0L;
131 for (Long time : incrementTimes) {
132 sum += time;
133 max = Math.max(max, time);
134 min = Math.min(min, time);
135 }
136 System.err.printf("incrementCounter avg:%f (ns) min:%d (ns) max:%d (ns) N:%d\n", sum/(double)incrementTimes.size(), min, max, incrementTimes.size() );
137 }
138
139}