Adding atomic countetr compare and set method

Change-Id: I5cf459e9e09ab1a84ced8160ef61d6a52ea4bea4
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index b5f62f8..1d81f99 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -16,14 +16,14 @@
 
 package org.onosproject.store.consistent.impl;
 
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
 /**
  * Database proxy.
  */
@@ -160,6 +160,16 @@
     CompletableFuture<Void> counterSet(String counterName, long value);
 
     /**
+     * Atomically sets the given counter to the specified update value if and only if the current value is equal to the
+     * expected value.
+     * @param counterName counter name
+     * @param expectedValue value to use for equivalence check
+     * @param update value to set if expected value is current value
+     * @return true if an update occurred, false otherwise
+     */
+    CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update);
+
+    /**
      * Returns the current value of the specified atomic counter.
      *
      * @param counterName counter name
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index b3dd1c4..1136428 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -16,18 +16,17 @@
 
 package org.onosproject.store.consistent.impl;
 
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
 import net.kuujo.copycat.state.Command;
 import net.kuujo.copycat.state.Initializer;
 import net.kuujo.copycat.state.Query;
 import net.kuujo.copycat.state.StateContext;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
  * Database state.
@@ -83,6 +82,9 @@
   Long counterAddAndGet(String counterName, long delta);
 
   @Command
+  Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue);
+
+  @Command
   Long counterGetAndAdd(String counterName, long delta);
 
   @Query
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
index 5e14e55..d851eaa 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
@@ -40,6 +40,7 @@
     private static final String ADD_AND_GET = "addAndGet";
     private static final String GET = "get";
     private static final String SET = "set";
+    private static final String COMPARE_AND_SET = "compareAndSet";
 
     public DefaultAsyncAtomicCounter(String name,
                                      Database database,
@@ -90,4 +91,11 @@
         return database.counterSet(name, value)
                 .whenComplete((r, e) -> timer.stop(e));
     }
+
+    @Override
+    public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
+        final MeteringAgent.Context timer = monitor.startTimer(COMPARE_AND_SET);
+        return database.counterCompareAndSet(name, expectedValue, updateValue)
+                .whenComplete((r, e) -> timer.stop(e));
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
index 08f6e57..2d6a956 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
@@ -68,6 +68,11 @@
     }
 
     @Override
+    public boolean compareAndSet(long expectedValue, long updateValue) {
+        return complete(asyncCounter.compareAndSet(expectedValue, updateValue));
+    }
+
+    @Override
     public long get() {
         return complete(asyncCounter.get());
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index fe94c9b..2a50fbd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -16,12 +16,15 @@
 
 package org.onosproject.store.consistent.impl;
 
-import net.kuujo.copycat.state.StateMachine;
+import com.google.common.collect.Sets;
 import net.kuujo.copycat.resource.internal.AbstractResource;
 import net.kuujo.copycat.resource.internal.ResourceManager;
+import net.kuujo.copycat.state.StateMachine;
 import net.kuujo.copycat.state.internal.DefaultStateMachine;
 import net.kuujo.copycat.util.concurrent.Futures;
 import net.kuujo.copycat.util.function.TriConsumer;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
 import java.util.Map;
@@ -30,11 +33,6 @@
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.Sets;
-
 /**
  * Default database.
  */
@@ -48,9 +46,9 @@
     public DefaultDatabase(ResourceManager context) {
         super(context);
         this.stateMachine = new DefaultStateMachine(context,
-                                                    DatabaseState.class,
-                                                    DefaultDatabaseState.class,
-                                                    DefaultDatabase.class.getClassLoader());
+                DatabaseState.class,
+                DefaultDatabaseState.class,
+                DefaultDatabase.class.getClassLoader());
         this.stateMachine.addStartupTask(() -> {
             stateMachine.registerWatcher(watcher);
             return CompletableFuture.completedFuture(null);
@@ -158,6 +156,11 @@
     }
 
     @Override
+    public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) {
+        return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update));
+    }
+
+    @Override
     public CompletableFuture<Long> queueSize(String queueName) {
         return checkOpen(() -> proxy.queueSize(queueName));
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 9a55ffb..8943fc8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -16,27 +16,26 @@
 
 package org.onosproject.store.consistent.impl;
 
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.StateContext;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import java.util.Set;
-
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-import com.google.common.base.Objects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import net.kuujo.copycat.state.Initializer;
-import net.kuujo.copycat.state.StateContext;
 
 /**
  * Default database state.
@@ -195,6 +194,11 @@
     }
 
     @Override
+    public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
+        return getCounter(counterName).compareAndSet(expectedValue, updateValue);
+    }
+
+    @Override
     public Long counterGet(String counterName) {
         return getCounter(counterName).get();
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 5edeb41..f741b36 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -16,6 +16,17 @@
 
 package org.onosproject.store.consistent.impl;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import net.kuujo.copycat.Task;
+import net.kuujo.copycat.cluster.Cluster;
+import net.kuujo.copycat.resource.ResourceState;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -28,18 +39,6 @@
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import net.kuujo.copycat.Task;
-import net.kuujo.copycat.cluster.Cluster;
-import net.kuujo.copycat.resource.ResourceState;
 import static com.google.common.base.Preconditions.checkState;
 
 /**
@@ -227,6 +226,14 @@
     }
 
     @Override
+    public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
+        return partitioner.getPartition(counterName, counterName).
+                counterCompareAndSet(counterName, expectedValue, updateValue);
+
+    }
+
+    @Override
     public CompletableFuture<Long> queueSize(String queueName) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(queueName, queueName).queueSize(queueName);