Expose atomic counter increment API to Java
This is part of ONOS-1080, ONOS-1081
- implement JNI code
- implement RCClient
- implement HZClient
- implement simple test case
Change-Id: I499ba2a0b648302c4fec8c653631ab28bf52039c
diff --git a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java
index 3644cfa..9a9141e 100644
--- a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java
+++ b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCClient.java
@@ -1,6 +1,8 @@
package net.onrc.onos.core.datastore.ramcloud;
import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -569,4 +571,85 @@
public long getVersionNonexistant() {
return VERSION_NONEXISTENT;
}
+
+ @Override
+ public void createCounter(final IKVTableID tableId, final byte[] key,
+ final long initialValue)
+ throws ObjectExistsException {
+
+ ByteBuffer valueBytes = ByteBuffer.allocate(8)
+ .order(ByteOrder.LITTLE_ENDIAN).putLong(initialValue);
+ valueBytes.flip();
+ final long version = create(tableId, key, valueBytes.array());
+ if (log.isTraceEnabled()) {
+ log.trace("Created counter {}-{}={}@{}",
+ tableId, ByteArrayUtil.toHexStringBuffer(key, ":"),
+ initialValue, version);
+ }
+ }
+
+ @Override
+ public void setCounter(final IKVTableID tableId, final byte[] key,
+ final long value) {
+
+ ByteBuffer valueBytes = ByteBuffer.allocate(8)
+ .order(ByteOrder.LITTLE_ENDIAN).putLong(value);
+ valueBytes.flip();
+
+ final long version = forceCreate(tableId, key, valueBytes.array());
+ if (log.isTraceEnabled()) {
+ log.trace("set counter {}-{}={}@{}",
+ tableId, ByteArrayUtil.toHexStringBuffer(key, ":"),
+ value, version);
+ }
+ }
+
+ @Override
+ public long incrementCounter(final IKVTableID tableId, final byte[] key,
+ final long incrementValue) {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ try {
+ return rcClient.increment(rcTableId.getTableID(), key, incrementValue);
+ } catch (JRamCloud.ObjectDoesntExistException e) {
+ log.warn("Counter {}-{} was not present",
+ tableId,
+ ByteArrayUtil.toHexStringBuffer(key, ":"));
+ try {
+ // creating counter initialized to 0
+ createCounter(rcTableId, key, 0L);
+ } catch (ObjectExistsException e1) {
+ // someone concurrently created it
+ log.debug("Counter {}-{} seemed to be concurrently created.",
+ tableId,
+ ByteArrayUtil.toHexStringBuffer(key, ":"));
+ }
+ try {
+ return rcClient.increment(rcTableId.getTableID(), key, incrementValue);
+ } catch (edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException e1) {
+ log.error("Should never happen");
+ throw new IllegalStateException("Created counter disappeared.");
+ }
+ }
+ }
+
+ @Override
+ public void destroyCounter(final IKVTableID tableId, final byte[] key) {
+
+ RCTableID rcTableId = (RCTableID) tableId;
+ JRamCloud rcClient = RCClient.getJRamCloudClient();
+
+ rcClient.remove(rcTableId.getTableID(), key);
+ }
+
+ @Override
+ public long getCounter(IKVTableID tableId, byte[] key)
+ throws ObjectDoesntExistException {
+
+ IKVEntry entry = read(tableId, key);
+ ByteBuffer counter = ByteBuffer.wrap(entry.getValue()).order(ByteOrder.LITTLE_ENDIAN);
+ return counter.getLong();
+ }
}