DistributedAtomicLong

- DistributedAtomicLong required for SharedLog sequencer.

Change-Id: I8b85970e67fcf18ea6938312c62eb84999c45105
diff --git a/src/test/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLongTest.java b/src/test/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLongTest.java
new file mode 100644
index 0000000..fbfc058
--- /dev/null
+++ b/src/test/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLongTest.java
@@ -0,0 +1,181 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test cases for HZDistributedAtomicLong.
+ */
+public class HZDistributedAtomicLongTest {
+    static {
+        // configuration to quickly fall back to instance mode for faster test run
+        System.setProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit", "0");
+    }
+
+    static final String TEST_COUNTER_NAME = "Counter" + UUID.randomUUID();
+
+    DistributedAtomicLong counter;
+
+    @Before
+    public void setUp() throws Exception {
+        counter = new HZDistributedAtomicLong(
+                    HZClient.getClient(), TEST_COUNTER_NAME);
+        counter.set(0L);
+    }
+
+    @Test
+    public void testGet() {
+        assertEquals(0L, counter.get());
+    }
+
+    @Test
+    public void testAddAndGet() {
+        assertEquals(0L, counter.get());
+        assertEquals(1L, counter.addAndGet(1L));
+        assertEquals(3L, counter.addAndGet(2L));
+        assertEquals(7L, counter.addAndGet(4L));
+    }
+
+    @Test
+    public void testSet() {
+        counter.set(42L);
+        assertEquals(42L, counter.get());
+    }
+
+    @Test
+    public void testIncrementAndGet() {
+        assertEquals(1L, counter.incrementAndGet());
+        assertEquals(2L, counter.incrementAndGet());
+        assertEquals(3L, counter.incrementAndGet());
+    }
+
+    /**
+     * Callable task incrementing atomicLong.
+     */
+    private static final class AdderTask implements Callable<Long> {
+        // using Map as Set
+        private final ConcurrentMap<Long, Long> uniquenessTestSet;
+        // Queue given here should be Thread-safe
+        private final Queue<Long> incrementTimes;
+        private final HZDistributedAtomicLong counter;
+        private final int numIncrements;
+
+        /**
+         * Constructor.
+         *
+         * @param numIncrements number of increments to execute
+         * @param uniquenessTestSet ThreadSafe Map to store increment result
+         * @param incrementTimes ThreadSafe Queue to store time it
+         *      took on each increment
+         */
+        public AdderTask(int numIncrements,
+                ConcurrentMap<Long, Long> uniquenessTestSet,
+                Queue<Long> incrementTimes) {
+
+            super();
+            this.uniquenessTestSet = uniquenessTestSet;
+            this.incrementTimes = incrementTimes;
+            this.counter = new HZDistributedAtomicLong(
+                    HZClient.getClient(), TEST_COUNTER_NAME);
+            this.numIncrements = numIncrements;
+        }
+
+        @Override
+        public Long call() throws ObjectDoesntExistException {
+            for (int i = 0; i < numIncrements; ++i) {
+                final long start = System.nanoTime();
+                final long incremented = counter.addAndGet(1L);
+                incrementTimes.add(System.nanoTime() - start);
+                final Long expectNull = uniquenessTestSet.putIfAbsent(
+                                                    incremented, incremented);
+                assertNull(expectNull);
+            }
+            return null;
+        }
+    }
+
+    private static final int NUM_THREADS = Integer.parseInt(
+                              System.getProperty(
+                                      "HZDistributedAtomicLongTest.NUM_THREADS",
+                                        System.getProperty("NUM_THREADS",
+                                                           "10")));
+
+    private static final int NUM_INCREMENTS = Integer.parseInt(
+                              System.getProperty(
+                                      "HZDistributedAtomicLongTest.NUM_INCREMENTS",
+                                        System.getProperty("NUM_INCREMENTS",
+                                                           "100")));
+
+    /**
+     * Increment using multiple threads to test addition is atomic.
+     */
+    @Test
+    public void testConcurrentAddAndGet() throws InterruptedException, ExecutionException {
+        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+
+        // using Map as Set
+        ConcurrentMap<Long, Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
+        Queue<Long> incrementTimes = new ConcurrentLinkedQueue<>();
+
+        // Start NUM_THREADS threads and increment NUM_INCREMENTS times each
+        List<Callable<Long>> tasks = new ArrayList<>(NUM_THREADS);
+        for (int i = 0; i < NUM_THREADS; ++i) {
+            tasks.add(new AdderTask(NUM_INCREMENTS,
+                                      uniquenessTestSet,
+                                      incrementTimes));
+        }
+        List<Future<Long>> futures = executor.invokeAll(tasks);
+
+        // wait for all tasks to end
+        for (Future<Long> future : futures) {
+            future.get();
+        }
+
+
+        assertEquals(NUM_THREADS * NUM_INCREMENTS , uniquenessTestSet.size());
+
+        // check uniqueness of result
+        long prevValue = 0;
+        for (Long value : uniquenessTestSet.keySet()) {
+            assertEquals((prevValue + 1), value.longValue());
+            prevValue = value;
+        }
+
+        long max = 0L;
+        long min = Long.MAX_VALUE;
+        long sum = 0L;
+        for (Long time : incrementTimes) {
+            sum += time;
+            max = Math.max(max, time);
+            min = Math.min(min, time);
+        }
+        System.err.printf("incrementCounter: th, incs, tot_incs,"
+                        + " avg(ns), min(ns), max(ns), T-put(1s/avg)\n");
+        System.err.printf("incrementCounter: %d, %d, %d,"
+                        + " %f, %d, %d, %f\n",
+                          NUM_THREADS, NUM_INCREMENTS, incrementTimes.size(),
+                          sum / (double) incrementTimes.size(), min, max,
+                          Math.pow(10, 9) * incrementTimes.size() / sum);
+
+        executor.shutdown();
+    }
+
+}