DistributedAtomicLong
- DistributedAtomicLong required for SharedLog sequencer.
Change-Id: I8b85970e67fcf18ea6938312c62eb84999c45105
diff --git a/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLong.java b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLong.java
new file mode 100644
index 0000000..5df8fc3
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLong.java
@@ -0,0 +1,54 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IAtomicLong;
+
+import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
+
+/**
+ * Hazelcast implementation of DistributedAtomicLong.
+ */
+public class HZDistributedAtomicLong implements DistributedAtomicLong {
+ private final IAtomicLong hzAtomicLong;
+
+ // TODO remove dependency HZClient if possible
+ /**
+ * Creates or Gets the DistributedAtomicLong instance.
+ *
+ * @param client client to use
+ * @param name the name of the DistributedAtomicLong instance
+ */
+ public HZDistributedAtomicLong(HZClient client, String name) {
+ this(client.getHZInstance(), name);
+ }
+
+ /**
+ * Creates or Gets the DistributedAtomicLong instance.
+ *
+ * @param instance HazelcastInstance to use
+ * @param name the name of the DistributedAtomicLong instance.
+ */
+ public HZDistributedAtomicLong(HazelcastInstance instance, String name) {
+ hzAtomicLong = instance.getAtomicLong(name);
+ }
+
+ @Override
+ public long get() {
+ return hzAtomicLong.get();
+ }
+
+ @Override
+ public long addAndGet(long delta) {
+ return hzAtomicLong.addAndGet(delta);
+ }
+
+ @Override
+ public void set(long newValue) {
+ hzAtomicLong.set(newValue);
+ }
+
+ @Override
+ public long incrementAndGet() {
+ return hzAtomicLong.incrementAndGet();
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCDistributedAtomicLong.java b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCDistributedAtomicLong.java
new file mode 100644
index 0000000..dc75940
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/datastore/ramcloud/RCDistributedAtomicLong.java
@@ -0,0 +1,76 @@
+package net.onrc.onos.core.datastore.ramcloud;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.core.datastore.IKVTable;
+import net.onrc.onos.core.datastore.IKVTableID;
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.datastore.ObjectExistsException;
+import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
+import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
+
+/**
+ * RAMCloudImplementation of DistributedAtomicLong.
+ */
+public class RCDistributedAtomicLong implements DistributedAtomicLong {
+ private static final String PREFIX = "DAL:";
+ private static final byte[] KEY = {0};
+ private static final byte[] ZERO = ByteArrayUtil.toLEBytes(0L);
+
+ private static final Logger log = LoggerFactory.getLogger(RCDistributedAtomicLong.class);
+
+
+ private final RCClient client;
+ private final String name;
+ private final IKVTableID tableID;
+
+
+ /**
+ * Creates or Gets the DistributedAtomicLong instance.
+ *
+ * @param client client to use.
+ * @param name name of the DistributedAtomicLong instance.
+ */
+ public RCDistributedAtomicLong(final RCClient client, final String name) {
+
+ this.client = client;
+ this.name = name;
+ IKVTable table = client.getTable(PREFIX + name);
+ this.tableID = table.getTableId();
+
+ try {
+ table.create(KEY, ZERO);
+ } catch (ObjectExistsException e) {
+ log.trace("RCDistributedAtomicLong {} already exists", name);
+ }
+ }
+
+ @Override
+ public long get() {
+ try {
+ IKVEntry entry = client.read(tableID, KEY);
+ return ByteArrayUtil.fromLEBytes(entry.getValue());
+ } catch (ObjectDoesntExistException e) {
+ log.error("RCDistributedAtomicLong {} does not exist", name);
+ throw new IllegalStateException(name + " does not exist", e);
+ }
+ }
+
+ @Override
+ public long addAndGet(long delta) {
+ return client.incrementCounter(tableID, KEY, delta);
+ }
+
+ @Override
+ public void set(long newValue) {
+ client.setCounter(tableID, KEY, newValue);
+ }
+
+ @Override
+ public long incrementAndGet() {
+ return addAndGet(1L);
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/util/distributed/DistributedAtomicLong.java b/src/main/java/net/onrc/onos/core/util/distributed/DistributedAtomicLong.java
new file mode 100644
index 0000000..c7e1aea
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/util/distributed/DistributedAtomicLong.java
@@ -0,0 +1,39 @@
+package net.onrc.onos.core.util.distributed;
+
+// TODO Should it extend Number?
+// TODO Only minimum set required for sequencer is defined now. Add CAS, etc.
+/**
+ * Distributed version of AtomicLong.
+ */
+public interface DistributedAtomicLong {
+
+ /**
+ * Gets the current value.
+ *
+ * @return current value
+ */
+ long get();
+
+ /**
+ * Atomically adds the given value to the current value.
+ *
+ * @param delta value to add
+ * @return updated value
+ */
+ long addAndGet(long delta);
+
+
+ /**
+ * Sets to the given value.
+ *
+ * @param newValue value to set.
+ */
+ public void set(long newValue);
+
+ /**
+ * Atomically adds one to the current value.
+ *
+ * @return updated value
+ */
+ public long incrementAndGet();
+}
diff --git a/src/test/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLongTest.java b/src/test/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLongTest.java
new file mode 100644
index 0000000..fbfc058
--- /dev/null
+++ b/src/test/java/net/onrc/onos/core/datastore/hazelcast/HZDistributedAtomicLongTest.java
@@ -0,0 +1,181 @@
+package net.onrc.onos.core.datastore.hazelcast;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import net.onrc.onos.core.datastore.ObjectDoesntExistException;
+import net.onrc.onos.core.util.distributed.DistributedAtomicLong;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test cases for HZDistributedAtomicLong.
+ */
+public class HZDistributedAtomicLongTest {
+ static {
+ // configuration to quickly fall back to instance mode for faster test run
+ System.setProperty("net.onrc.onos.core.datastore.hazelcast.client.attemptLimit", "0");
+ }
+
+ static final String TEST_COUNTER_NAME = "Counter" + UUID.randomUUID();
+
+ DistributedAtomicLong counter;
+
+ @Before
+ public void setUp() throws Exception {
+ counter = new HZDistributedAtomicLong(
+ HZClient.getClient(), TEST_COUNTER_NAME);
+ counter.set(0L);
+ }
+
+ @Test
+ public void testGet() {
+ assertEquals(0L, counter.get());
+ }
+
+ @Test
+ public void testAddAndGet() {
+ assertEquals(0L, counter.get());
+ assertEquals(1L, counter.addAndGet(1L));
+ assertEquals(3L, counter.addAndGet(2L));
+ assertEquals(7L, counter.addAndGet(4L));
+ }
+
+ @Test
+ public void testSet() {
+ counter.set(42L);
+ assertEquals(42L, counter.get());
+ }
+
+ @Test
+ public void testIncrementAndGet() {
+ assertEquals(1L, counter.incrementAndGet());
+ assertEquals(2L, counter.incrementAndGet());
+ assertEquals(3L, counter.incrementAndGet());
+ }
+
+ /**
+ * Callable task incrementing atomicLong.
+ */
+ private static final class AdderTask implements Callable<Long> {
+ // using Map as Set
+ private final ConcurrentMap<Long, Long> uniquenessTestSet;
+ // Queue given here should be Thread-safe
+ private final Queue<Long> incrementTimes;
+ private final HZDistributedAtomicLong counter;
+ private final int numIncrements;
+
+ /**
+ * Constructor.
+ *
+ * @param numIncrements number of increments to execute
+ * @param uniquenessTestSet ThreadSafe Map to store increment result
+ * @param incrementTimes ThreadSafe Queue to store time it
+ * took on each increment
+ */
+ public AdderTask(int numIncrements,
+ ConcurrentMap<Long, Long> uniquenessTestSet,
+ Queue<Long> incrementTimes) {
+
+ super();
+ this.uniquenessTestSet = uniquenessTestSet;
+ this.incrementTimes = incrementTimes;
+ this.counter = new HZDistributedAtomicLong(
+ HZClient.getClient(), TEST_COUNTER_NAME);
+ this.numIncrements = numIncrements;
+ }
+
+ @Override
+ public Long call() throws ObjectDoesntExistException {
+ for (int i = 0; i < numIncrements; ++i) {
+ final long start = System.nanoTime();
+ final long incremented = counter.addAndGet(1L);
+ incrementTimes.add(System.nanoTime() - start);
+ final Long expectNull = uniquenessTestSet.putIfAbsent(
+ incremented, incremented);
+ assertNull(expectNull);
+ }
+ return null;
+ }
+ }
+
+ private static final int NUM_THREADS = Integer.parseInt(
+ System.getProperty(
+ "HZDistributedAtomicLongTest.NUM_THREADS",
+ System.getProperty("NUM_THREADS",
+ "10")));
+
+ private static final int NUM_INCREMENTS = Integer.parseInt(
+ System.getProperty(
+ "HZDistributedAtomicLongTest.NUM_INCREMENTS",
+ System.getProperty("NUM_INCREMENTS",
+ "100")));
+
+ /**
+ * Increment using multiple threads to test addition is atomic.
+ */
+ @Test
+ public void testConcurrentAddAndGet() throws InterruptedException, ExecutionException {
+ ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+
+ // using Map as Set
+ ConcurrentMap<Long, Long> uniquenessTestSet = new ConcurrentSkipListMap<>();
+ Queue<Long> incrementTimes = new ConcurrentLinkedQueue<>();
+
+ // Start NUM_THREADS threads and increment NUM_INCREMENTS times each
+ List<Callable<Long>> tasks = new ArrayList<>(NUM_THREADS);
+ for (int i = 0; i < NUM_THREADS; ++i) {
+ tasks.add(new AdderTask(NUM_INCREMENTS,
+ uniquenessTestSet,
+ incrementTimes));
+ }
+ List<Future<Long>> futures = executor.invokeAll(tasks);
+
+ // wait for all tasks to end
+ for (Future<Long> future : futures) {
+ future.get();
+ }
+
+
+ assertEquals(NUM_THREADS * NUM_INCREMENTS , uniquenessTestSet.size());
+
+ // check uniqueness of result
+ long prevValue = 0;
+ for (Long value : uniquenessTestSet.keySet()) {
+ assertEquals((prevValue + 1), value.longValue());
+ prevValue = value;
+ }
+
+ long max = 0L;
+ long min = Long.MAX_VALUE;
+ long sum = 0L;
+ for (Long time : incrementTimes) {
+ sum += time;
+ max = Math.max(max, time);
+ min = Math.min(min, time);
+ }
+ System.err.printf("incrementCounter: th, incs, tot_incs,"
+ + " avg(ns), min(ns), max(ns), T-put(1s/avg)\n");
+ System.err.printf("incrementCounter: %d, %d, %d,"
+ + " %f, %d, %d, %f\n",
+ NUM_THREADS, NUM_INCREMENTS, incrementTimes.size(),
+ sum / (double) incrementTimes.size(), min, max,
+ Math.pow(10, 9) * incrementTimes.size() / sum);
+
+ executor.shutdown();
+ }
+
+}