[ONOS-6497] Add globally unique numeric ID generator primitive
Change-Id: Ic2d9214cfa885344694b8ba4250054dd6a33144e
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java
new file mode 100644
index 0000000..ac294f4
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.service.AsyncAtomicIdGenerator;
+import org.onosproject.store.service.AtomicIdGeneratorBuilder;
+
+/**
+ * Default implementation of AtomicIdGeneratorBuilder.
+ */
+public class DefaultAtomicIdGeneratorBuilder extends AtomicIdGeneratorBuilder {
+
+ private final DistributedPrimitiveCreator primitiveCreator;
+
+ public DefaultAtomicIdGeneratorBuilder(DistributedPrimitiveCreator primitiveCreator) {
+ this.primitiveCreator = primitiveCreator;
+ }
+
+ @Override
+ public AsyncAtomicIdGenerator build() {
+ return primitiveCreator.newAsyncIdGenerator(name(), executorSupplier());
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicIdGenerator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicIdGenerator.java
new file mode 100644
index 0000000..baf2b8a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicIdGenerator.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import org.onosproject.store.service.AsyncAtomicIdGenerator;
+
+/**
+ * {@link AsyncAtomicIdGenerator} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncAtomicIdGenerator extends ExecutingDistributedPrimitive implements AsyncAtomicIdGenerator {
+ private final AsyncAtomicIdGenerator delegateIdGenerator;
+
+ public ExecutingAsyncAtomicIdGenerator(
+ AsyncAtomicIdGenerator delegateIdGenerator, Executor orderedExecutor, Executor threadPoolExecutor) {
+ super(delegateIdGenerator, orderedExecutor, threadPoolExecutor);
+ this.delegateIdGenerator = delegateIdGenerator;
+ }
+
+ @Override
+ public CompletableFuture<Long> nextId() {
+ return asyncFuture(delegateIdGenerator.nextId());
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 0358a7c..5564844 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -25,6 +25,7 @@
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AsyncAtomicCounterMap;
+import org.onosproject.store.service.AsyncAtomicIdGenerator;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentMultimap;
@@ -103,6 +104,11 @@
}
@Override
+ public AsyncAtomicIdGenerator newAsyncIdGenerator(String name, Supplier<Executor> executorSupplier) {
+ return getCreator(name).newAsyncIdGenerator(name, executorSupplier);
+ }
+
+ @Override
public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
String name, Serializer serializer, Supplier<Executor> executorSupplier) {
return getCreator(name).newAsyncAtomicValue(name, serializer, executorSupplier);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 1fc821d..8d346b6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -47,6 +47,7 @@
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicCounterMapBuilder;
+import org.onosproject.store.service.AtomicIdGeneratorBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
@@ -166,6 +167,12 @@
}
@Override
+ public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultAtomicIdGeneratorBuilder(federatedPrimitiveCreator);
+ }
+
+ @Override
public <V> AtomicValueBuilder<V> atomicValueBuilder() {
checkPermission(STORAGE_WRITE);
Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index f06b052..d97868c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -46,11 +46,13 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMap;
import org.onosproject.store.primitives.resources.impl.AtomixCounter;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTree;
+import org.onosproject.store.primitives.resources.impl.AtomixIdGenerator;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AsyncAtomicCounterMap;
+import org.onosproject.store.service.AsyncAtomicIdGenerator;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentMultimap;
@@ -258,6 +260,13 @@
}
@Override
+ public AsyncAtomicIdGenerator newAsyncIdGenerator(String name, Supplier<Executor> executorSupplier) {
+ DistributedLong distributedLong = client.getLong(name).join();
+ AsyncAtomicIdGenerator asyncIdGenerator = new AtomixIdGenerator(name, distributedLong);
+ return new ExecutingAsyncAtomicIdGenerator(asyncIdGenerator, defaultExecutor(executorSupplier), sharedExecutor);
+ }
+
+ @Override
public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
String name, Serializer serializer, Supplier<Executor> executorSupplier) {
AsyncAtomicValue<V> asyncValue = new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixIdGenerator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixIdGenerator.java
new file mode 100644
index 0000000..971b60c
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixIdGenerator.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.atomix.variables.DistributedLong;
+import org.onosproject.store.service.AsyncAtomicIdGenerator;
+
+/**
+ * {@code AsyncAtomicIdGenerator} implementation backed by Atomix
+ * {@link DistributedLong}.
+ */
+public class AtomixIdGenerator implements AsyncAtomicIdGenerator {
+
+ private static final long DEFAULT_BATCH_SIZE = 1000;
+ private final String name;
+ private final DistributedLong distLong;
+ private final long batchSize;
+ private CompletableFuture<Long> reserveFuture;
+ private long base;
+ private final AtomicLong delta = new AtomicLong();
+
+ public AtomixIdGenerator(String name, DistributedLong distLong) {
+ this(name, distLong, DEFAULT_BATCH_SIZE);
+ }
+
+ AtomixIdGenerator(String name, DistributedLong distLong, long batchSize) {
+ this.name = name;
+ this.distLong = distLong;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public synchronized CompletableFuture<Long> nextId() {
+ long nextDelta = delta.incrementAndGet();
+ if ((base == 0 && reserveFuture == null) || nextDelta > batchSize) {
+ delta.set(0);
+ long delta = this.delta.incrementAndGet();
+ return reserve().thenApply(base -> base + delta);
+ } else {
+ return reserveFuture.thenApply(base -> base + nextDelta);
+ }
+ }
+
+ private CompletableFuture<Long> reserve() {
+ if (reserveFuture == null || reserveFuture.isDone()) {
+ reserveFuture = distLong.getAndAdd(batchSize);
+ } else {
+ reserveFuture = reserveFuture.thenCompose(v -> distLong.getAndAdd(batchSize));
+ }
+ reserveFuture = reserveFuture.thenApply(base -> {
+ this.base = base;
+ return base;
+ });
+ return reserveFuture;
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixIdGeneratorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixIdGeneratorTest.java
new file mode 100644
index 0000000..1fbc464
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixIdGeneratorTest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.concurrent.CompletableFuture;
+
+import io.atomix.resource.ResourceType;
+import io.atomix.variables.DistributedLong;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@code AtomixIdGenerator}.
+ */
+public class AtomixIdGeneratorTest extends AtomixTestBase {
+
+ @BeforeClass
+ public static void preTestSetup() throws Throwable {
+ createCopycatServers(3);
+ }
+
+ @AfterClass
+ public static void postTestCleanup() throws Exception {
+ clearTests();
+ }
+
+ @Override
+ protected ResourceType resourceType() {
+ return new ResourceType(DistributedLong.class);
+ }
+
+ /**
+ * Tests generating IDs.
+ */
+ @Test
+ public void testNextId() throws Throwable {
+ AtomixIdGenerator idGenerator1 = new AtomixIdGenerator("testNextId",
+ createAtomixClient().getLong("testNextId").join());
+ AtomixIdGenerator idGenerator2 = new AtomixIdGenerator("testNextId",
+ createAtomixClient().getLong("testNextId").join());
+
+ CompletableFuture<Long> future11 = idGenerator1.nextId();
+ CompletableFuture<Long> future12 = idGenerator1.nextId();
+ CompletableFuture<Long> future13 = idGenerator1.nextId();
+ assertEquals(Long.valueOf(1), future11.join());
+ assertEquals(Long.valueOf(2), future12.join());
+ assertEquals(Long.valueOf(3), future13.join());
+
+ CompletableFuture<Long> future21 = idGenerator1.nextId();
+ CompletableFuture<Long> future22 = idGenerator1.nextId();
+ CompletableFuture<Long> future23 = idGenerator1.nextId();
+ assertEquals(Long.valueOf(6), future23.join());
+ assertEquals(Long.valueOf(5), future22.join());
+ assertEquals(Long.valueOf(4), future21.join());
+
+ CompletableFuture<Long> future31 = idGenerator2.nextId();
+ CompletableFuture<Long> future32 = idGenerator2.nextId();
+ CompletableFuture<Long> future33 = idGenerator2.nextId();
+ assertEquals(Long.valueOf(1001), future31.join());
+ assertEquals(Long.valueOf(1002), future32.join());
+ assertEquals(Long.valueOf(1003), future33.join());
+ }
+
+ /**
+ * Tests generating IDs.
+ */
+ @Test
+ public void testNextIdBatchRollover() throws Throwable {
+ AtomixIdGenerator idGenerator1 = new AtomixIdGenerator("testNextIdBatchRollover",
+ createAtomixClient().getLong("testNextIdBatchRollover").join(), 2);
+ AtomixIdGenerator idGenerator2 = new AtomixIdGenerator("testNextIdBatchRollover",
+ createAtomixClient().getLong("testNextIdBatchRollover").join(), 2);
+
+ CompletableFuture<Long> future11 = idGenerator1.nextId();
+ CompletableFuture<Long> future12 = idGenerator1.nextId();
+ CompletableFuture<Long> future13 = idGenerator1.nextId();
+ assertEquals(Long.valueOf(1), future11.join());
+ assertEquals(Long.valueOf(2), future12.join());
+ assertEquals(Long.valueOf(3), future13.join());
+
+ CompletableFuture<Long> future21 = idGenerator2.nextId();
+ CompletableFuture<Long> future22 = idGenerator2.nextId();
+ CompletableFuture<Long> future23 = idGenerator2.nextId();
+ assertEquals(Long.valueOf(5), future21.join());
+ assertEquals(Long.valueOf(6), future22.join());
+ assertEquals(Long.valueOf(7), future23.join());
+
+ CompletableFuture<Long> future14 = idGenerator1.nextId();
+ CompletableFuture<Long> future15 = idGenerator1.nextId();
+ CompletableFuture<Long> future16 = idGenerator1.nextId();
+ assertEquals(Long.valueOf(4), future14.join());
+ assertEquals(Long.valueOf(9), future15.join());
+ assertEquals(Long.valueOf(10), future16.join());
+ }
+}