minor name changes and javadoc
Change-Id: I43a05d75392efad9ac004867027a31adcc18c6f5
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index a294681..5edeb41 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -100,10 +100,10 @@
return CompletableFuture.allOf(partitions
.stream()
.map(db -> db.counters()
- .thenApply(m -> {
- counters.putAll(m);
- return null;
- }))
+ .thenApply(m -> {
+ counters.putAll(m);
+ return null;
+ }))
.toArray(CompletableFuture[]::new))
.thenApply(v -> counters);
}
@@ -113,9 +113,9 @@
checkState(isOpen.get(), DB_NOT_OPEN);
AtomicInteger totalSize = new AtomicInteger(0);
return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> totalSize.get());
}
@@ -136,10 +136,10 @@
checkState(isOpen.get(), DB_NOT_OPEN);
AtomicBoolean containsValue = new AtomicBoolean(false);
return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapContainsValue(mapName, value)
- .thenApply(v -> containsValue.compareAndSet(false, v)))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(p -> p.mapContainsValue(mapName, value)
+ .thenApply(v -> containsValue.compareAndSet(false, v)))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> containsValue.get());
}
@@ -196,9 +196,9 @@
checkState(isOpen.get(), DB_NOT_OPEN);
Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> entrySet);
}
@@ -220,6 +220,11 @@
return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
}
+ @Override
+ public CompletableFuture<Void> counterSet(String counterName, long value) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(counterName, counterName).counterSet(counterName, value);
+ }
@Override
public CompletableFuture<Long> queueSize(String queueName) {
@@ -268,8 +273,8 @@
AtomicBoolean status = new AtomicBoolean(true);
return CompletableFuture.allOf(subTransactions.entrySet()
.stream()
- .map(entry -> entry
- .getKey()
+ .map(entry -> entry
+ .getKey()
.prepare(entry.getValue())
.thenApply(v -> status.compareAndSet(true, v)))
.toArray(CompletableFuture[]::new))
@@ -282,15 +287,15 @@
AtomicBoolean success = new AtomicBoolean(true);
List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
return CompletableFuture.allOf(subTransactions.entrySet()
- .stream()
- .map(entry -> entry.getKey().commit(entry.getValue())
- .thenAccept(response -> {
- success.set(success.get() && response.success());
- if (success.get()) {
- allUpdates.addAll(response.updates());
- }
- }))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(entry -> entry.getKey().commit(entry.getValue())
+ .thenAccept(response -> {
+ success.set(success.get() && response.success());
+ if (success.get()) {
+ allUpdates.addAll(response.updates());
+ }
+ }))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> success.get() ?
CommitResponse.success(allUpdates) : CommitResponse.failure());
}
@@ -301,7 +306,7 @@
return CompletableFuture.allOf(subTransactions.entrySet()
.stream()
.map(entry -> entry.getKey().rollback(entry.getValue()))
- .toArray(CompletableFuture[]::new))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> true);
}
@@ -384,3 +389,4 @@
partitions.forEach(p -> p.unregisterConsumer(consumer));
}
}
+