Support for a distributed counter

Change-Id: I346e9baa28556fac13e53771021f5f6fbcd75ac9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index c190a28..bad3782 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -21,6 +21,7 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.Set;
 
@@ -43,6 +44,7 @@
  */
 public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
     private Long nextVersion;
+    private Map<String, AtomicLong> counters;
     private Map<String, Map<String, Versioned<byte[]>>> tables;
 
     /**
@@ -60,6 +62,11 @@
     @Initializer
     @Override
     public void init(StateContext<DatabaseState<String, byte[]>> context) {
+        counters = context.get("counters");
+        if (counters == null) {
+            counters = Maps.newConcurrentMap();
+            context.put("counters", counters);
+        }
         tables = context.get("tables");
         if (tables == null) {
             tables = Maps.newConcurrentMap();
@@ -83,6 +90,13 @@
     }
 
     @Override
+    public Map<String, Long> counters() {
+        Map<String, Long> counterMap = Maps.newHashMap();
+        counters.forEach((k, v) -> counterMap.put(k, v.get()));
+        return counterMap;
+    }
+
+    @Override
     public int size(String tableName) {
       return getTableMap(tableName).size();
     }
@@ -212,6 +226,16 @@
     }
 
     @Override
+    public Long nextValue(String counterName) {
+        return getCounter(counterName).incrementAndGet();
+    }
+
+    @Override
+    public Long currentValue(String counterName) {
+        return getCounter(counterName).get();
+    }
+
+    @Override
     public boolean prepareAndCommit(Transaction transaction) {
         if (prepare(transaction)) {
             return commit(transaction);
@@ -255,6 +279,10 @@
         return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
     }
 
+    private AtomicLong getCounter(String counterName) {
+        return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
+    }
+
     private boolean isUpdatePossible(DatabaseUpdate update) {
         Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
         switch (update.type()) {