minor name changes and javadoc
Change-Id: I43a05d75392efad9ac004867027a31adcc18c6f5
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
index a879cc5..92efd39 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
@@ -58,4 +58,12 @@
* @return current value
*/
CompletableFuture<Long> get();
+
+
+ /**
+ * Atomically sets the given value to the current value.
+ *
+ * @return future void
+ */
+ CompletableFuture<Void> set(long value);
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java b/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
index f620e08..051838a 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
@@ -51,6 +51,14 @@
long addAndGet(long delta);
/**
+ * Atomically sets the given value to the current value.
+ *
+ * @param value the value to set
+ */
+ void set(long value);
+
+
+ /**
* Returns the current value of the counter without modifying it.
*
* @return current value
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java b/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java
index 01209be..26a5040 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java
@@ -48,6 +48,11 @@
}
@Override
+ public void set(long value) {
+ this.value.set(value);
+ }
+
+ @Override
public long get() {
return value.get();
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 95f9e39..b5f62f8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -45,6 +45,7 @@
/**
* Returns the number of entries in map.
+ *
* @param mapName map name
* @return A completable future to be completed with the result once complete.
*/
@@ -62,7 +63,7 @@
* Checks whether the map contains a key.
*
* @param mapName map name
- * @param key key to check.
+ * @param key key to check.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> mapContainsKey(String mapName, K key);
@@ -71,7 +72,7 @@
* Checks whether the map contains a value.
*
* @param mapName map name
- * @param value The value to check.
+ * @param value The value to check.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> mapContainsValue(String mapName, V value);
@@ -80,7 +81,7 @@
* Gets a value from the map.
*
* @param mapName map name
- * @param key The key to get.
+ * @param key The key to get.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Versioned<V>> mapGet(String mapName, K key);
@@ -88,11 +89,11 @@
/**
* Updates the map.
*
- * @param mapName map name
- * @param key The key to set
- * @param valueMatch match for checking existing value
- * @param versionMatch match for checking existing version
- * @param value new value
+ * @param mapName map name
+ * @param key The key to set
+ * @param valueMatch match for checking existing value
+ * @param versionMatch match for checking existing version
+ * @param value new value
* @return A completable future to be completed with the result once complete
*/
CompletableFuture<Result<UpdateResult<K, V>>> mapUpdate(
@@ -130,11 +131,11 @@
*/
CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> mapEntrySet(String mapName);
- /**
+ /**
* Atomically add the given value to current value of the specified counter.
*
* @param counterName counter name
- * @param delta value to add
+ * @param delta value to add
* @return updated value
*/
CompletableFuture<Long> counterAddAndGet(String counterName, long delta);
@@ -143,11 +144,21 @@
* Atomically add the given value to current value of the specified counter.
*
* @param counterName counter name
- * @param delta value to add
+ * @param delta value to add
* @return previous value
*/
CompletableFuture<Long> counterGetAndAdd(String counterName, long delta);
+
+ /**
+ * Atomically sets the given value to current value of the specified counter.
+ *
+ * @param counterName counter name
+ * @param value value to set
+ * @return void future
+ */
+ CompletableFuture<Void> counterSet(String counterName, long value);
+
/**
* Returns the current value of the specified atomic counter.
*
@@ -158,6 +169,7 @@
/**
* Returns the size of queue.
+ *
* @param queueName queue name
* @return queue size
*/
@@ -165,14 +177,16 @@
/**
* Inserts an entry into the queue.
+ *
* @param queueName queue name
- * @param entry queue entry
+ * @param entry queue entry
* @return void future
*/
CompletableFuture<Void> queuePush(String queueName, byte[] entry);
/**
* Removes an entry from the queue if the queue is non-empty.
+ *
* @param queueName queue name
* @return entry future. Can be completed with null if queue is empty
*/
@@ -180,6 +194,7 @@
/**
* Returns but does not remove an entry from the queue.
+ *
* @param queueName queue name
* @return entry. Can be null if queue is empty
*/
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
index 7a439c3..5e14e55 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
@@ -18,6 +18,7 @@
import org.onosproject.store.service.AsyncAtomicCounter;
import java.util.concurrent.CompletableFuture;
+
import static com.google.common.base.Preconditions.checkNotNull;
/**
@@ -38,6 +39,7 @@
private static final String GET_AND_ADD = "getAndAdd";
private static final String ADD_AND_GET = "addAndGet";
private static final String GET = "get";
+ private static final String SET = "set";
public DefaultAsyncAtomicCounter(String name,
Database database,
@@ -72,13 +74,20 @@
public CompletableFuture<Long> getAndAdd(long delta) {
final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD);
return database.counterGetAndAdd(name, delta)
- .whenComplete((r, e) -> timer.stop(e));
+ .whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Long> addAndGet(long delta) {
final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET);
return database.counterAddAndGet(name, delta)
- .whenComplete((r, e) -> timer.stop(e));
+ .whenComplete((r, e) -> timer.stop(e));
+ }
+
+ @Override
+ public CompletableFuture<Void> set(long value) {
+ final MeteringAgent.Context timer = monitor.startTimer(SET);
+ return database.counterSet(name, value)
+ .whenComplete((r, e) -> timer.stop(e));
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
index 64886e4..08f6e57 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
@@ -63,6 +63,11 @@
}
@Override
+ public void set(long value) {
+ complete(asyncCounter.set(value));
+ }
+
+ @Override
public long get() {
return complete(asyncCounter.get());
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 4d9776e..fe94c9b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -44,13 +44,13 @@
private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({"unchecked", "rawtypes"})
public DefaultDatabase(ResourceManager context) {
super(context);
this.stateMachine = new DefaultStateMachine(context,
- DatabaseState.class,
- DefaultDatabaseState.class,
- DefaultDatabase.class.getClassLoader());
+ DatabaseState.class,
+ DefaultDatabaseState.class,
+ DefaultDatabase.class.getClassLoader());
this.stateMachine.addStartupTask(() -> {
stateMachine.registerWatcher(watcher);
return CompletableFuture.completedFuture(null);
@@ -66,7 +66,7 @@
* return the completed future result.
*
* @param supplier The supplier to call if the database is open.
- * @param <T> The future result type.
+ * @param <T> The future result type.
* @return A completable future that if this database is closed is immediately failed.
*/
protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
@@ -153,6 +153,11 @@
}
@Override
+ public CompletableFuture<Void> counterSet(String counterName, long value) {
+ return checkOpen(() -> proxy.counterSet(counterName, value));
+ }
+
+ @Override
public CompletableFuture<Long> queueSize(String queueName) {
return checkOpen(() -> proxy.queueSize(queueName));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index a294681..5edeb41 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -100,10 +100,10 @@
return CompletableFuture.allOf(partitions
.stream()
.map(db -> db.counters()
- .thenApply(m -> {
- counters.putAll(m);
- return null;
- }))
+ .thenApply(m -> {
+ counters.putAll(m);
+ return null;
+ }))
.toArray(CompletableFuture[]::new))
.thenApply(v -> counters);
}
@@ -113,9 +113,9 @@
checkState(isOpen.get(), DB_NOT_OPEN);
AtomicInteger totalSize = new AtomicInteger(0);
return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> totalSize.get());
}
@@ -136,10 +136,10 @@
checkState(isOpen.get(), DB_NOT_OPEN);
AtomicBoolean containsValue = new AtomicBoolean(false);
return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapContainsValue(mapName, value)
- .thenApply(v -> containsValue.compareAndSet(false, v)))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(p -> p.mapContainsValue(mapName, value)
+ .thenApply(v -> containsValue.compareAndSet(false, v)))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> containsValue.get());
}
@@ -196,9 +196,9 @@
checkState(isOpen.get(), DB_NOT_OPEN);
Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> entrySet);
}
@@ -220,6 +220,11 @@
return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
}
+ @Override
+ public CompletableFuture<Void> counterSet(String counterName, long value) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(counterName, counterName).counterSet(counterName, value);
+ }
@Override
public CompletableFuture<Long> queueSize(String queueName) {
@@ -268,8 +273,8 @@
AtomicBoolean status = new AtomicBoolean(true);
return CompletableFuture.allOf(subTransactions.entrySet()
.stream()
- .map(entry -> entry
- .getKey()
+ .map(entry -> entry
+ .getKey()
.prepare(entry.getValue())
.thenApply(v -> status.compareAndSet(true, v)))
.toArray(CompletableFuture[]::new))
@@ -282,15 +287,15 @@
AtomicBoolean success = new AtomicBoolean(true);
List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
return CompletableFuture.allOf(subTransactions.entrySet()
- .stream()
- .map(entry -> entry.getKey().commit(entry.getValue())
- .thenAccept(response -> {
- success.set(success.get() && response.success());
- if (success.get()) {
- allUpdates.addAll(response.updates());
- }
- }))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(entry -> entry.getKey().commit(entry.getValue())
+ .thenAccept(response -> {
+ success.set(success.get() && response.success());
+ if (success.get()) {
+ allUpdates.addAll(response.updates());
+ }
+ }))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> success.get() ?
CommitResponse.success(allUpdates) : CommitResponse.failure());
}
@@ -301,7 +306,7 @@
return CompletableFuture.allOf(subTransactions.entrySet()
.stream()
.map(entry -> entry.getKey().rollback(entry.getValue()))
- .toArray(CompletableFuture[]::new))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> true);
}
@@ -384,3 +389,4 @@
partitions.forEach(p -> p.unregisterConsumer(consumer));
}
}
+