[ONOS-6497] Add globally unique numeric ID generator primitive

Change-Id: Ic2d9214cfa885344694b8ba4250054dd6a33144e
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java
new file mode 100644
index 0000000..ac294f4
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.service.AsyncAtomicIdGenerator;
+import org.onosproject.store.service.AtomicIdGeneratorBuilder;
+
+/**
+ * Default implementation of AtomicIdGeneratorBuilder.
+ */
+public class DefaultAtomicIdGeneratorBuilder extends AtomicIdGeneratorBuilder {
+
+    private final DistributedPrimitiveCreator primitiveCreator;
+
+    public DefaultAtomicIdGeneratorBuilder(DistributedPrimitiveCreator primitiveCreator) {
+        this.primitiveCreator = primitiveCreator;
+    }
+
+    @Override
+    public AsyncAtomicIdGenerator build() {
+        return primitiveCreator.newAsyncIdGenerator(name(), executorSupplier());
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicIdGenerator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicIdGenerator.java
new file mode 100644
index 0000000..baf2b8a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicIdGenerator.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import org.onosproject.store.service.AsyncAtomicIdGenerator;
+
+/**
+ * {@link AsyncAtomicIdGenerator} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncAtomicIdGenerator extends ExecutingDistributedPrimitive implements AsyncAtomicIdGenerator {
+    private final AsyncAtomicIdGenerator delegateIdGenerator;
+
+    public ExecutingAsyncAtomicIdGenerator(
+            AsyncAtomicIdGenerator delegateIdGenerator, Executor orderedExecutor, Executor threadPoolExecutor) {
+        super(delegateIdGenerator, orderedExecutor, threadPoolExecutor);
+        this.delegateIdGenerator = delegateIdGenerator;
+    }
+
+    @Override
+    public CompletableFuture<Long> nextId() {
+        return asyncFuture(delegateIdGenerator.nextId());
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 0358a7c..5564844 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -25,6 +25,7 @@
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.service.AsyncAtomicCounter;
 import org.onosproject.store.service.AsyncAtomicCounterMap;
+import org.onosproject.store.service.AsyncAtomicIdGenerator;
 import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncConsistentMultimap;
@@ -103,6 +104,11 @@
     }
 
     @Override
+    public AsyncAtomicIdGenerator newAsyncIdGenerator(String name, Supplier<Executor> executorSupplier) {
+        return getCreator(name).newAsyncIdGenerator(name, executorSupplier);
+    }
+
+    @Override
     public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
             String name, Serializer serializer, Supplier<Executor> executorSupplier) {
         return getCreator(name).newAsyncAtomicValue(name, serializer, executorSupplier);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 1fc821d..8d346b6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -47,6 +47,7 @@
 import org.onosproject.store.service.AsyncConsistentTreeMap;
 import org.onosproject.store.service.AtomicCounterBuilder;
 import org.onosproject.store.service.AtomicCounterMapBuilder;
+import org.onosproject.store.service.AtomicIdGeneratorBuilder;
 import org.onosproject.store.service.AtomicValueBuilder;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapBuilder;
@@ -166,6 +167,12 @@
     }
 
     @Override
+    public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultAtomicIdGeneratorBuilder(federatedPrimitiveCreator);
+    }
+
+    @Override
     public <V> AtomicValueBuilder<V> atomicValueBuilder() {
         checkPermission(STORAGE_WRITE);
         Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index f06b052..d97868c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -46,11 +46,13 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMap;
 import org.onosproject.store.primitives.resources.impl.AtomixCounter;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTree;
+import org.onosproject.store.primitives.resources.impl.AtomixIdGenerator;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncAtomicCounter;
 import org.onosproject.store.service.AsyncAtomicCounterMap;
+import org.onosproject.store.service.AsyncAtomicIdGenerator;
 import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncConsistentMultimap;
@@ -258,6 +260,13 @@
     }
 
     @Override
+    public AsyncAtomicIdGenerator newAsyncIdGenerator(String name, Supplier<Executor> executorSupplier) {
+        DistributedLong distributedLong = client.getLong(name).join();
+        AsyncAtomicIdGenerator asyncIdGenerator = new AtomixIdGenerator(name, distributedLong);
+        return new ExecutingAsyncAtomicIdGenerator(asyncIdGenerator, defaultExecutor(executorSupplier), sharedExecutor);
+    }
+
+    @Override
     public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
             String name, Serializer serializer, Supplier<Executor> executorSupplier) {
        AsyncAtomicValue<V> asyncValue = new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixIdGenerator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixIdGenerator.java
new file mode 100644
index 0000000..971b60c
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixIdGenerator.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.atomix.variables.DistributedLong;
+import org.onosproject.store.service.AsyncAtomicIdGenerator;
+
+/**
+ * {@code AsyncAtomicIdGenerator} implementation backed by Atomix
+ * {@link DistributedLong}.
+ */
+public class AtomixIdGenerator implements AsyncAtomicIdGenerator {
+
+    private static final long DEFAULT_BATCH_SIZE = 1000;
+    private final String name;
+    private final DistributedLong distLong;
+    private final long batchSize;
+    private CompletableFuture<Long> reserveFuture;
+    private long base;
+    private final AtomicLong delta = new AtomicLong();
+
+    public AtomixIdGenerator(String name, DistributedLong distLong) {
+        this(name, distLong, DEFAULT_BATCH_SIZE);
+    }
+
+    AtomixIdGenerator(String name, DistributedLong distLong, long batchSize) {
+        this.name = name;
+        this.distLong = distLong;
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public synchronized CompletableFuture<Long> nextId() {
+        long nextDelta = delta.incrementAndGet();
+        if ((base == 0 && reserveFuture == null) || nextDelta > batchSize) {
+            delta.set(0);
+            long delta = this.delta.incrementAndGet();
+            return reserve().thenApply(base -> base + delta);
+        } else {
+            return reserveFuture.thenApply(base -> base + nextDelta);
+        }
+    }
+
+    private CompletableFuture<Long> reserve() {
+        if (reserveFuture == null || reserveFuture.isDone()) {
+            reserveFuture = distLong.getAndAdd(batchSize);
+        } else {
+            reserveFuture = reserveFuture.thenCompose(v -> distLong.getAndAdd(batchSize));
+        }
+        reserveFuture = reserveFuture.thenApply(base -> {
+            this.base = base;
+            return base;
+        });
+        return reserveFuture;
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixIdGeneratorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixIdGeneratorTest.java
new file mode 100644
index 0000000..1fbc464
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixIdGeneratorTest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.concurrent.CompletableFuture;
+
+import io.atomix.resource.ResourceType;
+import io.atomix.variables.DistributedLong;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@code AtomixIdGenerator}.
+ */
+public class AtomixIdGeneratorTest extends AtomixTestBase {
+
+    @BeforeClass
+    public static void preTestSetup() throws Throwable {
+        createCopycatServers(3);
+    }
+
+    @AfterClass
+    public static void postTestCleanup() throws Exception {
+        clearTests();
+    }
+
+    @Override
+    protected ResourceType resourceType() {
+        return new ResourceType(DistributedLong.class);
+    }
+
+    /**
+     * Tests generating IDs.
+     */
+    @Test
+    public void testNextId() throws Throwable {
+        AtomixIdGenerator idGenerator1 = new AtomixIdGenerator("testNextId",
+                createAtomixClient().getLong("testNextId").join());
+        AtomixIdGenerator idGenerator2 = new AtomixIdGenerator("testNextId",
+                createAtomixClient().getLong("testNextId").join());
+
+        CompletableFuture<Long> future11 = idGenerator1.nextId();
+        CompletableFuture<Long> future12 = idGenerator1.nextId();
+        CompletableFuture<Long> future13 = idGenerator1.nextId();
+        assertEquals(Long.valueOf(1), future11.join());
+        assertEquals(Long.valueOf(2), future12.join());
+        assertEquals(Long.valueOf(3), future13.join());
+
+        CompletableFuture<Long> future21 = idGenerator1.nextId();
+        CompletableFuture<Long> future22 = idGenerator1.nextId();
+        CompletableFuture<Long> future23 = idGenerator1.nextId();
+        assertEquals(Long.valueOf(6), future23.join());
+        assertEquals(Long.valueOf(5), future22.join());
+        assertEquals(Long.valueOf(4), future21.join());
+
+        CompletableFuture<Long> future31 = idGenerator2.nextId();
+        CompletableFuture<Long> future32 = idGenerator2.nextId();
+        CompletableFuture<Long> future33 = idGenerator2.nextId();
+        assertEquals(Long.valueOf(1001), future31.join());
+        assertEquals(Long.valueOf(1002), future32.join());
+        assertEquals(Long.valueOf(1003), future33.join());
+    }
+
+    /**
+     * Tests generating IDs.
+     */
+    @Test
+    public void testNextIdBatchRollover() throws Throwable {
+        AtomixIdGenerator idGenerator1 = new AtomixIdGenerator("testNextIdBatchRollover",
+                createAtomixClient().getLong("testNextIdBatchRollover").join(), 2);
+        AtomixIdGenerator idGenerator2 = new AtomixIdGenerator("testNextIdBatchRollover",
+                createAtomixClient().getLong("testNextIdBatchRollover").join(), 2);
+
+        CompletableFuture<Long> future11 = idGenerator1.nextId();
+        CompletableFuture<Long> future12 = idGenerator1.nextId();
+        CompletableFuture<Long> future13 = idGenerator1.nextId();
+        assertEquals(Long.valueOf(1), future11.join());
+        assertEquals(Long.valueOf(2), future12.join());
+        assertEquals(Long.valueOf(3), future13.join());
+
+        CompletableFuture<Long> future21 = idGenerator2.nextId();
+        CompletableFuture<Long> future22 = idGenerator2.nextId();
+        CompletableFuture<Long> future23 = idGenerator2.nextId();
+        assertEquals(Long.valueOf(5), future21.join());
+        assertEquals(Long.valueOf(6), future22.join());
+        assertEquals(Long.valueOf(7), future23.join());
+
+        CompletableFuture<Long> future14 = idGenerator1.nextId();
+        CompletableFuture<Long> future15 = idGenerator1.nextId();
+        CompletableFuture<Long> future16 = idGenerator1.nextId();
+        assertEquals(Long.valueOf(4), future14.join());
+        assertEquals(Long.valueOf(9), future15.join());
+        assertEquals(Long.valueOf(10), future16.join());
+    }
+}