Expose atomic counter increment API to Java

This is part of ONOS-1080, ONOS-1081
- implement JNI code
- implement RCClient
- implement HZClient
- implement simple test case

Change-Id: I499ba2a0b648302c4fec8c653631ab28bf52039c
diff --git a/src/main/cpp/edu_stanford_ramcloud_JRamCloud.cc b/src/main/cpp/edu_stanford_ramcloud_JRamCloud.cc
index d895138..f6cfea7 100644
--- a/src/main/cpp/edu_stanford_ramcloud_JRamCloud.cc
+++ b/src/main/cpp/edu_stanford_ramcloud_JRamCloud.cc
@@ -885,3 +885,17 @@
     }
     return outJNIArray;
 }
+
+/*
+ * Class:     edu_stanford_ramcloud_JRamCloud
+ * Method:    increment
+ * Signature: (J[BJ)J
+ */
+JNIEXPORT jlong JNICALL Java_edu_stanford_ramcloud_JRamCloud_increment (JNIEnv* env, jobject jRamCloud, jlong jTableId, jbyteArray jKey, jlong incrementValue) {
+    RamCloud* ramcloud = getRamCloud(env, jRamCloud);
+    JByteArrayReference key(env, jKey);
+    uint64_t version = VERSION_NONEXISTENT;
+    try {
+        return ramcloud->increment(jTableId, key.pointer, key.length, incrementValue, NULL, &version);
+    } EXCEPTION_CATCHER(VERSION_NONEXISTENT);
+}
diff --git a/src/main/java/edu/stanford/ramcloud/JRamCloud.java b/src/main/java/edu/stanford/ramcloud/JRamCloud.java
index 6d5714c..ffc092f 100644
--- a/src/main/java/edu/stanford/ramcloud/JRamCloud.java
+++ b/src/main/java/edu/stanford/ramcloud/JRamCloud.java
@@ -395,6 +395,7 @@
     public native long writeRule(long tableId, byte[] key, byte[] value, RejectRules rules);
     public native MultiWriteRspObject[] multiWrite(long[] tableId, byte[] key[], short[] keyDataSize, byte[] value[], int[] valueDataSize, int requestNum, RejectRules[] rules);
     public native TableEnumeratorObject getTableObjects(long tableId, long nextHash);
+    public native long increment(long tableId, byte[] key, long incrementValue) throws ObjectDoesntExistException;
 
     /*
      * The following exceptions may be thrown by the JNI functions:
diff --git a/src/main/java/net/onrc/onos/core/datastore/IKVClient.java b/src/main/java/net/onrc/onos/core/datastore/IKVClient.java
index c2d9e35..4736928 100644
--- a/src/main/java/net/onrc/onos/core/datastore/IKVClient.java
+++ b/src/main/java/net/onrc/onos/core/datastore/IKVClient.java
@@ -158,6 +158,56 @@
     public boolean multiRead(final Collection<IMultiEntryOperation> ops);
 
     /**
+     * Create atomic 64bit integer counter in data store.
+     *
+     * @param tableId
+     * @param key
+     * @param initialValue
+     * @throws ObjectExistsException
+     */
+    public void createCounter(final IKVTableID tableId, final byte[] key, final long initialValue) throws ObjectExistsException;
+
+    /**
+     * Set atomic 64bit integer counter in data store to specified value.
+     *
+     * @param tableId
+     * @param key
+     * @param value
+     * @throws ObjectExistsException
+     */
+    public void setCounter(final IKVTableID tableId, final byte[] key, final long value);
+
+    /**
+     * Atomically increment 64bit integer counter in data store.
+     *
+     * @param tableId
+     * @param key key where 64bit integer is stored
+     * @param incrementValue
+     * @return value after incrementing
+     */
+    public long incrementCounter(final IKVTableID tableId, final byte[] key, final long incrementValue);
+
+
+    /**
+     * Get atomic 64bit integer counter value in data store.
+     *
+     * @param tableId
+     * @param key
+     * @return current value
+     * @throws ObjectDoesntExistException
+     */
+    public long getCounter(final IKVTableID tableId, final byte[] key)
+            throws ObjectDoesntExistException;
+
+    /**
+     * Destroy atomic 64bit integer counter in data store.
+     *
+     * @param tableId
+     * @param key
+     */
+    public void destroyCounter(final IKVTableID tableId, final byte[] key);
+
+    /**
      * Version number which represents that the object does not exist, or has
      * never been read the DB before.
      */
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
index 3bfe157..4b6e8a1 100644
--- a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
@@ -16,6 +16,7 @@
 import net.onrc.onos.core.datastore.WrongVersionException;
 import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
 import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@
 import com.hazelcast.config.SerializationConfig;
 import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IAtomicLong;
 import com.hazelcast.core.IMap;
 
 public final class HZClient implements IKVClient {
@@ -339,5 +341,72 @@
         return VERSION_NONEXISTENT;
     }
 
+    private String getCounterName(final IKVTableID tableId, final byte[] key) {
+        StringBuffer buf = new StringBuffer(tableId.getTableName());
+        buf.append('@');
+        ByteArrayUtil.toHexStringBuffer(key, ":", buf);
+        return buf.toString();
+    }
+
+    private IAtomicLong getAtomicLong(final IKVTableID tableId, final byte[] key) {
+        // TODO we probably want to implement some sort of caching
+        return hazelcastInstance.getAtomicLong(getCounterName(tableId, key));
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p />
+     * Warning: The counter is a different object from {@code key} entry on
+     * IKVTable with {@code tableId}. You cannot use table API to read/write
+     * counters.
+     *
+     * @param tableId Only getTableName() will be used.
+     * @param key tableId + key will be used as Counter name
+     */
+    @Override
+    public void createCounter(final IKVTableID tableId,
+                              final byte[] key, final long initialValue)
+                                      throws ObjectExistsException {
+
+        IAtomicLong counter = getAtomicLong(tableId, key);
+        // Assumption here is that AtomicLong is initialized to 0L
+        final boolean success = counter.compareAndSet(0L, initialValue);
+        if (!success) {
+            throw new ObjectExistsException("Atomic counter "
+                    + getCounterName(tableId, key)
+                    + " already exist with value:" + counter.get());
+        }
+    }
+
+    @Override
+    public void setCounter(final IKVTableID tableId,
+                           final byte[] key, final long value) {
+
+        IAtomicLong counter = getAtomicLong(tableId, key);
+        counter.set(value);
+    }
+
+    @Override
+    public long incrementCounter(final IKVTableID tableId,
+                                 final byte[] key, final long incrementValue) {
+
+        IAtomicLong counter = getAtomicLong(tableId, key);
+        return counter.addAndGet(incrementValue);
+    }
+
+    @Override
+    public void destroyCounter(final IKVTableID tableId, final byte[] key) {
+        IAtomicLong counter = getAtomicLong(tableId, key);
+        counter.destroy();
+
+    }
+
+    @Override
+    public long getCounter(final IKVTableID tableId, final byte[] key)
+            throws ObjectDoesntExistException {
+
+        IAtomicLong counter = getAtomicLong(tableId, key);
+        return counter.get();
+    }
 
 }
diff --git a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java
index 3644cfa..9a9141e 100644
--- a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java
+++ b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java
@@ -1,6 +1,8 @@
 package net.onrc.onos.core.datastore.ramcloud;
 
 import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -569,4 +571,85 @@
     public long getVersionNonexistant() {
         return VERSION_NONEXISTENT;
     }
+
+    @Override
+    public void createCounter(final IKVTableID tableId, final byte[] key,
+            final long initialValue)
+            throws ObjectExistsException {
+
+        ByteBuffer valueBytes = ByteBuffer.allocate(8)
+                .order(ByteOrder.LITTLE_ENDIAN).putLong(initialValue);
+        valueBytes.flip();
+        final long version = create(tableId, key, valueBytes.array());
+        if (log.isTraceEnabled()) {
+            log.trace("Created counter {}-{}={}@{}",
+                    tableId, ByteArrayUtil.toHexStringBuffer(key, ":"),
+                    initialValue, version);
+        }
+    }
+
+    @Override
+    public void setCounter(final IKVTableID tableId, final byte[] key,
+            final long value) {
+
+        ByteBuffer valueBytes = ByteBuffer.allocate(8)
+                .order(ByteOrder.LITTLE_ENDIAN).putLong(value);
+        valueBytes.flip();
+
+        final long version = forceCreate(tableId, key, valueBytes.array());
+        if (log.isTraceEnabled()) {
+            log.trace("set counter {}-{}={}@{}",
+                    tableId, ByteArrayUtil.toHexStringBuffer(key, ":"),
+                    value, version);
+        }
+    }
+
+    @Override
+    public long incrementCounter(final IKVTableID tableId, final byte[] key,
+            final long incrementValue) {
+
+        RCTableID rcTableId = (RCTableID) tableId;
+        JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+        try {
+            return rcClient.increment(rcTableId.getTableID(), key, incrementValue);
+        } catch (JRamCloud.ObjectDoesntExistException e) {
+            log.warn("Counter {}-{} was not present",
+                    tableId,
+                    ByteArrayUtil.toHexStringBuffer(key, ":"));
+            try {
+                // creating counter initialized to 0
+                createCounter(rcTableId, key, 0L);
+            } catch (ObjectExistsException e1) {
+                // someone concurrently created it
+                log.debug("Counter {}-{} seemed to be concurrently created.",
+                        tableId,
+                        ByteArrayUtil.toHexStringBuffer(key, ":"));
+            }
+            try {
+                return rcClient.increment(rcTableId.getTableID(), key, incrementValue);
+            } catch (edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException e1) {
+                log.error("Should never happen");
+                throw new IllegalStateException("Created counter disappeared.");
+            }
+        }
+    }
+
+    @Override
+    public void destroyCounter(final IKVTableID tableId, final byte[] key) {
+
+        RCTableID rcTableId = (RCTableID) tableId;
+        JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+        rcClient.remove(rcTableId.getTableID(), key);
+    }
+
+    @Override
+    public long getCounter(IKVTableID tableId, byte[] key)
+            throws ObjectDoesntExistException {
+
+        IKVEntry entry = read(tableId, key);
+        ByteBuffer counter = ByteBuffer.wrap(entry.getValue()).order(ByteOrder.LITTLE_ENDIAN);
+        return counter.getLong();
+    }
 }
diff --git a/src/test/java/net/onrc/onos/core/datastore/AtomicCounterTest.java b/src/test/java/net/onrc/onos/core/datastore/AtomicCounterTest.java
new file mode 100644
index 0000000..b64b892
--- /dev/null
+++ b/src/test/java/net/onrc/onos/core/datastore/AtomicCounterTest.java
@@ -0,0 +1,139 @@
+package net.onrc.onos.core.datastore;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import net.onrc.onos.core.datastore.DataStoreClient;
+import net.onrc.onos.core.datastore.IKVClient;
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AtomicCounterTest {
+
+    private static final String TEST_COUNTER = "TestCounter";
+    private static final byte[] LONG_ZERO = {0, 0, 0, 0, 0, 0, 0, 0}; // 0L
+    private static final IKVTableID counterID = DataStoreClient.getClient().getTable(TEST_COUNTER).getTableId();
+
+    @After
+    @Before
+    public void resetCounter() {
+        IKVClient client = DataStoreClient.getClient();
+        client.setCounter(counterID, LONG_ZERO, 0L);
+        client.destroyCounter(counterID, LONG_ZERO);
+    }
+
+    @Test
+    public void testSetCounter() throws ObjectExistsException, ObjectDoesntExistException {
+        IKVClient client = DataStoreClient.getClient();
+
+        final long five = 5;
+        client.createCounter(counterID, LONG_ZERO, five);
+
+        final long three = 3;
+        client.setCounter(counterID, LONG_ZERO, three);
+
+        final long four = client.incrementCounter(counterID, LONG_ZERO, 1);
+        assertEquals(4, four);
+    }
+
+    @Test
+    public void testIncrementCounter() throws ObjectExistsException, ObjectDoesntExistException {
+
+        IKVClient client = DataStoreClient.getClient();
+
+        final long five = 5;
+        client.createCounter(counterID, LONG_ZERO, five);
+
+        final long six = client.incrementCounter(counterID, LONG_ZERO, 1);
+        assertEquals(6, six);
+
+
+        final long nine = client.incrementCounter(counterID, LONG_ZERO, 3);
+        assertEquals(9, nine);
+    }
+
+
+    private static final int NUM_INCREMENTS = 500;
+    private static final int NUM_THREADS = 5;
+
+    class Incrementor implements Callable<Long> {
+        private final ConcurrentMap<Long,Long> uniquenessTestSet;
+        private final ConcurrentLinkedQueue<Long> incrementTimes;
+
+        public Incrementor(ConcurrentMap<Long, Long> uniquenessTestSet, ConcurrentLinkedQueue<Long> incrementTimes) {
+            super();
+            this.uniquenessTestSet = uniquenessTestSet;
+            this.incrementTimes = incrementTimes;
+        }
+
+        @Override
+        public Long call() throws ObjectDoesntExistException {
+            IKVClient client = DataStoreClient.getClient();
+            for (int i = 0 ; i < NUM_INCREMENTS ; ++i) {
+                final long start = System.nanoTime();
+                final long incremented = client.incrementCounter(counterID, LONG_ZERO, 1);
+                incrementTimes.add( System.nanoTime() - start );
+                final Long expectNull = uniquenessTestSet.putIfAbsent(incremented, incremented);
+                assertNull(expectNull);
+            }
+            return null;
+        }
+    }
+
+    @Test
+    public void testParallelIncrementCounter() throws ObjectExistsException, InterruptedException, ExecutionException {
+        IKVClient client = DataStoreClient.getClient();
+
+        client.createCounter(counterID, LONG_ZERO, 0L);
+
+        ConcurrentNavigableMap<Long,Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
+        ConcurrentLinkedQueue<Long> incrementTimes = new ConcurrentLinkedQueue<Long>();
+
+        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+
+        List<Callable<Long>> tasks = new ArrayList<>(NUM_THREADS);
+        for (int i = 0 ; i < NUM_THREADS ; ++i) {
+            tasks.add(new Incrementor(uniquenessTestSet, incrementTimes));
+        }
+        List<Future<Long>> futures = executor.invokeAll(tasks);
+
+        // wait for all tasks to end
+        for (Future<Long> future : futures) {
+            future.get();
+        }
+
+        assertEquals(NUM_THREADS * NUM_INCREMENTS , uniquenessTestSet.size() );
+        long prevValue = 0;
+        for (Long value : uniquenessTestSet.keySet() ) {
+            assertTrue( (prevValue + 1) == value );
+            prevValue = value;
+        }
+
+        long max = 0L;
+        long min = Long.MAX_VALUE;
+        long sum = 0L;
+        for (Long time : incrementTimes) {
+            sum += time;
+            max = Math.max(max, time);
+            min = Math.min(min, time);
+        }
+        System.err.printf("incrementCounter avg:%f (ns) min:%d (ns) max:%d (ns) N:%d\n", sum/(double)incrementTimes.size(), min, max, incrementTimes.size() );
+    }
+
+}