Support for a distributed counter
Change-Id: I346e9baa28556fac13e53771021f5f6fbcd75ac9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index c190a28..bad3782 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -21,6 +21,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.Set;
@@ -43,6 +44,7 @@
*/
public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
private Long nextVersion;
+ private Map<String, AtomicLong> counters;
private Map<String, Map<String, Versioned<byte[]>>> tables;
/**
@@ -60,6 +62,11 @@
@Initializer
@Override
public void init(StateContext<DatabaseState<String, byte[]>> context) {
+ counters = context.get("counters");
+ if (counters == null) {
+ counters = Maps.newConcurrentMap();
+ context.put("counters", counters);
+ }
tables = context.get("tables");
if (tables == null) {
tables = Maps.newConcurrentMap();
@@ -83,6 +90,13 @@
}
@Override
+ public Map<String, Long> counters() {
+ Map<String, Long> counterMap = Maps.newHashMap();
+ counters.forEach((k, v) -> counterMap.put(k, v.get()));
+ return counterMap;
+ }
+
+ @Override
public int size(String tableName) {
return getTableMap(tableName).size();
}
@@ -212,6 +226,16 @@
}
@Override
+ public Long nextValue(String counterName) {
+ return getCounter(counterName).incrementAndGet();
+ }
+
+ @Override
+ public Long currentValue(String counterName) {
+ return getCounter(counterName).get();
+ }
+
+ @Override
public boolean prepareAndCommit(Transaction transaction) {
if (prepare(transaction)) {
return commit(transaction);
@@ -255,6 +279,10 @@
return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
}
+ private AtomicLong getCounter(String counterName) {
+ return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
+ }
+
private boolean isUpdatePossible(DatabaseUpdate update) {
Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
switch (update.type()) {