Expose atomic counter increment API to Java
This is part of ONOS-1080, ONOS-1081
- implement JNI code
- implement RCClient
- implement HZClient
- implement simple test case
Change-Id: I499ba2a0b648302c4fec8c653631ab28bf52039c
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
index 3bfe157..4b6e8a1 100644
--- a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZClient.java
@@ -16,6 +16,7 @@
import net.onrc.onos.core.datastore.WrongVersionException;
import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@
import com.hazelcast.config.SerializationConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.IMap;
public final class HZClient implements IKVClient {
@@ -339,5 +341,72 @@
return VERSION_NONEXISTENT;
}
+ private String getCounterName(final IKVTableID tableId, final byte[] key) {
+ StringBuffer buf = new StringBuffer(tableId.getTableName());
+ buf.append('@');
+ ByteArrayUtil.toHexStringBuffer(key, ":", buf);
+ return buf.toString();
+ }
+
+ private IAtomicLong getAtomicLong(final IKVTableID tableId, final byte[] key) {
+ // TODO we probably want to implement some sort of caching
+ return hazelcastInstance.getAtomicLong(getCounterName(tableId, key));
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p />
+ * Warning: The counter is a different object from {@code key} entry on
+ * IKVTable with {@code tableId}. You cannot use table API to read/write
+ * counters.
+ *
+ * @param tableId Only getTableName() will be used.
+ * @param key tableId + key will be used as Counter name
+ */
+ @Override
+ public void createCounter(final IKVTableID tableId,
+ final byte[] key, final long initialValue)
+ throws ObjectExistsException {
+
+ IAtomicLong counter = getAtomicLong(tableId, key);
+ // Assumption here is that AtomicLong is initialized to 0L
+ final boolean success = counter.compareAndSet(0L, initialValue);
+ if (!success) {
+ throw new ObjectExistsException("Atomic counter "
+ + getCounterName(tableId, key)
+ + " already exist with value:" + counter.get());
+ }
+ }
+
+ @Override
+ public void setCounter(final IKVTableID tableId,
+ final byte[] key, final long value) {
+
+ IAtomicLong counter = getAtomicLong(tableId, key);
+ counter.set(value);
+ }
+
+ @Override
+ public long incrementCounter(final IKVTableID tableId,
+ final byte[] key, final long incrementValue) {
+
+ IAtomicLong counter = getAtomicLong(tableId, key);
+ return counter.addAndGet(incrementValue);
+ }
+
+ @Override
+ public void destroyCounter(final IKVTableID tableId, final byte[] key) {
+ IAtomicLong counter = getAtomicLong(tableId, key);
+ counter.destroy();
+
+ }
+
+ @Override
+ public long getCounter(final IKVTableID tableId, final byte[] key)
+ throws ObjectDoesntExistException {
+
+ IAtomicLong counter = getAtomicLong(tableId, key);
+ return counter.get();
+ }
}