DistributedAtomicLong

- DistributedAtomicLong required for SharedLog sequencer.

Change-Id: I8b85970e67fcf18ea6938312c62eb84999c45105
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLong.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLong.java
new file mode 100644
index 0000000..5df8fc3
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLong.java
@@ -0,0 +1,54 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IAtomicLong;
+
+import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
+
+/**
+ * Hazelcast implementation of DistributedAtomicLong.
+ */
+public class HZDistributedAtomicLong implements DistributedAtomicLong {
+    private final IAtomicLong hzAtomicLong;
+
+    // TODO remove dependency HZClient if possible
+    /**
+     * Creates or Gets the DistributedAtomicLong instance.
+     *
+     * @param client client to use
+     * @param name the name of the DistributedAtomicLong instance
+     */
+    public HZDistributedAtomicLong(HZClient client, String name) {
+        this(client.getHZInstance(), name);
+    }
+
+    /**
+     * Creates or Gets the DistributedAtomicLong instance.
+     *
+     * @param instance HazelcastInstance to use
+     * @param name the name of the DistributedAtomicLong instance.
+     */
+    public HZDistributedAtomicLong(HazelcastInstance instance, String name) {
+        hzAtomicLong = instance.getAtomicLong(name);
+    }
+
+    @Override
+    public long get() {
+        return hzAtomicLong.get();
+    }
+
+    @Override
+    public long addAndGet(long delta) {
+        return hzAtomicLong.addAndGet(delta);
+    }
+
+    @Override
+    public void set(long newValue) {
+        hzAtomicLong.set(newValue);
+    }
+
+    @Override
+    public long incrementAndGet() {
+        return hzAtomicLong.incrementAndGet();
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCDistributedAtomicLong.java b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCDistributedAtomicLong.java
new file mode 100644
index 0000000..dc75940
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCDistributedAtomicLong.java
@@ -0,0 +1,76 @@
+package net.onrc.onos.core.datastore.ramcloud;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.core.datastore.IKVTable;
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
+
+/**
+ * RAMCloudImplementation of DistributedAtomicLong.
+ */
+public class RCDistributedAtomicLong implements DistributedAtomicLong {
+    private static final String PREFIX = "DAL:";
+    private static final byte[] KEY = {0};
+    private static final byte[] ZERO = ByteArrayUtil.toLEBytes(0L);
+
+    private static final Logger log = LoggerFactory.getLogger(RCDistributedAtomicLong.class);
+
+
+    private final RCClient client;
+    private final String name;
+    private final IKVTableID tableID;
+
+
+    /**
+     * Creates or Gets the DistributedAtomicLong instance.
+     *
+     * @param client client to use.
+     * @param name name of the DistributedAtomicLong instance.
+     */
+    public RCDistributedAtomicLong(final RCClient client, final String name) {
+
+        this.client = client;
+        this.name = name;
+        IKVTable table = client.getTable(PREFIX + name);
+        this.tableID = table.getTableId();
+
+        try {
+            table.create(KEY, ZERO);
+        } catch (ObjectExistsException e) {
+            log.trace("RCDistributedAtomicLong {} already exists", name);
+        }
+    }
+
+    @Override
+    public long get() {
+        try {
+            IKVEntry entry = client.read(tableID, KEY);
+            return ByteArrayUtil.fromLEBytes(entry.getValue());
+        } catch (ObjectDoesntExistException e) {
+            log.error("RCDistributedAtomicLong {} does not exist", name);
+            throw new IllegalStateException(name + " does not exist", e);
+        }
+    }
+
+    @Override
+    public long addAndGet(long delta) {
+        return client.incrementCounter(tableID, KEY, delta);
+    }
+
+    @Override
+    public void set(long newValue) {
+        client.setCounter(tableID, KEY, newValue);
+    }
+
+    @Override
+    public long incrementAndGet() {
+        return addAndGet(1L);
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/DistributedAtomicLong.java b/src/main/java/net/onrc/onos/core/util/distributed/DistributedAtomicLong.java
new file mode 100644
index 0000000..c7e1aea
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/DistributedAtomicLong.java
@@ -0,0 +1,39 @@
+package net.onrc.onos.core.util.distributed;
+
+// TODO Should it extend Number?
+// TODO Only minimum set required for sequencer is defined now. Add CAS, etc.
+/**
+ * Distributed version of AtomicLong.
+ */
+public interface DistributedAtomicLong {
+
+    /**
+     * Gets the current value.
+     *
+     * @return current value
+     */
+    long get();
+
+    /**
+     * Atomically adds the given value to the current value.
+     *
+     * @param delta value to add
+     * @return updated value
+     */
+    long addAndGet(long delta);
+
+
+    /**
+     * Sets to the given value.
+     *
+     * @param newValue value to set.
+     */
+    public void set(long newValue);
+
+    /**
+     * Atomically adds one to the current value.
+     *
+     * @return updated value
+     */
+    public long incrementAndGet();
+}
diff --git a/src/test/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLongTest.java b/src/test/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLongTest.java
new file mode 100644
index 0000000..fbfc058
--- /dev/null
+++ b/src/test/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLongTest.java
@@ -0,0 +1,181 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test cases for HZDistributedAtomicLong.
+ */
+public class HZDistributedAtomicLongTest {
+    static {
+        // configuration to quickly fall back to instance mode for faster test run
+        System.setProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit", "0");
+    }
+
+    static final String TEST_COUNTER_NAME = "Counter" + UUID.randomUUID();
+
+    DistributedAtomicLong counter;
+
+    @Before
+    public void setUp() throws Exception {
+        counter = new HZDistributedAtomicLong(
+                    HZClient.getClient(), TEST_COUNTER_NAME);
+        counter.set(0L);
+    }
+
+    @Test
+    public void testGet() {
+        assertEquals(0L, counter.get());
+    }
+
+    @Test
+    public void testAddAndGet() {
+        assertEquals(0L, counter.get());
+        assertEquals(1L, counter.addAndGet(1L));
+        assertEquals(3L, counter.addAndGet(2L));
+        assertEquals(7L, counter.addAndGet(4L));
+    }
+
+    @Test
+    public void testSet() {
+        counter.set(42L);
+        assertEquals(42L, counter.get());
+    }
+
+    @Test
+    public void testIncrementAndGet() {
+        assertEquals(1L, counter.incrementAndGet());
+        assertEquals(2L, counter.incrementAndGet());
+        assertEquals(3L, counter.incrementAndGet());
+    }
+
+    /**
+     * Callable task incrementing atomicLong.
+     */
+    private static final class AdderTask implements Callable<Long> {
+        // using Map as Set
+        private final ConcurrentMap<Long, Long> uniquenessTestSet;
+        // Queue given here should be Thread-safe
+        private final Queue<Long> incrementTimes;
+        private final HZDistributedAtomicLong counter;
+        private final int numIncrements;
+
+        /**
+         * Constructor.
+         *
+         * @param numIncrements number of increments to execute
+         * @param uniquenessTestSet ThreadSafe Map to store increment result
+         * @param incrementTimes ThreadSafe Queue to store time it
+         *      took on each increment
+         */
+        public AdderTask(int numIncrements,
+                ConcurrentMap<Long, Long> uniquenessTestSet,
+                Queue<Long> incrementTimes) {
+
+            super();
+            this.uniquenessTestSet = uniquenessTestSet;
+            this.incrementTimes = incrementTimes;
+            this.counter = new HZDistributedAtomicLong(
+                    HZClient.getClient(), TEST_COUNTER_NAME);
+            this.numIncrements = numIncrements;
+        }
+
+        @Override
+        public Long call() throws ObjectDoesntExistException {
+            for (int i = 0; i < numIncrements; ++i) {
+                final long start = System.nanoTime();
+                final long incremented = counter.addAndGet(1L);
+                incrementTimes.add(System.nanoTime() - start);
+                final Long expectNull = uniquenessTestSet.putIfAbsent(
+                                                    incremented, incremented);
+                assertNull(expectNull);
+            }
+            return null;
+        }
+    }
+
+    private static final int NUM_THREADS = Integer.parseInt(
+                              System.getProperty(
+                                      "HZDistributedAtomicLongTest.NUM_THREADS",
+                                        System.getProperty("NUM_THREADS",
+                                                           "10")));
+
+    private static final int NUM_INCREMENTS = Integer.parseInt(
+                              System.getProperty(
+                                      "HZDistributedAtomicLongTest.NUM_INCREMENTS",
+                                        System.getProperty("NUM_INCREMENTS",
+                                                           "100")));
+
+    /**
+     * Increment using multiple threads to test addition is atomic.
+     */
+    @Test
+    public void testConcurrentAddAndGet() throws InterruptedException, ExecutionException {
+        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+
+        // using Map as Set
+        ConcurrentMap<Long, Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
+        Queue<Long> incrementTimes = new ConcurrentLinkedQueue<>();
+
+        // Start NUM_THREADS threads and increment NUM_INCREMENTS times each
+        List<Callable<Long>> tasks = new ArrayList<>(NUM_THREADS);
+        for (int i = 0; i < NUM_THREADS; ++i) {
+            tasks.add(new AdderTask(NUM_INCREMENTS,
+                                      uniquenessTestSet,
+                                      incrementTimes));
+        }
+        List<Future<Long>> futures = executor.invokeAll(tasks);
+
+        // wait for all tasks to end
+        for (Future<Long> future : futures) {
+            future.get();
+        }
+
+
+        assertEquals(NUM_THREADS * NUM_INCREMENTS , uniquenessTestSet.size());
+
+        // check uniqueness of result
+        long prevValue = 0;
+        for (Long value : uniquenessTestSet.keySet()) {
+            assertEquals((prevValue + 1), value.longValue());
+            prevValue = value;
+        }
+
+        long max = 0L;
+        long min = Long.MAX_VALUE;
+        long sum = 0L;
+        for (Long time : incrementTimes) {
+            sum += time;
+            max = Math.max(max, time);
+            min = Math.min(min, time);
+        }
+        System.err.printf("incrementCounter: th, incs, tot_incs,"
+                        + " avg(ns), min(ns), max(ns), T-put(1s/avg)\n");
+        System.err.printf("incrementCounter: %d, %d, %d,"
+                        + " %f, %d, %d, %f\n",
+                          NUM_THREADS, NUM_INCREMENTS, incrementTimes.size(),
+                          sum / (double) incrementTimes.size(), min, max,
+                          Math.pow(10, 9) * incrementTimes.size() / sum);
+
+        executor.shutdown();
+    }
+
+}