DistributedAtomicLong

- DistributedAtomicLong required for SharedLog sequencer.

Change-Id: I8b85970e67fcf18ea6938312c62eb84999c45105
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLong.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLong.java
new file mode 100644
index 0000000..5df8fc3
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLong.java
@@ -0,0 +1,54 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IAtomicLong;
+
+import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
+
+/**
+ * Hazelcast implementation of DistributedAtomicLong.
+ */
+public class HZDistributedAtomicLong implements DistributedAtomicLong {
+    private final IAtomicLong hzAtomicLong;
+
+    // TODO remove dependency HZClient if possible
+    /**
+     * Creates or Gets the DistributedAtomicLong instance.
+     *
+     * @param client client to use
+     * @param name the name of the DistributedAtomicLong instance
+     */
+    public HZDistributedAtomicLong(HZClient client, String name) {
+        this(client.getHZInstance(), name);
+    }
+
+    /**
+     * Creates or Gets the DistributedAtomicLong instance.
+     *
+     * @param instance HazelcastInstance to use
+     * @param name the name of the DistributedAtomicLong instance.
+     */
+    public HZDistributedAtomicLong(HazelcastInstance instance, String name) {
+        hzAtomicLong = instance.getAtomicLong(name);
+    }
+
+    @Override
+    public long get() {
+        return hzAtomicLong.get();
+    }
+
+    @Override
+    public long addAndGet(long delta) {
+        return hzAtomicLong.addAndGet(delta);
+    }
+
+    @Override
+    public void set(long newValue) {
+        hzAtomicLong.set(newValue);
+    }
+
+    @Override
+    public long incrementAndGet() {
+        return hzAtomicLong.incrementAndGet();
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCDistributedAtomicLong.java b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCDistributedAtomicLong.java
new file mode 100644
index 0000000..dc75940
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCDistributedAtomicLong.java
@@ -0,0 +1,76 @@
+package net.onrc.onos.core.datastore.ramcloud;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.core.datastore.IKVTable;
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
+
+/**
+ * RAMCloudImplementation of DistributedAtomicLong.
+ */
+public class RCDistributedAtomicLong implements DistributedAtomicLong {
+    private static final String PREFIX = "DAL:";
+    private static final byte[] KEY = {0};
+    private static final byte[] ZERO = ByteArrayUtil.toLEBytes(0L);
+
+    private static final Logger log = LoggerFactory.getLogger(RCDistributedAtomicLong.class);
+
+
+    private final RCClient client;
+    private final String name;
+    private final IKVTableID tableID;
+
+
+    /**
+     * Creates or Gets the DistributedAtomicLong instance.
+     *
+     * @param client client to use.
+     * @param name name of the DistributedAtomicLong instance.
+     */
+    public RCDistributedAtomicLong(final RCClient client, final String name) {
+
+        this.client = client;
+        this.name = name;
+        IKVTable table = client.getTable(PREFIX + name);
+        this.tableID = table.getTableId();
+
+        try {
+            table.create(KEY, ZERO);
+        } catch (ObjectExistsException e) {
+            log.trace("RCDistributedAtomicLong {} already exists", name);
+        }
+    }
+
+    @Override
+    public long get() {
+        try {
+            IKVEntry entry = client.read(tableID, KEY);
+            return ByteArrayUtil.fromLEBytes(entry.getValue());
+        } catch (ObjectDoesntExistException e) {
+            log.error("RCDistributedAtomicLong {} does not exist", name);
+            throw new IllegalStateException(name + " does not exist", e);
+        }
+    }
+
+    @Override
+    public long addAndGet(long delta) {
+        return client.incrementCounter(tableID, KEY, delta);
+    }
+
+    @Override
+    public void set(long newValue) {
+        client.setCounter(tableID, KEY, newValue);
+    }
+
+    @Override
+    public long incrementAndGet() {
+        return addAndGet(1L);
+    }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/DistributedAtomicLong.java b/src/main/java/net/onrc/onos/core/util/distributed/DistributedAtomicLong.java
new file mode 100644
index 0000000..c7e1aea
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/DistributedAtomicLong.java
@@ -0,0 +1,39 @@
+package net.onrc.onos.core.util.distributed;
+
+// TODO Should it extend Number?
+// TODO Only minimum set required for sequencer is defined now. Add CAS, etc.
+/**
+ * Distributed version of AtomicLong.
+ */
+public interface DistributedAtomicLong {
+
+    /**
+     * Gets the current value.
+     *
+     * @return current value
+     */
+    long get();
+
+    /**
+     * Atomically adds the given value to the current value.
+     *
+     * @param delta value to add
+     * @return updated value
+     */
+    long addAndGet(long delta);
+
+
+    /**
+     * Sets to the given value.
+     *
+     * @param newValue value to set.
+     */
+    public void set(long newValue);
+
+    /**
+     * Atomically adds one to the current value.
+     *
+     * @return updated value
+     */
+    public long incrementAndGet();
+}