DistributedAtomicLong
- DistributedAtomicLong required for SharedLog sequencer.
Change-Id: I8b85970e67fcf18ea6938312c62eb84999c45105
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLong.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLong.java
new file mode 100644
index 0000000..5df8fc3
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLong.java
@@ -0,0 +1,54 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IAtomicLong;
+
+import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
+
+/**
+ * Hazelcast implementation of DistributedAtomicLong.
+ */
+public class HZDistributedAtomicLong implements DistributedAtomicLong {
+ private final IAtomicLong hzAtomicLong;
+
+ // TODO remove dependency HZClient if possible
+ /**
+ * Creates or Gets the DistributedAtomicLong instance.
+ *
+ * @param client client to use
+ * @param name the name of the DistributedAtomicLong instance
+ */
+ public HZDistributedAtomicLong(HZClient client, String name) {
+ this(client.getHZInstance(), name);
+ }
+
+ /**
+ * Creates or Gets the DistributedAtomicLong instance.
+ *
+ * @param instance HazelcastInstance to use
+ * @param name the name of the DistributedAtomicLong instance.
+ */
+ public HZDistributedAtomicLong(HazelcastInstance instance, String name) {
+ hzAtomicLong = instance.getAtomicLong(name);
+ }
+
+ @Override
+ public long get() {
+ return hzAtomicLong.get();
+ }
+
+ @Override
+ public long addAndGet(long delta) {
+ return hzAtomicLong.addAndGet(delta);
+ }
+
+ @Override
+ public void set(long newValue) {
+ hzAtomicLong.set(newValue);
+ }
+
+ @Override
+ public long incrementAndGet() {
+ return hzAtomicLong.incrementAndGet();
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCDistributedAtomicLong.java b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCDistributedAtomicLong.java
new file mode 100644
index 0000000..dc75940
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCDistributedAtomicLong.java
@@ -0,0 +1,76 @@
+package net.onrc.onos.core.datastore.ramcloud;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.core.datastore.IKVTable;
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
+
+/**
+ * RAMCloudImplementation of DistributedAtomicLong.
+ */
+public class RCDistributedAtomicLong implements DistributedAtomicLong {
+ private static final String PREFIX = "DAL:";
+ private static final byte[] KEY = {0};
+ private static final byte[] ZERO = ByteArrayUtil.toLEBytes(0L);
+
+ private static final Logger log = LoggerFactory.getLogger(RCDistributedAtomicLong.class);
+
+
+ private final RCClient client;
+ private final String name;
+ private final IKVTableID tableID;
+
+
+ /**
+ * Creates or Gets the DistributedAtomicLong instance.
+ *
+ * @param client client to use.
+ * @param name name of the DistributedAtomicLong instance.
+ */
+ public RCDistributedAtomicLong(final RCClient client, final String name) {
+
+ this.client = client;
+ this.name = name;
+ IKVTable table = client.getTable(PREFIX + name);
+ this.tableID = table.getTableId();
+
+ try {
+ table.create(KEY, ZERO);
+ } catch (ObjectExistsException e) {
+ log.trace("RCDistributedAtomicLong {} already exists", name);
+ }
+ }
+
+ @Override
+ public long get() {
+ try {
+ IKVEntry entry = client.read(tableID, KEY);
+ return ByteArrayUtil.fromLEBytes(entry.getValue());
+ } catch (ObjectDoesntExistException e) {
+ log.error("RCDistributedAtomicLong {} does not exist", name);
+ throw new IllegalStateException(name + " does not exist", e);
+ }
+ }
+
+ @Override
+ public long addAndGet(long delta) {
+ return client.incrementCounter(tableID, KEY, delta);
+ }
+
+ @Override
+ public void set(long newValue) {
+ client.setCounter(tableID, KEY, newValue);
+ }
+
+ @Override
+ public long incrementAndGet() {
+ return addAndGet(1L);
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/DistributedAtomicLong.java b/src/main/java/net/onrc/onos/core/util/distributed/DistributedAtomicLong.java
new file mode 100644
index 0000000..c7e1aea
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/DistributedAtomicLong.java
@@ -0,0 +1,39 @@
+package net.onrc.onos.core.util.distributed;
+
+// TODO Should it extend Number?
+// TODO Only minimum set required for sequencer is defined now. Add CAS, etc.
+/**
+ * Distributed version of AtomicLong.
+ */
+public interface DistributedAtomicLong {
+
+ /**
+ * Gets the current value.
+ *
+ * @return current value
+ */
+ long get();
+
+ /**
+ * Atomically adds the given value to the current value.
+ *
+ * @param delta value to add
+ * @return updated value
+ */
+ long addAndGet(long delta);
+
+
+ /**
+ * Sets to the given value.
+ *
+ * @param newValue value to set.
+ */
+ public void set(long newValue);
+
+ /**
+ * Atomically adds one to the current value.
+ *
+ * @return updated value
+ */
+ public long incrementAndGet();
+}