Adds abstract distributed primitive builder + Refactored AtomicCounter and AtomicValue builder to make use of it.
Change-Id: I56cef62673fabc54ca29634c27e4ff1f41ba6a88
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultAtomicCounter.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultAtomicCounter.java
new file mode 100644
index 0000000..8283adc
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultAtomicCounter.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives;
+
+import org.onosproject.store.service.AsyncAtomicCounter;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Synchronous;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Default implementation for a {@code AtomicCounter} backed by a {@link AsyncAtomicCounter}.
+ */
+public class DefaultAtomicCounter extends Synchronous<AsyncAtomicCounter> implements AtomicCounter {
+
+ private final AsyncAtomicCounter asyncCounter;
+ private final long operationTimeoutMillis;
+
+ public DefaultAtomicCounter(AsyncAtomicCounter asyncCounter, long operationTimeoutMillis) {
+ super(asyncCounter);
+ this.asyncCounter = asyncCounter;
+ this.operationTimeoutMillis = operationTimeoutMillis;
+ }
+
+ @Override
+ public long incrementAndGet() {
+ return complete(asyncCounter.incrementAndGet());
+ }
+
+ @Override
+ public long getAndIncrement() {
+ return complete(asyncCounter.getAndIncrement());
+ }
+
+ @Override
+ public long getAndAdd(long delta) {
+ return complete(asyncCounter.getAndAdd(delta));
+ }
+
+ @Override
+ public long addAndGet(long delta) {
+ return complete(asyncCounter.getAndAdd(delta));
+ }
+
+ @Override
+ public void set(long value) {
+ complete(asyncCounter.set(value));
+ }
+
+ @Override
+ public boolean compareAndSet(long expectedValue, long updateValue) {
+ return complete(asyncCounter.compareAndSet(expectedValue, updateValue));
+ }
+
+ @Override
+ public long get() {
+ return complete(asyncCounter.get());
+ }
+
+ private <T> T complete(CompletableFuture<T> future) {
+ try {
+ return future.get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new StorageException.Interrupted();
+ } catch (TimeoutException e) {
+ throw new StorageException.Timeout();
+ } catch (ExecutionException e) {
+ throw new StorageException(e.getCause());
+ }
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultAtomicValue.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultAtomicValue.java
new file mode 100644
index 0000000..a77f069
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultAtomicValue.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AtomicValue;
+import org.onosproject.store.service.AtomicValueEventListener;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Synchronous;
+
+/**
+ * Default implementation for a {@code AtomicValue} backed by a {@link AsyncAtomicValue}.
+ *
+ * @param <V> value type
+ */
+public class DefaultAtomicValue<V> extends Synchronous<AsyncAtomicValue<V>> implements AtomicValue<V> {
+
+ private final AsyncAtomicValue<V> asyncValue;
+ private final long operationTimeoutMillis;
+
+ public DefaultAtomicValue(AsyncAtomicValue<V> asyncValue, long operationTimeoutMillis) {
+ super(asyncValue);
+ this.asyncValue = asyncValue;
+ this.operationTimeoutMillis = operationTimeoutMillis;
+ }
+
+ @Override
+ public boolean compareAndSet(V expect, V update) {
+ return complete(asyncValue.compareAndSet(expect, update));
+ }
+
+ @Override
+ public V get() {
+ return complete(asyncValue.get());
+ }
+
+ @Override
+ public V getAndSet(V value) {
+ return complete(asyncValue.getAndSet(value));
+ }
+
+ @Override
+ public void set(V value) {
+ complete(asyncValue.set(value));
+ }
+
+ @Override
+ public void addListener(AtomicValueEventListener<V> listener) {
+ complete(asyncValue.addListener(listener));
+ }
+
+ @Override
+ public void removeListener(AtomicValueEventListener<V> listener) {
+ complete(asyncValue.removeListener(listener));
+ }
+
+ private <T> T complete(CompletableFuture<T> future) {
+ try {
+ return future.get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new StorageException.Interrupted();
+ } catch (TimeoutException e) {
+ throw new StorageException.Timeout();
+ } catch (ExecutionException e) {
+ throw new StorageException(e.getCause());
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveBuilder.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveBuilder.java
new file mode 100644
index 0000000..2792ac5
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveBuilder.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives;
+
+import org.onosproject.core.ApplicationId;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.Serializer;
+
+/**
+ * Abstract builder for distributed primitives.
+ *
+ * @param <T> distributed primitive type
+ */
+public abstract class DistributedPrimitiveBuilder<T extends DistributedPrimitive> {
+
+ private DistributedPrimitive.Type type;
+ private String name;
+ private ApplicationId applicationId;
+ private Serializer serializer;
+ private boolean partitionsDisabled = false;
+ private boolean meteringDisabled = false;
+
+ public DistributedPrimitiveBuilder(DistributedPrimitive.Type type) {
+ this.type = type;
+ }
+
+ /**
+ * Sets the primitive name.
+ *
+ * @param name primitive name
+ * @return this builder
+ */
+ public DistributedPrimitiveBuilder<T> withName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ /**
+ * Sets the serializer to use for transcoding info held in the primitive.
+ *
+ * @param serializer serializer
+ * @return this builder
+ */
+ public DistributedPrimitiveBuilder<T> withSerializer(Serializer serializer) {
+ this.serializer = serializer;
+ return this;
+ }
+
+ /**
+ * Sets the application id that owns this primitive.
+ *
+ * @param applicationId application identifier
+ * @return this builder
+ */
+ public DistributedPrimitiveBuilder<T> withApplicationId(ApplicationId applicationId) {
+ this.applicationId = applicationId;
+ return this;
+ }
+
+ /**
+ * Creates this primitive on a special partition that comprises of all members in the cluster.
+ * @deprecated usage of this method is discouraged for most common scenarios. Eventually it will be replaced
+ * with a better alternative that does not exposes low level details. Until then avoid using this method.
+ * @return this builder
+ */
+ @Deprecated
+ public DistributedPrimitiveBuilder<T> withPartitionsDisabled() {
+ this.partitionsDisabled = true;
+ return this;
+ }
+
+ /**
+ * Disables recording usage stats for this primitive.
+ * @deprecated usage of this method is discouraged for most common scenarios.
+ * @return this builder
+ */
+ @Deprecated
+ public DistributedPrimitiveBuilder<T> withMeteringDisabled() {
+ this.meteringDisabled = true;
+ return this;
+ }
+
+ /**
+ * Returns if metering is enabled.
+ *
+ * @return {@code true} if yes; {@code false} otherwise
+ */
+ public final boolean meteringEnabled() {
+ return !meteringDisabled;
+ }
+
+ /**
+ * Returns if partitions are disabled.
+ *
+ * @return {@code true} if yes; {@code false} otherwise
+ */
+ public final boolean partitionsDisabled() {
+ return partitionsDisabled;
+ }
+
+ /**
+ * Returns the serializer.
+ *
+ * @return serializer
+ */
+ public final Serializer serializer() {
+ return serializer;
+ }
+
+ /**
+ * Returns the application identifier.
+ *
+ * @return application id
+ */
+ public final ApplicationId applicationId() {
+ return applicationId;
+ }
+
+ /**
+ * Returns the name of the primitive.
+ *
+ * @return primitive name
+ */
+ public final String name() {
+ return name;
+ }
+
+ /**
+ * Returns the primitive type.
+ *
+ * @return primitive type
+ */
+ public final DistributedPrimitive.Type type() {
+ return type;
+ }
+
+ /**
+ * Constructs an instance of the distributed primitive.
+ * @return distributed primitive
+ */
+ public abstract T build();
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
index aa20f4e..05326a2 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
@@ -17,6 +17,8 @@
import java.util.concurrent.CompletableFuture;
+import org.onosproject.store.primitives.DefaultAtomicCounter;
+
/**
* An async atomic counter dispenses monotonically increasing values.
*/
@@ -81,4 +83,23 @@
* @return true if the update occurred and the expected value was equal to the current value, false otherwise
*/
CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue);
+
+ /**
+ * Returns a new {@link AtomicCounter} that is backed by this instance.
+ *
+ * @param timeoutMillis timeout duration for the returned ConsistentMap operations
+ * @return new {@code ConsistentMap} instance
+ */
+ default AtomicCounter asAtomicCounter(long timeoutMillis) {
+ return new DefaultAtomicCounter(this, timeoutMillis);
+ }
+
+ /**
+ * Returns a new {@link AtomicCounter} that is backed by this instance and with a default operation timeout.
+ *
+ * @return new {@code ConsistentMap} instance
+ */
+ default AtomicCounter asAtomicCounter() {
+ return new DefaultAtomicCounter(this, DEFAULT_OPERTATION_TIMEOUT_MILLIS);
+ }
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java
index 60d8337..a0e72ff 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java
@@ -17,6 +17,8 @@
import java.util.concurrent.CompletableFuture;
+import org.onosproject.store.primitives.DefaultAtomicValue;
+
/**
* Distributed version of java.util.concurrent.atomic.AtomicReference.
* <p>
@@ -80,4 +82,23 @@
* @return CompletableFuture that will be completed when the operation finishes
*/
CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener);
+
+ /**
+ * Returns a new {@link AtomicValue} that is backed by this instance.
+ *
+ * @param timeoutMillis timeout duration for the returned ConsistentMap operations
+ * @return new {@code AtomicValue} instance
+ */
+ default AtomicValue<V> asAtomicValue(long timeoutMillis) {
+ return new DefaultAtomicValue<>(this, timeoutMillis);
+ }
+
+ /**
+ * Returns a new {@link AtomicValue} that is backed by this instance and with a default operation timeout.
+ *
+ * @return new {@code AtomicValue} instance
+ */
+ default AtomicValue<V> asAtomicValue() {
+ return new DefaultAtomicValue<>(this, DEFAULT_OPERTATION_TIMEOUT_MILLIS);
+ }
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicCounterBuilder.java b/core/api/src/main/java/org/onosproject/store/service/AtomicCounterBuilder.java
index 41a19f0..0728b20 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicCounterBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicCounterBuilder.java
@@ -15,61 +15,13 @@
*/
package org.onosproject.store.service;
+import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
+
/**
* Builder for AtomicCounter.
*/
-public interface AtomicCounterBuilder {
-
- /**
- * Sets the name for the atomic counter.
- * <p>
- * Each atomic counter is identified by a unique name.
- * </p>
- * <p>
- * Note: This is a mandatory parameter.
- * </p>
- *
- * @param name name of the atomic counter
- * @return this AtomicCounterBuilder
- */
- AtomicCounterBuilder withName(String name);
-
- /**
- * Creates this counter on the partition that spans the entire cluster.
- * <p>
- * When partitioning is disabled, the counter state will be
- * ephemeral and does not survive a full cluster restart.
- * </p>
- * <p>
- * Note: By default partitions are enabled.
- * </p>
- * @return this AtomicCounterBuilder
- */
- AtomicCounterBuilder withPartitionsDisabled();
-
- /**
- * Instantiates Metering service to gather usage and performance metrics.
- * By default, usage data will be stored.
- *
- * @return this AtomicCounterBuilder
- */
- AtomicCounterBuilder withMeteringDisabled();
-
- /**
- * Builds a AtomicCounter based on the configuration options
- * supplied to this builder.
- *
- * @return new AtomicCounter
- * @throws java.lang.RuntimeException if a mandatory parameter is missing
- */
- AtomicCounter build();
-
- /**
- * Builds a AsyncAtomicCounter based on the configuration options
- * supplied to this builder.
- *
- * @return new AsyncAtomicCounter
- * @throws java.lang.RuntimeException if a mandatory parameter is missing
- */
- AsyncAtomicCounter buildAsyncCounter();
-}
+public abstract class AtomicCounterBuilder extends DistributedPrimitiveBuilder<AsyncAtomicCounter> {
+ public AtomicCounterBuilder() {
+ super(DistributedPrimitive.Type.COUNTER);
+ }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java
index d0ba25e..81c484f 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java
@@ -15,65 +15,16 @@
*/
package org.onosproject.store.service;
+import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
+
/**
* Builder for constructing new AtomicValue instances.
*
* @param <V> atomic value type
*/
-public interface AtomicValueBuilder<V> {
- /**
- * Sets the name for the atomic value.
- * <p>
- * Each atomic value is identified by a unique name.
- * </p>
- * <p>
- * Note: This is a mandatory parameter.
- * </p>
- *
- * @param name name of the atomic value
- * @return this AtomicValueBuilder for method chaining
- */
- AtomicValueBuilder<V> withName(String name);
+public abstract class AtomicValueBuilder<V> extends DistributedPrimitiveBuilder<AsyncAtomicValue<V>> {
- /**
- * Sets a serializer that can be used to serialize the value.
- * <p>
- * Note: This is a mandatory parameter.
- * </p>
- *
- * @param serializer serializer
- * @return this AtomicValueBuilder for method chaining
- */
- AtomicValueBuilder<V> withSerializer(Serializer serializer);
-
- /**
- * Creates this atomic value on the partition that spans the entire cluster.
- * <p>
- * When partitioning is disabled, the value state will be
- * ephemeral and does not survive a full cluster restart.
- * </p>
- * <p>
- * Note: By default partitions are enabled.
- * </p>
- * @return this AtomicValueBuilder for method chaining
- */
- AtomicValueBuilder<V> withPartitionsDisabled();
-
- /**
- * Builds a AsyncAtomicValue based on the configuration options
- * supplied to this builder.
- *
- * @return new AsyncAtomicValue
- * @throws java.lang.RuntimeException if a mandatory parameter is missing
- */
- AsyncAtomicValue<V> buildAsyncValue();
-
- /**
- * Builds a AtomicValue based on the configuration options
- * supplied to this builder.
- *
- * @return new AtomicValue
- * @throws java.lang.RuntimeException if a mandatory parameter is missing
- */
- AtomicValue<V> build();
+ public AtomicValueBuilder() {
+ super(DistributedPrimitive.Type.VALUE);
+ }
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
index f69a1d3..abdb14d 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
@@ -64,6 +64,8 @@
LEADER_ELECTOR
}
+ static final long DEFAULT_OPERTATION_TIMEOUT_MILLIS = 5000L;
+
/**
* Returns the name of this primitive.
* @return name