Expose atomic counter increment API to Java
This is part of ONOS-1080, ONOS-1081
- implement JNI code
- implement RCClient
- implement HZClient
- implement simple test case
Change-Id: I499ba2a0b648302c4fec8c653631ab28bf52039c
diff --git a/src/main/cpp/edu_stanford_ramcloud_JRamCloud.cc b/src/main/cpp/edu_stanford_ramcloud_JRamCloud.cc
index d895138..f6cfea7 100644
--- a/src/main/cpp/edu_stanford_ramcloud_JRamCloud.cc
+++ b/src/main/cpp/edu_stanford_ramcloud_JRamCloud.cc
@@ -885,3 +885,17 @@
}
return outJNIArray;
}
+
+/*
+ * Class: edu_stanford_ramcloud_JRamCloud
+ * Method: increment
+ * Signature: (J[BJ)J
+ */
+JNIEXPORT jlong JNICALL Java_edu_stanford_ramcloud_JRamCloud_increment (JNIEnv* env, jobject jRamCloud, jlong jTableId, jbyteArray jKey, jlong incrementValue) {
+ RamCloud* ramcloud = getRamCloud(env, jRamCloud);
+ JByteArrayReference key(env, jKey);
+ uint64_t version = VERSION_NONEXISTENT;
+ try {
+ return ramcloud->increment(jTableId, key.pointer, key.length, incrementValue, NULL, &version);
+ } EXCEPTION_CATCHER(VERSION_NONEXISTENT);
+}
diff --git a/src/main/java/edu/stanford/ramcloud/JRamCloud.java b/src/main/java/edu/stanford/ramcloud/JRamCloud.java
index 6d5714c..ffc092f 100644
--- a/src/main/java/edu/stanford/ramcloud/JRamCloud.java
+++ b/src/main/java/edu/stanford/ramcloud/JRamCloud.java
@@ -395,6 +395,7 @@
public native long writeRule(long tableId, byte[] key, byte[] value, RejectRules rules);
public native MultiWriteRspObject[] multiWrite(long[] tableId, byte[] key[], short[] keyDataSize, byte[] value[], int[] valueDataSize, int requestNum, RejectRules[] rules);
public native TableEnumeratorObject getTableObjects(long tableId, long nextHash);
+ public native long increment(long tableId, byte[] key, long incrementValue) throws ObjectDoesntExistException;
/*
* The following exceptions may be thrown by the JNI functions:
diff --git a/src/main/java/net/onrc/onos/core/datastore/IKVClient.java b/src/main/java/net/onrc/onos/core/datastore/IKVClient.java
index c2d9e35..4736928 100644
--- a/src/main/java/net/onrc/onos/core/datastore/IKVClient.java
+++ b/src/main/java/net/onrc/onos/core/datastore/IKVClient.java
@@ -158,6 +158,56 @@
public boolean multiRead(final Collection<IMultiEntryOperation> ops);
/**
+ * Create atomic 64bit integer counter in data store.
+ *
+ * @param tableId
+ * @param key
+ * @param initialValue
+ * @throws ObjectExistsException
+ */
+ public void createCounter(final IKVTableID tableId, final byte[] key, final long initialValue) throws ObjectExistsException;
+
+ /**
+ * Set atomic 64bit integer counter in data store to specified value.
+ *
+ * @param tableId
+ * @param key
+ * @param value
+ * @throws ObjectExistsException
+ */
+ public void setCounter(final IKVTableID tableId, final byte[] key, final long value);
+
+ /**
+ * Atomically increment 64bit integer counter in data store.
+ *
+ * @param tableId
+ * @param key key where 64bit integer is stored
+ * @param incrementValue
+ * @return value after incrementing
+ */
+ public long incrementCounter(final IKVTableID tableId, final byte[] key, final long incrementValue);
+
+
+ /**
+ * Get atomic 64bit integer counter value in data store.
+ *
+ * @param tableId
+ * @param key
+ * @return current value
+ * @throws ObjectDoesntExistException
+ */
+ public long getCounter(final IKVTableID tableId, final byte[] key)
+ throws ObjectDoesntExistException;
+
+ /**
+ * Destroy atomic 64bit integer counter in data store.
+ *
+ * @param tableId
+ * @param key
+ */
+ public void destroyCounter(final IKVTableID tableId, final byte[] key);
+
+ /**
* Version number which represents that the object does not exist, or has
* never been read the DB before.
*/
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
index 3bfe157..4b6e8a1 100644
--- a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
@@ -16,6 +16,7 @@
import net.onrc.onos.core.datastore.WrongVersionException;
import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@
import com.hazelcast.config.SerializationConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.IMap;
public final class HZClient implements IKVClient {
@@ -339,5 +341,72 @@
return VERSION_NONEXISTENT;
}
+ private String getCounterName(final IKVTableID tableId, final byte[] key) {
+ StringBuffer buf = new StringBuffer(tableId.getTableName());
+ buf.append('@');
+ ByteArrayUtil.toHexStringBuffer(key, ":", buf);
+ return buf.toString();
+ }
+
+ private IAtomicLong getAtomicLong(final IKVTableID tableId, final byte[] key) {
+ // TODO we probably want to implement some sort of caching
+ return hazelcastInstance.getAtomicLong(getCounterName(tableId, key));
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p />
+ * Warning: The counter is a different object from {@code key} entry on
+ * IKVTable with {@code tableId}. You cannot use table API to read/write
+ * counters.
+ *
+ * @param tableId Only getTableName() will be used.
+ * @param key tableId + key will be used as Counter name
+ */
+ @Override
+ public void createCounter(final IKVTableID tableId,
+ final byte[] key, final long initialValue)
+ throws ObjectExistsException {
+
+ IAtomicLong counter = getAtomicLong(tableId, key);
+ // Assumption here is that AtomicLong is initialized to 0L
+ final boolean success = counter.compareAndSet(0L, initialValue);
+ if (!success) {
+ throw new ObjectExistsException("Atomic counter "
+ + getCounterName(tableId, key)
+ + " already exist with value:" + counter.get());
+ }
+ }
+
+ @Override
+ public void setCounter(final IKVTableID tableId,
+ final byte[] key, final long value) {
+
+ IAtomicLong counter = getAtomicLong(tableId, key);
+ counter.set(value);
+ }
+
+ @Override
+ public long incrementCounter(final IKVTableID tableId,
+ final byte[] key, final long incrementValue) {
+
+ IAtomicLong counter = getAtomicLong(tableId, key);
+ return counter.addAndGet(incrementValue);
+ }
+
+ @Override
+ public void destroyCounter(final IKVTableID tableId, final byte[] key) {
+ IAtomicLong counter = getAtomicLong(tableId, key);
+ counter.destroy();
+
+ }
+
+ @Override
+ public long getCounter(final IKVTableID tableId, final byte[] key)
+ throws ObjectDoesntExistException {
+
+ IAtomicLong counter = getAtomicLong(tableId, key);
+ return counter.get();
+ }
}
diff --git a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java
index 3644cfa..9a9141e 100644
--- a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java
+++ b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java
@@ -1,6 +1,8 @@
package net.onrc.onos.core.datastore.ramcloud;
import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -569,4 +571,85 @@
public long getVersionNonexistant() {
return VERSION_NONEXISTENT;
}
+
+ @Override
+ public void createCounter(final IKVTableID tableId, final byte[] key,
+ final long initialValue)
+ throws ObjectExistsException {
+
+ ByteBuffer valueBytes = ByteBuffer.allocate(8)
+ .order(ByteOrder.LITTLE_ENDIAN).putLong(initialValue);
+ valueBytes.flip();
+ final long version = create(tableId, key, valueBytes.array());
+ if (log.isTraceEnabled()) {
+ log.trace("Created counter {}-{}={}@{}",
+ tableId, ByteArrayUtil.toHexStringBuffer(key, ":"),
+ initialValue, version);
+ }
+ }
+
+ @Override
+ public void setCounter(final IKVTableID tableId, final byte[] key,
+ final long value) {
+
+ ByteBuffer valueBytes = ByteBuffer.allocate(8)
+ .order(ByteOrder.LITTLE_ENDIAN).putLong(value);
+ valueBytes.flip();
+
+ final long version = forceCreate(tableId, key, valueBytes.array());
+ if (log.isTraceEnabled()) {
+ log.trace("set counter {}-{}={}@{}",
+ tableId, ByteArrayUtil.toHexStringBuffer(key, ":"),
+ value, version);
+ }
+ }
+
+ @Override
+ public long incrementCounter(final IKVTableID tableId, final byte[] key,
+ final long incrementValue) {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ try {
+ return rcClient.increment(rcTableId.getTableID(), key, incrementValue);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ log.warn("Counter {}-{} was not present",
+ tableId,
+ ByteArrayUtil.toHexStringBuffer(key, ":"));
+ try {
+ // creating counter initialized to 0
+ createCounter(rcTableId, key, 0L);
+ } catch (ObjectExistsException e1) {
+ // someone concurrently created it
+ log.debug("Counter {}-{} seemed to be concurrently created.",
+ tableId,
+ ByteArrayUtil.toHexStringBuffer(key, ":"));
+ }
+ try {
+ return rcClient.increment(rcTableId.getTableID(), key, incrementValue);
+ } catch (edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException e1) {
+ log.error("Should never happen");
+ throw new IllegalStateException("Created counter disappeared.");
+ }
+ }
+ }
+
+ @Override
+ public void destroyCounter(final IKVTableID tableId, final byte[] key) {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ rcClient.remove(rcTableId.getTableID(), key);
+ }
+
+ @Override
+ public long getCounter(IKVTableID tableId, byte[] key)
+ throws ObjectDoesntExistException {
+
+ IKVEntry entry = read(tableId, key);
+ ByteBuffer counter = ByteBuffer.wrap(entry.getValue()).order(ByteOrder.LITTLE_ENDIAN);
+ return counter.getLong();
+ }
}