WIP: Consistent map implementation.

Change-Id: I51b2d954b7a8ff2c51c425d9a8125937d4eaa6b0

Change-Id: Ib27799d4eb60fc4bfaa8d2f21a904365ff5437eb

Change-Id: I95c937600ceb8f282a482280217671c471f40b9c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java
new file mode 100644
index 0000000..4d7f33b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java
@@ -0,0 +1,200 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.Map.Entry;
+
+/**
+ * A distributed, strongly consistent map.
+ * <p>
+ * This map offers strong read-after-update (where update == create/update/delete)
+ * consistency. All operations to the map are serialized and applied in a consistent
+ * manner.
+ * <p>
+ * The stronger consistency comes at the expense of availability in
+ * the event of a network partition. A network partition can be either due to
+ * a temporary disruption in network connectivity between participating nodes
+ * or due to a node being temporarily down.
+ * </p><p>
+ * All values stored in this map are versioned and the API supports optimistic
+ * concurrency by allowing conditional updates that take into consideration
+ * the version or value that was previously read.
+ * </p><p>
+ * The map also supports atomic batch updates (transactions). One can provide a list
+ * of updates to be applied atomically if and only if all the operations are guaranteed
+ * to succeed i.e. all their preconditions are met. For example, the precondition
+ * for a putIfAbsent API call is absence of a mapping for the key. Similarly, the
+ * precondition for a conditional replace operation is the presence of an expected
+ * version or value
+ * </p><p>
+ * This map does not allow null values. All methods can throw a ConsistentMapException
+ * (which extends RuntimeException) to indicate failures.
+ *
+ */
+public interface ConsistentMap<K, V> {
+
+    /**
+     * Returns the number of entries in the map.
+     *
+     * @return map size.
+     */
+    int size();
+
+    /**
+     * Returns true if the map is empty.
+     *
+     * @return true if map has no entries, false otherwise.
+     */
+    boolean isEmpty();
+
+    /**
+     * Returns true if this map contains a mapping for the specified key.
+     *
+     * @param key key
+     * @return true if map contains key, false otherwise.
+     */
+    boolean containsKey(K key);
+
+    /**
+     * Returns true if this map contains the specified value.
+     *
+     * @param value value
+     * @return true if map contains value, false otherwise.
+     */
+    boolean containsValue(V value);
+
+    /**
+     * Returns the value (and version) to which the specified key is mapped, or null if this
+     * map contains no mapping for the key.
+     *
+     * @param key the key whose associated value (and version) is to be returned
+     * @return the value (and version) to which the specified key is mapped, or null if
+     * this map contains no mapping for the key
+     */
+    Versioned<V> get(K key);
+
+    /**
+     * Associates the specified value with the specified key in this map (optional operation).
+     * If the map previously contained a mapping for the key, the old value is replaced by the
+     * specified value.
+     *
+     * @param key key with which the specified value is to be associated
+     * @param value value to be associated with the specified key
+     * @return the previous value (and version) associated with key, or null if there was
+     * no mapping for key.
+     */
+    Versioned<V> put(K key, V value);
+
+    /**
+     * Removes the mapping for a key from this map if it is present (optional operation).
+     *
+     * @param key key whose value is to be removed from the map
+     * @return the value (and version) to which this map previously associated the key,
+     * or null if the map contained no mapping for the key.
+     */
+    Versioned<V> remove(K key);
+
+    /**
+     * Removes all of the mappings from this map (optional operation).
+     * The map will be empty after this call returns.
+     */
+    void clear();
+
+    /**
+     * Returns a Set view of the keys contained in this map.
+     * This method differs from the behavior of java.util.Map.keySet() in that
+     * what is returned is a unmodifiable snapshot view of the keys in the ConsistentMap.
+     * Attempts to modify the returned set, whether direct or via its iterator,
+     * result in an UnsupportedOperationException.
+     *
+     * @return a set of the keys contained in this map
+     */
+    Set<K> keySet();
+
+    /**
+     * Returns the collection of values (and associated versions) contained in this map.
+     * This method differs from the behavior of java.util.Map.values() in that
+     * what is returned is a unmodifiable snapshot view of the values in the ConsistentMap.
+     * Attempts to modify the returned collection, whether direct or via its iterator,
+     * result in an UnsupportedOperationException.
+     *
+     * @return a collection of the values (and associated versions) contained in this map
+     */
+    Collection<Versioned<V>> values();
+
+    /**
+     * Returns the set of entries contained in this map.
+     * This method differs from the behavior of java.util.Map.entrySet() in that
+     * what is returned is a unmodifiable snapshot view of the entries in the ConsistentMap.
+     * Attempts to modify the returned set, whether direct or via its iterator,
+     * result in an UnsupportedOperationException.
+     *
+     * @return set of entries contained in this map.
+     */
+    Set<Entry<K, Versioned<V>>> entrySet();
+
+    /**
+     * If the specified key is not already associated with a value
+     * associates it with the given value and returns null, else returns the current value.
+     *
+     * @param key key with which the specified value is to be associated
+     * @param value value to be associated with the specified key
+     * @return the previous value associated with the specified key or null
+     * if key does not already mapped to a value.
+     */
+    Versioned<V> putIfAbsent(K key, V value);
+
+    /**
+     * Removes the entry for the specified key only if it is currently
+     * mapped to the specified value.
+     *
+     * @param key key with which the specified value is associated
+     * @param value value expected to be associated with the specified key
+     * @return true if the value was removed
+     */
+    boolean remove(K key, V value);
+
+    /**
+     * Removes the entry for the specified key only if its current
+     * version in the map is equal to the specified version.
+     *
+     * @param key key with which the specified version is associated
+     * @param version version expected to be associated with the specified key
+     * @return true if the value was removed
+     */
+    boolean remove(K key, long version);
+
+    /**
+     * Replaces the entry for the specified key only if currently mapped
+     * to the specified value.
+     *
+     * @param key key with which the specified value is associated
+     * @param oldValue value expected to be associated with the specified key
+     * @param newValue value to be associated with the specified key
+     * @return true if the value was replaced
+     */
+    boolean replace(K key, V oldValue, V newValue);
+
+    /**
+     * Replaces the entry for the specified key only if it is currently mapped to the
+     * specified version.
+     *
+     * @param key key key with which the specified value is associated
+     * @param oldVersion version expected to be associated with the specified key
+     * @param newValue value to be associated with the specified key
+     * @return true if the value was replaced
+     */
+    boolean replace(K key, long oldVersion, V newValue);
+
+    /**
+     * Atomically apply the specified list of updates to the map.
+     * If any of the updates cannot be applied due to a precondition
+     * violation, none of the updates will be applied and the state of
+     * the map remains unaltered.
+     *
+     * @param updates list of updates to apply atomically.
+     * @return true if the map was updated.
+     */
+    boolean batchUpdate(List<UpdateOperation<K, V>> updates);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java
new file mode 100644
index 0000000..d98e5a3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java
@@ -0,0 +1,26 @@
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Top level exception for ConsistentMap failures.
+ */
+@SuppressWarnings("serial")
+public class ConsistentMapException extends RuntimeException {
+    public ConsistentMapException() {
+    }
+
+    public ConsistentMapException(Throwable t) {
+        super(t);
+    }
+
+    /**
+     * ConsistentMap operation timeout.
+     */
+    public static class Timeout extends ConsistentMapException {
+    }
+
+    /**
+     * ConsistentMap operation interrupted.
+     */
+    public static class Interrupted extends ConsistentMapException {
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
new file mode 100644
index 0000000..1c7ef7f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
@@ -0,0 +1,234 @@
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.*;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.Set;
+
+import org.onlab.util.HexString;
+import org.onosproject.store.serializers.StoreSerializer;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * ConsistentMap implementation that is backed by a Raft consensus
+ * based database.
+ *
+ * @param <K> type of key.
+ * @param <V> type of value.
+ */
+public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
+
+    private final String name;
+    private final DatabaseProxy<String, byte[]> proxy;
+    private final StoreSerializer serializer;
+
+    private static final int OPERATION_TIMEOUT_MILLIS = 1000;
+    private static final String ERROR_NULL_KEY = "Key cannot be null";
+    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
+
+    private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
+            .softValues()
+            .build(new CacheLoader<K, String>() {
+
+                @Override
+                public String load(K key) {
+                    return HexString.toHexString(serializer.encode(key));
+                }
+            });
+
+    protected K dK(String key) {
+        return serializer.decode(HexString.fromHexString(key));
+    }
+
+    ConsistentMapImpl(String name,
+            DatabaseProxy<String, byte[]> proxy,
+            StoreSerializer serializer) {
+        this.name = checkNotNull(name, "map name cannot be null");
+        this.proxy = checkNotNull(proxy, "database proxy cannot be null");
+        this.serializer = checkNotNull(serializer, "serializer cannot be null");
+    }
+
+    @Override
+    public int size() {
+        return complete(proxy.size(name));
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return complete(proxy.isEmpty(name));
+    }
+
+    @Override
+    public boolean containsKey(K key) {
+        checkNotNull(key, ERROR_NULL_KEY);
+        return complete(proxy.containsKey(name, keyCache.getUnchecked(key)));
+    }
+
+    @Override
+    public boolean containsValue(V value) {
+        checkNotNull(value, ERROR_NULL_VALUE);
+        return complete(proxy.containsValue(name, serializer.encode(value)));
+    }
+
+    @Override
+    public Versioned<V> get(K key) {
+        checkNotNull(key, ERROR_NULL_KEY);
+        Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
+        return new Versioned<>(serializer.decode(value.value()), value.version());
+    }
+
+    @Override
+    public Versioned<V> put(K key, V value) {
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        Versioned<byte[]> previousValue = complete(proxy.get(name, keyCache.getUnchecked(key)));
+        return (previousValue != null) ?
+                new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
+
+    }
+
+    @Override
+    public Versioned<V> remove(K key) {
+        checkNotNull(key, ERROR_NULL_KEY);
+        Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
+        return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
+    }
+
+    @Override
+    public void clear() {
+        complete(proxy.clear(name));
+    }
+
+    @Override
+    public Set<K> keySet() {
+        return Collections.unmodifiableSet(complete(proxy.keySet(name))
+                .stream()
+                .map(this::dK)
+                .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public Collection<Versioned<V>> values() {
+        return Collections.unmodifiableList(complete(proxy.values(name))
+            .stream()
+            .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version()))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public Set<Entry<K, Versioned<V>>> entrySet() {
+        return Collections.unmodifiableSet(complete(proxy.entrySet(name))
+                .stream()
+                .map(this::fromRawEntry)
+                .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public Versioned<V> putIfAbsent(K key, V value) {
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        Versioned<byte[]> existingValue = complete(proxy.putIfAbsent(
+                name, keyCache.getUnchecked(key), serializer.encode(value)));
+        return (existingValue != null) ?
+                new Versioned<>(serializer.decode(existingValue.value()), existingValue.version()) : null;
+    }
+
+    @Override
+    public boolean remove(K key, V value) {
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(value, ERROR_NULL_VALUE);
+        return complete(proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value)));
+    }
+
+    @Override
+    public boolean remove(K key, long version) {
+        checkNotNull(key, ERROR_NULL_KEY);
+        return complete(proxy.remove(name, keyCache.getUnchecked(key), version));
+
+    }
+
+    @Override
+    public boolean replace(K key, V oldValue, V newValue) {
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(newValue, ERROR_NULL_VALUE);
+        byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
+        return complete(proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue)));
+    }
+
+    @Override
+    public boolean replace(K key, long oldVersion, V newValue) {
+        checkNotNull(key, ERROR_NULL_KEY);
+        checkNotNull(newValue, ERROR_NULL_VALUE);
+        return complete(proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)));
+    }
+
+    @Override
+    public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
+        checkNotNull(updates, "updates cannot be null");
+        return complete(proxy.atomicBatchUpdate(updates
+                .stream()
+                .map(this::toRawUpdateOperation)
+                .collect(Collectors.toList())));
+    }
+
+    private static <T> T complete(CompletableFuture<T> future) {
+        try {
+            return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ConsistentMapException.Interrupted();
+        } catch (TimeoutException e) {
+            throw new ConsistentMapException.Timeout();
+        } catch (ExecutionException e) {
+            throw new ConsistentMapException(e.getCause());
+        }
+    }
+
+    private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
+        return new AbstractMap.SimpleEntry<>(
+                dK(e.getKey()),
+                new Versioned<>(
+                        serializer.decode(e.getValue().value()),
+                        e.getValue().version()));
+    }
+
+    private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
+
+        checkArgument(name.equals(update.tableName()), "Unexpected table name");
+
+        UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
+
+        rawUpdate = rawUpdate.withKey(keyCache.getUnchecked(update.key()))
+            .withCurrentVersion(update.currentVersion())
+            .withType(update.type());
+
+        rawUpdate = rawUpdate.withTableName(update.tableName());
+
+        if (update.value() != null) {
+            rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
+        } else {
+            checkState(update.type() == UpdateOperation.Type.REMOVE
+                    || update.type() == UpdateOperation.Type.REMOVE_IF_VERSION_MATCH,
+                    ERROR_NULL_VALUE);
+        }
+
+        if (update.currentValue() != null) {
+            rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
+        }
+
+        return rawUpdate.build();
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java
new file mode 100644
index 0000000..f2d5269
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java
@@ -0,0 +1,125 @@
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onlab.packet.IpAddress;
+import org.slf4j.Logger;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+
+/**
+ * Allows for reading and writing partitioned database definition as a JSON file.
+ */
+public class DatabaseDefinitionStore {
+
+    private final Logger log = getLogger(getClass());
+
+    private final File definitionfile;
+
+    /**
+     * Creates a reader/writer of the database definition file.
+     *
+     * @param filePath location of the definition file
+     */
+    public DatabaseDefinitionStore(String filePath) {
+        definitionfile = new File(filePath);
+    }
+
+    /**
+     * Creates a reader/writer of the database definition file.
+     *
+     * @param filePath location of the definition file
+     */
+    public DatabaseDefinitionStore(File filePath) {
+        definitionfile = checkNotNull(filePath);
+    }
+
+    /**
+     * Returns the Map from database partition name to set of initial active member nodes.
+     *
+     * @return Map from partition name to set of active member nodes
+     * @throws IOException when I/O exception of some sort has occurred.
+     */
+    public Map<String, Set<DefaultControllerNode>> read() throws IOException {
+
+        final Map<String, Set<DefaultControllerNode>> partitions = Maps.newHashMap();
+
+        final ObjectMapper mapper = new ObjectMapper();
+        final ObjectNode tabletNodes = (ObjectNode) mapper.readTree(definitionfile);
+        final Iterator<Entry<String, JsonNode>> fields = tabletNodes.fields();
+        while (fields.hasNext()) {
+            final Entry<String, JsonNode> next = fields.next();
+            final Set<DefaultControllerNode> nodes = new HashSet<>();
+            final Iterator<JsonNode> elements = next.getValue().elements();
+            while (elements.hasNext()) {
+                ObjectNode nodeDef = (ObjectNode) elements.next();
+                nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
+                                                    IpAddress.valueOf(nodeDef.get("ip").asText()),
+                                                    nodeDef.get("tcpPort").asInt(DatabaseManager.COPYCAT_TCP_PORT)));
+            }
+
+            partitions.put(next.getKey(), nodes);
+        }
+        return partitions;
+    }
+
+    /**
+     * Updates the Map from database partition name to set of member nodes.
+     *
+     * @param partitionName name of the database partition to update
+     * @param nodes set of initial member nodes
+     * @throws IOException when I/O exception of some sort has occurred.
+     */
+    public void write(String partitionName, Set<DefaultControllerNode> nodes) throws IOException {
+        checkNotNull(partitionName);
+        checkArgument(partitionName.isEmpty(), "Partition name cannot be empty");
+
+        // load current
+        Map<String, Set<DefaultControllerNode>> config;
+        try {
+            config = read();
+        } catch (IOException e) {
+            log.info("Reading partition config failed, assuming empty definition.");
+            config = new HashMap<>();
+        }
+        // update with specified
+        config.put(partitionName, nodes);
+
+        // write back to file
+        final ObjectMapper mapper = new ObjectMapper();
+        final ObjectNode partitionNodes = mapper.createObjectNode();
+        for (Entry<String, Set<DefaultControllerNode>> tablet : config.entrySet()) {
+            ArrayNode nodeDefs = mapper.createArrayNode();
+            partitionNodes.set(tablet.getKey(), nodeDefs);
+
+            for (DefaultControllerNode node : tablet.getValue()) {
+                ObjectNode nodeDef = mapper.createObjectNode();
+                nodeDef.put("id", node.id().toString())
+                       .put("ip", node.ip().toString())
+                       .put("tcpPort", node.tcpPort());
+                nodeDefs.add(nodeDef);
+            }
+        }
+        mapper.writeTree(new JsonFactory().createGenerator(definitionfile, JsonEncoding.UTF8),
+                         partitionNodes);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
new file mode 100644
index 0000000..0fcb895
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -0,0 +1,122 @@
+package org.onosproject.store.consistent.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.log.FileLog;
+import net.kuujo.copycat.netty.NettyTcpProtocol;
+import net.kuujo.copycat.protocol.Consistency;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.store.serializers.StoreSerializer;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Database manager.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class DatabaseManager implements DatabaseService {
+
+    private final Logger log = getLogger(getClass());
+    private PartitionedDatabase partitionedDatabase;
+    public static final int COPYCAT_TCP_PORT = 7238; //  7238 = RAFT
+    private static final String CONFIG_DIR = "../config";
+    private static final String PARTITION_DEFINITION_FILE = "tablets.json";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    protected String nodeToUri(ControllerNode node) {
+        return "tcp://" + node.ip() + ":" + COPYCAT_TCP_PORT;
+    }
+
+    @Activate
+    public void activate() {
+
+        final String logDir = System.getProperty("karaf.data", "./data");
+
+        // load database configuration
+        File file = new File(CONFIG_DIR, PARTITION_DEFINITION_FILE);
+        log.info("Loading database definition: {}", file.getAbsolutePath());
+
+        DatabaseDefinitionStore databaseDef = new DatabaseDefinitionStore(file);
+        Map<String, Set<DefaultControllerNode>> partitionMap;
+        try {
+            partitionMap = databaseDef.read();
+        } catch (IOException e) {
+            log.error("Failed to load database config {}", file);
+            throw new IllegalStateException("Failed to load database config", e);
+        }
+
+        String[] activeNodeUris = partitionMap.values()
+                    .stream()
+                    .reduce((s1, s2) -> Sets.union(s1, s2))
+                    .get()
+                    .stream()
+                    .map(this::nodeToUri)
+                    .toArray(String[]::new);
+
+        String localNodeUri = nodeToUri(clusterService.getLocalNode());
+
+        ClusterConfig clusterConfig = new ClusterConfig()
+            .withProtocol(new NettyTcpProtocol())
+            .withMembers(activeNodeUris)
+            .withLocalMember(localNodeUri);
+
+        PartitionedDatabaseConfig databaseConfig = new PartitionedDatabaseConfig();
+
+        partitionMap.forEach((name, nodes) -> {
+            Set<String> replicas = nodes.stream().map(this::nodeToUri).collect(Collectors.toSet());
+            DatabaseConfig partitionConfig = new DatabaseConfig()
+                            .withConsistency(Consistency.STRONG)
+                            .withLog(new FileLog(logDir))
+                            .withReplicas(replicas);
+            databaseConfig.addPartition(name, partitionConfig);
+        });
+
+        partitionedDatabase = PartitionedDatabaseManager.create("onos-store", clusterConfig, databaseConfig);
+
+        partitionedDatabase.open().whenComplete((db, error) -> {
+            if (error != null) {
+                log.warn("Failed to open database.", error);
+            } else {
+                log.info("Successfully opened database.");
+            }
+        });
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        partitionedDatabase.close().whenComplete((result, error) -> {
+            if (error != null) {
+                log.warn("Failed to cleanly close database.", error);
+            } else {
+                log.info("Successfully closed database.");
+            }
+        });
+        log.info("Stopped");
+    }
+
+    @Override
+    public <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer) {
+        return new ConsistentMapImpl<K, V>(name, partitionedDatabase, serializer);
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java
new file mode 100644
index 0000000..ef4ece7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java
@@ -0,0 +1,30 @@
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Map;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.hash.Hashing;
+
+/**
+ * Partitioner for mapping table entries to individual database partitions.
+ * <p>
+ * By default a md5 hash of the hash key (key or table name) is used to pick a
+ * partition.
+ */
+public abstract class DatabasePartitioner implements Partitioner<String> {
+    // Database partitions sorted by their partition name.
+    protected final Database[] sortedPartitions;
+
+    public DatabasePartitioner(Map<String, Database> partitionMap) {
+        checkState(partitionMap != null && !partitionMap.isEmpty(), "Partition map cannot be null or empty");
+        sortedPartitions = ImmutableSortedMap.<String, Database>copyOf(partitionMap).values().toArray(new Database[]{});
+    }
+
+    protected int hash(String key) {
+        return Math.abs(Hashing.md5().newHasher().putBytes(key.getBytes(Charsets.UTF_8)).hash().asInt());
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java
new file mode 100644
index 0000000..6f2b655
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java
@@ -0,0 +1,18 @@
+package org.onosproject.store.consistent.impl;
+
+import org.onosproject.store.serializers.StoreSerializer;
+
+/**
+ * Database service.
+ */
+public interface DatabaseService {
+
+    /**
+     * Creates a ConsistentMap.
+     *
+     * @param name map name
+     * @param serializer serializer to use for serializing keys and values.
+     * @return consistent map.
+     */
+    <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer);
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index d35aca2..8cf1703 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -9,8 +9,6 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -188,8 +186,7 @@
                                                         .values()
                                                         .stream()
                                                         .map(Database::open)
-                                                        .collect(Collectors.toList())
-                                                        .toArray(new CompletableFuture[partitions.size()]))
+                                                        .toArray(CompletableFuture[]::new))
                                  .thenApply(v -> this));
 
     }
@@ -200,8 +197,7 @@
                 .values()
                 .stream()
                 .map(database -> database.close())
-                .collect(Collectors.toList())
-                .toArray(new CompletableFuture[partitions.size()]));
+                .toArray(CompletableFuture[]::new));
         CompletableFuture<Void> closeCoordinator = coordinator.close();
         return closePartitions.thenCompose(v -> closeCoordinator);
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
index 6d375cc..a305a11 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
@@ -24,7 +24,7 @@
      * @param config partition config
      * @return this instance
      */
-    public PartitionedDatabaseConfig withPartition(String name, DatabaseConfig config) {
+    public PartitionedDatabaseConfig addPartition(String name, DatabaseConfig config) {
         partitions.put(name, config);
         return this;
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
index 44cd3a1..81f00de 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
@@ -10,6 +10,9 @@
 import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
 import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
 
+/**
+ * Manages a PartitionedDatabase.
+ */
 public interface PartitionedDatabaseManager {
     /**
      * Opens the database.
@@ -73,7 +76,7 @@
                         .withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
                         .withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
         partitionedDatabase.setPartitioner(
-                new SimpleKeyHashPartitioner<>(partitionedDatabase.getRegisteredPartitions()));
+                new SimpleKeyHashPartitioner(partitionedDatabase.getRegisteredPartitions()));
         return partitionedDatabase;
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
index 5410f9f..13a4ce9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
@@ -1,31 +1,22 @@
 package org.onosproject.store.consistent.impl;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
 /**
- * A simple Partitioner that uses the key hashCode to map
- * key to a partition.
+ * A simple Partitioner for mapping keys to database partitions.
+ * <p>
+ * This class uses a md5 hash based hashing scheme for hashing the key to
+ * a partition.
  *
- * @param <K> key type.
  */
-public class SimpleKeyHashPartitioner<K> implements Partitioner<K> {
-
-    private final Map<String, Database> partitionMap;
-    private final List<String> sortedPartitionNames;
+public class SimpleKeyHashPartitioner extends DatabasePartitioner {
 
     public SimpleKeyHashPartitioner(Map<String, Database> partitionMap) {
-        this.partitionMap = ImmutableMap.copyOf(partitionMap);
-        sortedPartitionNames = Lists.newArrayList(this.partitionMap.keySet());
-        Collections.sort(sortedPartitionNames);
+        super(partitionMap);
     }
 
     @Override
-    public Database getPartition(String tableName, K key) {
-        return partitionMap.get(sortedPartitionNames.get(Math.abs(key.hashCode()) % partitionMap.size()));
+    public Database getPartition(String tableName, String key) {
+        return sortedPartitions[hash(key) % sortedPartitions.length];
     }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
new file mode 100644
index 0000000..367e3f2
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
@@ -0,0 +1,23 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Map;
+
+/**
+ * A simple Partitioner that uses the table name hash to
+ * pick a partition.
+ * <p>
+ * This class uses a md5 hash based hashing scheme for hashing the table name to
+ * a partition. This partitioner maps all keys for a table to the same database
+ * partition.
+ */
+public class SimpleTableHashPartitioner extends DatabasePartitioner {
+
+    public SimpleTableHashPartitioner(Map<String, Database> partitionMap) {
+        super(partitionMap);
+    }
+
+    @Override
+    public Database getPartition(String tableName, String key) {
+        return sortedPartitions[hash(tableName) % sortedPartitions.length];
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
index 9d52a09..11df520 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
@@ -1,5 +1,7 @@
 package org.onosproject.store.consistent.impl;
 
+import static com.google.common.base.Preconditions.*;
+
 import com.google.common.base.MoreObjects;
 
 /**
@@ -28,7 +30,7 @@
     private K key;
     private V value;
     private V currentValue;
-    private long currentVersion;
+    private long currentVersion = -1;
 
     /**
      * Returns the type of update operation.
@@ -91,6 +93,17 @@
     }
 
     /**
+     * Creates a new builder instance.
+     * @param <K> key type.
+     * @param <V> value type.
+     *
+     * @return builder.
+     */
+    public static <K, V> Builder<K, V> newBuilder() {
+        return new Builder<>();
+    }
+
+    /**
      * UpdatOperation builder.
      *
      * @param <K> key type.
@@ -100,52 +113,70 @@
 
         private UpdateOperation<K, V> operation = new UpdateOperation<>();
 
-        /**
-         * Creates a new builder instance.
-         * @param <K> key type.
-         * @param <V> value type.
-         *
-         * @return builder.
-         */
-        public static <K, V> Builder<K, V> builder() {
-            return new Builder<>();
-        }
-
-        private Builder() {
-        }
-
         public UpdateOperation<K, V> build() {
+            validateInputs();
             return operation;
         }
 
         public Builder<K, V> withType(Type type) {
-            operation.type = type;
+            operation.type = checkNotNull(type, "type cannot be null");
             return this;
         }
 
         public Builder<K, V> withTableName(String tableName) {
-            operation.tableName = tableName;
+            operation.tableName = checkNotNull(tableName, "tableName cannot be null");
             return this;
         }
 
         public Builder<K, V> withKey(K key) {
-            operation.key = key;
+            operation.key = checkNotNull(key, "key cannot be null");
             return this;
         }
 
         public Builder<K, V> withCurrentValue(V value) {
-            operation.currentValue = value;
+            operation.currentValue = checkNotNull(value, "currentValue cannot be null");
             return this;
         }
 
         public Builder<K, V> withValue(V value) {
-            operation.value = value;
+            operation.value = checkNotNull(value, "value cannot be null");
             return this;
         }
 
         public Builder<K, V> withCurrentVersion(long version) {
+            checkArgument(version >= 0, "version cannot be negative");
             operation.currentVersion = version;
             return this;
         }
+
+        private void validateInputs() {
+            checkNotNull(operation.type, "type must be specified");
+            checkNotNull(operation.tableName, "table name must be specified");
+            checkNotNull(operation.key, "key must be specified");
+            switch (operation.type) {
+            case PUT:
+            case PUT_IF_ABSENT:
+                checkNotNull(operation.value, "value must be specified.");
+                break;
+            case PUT_IF_VERSION_MATCH:
+                checkNotNull(operation.value, "value must be specified.");
+                checkState(operation.currentVersion >= 0, "current version must be specified");
+                break;
+            case PUT_IF_VALUE_MATCH:
+                checkNotNull(operation.value, "value must be specified.");
+                checkNotNull(operation.currentValue, "currentValue must be specified.");
+                break;
+            case REMOVE:
+                break;
+            case REMOVE_IF_VERSION_MATCH:
+                checkState(operation.currentVersion >= 0, "current version must be specified");
+                break;
+            case REMOVE_IF_VALUE_MATCH:
+                checkNotNull(operation.currentValue, "currentValue must be specified.");
+                break;
+            default:
+                throw new IllegalStateException("Unknown operation type");
+            }
+        }
     }
 }
diff --git a/utils/thirdparty/pom.xml b/utils/thirdparty/pom.xml
index 61adc1a..b79b7d6 100644
--- a/utils/thirdparty/pom.xml
+++ b/utils/thirdparty/pom.xml
@@ -100,7 +100,7 @@
             <filter>
               <artifact>org.onosproject:copycat*</artifact>
               <includes>
-                <include>net/kuujo/copycat/**</include>
+                <include>**</include>
               </includes>
             </filter>