Support for a distributed counter

Change-Id: I346e9baa28556fac13e53771021f5f6fbcd75ac9
diff --git a/cli/src/main/java/org/onosproject/cli/net/CountersListCommand.java b/cli/src/main/java/org/onosproject/cli/net/CountersListCommand.java
new file mode 100644
index 0000000..2adba25
--- /dev/null
+++ b/cli/src/main/java/org/onosproject/cli/net/CountersListCommand.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cli.net;
+
+import java.util.Map;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.store.service.StorageAdminService;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * Command to list the various counters in the system.
+ */
+@Command(scope = "onos", name = "counters",
+        description = "Lists information about atomic counters in the system")
+public class CountersListCommand extends AbstractShellCommand {
+
+    private static final String FMT = "name=%s next_value=%d";
+
+    /**
+     * Displays counters as text.
+     *
+     * @param mapInfo map descriptions
+     */
+    private void displayCounters(Map<String, Long> counters) {
+        counters.forEach((name, nextValue) -> print(FMT, name, nextValue));
+    }
+
+    /**
+     * Converts info for counters into a JSON object.
+     *
+     * @param counters counter info
+     */
+    private JsonNode json(Map<String, Long> counters) {
+        ObjectMapper mapper = new ObjectMapper();
+        ArrayNode jsonCounters = mapper.createArrayNode();
+
+        // Create a JSON node for each counter
+        counters.forEach((name, value) -> {
+                ObjectNode jsonCounter = mapper.createObjectNode();
+                jsonCounter.put("name", name)
+                   .put("value", value);
+                jsonCounters.add(jsonCounter);
+            });
+
+        return jsonCounters;
+    }
+
+    @Override
+    protected void execute() {
+        StorageAdminService storageAdminService = get(StorageAdminService.class);
+        Map<String, Long> counters = storageAdminService.getCounters();
+        if (outputJson()) {
+            print("%s", json(counters));
+        } else {
+            displayCounters(counters);
+        }
+    }
+}
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 01d6e62..96a907d 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -233,6 +233,9 @@
             <action class="org.onosproject.cli.net.MapsListCommand"/>
         </command>
         <command>
+            <action class="org.onosproject.cli.net.CountersListCommand"/>
+        </command>
+        <command>
             <action class="org.onosproject.cli.net.TransactionsCommand"/>
         </command>
         <command>
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
new file mode 100644
index 0000000..8c27666
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An async atomic counter dispenses monotonically increasing values.
+ */
+public interface AsyncAtomicCounter {
+
+    /**
+     * Atomically increment by one the current value.
+     *
+     * @return updated value
+     */
+    CompletableFuture<Long> incrementAndGet();
+
+    /**
+     * Returns the current value of the counter without modifying it.
+     *
+     * @return current value
+     */
+    CompletableFuture<Long> get();
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java b/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
new file mode 100644
index 0000000..4385ce0
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * An atomic counter dispenses monotonically increasing values.
+ */
+public interface AtomicCounter {
+
+    /**
+     * Atomically increment by one the current value.
+     *
+     * @return updated value
+     */
+    long incrementAndGet();
+
+    /**
+     * Returns the current value of the counter without modifying it.
+     *
+     * @return current value
+     */
+    long get();
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicCounterBuilder.java b/core/api/src/main/java/org/onosproject/store/service/AtomicCounterBuilder.java
new file mode 100644
index 0000000..ee8481d2
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicCounterBuilder.java
@@ -0,0 +1,52 @@
+package org.onosproject.store.service;
+
+/**
+ * Builder for AtomicCounter.
+ */
+public interface AtomicCounterBuilder {
+
+    /**
+     * Sets the name for the atomic counter.
+     * <p>
+     * Each atomic counter is identified by a unique name.
+     * </p>
+     * <p>
+     * Note: This is a mandatory parameter.
+     * </p>
+     *
+     * @param name name of the atomic counter
+     * @return this AtomicCounterBuilder
+     */
+    public AtomicCounterBuilder withName(String name);
+
+    /**
+     * Creates this counter on the partition that spans the entire cluster.
+     * <p>
+     * When partitioning is disabled, the counter state will be
+     * ephemeral and does not survive a full cluster restart.
+     * </p>
+     * <p>
+     * Note: By default partitions are enabled.
+     * </p>
+     * @return this AtomicCounterBuilder
+     */
+    public AtomicCounterBuilder withPartitionsDisabled();
+
+    /**
+     * Builds a AtomicCounter based on the configuration options
+     * supplied to this builder.
+     *
+     * @return new AtomicCounter
+     * @throws java.lang.RuntimeException if a mandatory parameter is missing
+     */
+    public AtomicCounter build();
+
+    /**
+     * Builds a AsyncAtomicCounter based on the configuration options
+     * supplied to this builder.
+     *
+     * @return new AsyncAtomicCounter
+     * @throws java.lang.RuntimeException if a mandatory parameter is missing
+     */
+    public AsyncAtomicCounter buildAsyncCounter();
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
index 7d6ad4d..94ed649 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
@@ -20,7 +20,7 @@
  * Top level exception for ConsistentMap failures.
  */
 @SuppressWarnings("serial")
-public class ConsistentMapException extends RuntimeException {
+public class ConsistentMapException extends StorageException {
     public ConsistentMapException() {
     }
 
diff --git a/core/api/src/main/java/org/onosproject/store/service/Serializer.java b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
index 466943e..32ae585 100644
--- a/core/api/src/main/java/org/onosproject/store/service/Serializer.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
@@ -16,6 +16,8 @@
 
 package org.onosproject.store.service;
 
+import org.onlab.util.KryoNamespace;
+
 /**
  * Interface for serialization for store artifacts.
  */
@@ -35,4 +37,24 @@
      * @param <T> decoded type
      */
     <T> T decode(byte[] bytes);
+
+    /**
+     * Creates a new Serializer instance from a KryoNamespace.
+     *
+     * @param kryo kryo namespace
+     * @return Serializer instance
+     */
+    public static Serializer using(KryoNamespace kryo) {
+        return new Serializer() {
+            @Override
+            public <T> byte[] encode(T object) {
+                return kryo.serialize(object);
+            }
+
+            @Override
+            public <T> T decode(byte[] bytes) {
+                return kryo.deserialize(bytes);
+            }
+        };
+    }
 }
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
index eb129fe..204e35b 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
@@ -17,6 +17,7 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Service for administering storage instances.
@@ -38,6 +39,13 @@
     List<MapInfo> getMapInfo();
 
     /**
+     * Returns information about all the atomic counters in the system.
+     *
+     * @return mapping from counter name to that counter's next value
+     */
+    Map<String, Long> getCounters();
+
+    /**
      * Returns all the transactions in the system.
      *
      * @return collection of transactions
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageException.java b/core/api/src/main/java/org/onosproject/store/service/StorageException.java
new file mode 100644
index 0000000..a66fc3e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageException.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+/**
+ * Top level exception for Store failures.
+ */
+@SuppressWarnings("serial")
+public class StorageException extends RuntimeException {
+    public StorageException() {
+    }
+
+    public StorageException(Throwable t) {
+        super(t);
+    }
+
+    /**
+     * Store operation timeout.
+     */
+    public static class Timeout extends StorageException {
+    }
+
+    /**
+     * Store update conflicts with an in flight transaction.
+     */
+    public static class ConcurrentModification extends StorageException {
+    }
+
+    /**
+     * Store operation interrupted.
+     */
+    public static class Interrupted extends StorageException {
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index 5ea0420..91cc273 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.store.service;
 
+
 /**
  * Storage service.
  * <p>
@@ -55,6 +56,13 @@
     <E> SetBuilder<E> setBuilder();
 
     /**
+     * Creates a new AtomicCounterBuilder.
+     *
+     * @return atomic counter builder
+     */
+    AtomicCounterBuilder atomicCounterBuilder();
+
+    /**
      * Creates a new transaction context.
      *
      * @return transaction context
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 363f330..dd8088c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -18,6 +18,7 @@
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import net.kuujo.copycat.CopycatConfig;
@@ -47,6 +48,7 @@
 import org.onosproject.store.cluster.impl.NodeInfo;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
+import org.onosproject.store.service.AtomicCounterBuilder;
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
@@ -324,6 +326,11 @@
     }
 
     @Override
+    public AtomicCounterBuilder atomicCounterBuilder() {
+        return new DefaultAtomicCounterBuilder(inMemoryDatabase, partitionedDatabase);
+    }
+
+    @Override
     public List<MapInfo> getMapInfo() {
         List<MapInfo> maps = Lists.newArrayList();
         maps.addAll(getMapInfo(inMemoryDatabase));
@@ -339,6 +346,15 @@
             .collect(Collectors.toList());
     }
 
+
+    @Override
+    public Map<String, Long> getCounters() {
+        Map<String, Long> counters = Maps.newHashMap();
+        counters.putAll(complete(inMemoryDatabase.counters()));
+        counters.putAll(complete(partitionedDatabase.counters()));
+        return counters;
+    }
+
     @Override
     public Collection<Transaction> getTransactions() {
         return complete(transactionManager.getTransactions());
@@ -361,4 +377,4 @@
     public void redriveTransactions() {
         getTransactions().stream().forEach(transactionManager::execute);
     }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 3578525..fdd93c3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -35,6 +35,12 @@
      */
     CompletableFuture<Set<String>> tableNames();
 
+    /**
+     * Returns a mapping from counter name to next value.
+     * @return A completable future to be completed with the result once complete.
+     */
+    CompletableFuture<Map<String, Long>> counters();
+
   /**
    * Gets the table size.
    *
@@ -182,6 +188,23 @@
   CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue);
 
   /**
+   * Returns the next value for the specified atomic counter after
+   * incrementing the current value by one.
+   *
+   * @param counterName counter name
+   * @return next value for the specified counter
+   */
+  CompletableFuture<Long> nextValue(String counterName);
+
+  /**
+   * Returns the current value for the specified atomic counter.
+   *
+   * @param counterName counter name
+   * @return current value for the specified counter
+   */
+  CompletableFuture<Long> currentValue(String counterName);
+
+  /**
    * Prepare and commit the specified transaction.
    *
    * @param transaction transaction to commit (after preparation)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 6685dde..5f955fd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -17,6 +17,7 @@
 package org.onosproject.store.consistent.impl;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
@@ -46,6 +47,9 @@
   Set<String> tableNames();
 
   @Query
+  Map<String, Long> counters();
+
+  @Query
   int size(String tableName);
 
   @Query
@@ -94,6 +98,12 @@
   Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
 
   @Command
+  Long nextValue(String counterName);
+
+  @Query
+  Long currentValue(String counterName);
+
+  @Command
   boolean prepareAndCommit(Transaction transaction);
 
   @Command
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
new file mode 100644
index 0000000..3cad25f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import java.util.concurrent.CompletableFuture;
+import org.onosproject.store.service.AsyncAtomicCounter;
+import static com.google.common.base.Preconditions.*;
+
+/**
+ * Default implementation for a distributed AsyncAtomicCounter backed by
+ * partitioned Raft DB.
+ * <p>
+ * The initial value will be zero.
+ */
+public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
+
+    private final String name;
+    private final Database database;
+
+    public DefaultAsyncAtomicCounter(String name, Database database) {
+        this.name = checkNotNull(name);
+        this.database = checkNotNull(database);
+    }
+
+    @Override
+    public CompletableFuture<Long> incrementAndGet() {
+        return database.nextValue(name);
+    }
+
+    @Override
+    public CompletableFuture<Long> get() {
+        return database.currentValue(name);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
new file mode 100644
index 0000000..e479028
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.onosproject.store.service.AsyncAtomicCounter;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.StorageException;
+
+/**
+ * Default implementation for a distributed AtomicCounter backed by
+ * partitioned Raft DB.
+ * <p>
+ * The initial value will be zero.
+ */
+public class DefaultAtomicCounter implements AtomicCounter {
+
+    private static final int OPERATION_TIMEOUT_MILLIS = 5000;
+
+    private final AsyncAtomicCounter asyncCounter;
+
+    public DefaultAtomicCounter(String name, Database database) {
+        asyncCounter = new DefaultAsyncAtomicCounter(name, database);
+    }
+
+    @Override
+    public long incrementAndGet() {
+        return complete(asyncCounter.incrementAndGet());
+    }
+
+    @Override
+    public long get() {
+        return complete(asyncCounter.get());
+    }
+
+    private static <T> T complete(CompletableFuture<T> future) {
+        try {
+            return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new StorageException.Interrupted();
+        } catch (TimeoutException e) {
+            throw new StorageException.Timeout();
+        } catch (ExecutionException e) {
+            throw new StorageException(e.getCause());
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
new file mode 100644
index 0000000..c84cff0
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
@@ -0,0 +1,48 @@
+package org.onosproject.store.consistent.impl;
+
+import org.onosproject.store.service.AsyncAtomicCounter;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.AtomicCounterBuilder;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Default implementation of AtomicCounterBuilder.
+ */
+public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
+
+    private String name;
+    private boolean partitionsEnabled = true;
+    private final Database partitionedDatabase;
+    private final Database inMemoryDatabase;
+
+    public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
+        this.inMemoryDatabase = inMemoryDatabase;
+        this.partitionedDatabase = partitionedDatabase;
+    }
+
+    @Override
+    public AtomicCounterBuilder withName(String name) {
+        checkArgument(name != null && !name.isEmpty());
+        this.name = name;
+        return this;
+    }
+
+    @Override
+    public AtomicCounterBuilder withPartitionsDisabled() {
+        partitionsEnabled = false;
+        return this;
+    }
+
+    @Override
+    public AtomicCounter build() {
+        Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
+        return new DefaultAtomicCounter(name, database);
+    }
+
+    @Override
+    public AsyncAtomicCounter buildAsyncCounter() {
+        Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
+        return new DefaultAsyncAtomicCounter(name, database);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 8ed7670..4bdfcb3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -65,6 +65,11 @@
     }
 
     @Override
+    public CompletableFuture<Map<String, Long>> counters() {
+        return checkOpen(() -> proxy.counters());
+    }
+
+    @Override
     public CompletableFuture<Integer> size(String tableName) {
         return checkOpen(() -> proxy.size(tableName));
     }
@@ -145,6 +150,16 @@
     }
 
     @Override
+    public CompletableFuture<Long> nextValue(String counterName) {
+        return checkOpen(() -> proxy.nextValue(counterName));
+    }
+
+    @Override
+    public CompletableFuture<Long> currentValue(String counterName) {
+        return checkOpen(() -> proxy.currentValue(counterName));
+    }
+
+    @Override
     public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
         return checkOpen(() -> proxy.prepareAndCommit(transaction));
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index c190a28..bad3782 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -21,6 +21,7 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.Set;
 
@@ -43,6 +44,7 @@
  */
 public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
     private Long nextVersion;
+    private Map<String, AtomicLong> counters;
     private Map<String, Map<String, Versioned<byte[]>>> tables;
 
     /**
@@ -60,6 +62,11 @@
     @Initializer
     @Override
     public void init(StateContext<DatabaseState<String, byte[]>> context) {
+        counters = context.get("counters");
+        if (counters == null) {
+            counters = Maps.newConcurrentMap();
+            context.put("counters", counters);
+        }
         tables = context.get("tables");
         if (tables == null) {
             tables = Maps.newConcurrentMap();
@@ -83,6 +90,13 @@
     }
 
     @Override
+    public Map<String, Long> counters() {
+        Map<String, Long> counterMap = Maps.newHashMap();
+        counters.forEach((k, v) -> counterMap.put(k, v.get()));
+        return counterMap;
+    }
+
+    @Override
     public int size(String tableName) {
       return getTableMap(tableName).size();
     }
@@ -212,6 +226,16 @@
     }
 
     @Override
+    public Long nextValue(String counterName) {
+        return getCounter(counterName).incrementAndGet();
+    }
+
+    @Override
+    public Long currentValue(String counterName) {
+        return getCounter(counterName).get();
+    }
+
+    @Override
     public boolean prepareAndCommit(Transaction transaction) {
         if (prepare(transaction)) {
             return commit(transaction);
@@ -255,6 +279,10 @@
         return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
     }
 
+    private AtomicLong getCounter(String counterName) {
+        return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
+    }
+
     private boolean isUpdatePossible(DatabaseUpdate update) {
         Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
         switch (update.type()) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 571e309..39b4844 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -103,18 +103,7 @@
     public void activate() {
         lockMap = storageService.<String, NodeId>consistentMapBuilder()
                     .withName("onos-leader-locks")
-                    .withSerializer(new Serializer() {
-                        KryoNamespace kryo = new KryoNamespace.Builder().register(KryoNamespaces.API).build();
-                        @Override
-                        public <T> byte[] encode(T object) {
-                            return kryo.serialize(object);
-                        }
-
-                        @Override
-                        public <T> T decode(byte[] bytes) {
-                            return kryo.deserialize(bytes);
-                        }
-                    })
+                    .withSerializer(Serializer.using(new KryoNamespace.Builder().register(KryoNamespaces.API).build()))
                     .withPartitionsDisabled().build();
 
         localNodeId = clusterService.getLocalNode().id();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 178f49f..bbbd9b6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -90,6 +90,21 @@
     }
 
     @Override
+    public CompletableFuture<Map<String, Long>> counters() {
+        checkState(isOpen.get(), DB_NOT_OPEN);
+        Map<String, Long> counters = Maps.newConcurrentMap();
+        return CompletableFuture.allOf(partitions
+                .stream()
+                .map(db -> db.counters()
+                             .thenApply(m -> {
+                                 counters.putAll(m);
+                                 return null;
+                             }))
+                .toArray(CompletableFuture[]::new))
+            .thenApply(v -> counters);
+    }
+
+    @Override
     public CompletableFuture<Integer> size(String tableName) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         AtomicInteger totalSize = new AtomicInteger(0);
@@ -219,6 +234,18 @@
     }
 
     @Override
+    public CompletableFuture<Long> nextValue(String counterName) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
+        return partitioner.getPartition(counterName, counterName).nextValue(counterName);
+    }
+
+    @Override
+    public CompletableFuture<Long> currentValue(String counterName) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
+        return partitioner.getPartition(counterName, counterName).currentValue(counterName);
+    }
+
+    @Override
     public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
         Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
         if (subTransactions.isEmpty()) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
index 865934b..8a3603a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
@@ -36,34 +36,22 @@
  */
 public class TransactionManager {
 
+    private static final KryoNamespace KRYO_NAMESPACE = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.BASIC)
+            .nextId(KryoNamespace.FLOATING_ID)
+            .register(Versioned.class)
+            .register(DatabaseUpdate.class)
+            .register(DatabaseUpdate.Type.class)
+            .register(DefaultTransaction.class)
+            .register(Transaction.State.class)
+            .register(Pair.class)
+            .register(ImmutablePair.class)
+            .build();
+
+    private final Serializer serializer = Serializer.using(KRYO_NAMESPACE);
     private final Database database;
     private final AsyncConsistentMap<Long, Transaction> transactions;
 
-    private final Serializer serializer = new Serializer() {
-
-        private KryoNamespace kryo = KryoNamespace.newBuilder()
-                .register(KryoNamespaces.BASIC)
-                .nextId(KryoNamespace.FLOATING_ID)
-                .register(Versioned.class)
-                .register(DatabaseUpdate.class)
-                .register(DatabaseUpdate.Type.class)
-                .register(DefaultTransaction.class)
-                .register(Transaction.State.class)
-                .register(Pair.class)
-                .register(ImmutablePair.class)
-                .build();
-
-        @Override
-        public <T> byte[] encode(T object) {
-            return kryo.serialize(object);
-        }
-
-        @Override
-        public <T> T decode(byte[] bytes) {
-            return kryo.deserialize(bytes);
-        }
-    };
-
     /**
      * Constructs a new TransactionManager for the specified database instance.
      *