Adding atomic countetr compare and set method
Change-Id: I5cf459e9e09ab1a84ced8160ef61d6a52ea4bea4
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
index 92efd39..c0df713 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
@@ -66,4 +66,13 @@
* @return future void
*/
CompletableFuture<Void> set(long value);
+
+ /**
+ * Atomically sets the given counter to the updated value if the current value is the expected value, otherwise
+ * no change occurs.
+ * @param expectedValue the expected current value of the counter
+ * @param updateValue the new value to be set
+ * @return true if the update occurred and the expected value was equal to the current value, false otherwise
+ */
+ CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue);
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java b/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
index 051838a..3c9e02c 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
@@ -57,6 +57,14 @@
*/
void set(long value);
+ /**
+ * Atomically sets the given counter to the updated value if the current value is the expected value, otherwise
+ * no change occurs.
+ * @param expectedValue the expected current value of the counter
+ * @param updateValue the new value to be set
+ * @return true if the update occurred and the expected value was equal to the current value, false otherwise
+ */
+ boolean compareAndSet(long expectedValue, long updateValue);
/**
* Returns the current value of the counter without modifying it.
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java b/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java
index 26a5040..8c577df 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java
@@ -53,6 +53,11 @@
}
@Override
+ public boolean compareAndSet(long expectedValue, long updateValue) {
+ return value.compareAndSet(expectedValue, updateValue);
+ }
+
+ @Override
public long get() {
return value.get();
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index b5f62f8..1d81f99 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -16,14 +16,14 @@
package org.onosproject.store.consistent.impl;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
/**
* Database proxy.
*/
@@ -160,6 +160,16 @@
CompletableFuture<Void> counterSet(String counterName, long value);
/**
+ * Atomically sets the given counter to the specified update value if and only if the current value is equal to the
+ * expected value.
+ * @param counterName counter name
+ * @param expectedValue value to use for equivalence check
+ * @param update value to set if expected value is current value
+ * @return true if an update occurred, false otherwise
+ */
+ CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update);
+
+ /**
* Returns the current value of the specified atomic counter.
*
* @param counterName counter name
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index b3dd1c4..1136428 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -16,18 +16,17 @@
package org.onosproject.store.consistent.impl;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
import net.kuujo.copycat.state.Command;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.Query;
import net.kuujo.copycat.state.StateContext;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
/**
* Database state.
@@ -83,6 +82,9 @@
Long counterAddAndGet(String counterName, long delta);
@Command
+ Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue);
+
+ @Command
Long counterGetAndAdd(String counterName, long delta);
@Query
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
index 5e14e55..d851eaa 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
@@ -40,6 +40,7 @@
private static final String ADD_AND_GET = "addAndGet";
private static final String GET = "get";
private static final String SET = "set";
+ private static final String COMPARE_AND_SET = "compareAndSet";
public DefaultAsyncAtomicCounter(String name,
Database database,
@@ -90,4 +91,11 @@
return database.counterSet(name, value)
.whenComplete((r, e) -> timer.stop(e));
}
+
+ @Override
+ public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
+ final MeteringAgent.Context timer = monitor.startTimer(COMPARE_AND_SET);
+ return database.counterCompareAndSet(name, expectedValue, updateValue)
+ .whenComplete((r, e) -> timer.stop(e));
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
index 08f6e57..2d6a956 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
@@ -68,6 +68,11 @@
}
@Override
+ public boolean compareAndSet(long expectedValue, long updateValue) {
+ return complete(asyncCounter.compareAndSet(expectedValue, updateValue));
+ }
+
+ @Override
public long get() {
return complete(asyncCounter.get());
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index fe94c9b..2a50fbd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -16,12 +16,15 @@
package org.onosproject.store.consistent.impl;
-import net.kuujo.copycat.state.StateMachine;
+import com.google.common.collect.Sets;
import net.kuujo.copycat.resource.internal.AbstractResource;
import net.kuujo.copycat.resource.internal.ResourceManager;
+import net.kuujo.copycat.state.StateMachine;
import net.kuujo.copycat.state.internal.DefaultStateMachine;
import net.kuujo.copycat.util.concurrent.Futures;
import net.kuujo.copycat.util.function.TriConsumer;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
import java.util.Collection;
import java.util.Map;
@@ -30,11 +33,6 @@
import java.util.function.Consumer;
import java.util.function.Supplier;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.Sets;
-
/**
* Default database.
*/
@@ -48,9 +46,9 @@
public DefaultDatabase(ResourceManager context) {
super(context);
this.stateMachine = new DefaultStateMachine(context,
- DatabaseState.class,
- DefaultDatabaseState.class,
- DefaultDatabase.class.getClassLoader());
+ DatabaseState.class,
+ DefaultDatabaseState.class,
+ DefaultDatabase.class.getClassLoader());
this.stateMachine.addStartupTask(() -> {
stateMachine.registerWatcher(watcher);
return CompletableFuture.completedFuture(null);
@@ -158,6 +156,11 @@
}
@Override
+ public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) {
+ return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update));
+ }
+
+ @Override
public CompletableFuture<Long> queueSize(String queueName) {
return checkOpen(() -> proxy.queueSize(queueName));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 9a55ffb..8943fc8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -16,27 +16,26 @@
package org.onosproject.store.consistent.impl;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.StateContext;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import java.util.Set;
-
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-import com.google.common.base.Objects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import net.kuujo.copycat.state.Initializer;
-import net.kuujo.copycat.state.StateContext;
/**
* Default database state.
@@ -195,6 +194,11 @@
}
@Override
+ public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
+ return getCounter(counterName).compareAndSet(expectedValue, updateValue);
+ }
+
+ @Override
public Long counterGet(String counterName) {
return getCounter(counterName).get();
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 5edeb41..f741b36 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -16,6 +16,17 @@
package org.onosproject.store.consistent.impl;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import net.kuujo.copycat.Task;
+import net.kuujo.copycat.cluster.Cluster;
+import net.kuujo.copycat.resource.ResourceState;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -28,18 +39,6 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import net.kuujo.copycat.Task;
-import net.kuujo.copycat.cluster.Cluster;
-import net.kuujo.copycat.resource.ResourceState;
import static com.google.common.base.Preconditions.checkState;
/**
@@ -227,6 +226,14 @@
}
@Override
+ public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(counterName, counterName).
+ counterCompareAndSet(counterName, expectedValue, updateValue);
+
+ }
+
+ @Override
public CompletableFuture<Long> queueSize(String queueName) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(queueName, queueName).queueSize(queueName);