WIP: Partitioned Database based on Raft.
Removed the implementation based on previous Copycat API.

Change-Id: I6b9d67e943e17095f585ae2a2cb6304c248cd686
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
index fb19d4d..8f3068c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
@@ -39,7 +39,6 @@
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.service.Lock;
 import org.onosproject.store.service.LockService;
-import org.onosproject.store.service.impl.DistributedLockManager;
 import org.slf4j.Logger;
 
 import java.util.Map;
@@ -62,9 +61,7 @@
 
     private final Logger log = getLogger(getClass());
 
-    // TODO: Remove this dependency
-    private static final int TERM_DURATION_MS =
-            DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
+    private static final int TERM_DURATION_MS = 2000;
 
     // Time to wait before retrying leadership after
     // a unexpected error.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
new file mode 100644
index 0000000..cc1936a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
@@ -0,0 +1,69 @@
+package org.onosproject.store.consistent.impl;
+
+
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
+import net.kuujo.copycat.resource.Resource;
+
+/**
+ * Database.
+ */
+public interface Database extends DatabaseProxy<String, byte[]>, Resource<Database> {
+
+  /**
+   * Creates a new database with the default cluster configuration.<p>
+   *
+   * The database will be constructed with the default cluster configuration. The default cluster configuration
+   * searches for two resources on the classpath - {@code cluster} and {cluster-defaults} - in that order. Configuration
+   * options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.<p>
+   *
+   * Additionally, the database will be constructed with an database configuration that searches the classpath for
+   * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
+   * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
+   * as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
+   * configurations will be loaded according to namespaces as well; for example, `databases.conf`.
+   *
+   * @param name The database name.
+   * @return The database.
+   */
+  static Database create(String name) {
+    return create(name, new ClusterConfig(), new DatabaseConfig());
+  }
+
+  /**
+   * Creates a new database.<p>
+   *
+   * The database will be constructed with an database configuration that searches the classpath for
+   * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
+   * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
+   * as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
+   * configurations will be loaded according to namespaces as well; for example, `databases.conf`.
+   *
+   * @param name The database name.
+   * @param cluster The cluster configuration.
+   * @return The database.
+   */
+  static Database create(String name, ClusterConfig cluster) {
+    return create(name, cluster, new DatabaseConfig());
+  }
+
+  /**
+   * Creates a new database.
+   *
+   * @param name The database name.
+   * @param cluster The cluster configuration.
+   * @param config The database configuration.
+
+   * @return The database.
+   */
+  static Database create(String name, ClusterConfig cluster, DatabaseConfig config) {
+    ClusterCoordinator coordinator =
+            new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster));
+    return coordinator.<Database>getResource(name, config.resolve(cluster))
+      .addStartupTask(() -> coordinator.open().thenApply(v -> null))
+      .addShutdownTask(coordinator::close);
+  }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
new file mode 100644
index 0000000..eda1e99
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
@@ -0,0 +1,108 @@
+package org.onosproject.store.consistent.impl;
+
+import com.typesafe.config.ConfigValueFactory;
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
+import net.kuujo.copycat.protocol.Consistency;
+import net.kuujo.copycat.resource.ResourceConfig;
+import net.kuujo.copycat.state.StateLogConfig;
+import net.kuujo.copycat.util.internal.Assert;
+
+import java.util.Map;
+
+/**
+ * Database configuration.
+ *
+ */
+public class DatabaseConfig extends ResourceConfig<DatabaseConfig> {
+  private static final String DATABASE_CONSISTENCY = "consistency";
+
+  private static final String DEFAULT_CONFIGURATION = "database-defaults";
+  private static final String CONFIGURATION = "database";
+
+  public DatabaseConfig() {
+    super(CONFIGURATION, DEFAULT_CONFIGURATION);
+  }
+
+  public DatabaseConfig(Map<String, Object> config) {
+    super(config, CONFIGURATION, DEFAULT_CONFIGURATION);
+  }
+
+  public DatabaseConfig(String resource) {
+    super(resource, CONFIGURATION, DEFAULT_CONFIGURATION);
+  }
+
+  protected DatabaseConfig(DatabaseConfig config) {
+    super(config);
+  }
+
+  @Override
+  public DatabaseConfig copy() {
+    return new DatabaseConfig(this);
+  }
+
+  /**
+   * Sets the database read consistency.
+   *
+   * @param consistency The database read consistency.
+   * @throws java.lang.NullPointerException If the consistency is {@code null}
+   */
+  public void setConsistency(String consistency) {
+    this.config = config.withValue(DATABASE_CONSISTENCY,
+            ConfigValueFactory.fromAnyRef(
+                    Consistency.parse(Assert.isNotNull(consistency, "consistency")).toString()));
+  }
+
+  /**
+   * Sets the database read consistency.
+   *
+   * @param consistency The database read consistency.
+   * @throws java.lang.NullPointerException If the consistency is {@code null}
+   */
+  public void setConsistency(Consistency consistency) {
+    this.config = config.withValue(DATABASE_CONSISTENCY,
+            ConfigValueFactory.fromAnyRef(
+                    Assert.isNotNull(consistency, "consistency").toString()));
+  }
+
+  /**
+   * Returns the database read consistency.
+   *
+   * @return The database read consistency.
+   */
+  public Consistency getConsistency() {
+    return Consistency.parse(config.getString(DATABASE_CONSISTENCY));
+  }
+
+  /**
+   * Sets the database read consistency, returning the configuration for method chaining.
+   *
+   * @param consistency The database read consistency.
+   * @return The database configuration.
+   * @throws java.lang.NullPointerException If the consistency is {@code null}
+   */
+  public DatabaseConfig withConsistency(String consistency) {
+    setConsistency(consistency);
+    return this;
+  }
+
+  /**
+   * Sets the database read consistency, returning the configuration for method chaining.
+   *
+   * @param consistency The database read consistency.
+   * @return The database configuration.
+   * @throws java.lang.NullPointerException If the consistency is {@code null}
+   */
+  public DatabaseConfig withConsistency(Consistency consistency) {
+    setConsistency(consistency);
+    return this;
+  }
+
+  @Override
+  public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
+    return new StateLogConfig(toMap())
+      .resolve(cluster)
+      .withResourceType(DefaultDatabase.class);
+  }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
new file mode 100644
index 0000000..f83e8f8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -0,0 +1,168 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Database proxy.
+ */
+public interface DatabaseProxy<K, V> {
+
+  /**
+   * Gets the table size.
+   *
+   * @param tableName table name
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Integer> size(String tableName);
+
+  /**
+   * Checks whether the table is empty.
+   *
+   * @param tableName table name
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> isEmpty(String tableName);
+
+  /**
+   * Checks whether the table contains a key.
+   *
+   * @param tableName table name
+   * @param key The key to check.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> containsKey(String tableName, K key);
+
+  /**
+   * Checks whether the table contains a value.
+   *
+   * @param tableName table name
+   * @param value The value to check.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> containsValue(String tableName, V value);
+
+  /**
+   * Gets a value from the table.
+   *
+   * @param tableName table name
+   * @param key The key to get.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Versioned<V>> get(String tableName, K key);
+
+  /**
+   * Puts a value in the table.
+   *
+   * @param tableName table name
+   * @param key The key to set.
+   * @param value The value to set.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Versioned<V>> put(String tableName, K key, V value);
+
+  /**
+   * Removes a value from the table.
+   *
+   * @param tableName table name
+   * @param key The key to remove.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Versioned<V>> remove(String tableName, K key);
+
+  /**
+   * Clears the table.
+   *
+   * @param tableName table name
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Void> clear(String tableName);
+
+  /**
+   * Gets a set of keys in the table.
+   *
+   * @param tableName table name
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Set<K>> keySet(String tableName);
+
+  /**
+   * Gets a collection of values in the table.
+   *
+   * @param tableName table name
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Collection<Versioned<V>>> values(String tableName);
+
+  /**
+   * Gets a set of entries in the table.
+   *
+   * @param tableName table name
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet(String tableName);
+
+  /**
+   * Puts a value in the table if the given key does not exist.
+   *
+   * @param tableName table name
+   * @param key The key to set.
+   * @param value The value to set if the given key does not exist.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Versioned<V>> putIfAbsent(String tableName, K key, V value);
+
+  /**
+   * Removes a key and if the existing value for that key matches the specified value.
+   *
+   * @param tableName table name
+   * @param key The key to remove.
+   * @param value The value to remove.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> remove(String tableName, K key, V value);
+
+  /**
+   * Removes a key and if the existing version for that key matches the specified version.
+   *
+   * @param tableName table name
+   * @param key The key to remove.
+   * @param version The expected version.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> remove(String tableName, K key, long version);
+
+  /**
+   * Replaces the entry for the specified key only if currently mapped to the specified value.
+   *
+   * @param tableName table name
+   * @param key The key to replace.
+   * @param oldValue The value to replace.
+   * @param newValue The value with which to replace the given key and value.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> replace(String tableName, K key, V oldValue, V newValue);
+
+  /**
+   * Replaces the entry for the specified key only if currently mapped to the specified version.
+   *
+   * @param tableName table name
+   * @param key The key to update
+   * @param oldVersion existing version in the map for this replace to succeed.
+   * @param newValue The value with which to replace the given key and version.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
+
+  /**
+   * Perform a atomic batch update operation i.e. either all operations in batch succeed or
+   * none do and no state changes are made.
+   *
+   * @param updates list of updates to apply atomically.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<K, V>> updates);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
new file mode 100644
index 0000000..89a51e8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -0,0 +1,77 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.kuujo.copycat.state.Command;
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.Query;
+import net.kuujo.copycat.state.StateContext;
+
+/**
+ * Database state.
+ *
+ */
+public interface DatabaseState<K, V> {
+
+  /**
+   * Initializes the database state.
+   *
+   * @param context The map state context.
+   */
+  @Initializer
+  public void init(StateContext<DatabaseState<K, V>> context);
+
+  @Query
+  int size(String tableName);
+
+  @Query
+  boolean isEmpty(String tableName);
+
+  @Query
+  boolean containsKey(String tableName, K key);
+
+  @Query
+  boolean containsValue(String tableName, V value);
+
+  @Query
+  Versioned<V> get(String tableName, K key);
+
+  @Command
+  Versioned<V> put(String tableName, K key, V value);
+
+  @Command
+  Versioned<V> remove(String tableName, K key);
+
+  @Command
+  void clear(String tableName);
+
+  @Query
+  Set<K> keySet(String tableName);
+
+  @Query
+  Collection<Versioned<V>> values(String tableName);
+
+  @Query
+  Set<Entry<K, Versioned<V>>> entrySet(String tableName);
+
+  @Command
+  Versioned<V> putIfAbsent(String tableName, K key, V value);
+
+  @Command
+  boolean remove(String tableName, K key, V value);
+
+  @Command
+  boolean remove(String tableName, K key, long version);
+
+  @Command
+  boolean replace(String tableName, K key, V oldValue, V newValue);
+
+  @Command
+  boolean replace(String tableName, K key, long oldVersion, V newValue);
+
+  @Command
+  boolean batchUpdate(List<UpdateOperation<K, V>> updates);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
new file mode 100644
index 0000000..837f3b4
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -0,0 +1,146 @@
+package org.onosproject.store.consistent.impl;
+
+import net.kuujo.copycat.resource.internal.ResourceContext;
+import net.kuujo.copycat.state.StateMachine;
+import net.kuujo.copycat.resource.internal.AbstractResource;
+import net.kuujo.copycat.state.internal.DefaultStateMachine;
+import net.kuujo.copycat.util.concurrent.Futures;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+/**
+ * Default database.
+ */
+public class DefaultDatabase extends AbstractResource<Database> implements Database {
+  private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
+  private DatabaseProxy<String, byte[]> proxy;
+
+  @SuppressWarnings("unchecked")
+  public DefaultDatabase(ResourceContext context) {
+    super(context);
+    this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
+  }
+
+  /**
+   * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
+   * return the completed future result.
+   *
+   * @param supplier The supplier to call if the database is open.
+   * @param <T> The future result type.
+   * @return A completable future that if this database is closed is immediately failed.
+   */
+  protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
+    if (proxy == null) {
+      return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
+    }
+    return supplier.get();
+  }
+
+  @Override
+  public CompletableFuture<Integer> size(String tableName) {
+    return checkOpen(() -> proxy.size(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isEmpty(String tableName) {
+    return checkOpen(() -> proxy.isEmpty(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> containsKey(String tableName, String key) {
+    return checkOpen(() -> proxy.containsKey(tableName, key));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
+    return checkOpen(() -> proxy.containsValue(tableName, value));
+  }
+
+  @Override
+  public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
+    return checkOpen(() -> proxy.get(tableName, key));
+  }
+
+  @Override
+  public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+    return checkOpen(() -> proxy.put(tableName, key, value));
+  }
+
+  @Override
+  public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+    return checkOpen(() -> proxy.remove(tableName, key));
+  }
+
+  @Override
+  public CompletableFuture<Void> clear(String tableName) {
+    return checkOpen(() -> proxy.clear(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Set<String>> keySet(String tableName) {
+    return checkOpen(() -> proxy.keySet(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
+    return checkOpen(() -> proxy.values(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
+    return checkOpen(() -> proxy.entrySet(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+    return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+    return checkOpen(() -> proxy.remove(tableName, key, value));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+    return checkOpen(() -> proxy.remove(tableName, key, version));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+    return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+    return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
+      return checkOpen(() -> proxy.atomicBatchUpdate(updates));
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public synchronized CompletableFuture<Database> open() {
+    return runStartupTasks()
+      .thenCompose(v -> stateMachine.open())
+      .thenRun(() -> {
+        this.proxy = stateMachine.createProxy(DatabaseProxy.class);
+      })
+      .thenApply(v -> null);
+  }
+
+  @Override
+  public synchronized CompletableFuture<Void> close() {
+    proxy = null;
+    return stateMachine.close()
+      .thenCompose(v -> runShutdownTasks());
+  }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
new file mode 100644
index 0000000..2b20f53
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -0,0 +1,217 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.StateContext;
+
+/**
+ * Default database state.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
+
+    private Long nextVersion;
+    private Map<String, Map<K, Versioned<V>>> tables;
+
+    @Initializer
+    @Override
+    public void init(StateContext<DatabaseState<K, V>> context) {
+        tables = context.get("tables");
+        if (tables == null) {
+            tables = new HashMap<>();
+            context.put("tables", tables);
+        }
+        nextVersion = context.get("nextVersion");
+        if (nextVersion == null) {
+            nextVersion = new Long(0);
+            context.put("nextVersion", nextVersion);
+        }
+    }
+
+    private Map<K, Versioned<V>> getTableMap(String tableName) {
+        Map<K, Versioned<V>> table = tables.get(tableName);
+        if (table == null) {
+            table = new HashMap<>();
+            tables.put(tableName, table);
+        }
+        return table;
+    }
+
+    @Override
+    public int size(String tableName) {
+      return getTableMap(tableName).size();
+    }
+
+    @Override
+    public boolean isEmpty(String tableName) {
+        return getTableMap(tableName).isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(String tableName, K key) {
+        return getTableMap(tableName).containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(String tableName, V value) {
+        return getTableMap(tableName).values().stream().anyMatch(v -> checkEquality(v.value(), value));
+    }
+
+    @Override
+    public Versioned<V> get(String tableName, K key) {
+        return getTableMap(tableName).get(key);
+    }
+
+    @Override
+    public Versioned<V> put(String tableName, K key, V value) {
+        return getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion));
+    }
+
+    @Override
+    public Versioned<V> remove(String tableName, K key) {
+        return getTableMap(tableName).remove(key);
+    }
+
+    @Override
+    public void clear(String tableName) {
+        getTableMap(tableName).clear();
+    }
+
+    @Override
+    public Set<K> keySet(String tableName) {
+        return getTableMap(tableName).keySet();
+    }
+
+    @Override
+    public Collection<Versioned<V>> values(String tableName) {
+        return getTableMap(tableName).values();
+    }
+
+    @Override
+    public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
+        return getTableMap(tableName).entrySet();
+    }
+
+    @Override
+    public Versioned<V> putIfAbsent(String tableName, K key, V value) {
+        Versioned<V> existingValue = getTableMap(tableName).get(key);
+        return existingValue != null ? existingValue : put(tableName, key, value);
+    }
+
+    @Override
+    public boolean remove(String tableName, K key, V value) {
+        Versioned<V> existing = getTableMap(tableName).get(key);
+        if (existing != null && existing.value().equals(value)) {
+            getTableMap(tableName).remove(key);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean remove(String tableName, K key, long version) {
+        Versioned<V> existing = getTableMap(tableName).get(key);
+        if (existing != null && existing.version() == version) {
+            remove(tableName, key);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean replace(String tableName, K key, V oldValue, V newValue) {
+        Versioned<V> existing = getTableMap(tableName).get(key);
+        if (existing != null && existing.value().equals(oldValue)) {
+            put(tableName, key, newValue);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean replace(String tableName, K key, long oldVersion, V newValue) {
+        Versioned<V> existing = getTableMap(tableName).get(key);
+        if (existing != null && existing.version() == oldVersion) {
+            put(tableName, key, newValue);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
+        if (updates.stream().anyMatch(update -> !checkIfUpdateIsPossible(update))) {
+            return false;
+        } else {
+            updates.stream().forEach(this::doUpdate);
+            return true;
+        }
+    }
+
+    private void doUpdate(UpdateOperation<K, V> update) {
+        String tableName = update.tableName();
+        K key = update.key();
+        switch (update.type()) {
+        case PUT:
+            put(tableName, key, update.value());
+            return;
+        case REMOVE:
+            remove(tableName, key);
+            return;
+        case PUT_IF_ABSENT:
+            putIfAbsent(tableName, key, update.value());
+            return;
+        case PUT_IF_VERSION_MATCH:
+            replace(tableName, key, update.currentValue(), update.value());
+            return;
+        case PUT_IF_VALUE_MATCH:
+            replace(tableName, key, update.currentVersion(), update.value());
+            return;
+        case REMOVE_IF_VERSION_MATCH:
+            remove(tableName, key, update.currentVersion());
+            return;
+        case REMOVE_IF_VALUE_MATCH:
+            remove(tableName, key, update.currentValue());
+            return;
+        default:
+            throw new IllegalStateException("Unsupported type: " + update.type());
+        }
+    }
+
+    private boolean checkIfUpdateIsPossible(UpdateOperation<K, V> update) {
+        Versioned<V> existingEntry = get(update.tableName(), update.key());
+        switch (update.type()) {
+        case PUT:
+        case REMOVE:
+            return true;
+        case PUT_IF_ABSENT:
+            return existingEntry == null;
+        case PUT_IF_VERSION_MATCH:
+            return existingEntry != null && existingEntry.version() == update.currentVersion();
+        case PUT_IF_VALUE_MATCH:
+            return existingEntry != null && existingEntry.value().equals(update.currentValue());
+        case REMOVE_IF_VERSION_MATCH:
+            return existingEntry == null || existingEntry.version() == update.currentVersion();
+        case REMOVE_IF_VALUE_MATCH:
+            return existingEntry == null || existingEntry.value().equals(update.currentValue());
+        default:
+            throw new IllegalStateException("Unsupported type: " + update.type());
+        }
+    }
+
+    private boolean checkEquality(V value1, V value2) {
+        if (value1 instanceof byte[]) {
+            return Arrays.equals((byte[]) value1, (byte[]) value2);
+        }
+        return value1.equals(value2);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
new file mode 100644
index 0000000..d35aca2
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -0,0 +1,208 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+
+/**
+ * A database that partitions the keys across one or more database partitions.
+ */
+public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
+
+    private Partitioner<String> partitioner;
+    private final ClusterCoordinator coordinator;
+    private final Map<String, Database> partitions = Maps.newConcurrentMap();
+
+    protected PartitionedDatabase(ClusterCoordinator coordinator) {
+        this.coordinator = coordinator;
+    }
+
+    @Override
+    public void registerPartition(String name, Database partition) {
+        partitions.put(name, partition);
+    }
+
+    @Override
+    public Map<String, Database> getRegisteredPartitions() {
+        return ImmutableMap.copyOf(partitions);
+    }
+
+    @Override
+    public CompletableFuture<Integer> size(String tableName) {
+        AtomicInteger totalSize = new AtomicInteger(0);
+        return CompletableFuture.allOf(partitions
+                    .values()
+                    .stream()
+                    .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> totalSize.get());
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isEmpty(String tableName) {
+        return size(tableName).thenApply(size -> size == 0);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsKey(String tableName, String key) {
+        return partitioner.getPartition(tableName, key).containsKey(tableName, key);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
+        AtomicBoolean containsValue = new AtomicBoolean(false);
+        return CompletableFuture.allOf(partitions
+                    .values()
+                    .stream()
+                    .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> containsValue.get());
+    }
+
+    @Override
+    public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
+        return partitioner.getPartition(tableName, key).get(tableName, key);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+        return partitioner.getPartition(tableName, key).put(tableName, key, value);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+        return partitioner.getPartition(tableName, key).remove(tableName, key);
+    }
+
+    @Override
+    public CompletableFuture<Void> clear(String tableName) {
+        return CompletableFuture.allOf(partitions
+                    .values()
+                    .stream()
+                    .map(p -> p.clear(tableName))
+                    .toArray(CompletableFuture[]::new));
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> keySet(String tableName) {
+        Set<String> keySet = Sets.newConcurrentHashSet();
+        return CompletableFuture.allOf(partitions
+                    .values()
+                    .stream()
+                    .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> keySet);
+    }
+
+    @Override
+    public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
+        List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
+        return CompletableFuture.allOf(partitions
+                    .values()
+                    .stream()
+                    .map(p -> p.values(tableName))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> values);
+    }
+
+    @Override
+    public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
+        Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
+        return CompletableFuture.allOf(partitions
+                    .values()
+                    .stream()
+                    .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> entrySet);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+        return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+        return partitioner.getPartition(tableName, key).remove(tableName, key, value);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+        return partitioner.getPartition(tableName, key).remove(tableName, key, version);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+        return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+        return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
+        Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
+        for (UpdateOperation<String, byte[]> update : updates) {
+            Database partition = partitioner.getPartition(update.tableName(), update.key());
+            List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
+            if (partitionUpdates == null) {
+                partitionUpdates = Lists.newArrayList();
+                perPartitionUpdates.put(partition, partitionUpdates);
+            }
+            partitionUpdates.add(update);
+        }
+        if (perPartitionUpdates.size() > 1) {
+            // TODO
+            throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
+        } else {
+            Entry<Database, List<UpdateOperation<String, byte[]>>> only =
+                    perPartitionUpdates.entrySet().iterator().next();
+            return only.getKey().atomicBatchUpdate(only.getValue());
+        }
+    }
+
+    @Override
+    public void setPartitioner(Partitioner<String> partitioner) {
+        this.partitioner = partitioner;
+    }
+
+    @Override
+    public CompletableFuture<PartitionedDatabase> open() {
+        return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
+                                                        .values()
+                                                        .stream()
+                                                        .map(Database::open)
+                                                        .collect(Collectors.toList())
+                                                        .toArray(new CompletableFuture[partitions.size()]))
+                                 .thenApply(v -> this));
+
+    }
+
+    @Override
+    public CompletableFuture<Void> close() {
+        CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
+                .values()
+                .stream()
+                .map(database -> database.close())
+                .collect(Collectors.toList())
+                .toArray(new CompletableFuture[partitions.size()]));
+        CompletableFuture<Void> closeCoordinator = coordinator.close();
+        return closePartitions.thenCompose(v -> closeCoordinator);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
new file mode 100644
index 0000000..6d375cc
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
@@ -0,0 +1,31 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Partitioned database configuration.
+ */
+public class PartitionedDatabaseConfig {
+    private final Map<String, DatabaseConfig> partitions = new HashMap<>();
+
+    /**
+     * Returns the configuration for all partitions.
+     * @return partition map to configuartion mapping.
+     */
+    public Map<String, DatabaseConfig> partitions() {
+        return Collections.unmodifiableMap(partitions);
+    }
+
+    /**
+     * Adds the specified partition name and configuration.
+     * @param name partition name.
+     * @param config partition config
+     * @return this instance
+     */
+    public PartitionedDatabaseConfig withPartition(String name, DatabaseConfig config) {
+        partitions.put(name, config);
+        return this;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
new file mode 100644
index 0000000..44cd3a1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
@@ -0,0 +1,79 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+import net.kuujo.copycat.CopycatConfig;
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
+import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
+
+public interface PartitionedDatabaseManager {
+    /**
+     * Opens the database.
+     *
+     * @return A completable future to be completed with the result once complete.
+     */
+    CompletableFuture<PartitionedDatabase> open();
+
+    /**
+     * Closes the database.
+     *
+     * @return A completable future to be completed with the result once complete.
+     */
+    CompletableFuture<Void> close();
+
+    /**
+     * Sets the partitioner to use for mapping keys to partitions.
+     *
+     * @param partitioner partitioner
+     */
+    void setPartitioner(Partitioner<String> partitioner);
+
+    /**
+     * Registers a new partition.
+     *
+     * @param partitionName partition name.
+     * @param partition partition.
+     */
+    void registerPartition(String partitionName, Database partition);
+
+    /**
+     * Returns all the registered database partitions.
+     *
+     * @return mapping of all registered database partitions.
+     */
+    Map<String, Database> getRegisteredPartitions();
+
+
+    /**
+     * Creates a new partitioned database.
+     *
+     * @param name The database name.
+     * @param clusterConfig The cluster configuration.
+     * @param partitionedDatabaseConfig The database configuration.
+
+     * @return The database.
+     */
+    public static PartitionedDatabase create(
+            String name,
+            ClusterConfig clusterConfig,
+            PartitionedDatabaseConfig partitionedDatabaseConfig) {
+        CopycatConfig copycatConfig = new CopycatConfig()
+            .withName(name)
+            .withClusterConfig(clusterConfig)
+            .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
+        ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
+        PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
+        partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
+            partitionedDatabase.registerPartition(partitionName ,
+                    coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
+                        .withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
+                        .withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
+        partitionedDatabase.setPartitioner(
+                new SimpleKeyHashPartitioner<>(partitionedDatabase.getRegisteredPartitions()));
+        return partitionedDatabase;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java
new file mode 100644
index 0000000..df8d56e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java
@@ -0,0 +1,17 @@
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Partitioner is responsible for mapping keys to individual database partitions.
+ *
+ * @param <K> key type.
+ */
+public interface Partitioner<K> {
+
+    /**
+     * Returns the database partition.
+     * @param tableName table name
+     * @param key key
+     * @return Database partition
+     */
+    Database getPartition(String tableName, K key);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
new file mode 100644
index 0000000..5410f9f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
@@ -0,0 +1,31 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+/**
+ * A simple Partitioner that uses the key hashCode to map
+ * key to a partition.
+ *
+ * @param <K> key type.
+ */
+public class SimpleKeyHashPartitioner<K> implements Partitioner<K> {
+
+    private final Map<String, Database> partitionMap;
+    private final List<String> sortedPartitionNames;
+
+    public SimpleKeyHashPartitioner(Map<String, Database> partitionMap) {
+        this.partitionMap = ImmutableMap.copyOf(partitionMap);
+        sortedPartitionNames = Lists.newArrayList(this.partitionMap.keySet());
+        Collections.sort(sortedPartitionNames);
+    }
+
+    @Override
+    public Database getPartition(String tableName, K key) {
+        return partitionMap.get(sortedPartitionNames.get(Math.abs(key.hashCode()) % partitionMap.size()));
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
new file mode 100644
index 0000000..9d52a09
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
@@ -0,0 +1,151 @@
+package org.onosproject.store.consistent.impl;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Database update operation.
+ *
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+public class UpdateOperation<K, V> {
+
+    /**
+     * Type of database update operation.
+     */
+    public static enum Type {
+        PUT,
+        PUT_IF_ABSENT,
+        PUT_IF_VERSION_MATCH,
+        PUT_IF_VALUE_MATCH,
+        REMOVE,
+        REMOVE_IF_VERSION_MATCH,
+        REMOVE_IF_VALUE_MATCH,
+    }
+
+    private Type type;
+    private String tableName;
+    private K key;
+    private V value;
+    private V currentValue;
+    private long currentVersion;
+
+    /**
+     * Returns the type of update operation.
+     * @return type of update.
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the tableName being updated.
+     * @return table name.
+     */
+    public String tableName() {
+        return tableName;
+    }
+
+    /**
+     * Returns the item key being updated.
+     * @return item key
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * Returns the new value.
+     * @return item's target value.
+     */
+    public V value() {
+        return value;
+    }
+
+    /**
+     * Returns the expected current value in the database value for the key.
+     * @return current value in database.
+     */
+    public V currentValue() {
+        return currentValue;
+    }
+
+    /**
+     * Returns the expected current version in the database for the key.
+     * @return expected version.
+     */
+    public long currentVersion() {
+        return currentVersion;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("type", type)
+            .add("tableName", tableName)
+            .add("key", key)
+            .add("value", value)
+            .add("currentValue", currentValue)
+            .add("currentVersion", currentVersion)
+            .toString();
+    }
+
+    /**
+     * UpdatOperation builder.
+     *
+     * @param <K> key type.
+     * @param <V> value type.
+     */
+    public static final class Builder<K, V> {
+
+        private UpdateOperation<K, V> operation = new UpdateOperation<>();
+
+        /**
+         * Creates a new builder instance.
+         * @param <K> key type.
+         * @param <V> value type.
+         *
+         * @return builder.
+         */
+        public static <K, V> Builder<K, V> builder() {
+            return new Builder<>();
+        }
+
+        private Builder() {
+        }
+
+        public UpdateOperation<K, V> build() {
+            return operation;
+        }
+
+        public Builder<K, V> withType(Type type) {
+            operation.type = type;
+            return this;
+        }
+
+        public Builder<K, V> withTableName(String tableName) {
+            operation.tableName = tableName;
+            return this;
+        }
+
+        public Builder<K, V> withKey(K key) {
+            operation.key = key;
+            return this;
+        }
+
+        public Builder<K, V> withCurrentValue(V value) {
+            operation.currentValue = value;
+            return this;
+        }
+
+        public Builder<K, V> withValue(V value) {
+            operation.value = value;
+            return this;
+        }
+
+        public Builder<K, V> withCurrentVersion(long version) {
+            operation.currentVersion = version;
+            return this;
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
new file mode 100644
index 0000000..6eb908d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
@@ -0,0 +1,50 @@
+package org.onosproject.store.consistent.impl;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Versioned value.
+ *
+ * @param <V> value type.
+ */
+public class Versioned<V> {
+
+    private final V value;
+    private final long version;
+
+    /**
+     * Constructs a new versioned value.
+     * @param value value
+     * @param version version
+     */
+    public Versioned(V value, long version) {
+        this.value = value;
+        this.version = version;
+    }
+
+    /**
+     * Returns the value.
+     *
+     * @return value.
+     */
+    public V value() {
+        return value;
+    }
+
+    /**
+     * Returns the version.
+     *
+     * @return version
+     */
+    public long version() {
+        return version;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("value", value)
+            .add("version", version)
+            .toString();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocol.java
deleted file mode 100644
index 516aa44..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocol.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Vector;
-
-import net.kuujo.copycat.cluster.TcpClusterConfig;
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.event.LeaderElectEvent;
-import net.kuujo.copycat.internal.log.ConfigurationEntry;
-import net.kuujo.copycat.internal.log.CopycatEntry;
-import net.kuujo.copycat.internal.log.OperationEntry;
-import net.kuujo.copycat.internal.log.SnapshotEntry;
-import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PingResponse;
-import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.PollResponse;
-import net.kuujo.copycat.protocol.Response.Status;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
-import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.protocol.SyncResponse;
-import net.kuujo.copycat.spi.protocol.Protocol;
-import net.kuujo.copycat.spi.protocol.ProtocolClient;
-import net.kuujo.copycat.spi.protocol.ProtocolServer;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.serializers.StoreSerializer;
-import org.onosproject.store.service.impl.DatabaseStateMachine.State;
-import org.onosproject.store.service.impl.DatabaseStateMachine.TableMetadata;
-import org.onlab.util.KryoNamespace;
-import org.slf4j.Logger;
-
-/**
- * ONOS Cluster messaging based Copycat protocol.
- */
-@Component(immediate = false)
-@Service
-public class ClusterMessagingProtocol
-    implements DatabaseProtocolService, Protocol<TcpMember> {
-
-    private final Logger log = getLogger(getClass());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterService clusterService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterCommunicationService clusterCommunicator;
-
-    public static final MessageSubject COPYCAT_PING =
-            new MessageSubject("copycat-raft-consensus-ping");
-    public static final MessageSubject COPYCAT_SYNC =
-            new MessageSubject("copycat-raft-consensus-sync");
-    public static final MessageSubject COPYCAT_POLL =
-            new MessageSubject("copycat-raft-consensus-poll");
-    public static final MessageSubject COPYCAT_SUBMIT =
-            new MessageSubject("copycat-raft-consensus-submit");
-
-    static final int AFTER_COPYCAT = KryoNamespaces.BEGIN_USER_CUSTOM_ID + 50;
-
-    static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
-            .register(KryoNamespaces.API)
-            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
-            .register(PingRequest.class)
-            .register(PingResponse.class)
-            .register(PollRequest.class)
-            .register(PollResponse.class)
-            .register(SyncRequest.class)
-            .register(SyncResponse.class)
-            .register(SubmitRequest.class)
-            .register(SubmitResponse.class)
-            .register(Status.class)
-            .register(ConfigurationEntry.class)
-            .register(SnapshotEntry.class)
-            .register(CopycatEntry.class)
-            .register(OperationEntry.class)
-            .register(TcpClusterConfig.class)
-            .register(TcpMember.class)
-            .register(LeaderElectEvent.class)
-            .register(Vector.class)
-            .build();
-
-    // serializer used for CopyCat Protocol
-    public static final StoreSerializer DB_SERIALIZER = new KryoSerializer() {
-        @Override
-        protected void setupKryoPool() {
-            serializerPool = KryoNamespace.newBuilder()
-                    .register(COPYCAT)
-                    .nextId(AFTER_COPYCAT)
-                     // for snapshot
-                    .register(State.class)
-                    .register(TableMetadata.class)
-                    // TODO: Move this out to API?
-                    .register(TableModificationEvent.class)
-                    .register(TableModificationEvent.Type.class)
-                    .build();
-        }
-    };
-
-    @Activate
-    public void activate() {
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        log.info("Stopped");
-    }
-
-    @Override
-    public ProtocolServer createServer(TcpMember member) {
-        return new ClusterMessagingProtocolServer(clusterCommunicator);
-    }
-
-    @Override
-    public ProtocolClient createClient(TcpMember member) {
-        return new ClusterMessagingProtocolClient(clusterService,
-                                                  clusterCommunicator,
-                                                  clusterService.getLocalNode(),
-                                                  member);
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocolClient.java
deleted file mode 100644
index 75c0969..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocolClient.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static com.google.common.base.Verify.verifyNotNull;
-import static org.onosproject.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
-import static org.onlab.util.Tools.namedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-import static java.util.concurrent.Executors.newCachedThreadPool;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PingResponse;
-import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.PollResponse;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
-import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.protocol.SyncResponse;
-import net.kuujo.copycat.spi.protocol.ProtocolClient;
-
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.slf4j.Logger;
-
-/**
- * ONOS Cluster messaging based Copycat protocol client.
- */
-public class ClusterMessagingProtocolClient implements ProtocolClient {
-
-    private final Logger log = getLogger(getClass());
-
-    public static final Duration RETRY_INTERVAL = Duration.ofMillis(2000);
-
-    private final ClusterService clusterService;
-    private final ClusterCommunicationService clusterCommunicator;
-    private final ControllerNode localNode;
-    private final TcpMember remoteMember;
-
-    private ControllerNode remoteNode;
-    private final AtomicBoolean connectionOK = new AtomicBoolean(true);
-
-    private ExecutorService pool;
-
-    public ClusterMessagingProtocolClient(
-            ClusterService clusterService,
-            ClusterCommunicationService clusterCommunicator,
-            ControllerNode localNode,
-            TcpMember remoteMember) {
-
-        this.clusterService = clusterService;
-        this.clusterCommunicator = clusterCommunicator;
-        this.localNode = localNode;
-        this.remoteMember = remoteMember;
-    }
-
-    @Override
-    public CompletableFuture<PingResponse> ping(PingRequest request) {
-        return requestReply(request);
-    }
-
-    @Override
-    public CompletableFuture<SyncResponse> sync(SyncRequest request) {
-        return requestReply(request);
-    }
-
-    @Override
-    public CompletableFuture<PollResponse> poll(PollRequest request) {
-        return requestReply(request);
-    }
-
-    @Override
-    public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
-        return requestReply(request);
-    }
-
-    @Override
-    public synchronized CompletableFuture<Void> connect() {
-        if (pool == null || pool.isShutdown()) {
-            // TODO include remote name?
-            pool = newCachedThreadPool(namedThreads("onos-copycat-netty-messaging-client-%d"));
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
-    public synchronized CompletableFuture<Void> close() {
-        if (pool != null) {
-            pool.shutdownNow();
-            pool = null;
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
-    private <I> MessageSubject messageType(I input) {
-        Class<?> clazz = input.getClass();
-        if (clazz.equals(PollRequest.class)) {
-            return ClusterMessagingProtocol.COPYCAT_POLL;
-        } else if (clazz.equals(SyncRequest.class)) {
-            return ClusterMessagingProtocol.COPYCAT_SYNC;
-        } else if (clazz.equals(SubmitRequest.class)) {
-            return ClusterMessagingProtocol.COPYCAT_SUBMIT;
-        } else if (clazz.equals(PingRequest.class)) {
-            return ClusterMessagingProtocol.COPYCAT_PING;
-        } else {
-            throw new IllegalArgumentException("Unknown class " + clazz.getName());
-        }
-    }
-
-    private <I, O> CompletableFuture<O> requestReply(I request) {
-        CompletableFuture<O> future = new CompletableFuture<>();
-        if (pool == null) {
-            log.info("Attempted to use closed client, connecting now. {}", request);
-            connect();
-        }
-        pool.submit(new RPCTask<I, O>(request, future));
-        return future;
-    }
-
-    private ControllerNode getControllerNode(TcpMember remoteMember) {
-        final String host = remoteMember.host();
-        final int port = remoteMember.port();
-        for (ControllerNode node : clusterService.getNodes()) {
-            if (node.ip().toString().equals(host) && node.tcpPort() == port) {
-                return node;
-            }
-        }
-        return null;
-    }
-
-    private class RPCTask<I, O> implements Runnable {
-
-        private final I request;
-        private final ClusterMessage message;
-        private final CompletableFuture<O> future;
-
-        public RPCTask(I request, CompletableFuture<O> future) {
-            this.request = request;
-            this.message =
-                    new ClusterMessage(
-                            localNode.id(),
-                            messageType(request),
-                            verifyNotNull(DB_SERIALIZER.encode(request)));
-            this.future = future;
-        }
-
-        @Override
-        public void run() {
-            try {
-                if (remoteNode == null) {
-                    remoteNode = getControllerNode(remoteMember);
-                    if (remoteNode == null) {
-                        throw new IOException("Remote node is offline!");
-                    }
-                }
-                byte[] response = clusterCommunicator
-                    .sendAndReceive(message, remoteNode.id())
-                    .get(RETRY_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
-                if (!connectionOK.getAndSet(true)) {
-                    log.info("Connectivity to {} restored", remoteNode);
-                }
-                future.complete(verifyNotNull(DB_SERIALIZER.decode(response)));
-
-            } catch (IOException | TimeoutException e) {
-                if (connectionOK.getAndSet(false)) {
-                    log.warn("Detected connectivity issues with {}. Reason: {}", remoteNode, e.getMessage());
-                }
-                log.debug("RPCTask for {} failed.", request, e);
-                future.completeExceptionally(e);
-            } catch (ExecutionException e) {
-                log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
-                log.debug("RPCTask execution for {} failed.", request, e);
-                future.completeExceptionally(e);
-            } catch (InterruptedException e) {
-                log.warn("RPCTask for {} was interrupted: {}", request, e.getMessage());
-                log.debug("RPCTask for {} was interrupted.", request, e);
-                future.completeExceptionally(e);
-                Thread.currentThread().interrupt();
-            } catch (Exception e) {
-                log.warn("RPCTask for {} terribly failed.", request, e);
-                future.completeExceptionally(e);
-            }
-        }
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocolServer.java
deleted file mode 100644
index aa56855..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocolServer.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static java.util.concurrent.Executors.newCachedThreadPool;
-import static org.onlab.util.Tools.namedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.store.service.impl.ClusterMessagingProtocol.*;
-import static org.onosproject.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiConsumer;
-
-import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.RequestHandler;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.spi.protocol.ProtocolServer;
-
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.slf4j.Logger;
-
-/**
- * ONOS Cluster messaging based Copycat protocol server.
- */
-public class ClusterMessagingProtocolServer implements ProtocolServer {
-
-    private final Logger log = getLogger(getClass());
-
-    private final ClusterCommunicationService clusterCommunicator;
-
-    private volatile RequestHandler handler;
-
-    private ExecutorService pool;
-
-    public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
-        this.clusterCommunicator = clusterCommunicator;
-    }
-
-    @Override
-    public void requestHandler(RequestHandler handler) {
-        this.handler = handler;
-    }
-
-    @Override
-    public CompletableFuture<Void> listen() {
-        if (pool == null || pool.isShutdown()) {
-            pool = newCachedThreadPool(namedThreads("onos-copycat-netty-messaging-server-%d"));
-        }
-
-        clusterCommunicator.addSubscriber(COPYCAT_PING, new PingHandler());
-        clusterCommunicator.addSubscriber(COPYCAT_SYNC, new SyncHandler());
-        clusterCommunicator.addSubscriber(COPYCAT_POLL, new PollHandler());
-        clusterCommunicator.addSubscriber(COPYCAT_SUBMIT, new SubmitHandler());
-        return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
-    public CompletableFuture<Void> close() {
-        clusterCommunicator.removeSubscriber(COPYCAT_PING);
-        clusterCommunicator.removeSubscriber(COPYCAT_SYNC);
-        clusterCommunicator.removeSubscriber(COPYCAT_POLL);
-        clusterCommunicator.removeSubscriber(COPYCAT_SUBMIT);
-        if (pool != null) {
-            pool.shutdownNow();
-            pool = null;
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
-    private final class PingHandler extends CopycatMessageHandler<PingRequest> {
-
-        @Override
-        public void raftHandle(PingRequest request, ClusterMessage message) {
-            pool.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    currentHandler().ping(request)
-                        .whenComplete(new PostExecutionTask<>(message));
-                }
-            });
-        }
-    }
-
-    private final class SyncHandler extends CopycatMessageHandler<SyncRequest> {
-
-        @Override
-        public void raftHandle(SyncRequest request, ClusterMessage message) {
-            pool.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    currentHandler().sync(request)
-                        .whenComplete(new PostExecutionTask<>(message));
-                }
-            });
-        }
-    }
-
-    private final class PollHandler extends CopycatMessageHandler<PollRequest> {
-
-        @Override
-        public void raftHandle(PollRequest request, ClusterMessage message) {
-            pool.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    currentHandler().poll(request)
-                    .whenComplete(new PostExecutionTask<>(message));
-                }
-            });
-        }
-    }
-
-    private final class SubmitHandler extends CopycatMessageHandler<SubmitRequest> {
-
-        @Override
-        public void raftHandle(SubmitRequest request, ClusterMessage message) {
-            pool.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                    currentHandler().submit(request)
-                    .whenComplete(new PostExecutionTask<>(message));
-                }
-            });
-        }
-    }
-
-    private abstract class CopycatMessageHandler<T> implements ClusterMessageHandler {
-
-        public abstract void raftHandle(T request, ClusterMessage message);
-
-        @Override
-        public void handle(ClusterMessage message) {
-            T request = DB_SERIALIZER.decode(message.payload());
-            raftHandle(request, message);
-        }
-
-        RequestHandler currentHandler() {
-            RequestHandler currentHandler = handler;
-            if (currentHandler == null) {
-                // there is a slight window of time during state transition,
-                // where handler becomes null
-                long sleepMs = 1;
-                for (int i = 0; i < 10; ++i) {
-                    currentHandler = handler;
-                    if (currentHandler != null) {
-                        break;
-                    }
-                    try {
-                        sleepMs <<= 1;
-                        Thread.sleep(sleepMs);
-                    } catch (InterruptedException e) {
-                        log.error("Interrupted", e);
-                        return handler;
-                    }
-                }
-                if (currentHandler == null) {
-                    log.error("There was no handler registered!");
-                    return handler;
-                }
-            }
-            return currentHandler;
-        }
-
-        final class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
-
-            private final ClusterMessage message;
-
-            public PostExecutionTask(ClusterMessage message) {
-                this.message = message;
-            }
-
-            @Override
-            public void accept(R response, Throwable error) {
-                if (error != null) {
-                    log.error("Processing {} failed.", message.subject(),  error);
-                } else {
-                    try {
-                        log.trace("responding to {}", message.subject());
-                        message.respond(DB_SERIALIZER.encode(response));
-                    } catch (Exception e) {
-                        log.error("Failed responding with {}", response.getClass().getName(), e);
-                    }
-                }
-            }
-        }
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseClient.java
deleted file mode 100644
index 869b188..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseClient.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import net.kuujo.copycat.cluster.Member;
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.event.LeaderElectEvent;
-import net.kuujo.copycat.protocol.Response.Status;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
-import net.kuujo.copycat.spi.protocol.ProtocolClient;
-
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.service.BatchReadRequest;
-import org.onosproject.store.service.BatchWriteRequest;
-import org.onosproject.store.service.DatabaseException;
-import org.onosproject.store.service.ReadResult;
-import org.onosproject.store.service.VersionedValue;
-import org.onosproject.store.service.WriteResult;
-import org.slf4j.Logger;
-
-/**
- * Client for interacting with the Copycat Raft cluster.
- */
-public class DatabaseClient implements ClusterMessageHandler {
-
-    private static final int RETRIES = 5;
-
-    private static final int TIMEOUT_MS = 2000;
-
-    private final Logger log = getLogger(getClass());
-
-    private final DatabaseProtocolService protocol;
-    private volatile ProtocolClient client = null;
-    private volatile Member currentLeader = null;
-    private volatile long currentLeaderTerm = 0;
-
-    public DatabaseClient(DatabaseProtocolService protocol) {
-        this.protocol = checkNotNull(protocol);
-    }
-
-    @Override
-    public void handle(ClusterMessage message) {
-        LeaderElectEvent event =
-                ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
-        TcpMember newLeader = event.leader();
-        long newLeaderTerm = event.term();
-        if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
-            log.info("New leader detected. Leader: {}, term: {}", newLeader, newLeaderTerm);
-            ProtocolClient prevClient = client;
-            ProtocolClient newClient = protocol.createClient(newLeader);
-            newClient.connect();
-            client = newClient;
-            currentLeader = newLeader;
-            currentLeaderTerm = newLeaderTerm;
-
-            if (prevClient != null) {
-                prevClient.close();
-            }
-        }
-    }
-
-    private String nextRequestId() {
-        return UUID.randomUUID().toString();
-    }
-
-    public void waitForLeader() {
-        if (currentLeader != null) {
-            return;
-        }
-
-        log.info("No leader in cluster, waiting for election.");
-
-        try {
-            while (currentLeader == null) {
-                Thread.sleep(200);
-            }
-            return;
-        } catch (InterruptedException e) {
-            log.error("Interrupted while waiting for Leader", e);
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    private <T> T submit(String operationName, Object... args) {
-        waitForLeader();
-        if (currentLeader == null) {
-            throw new DatabaseException("Raft cluster does not have a leader.");
-        }
-
-        SubmitRequest request =
-                new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
-
-        CompletableFuture<SubmitResponse> submitResponse = client.submit(request);
-
-        log.debug("Sent {} to {}", request, currentLeader);
-
-        try {
-            final SubmitResponse response = submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
-            if (response.status() != Status.OK) {
-                throw new DatabaseException(response.error());
-            }
-            return (T) response.result();
-        } catch (ExecutionException | InterruptedException e) {
-            throw new DatabaseException(e);
-        } catch (TimeoutException e) {
-            throw new DatabaseException.Timeout(e);
-        }
-    }
-
-    public boolean createTable(String tableName) {
-        return submit("createTable", tableName);
-    }
-
-    public boolean createTable(String tableName, int ttlMillis) {
-        return submit("createTable", tableName, ttlMillis);
-    }
-
-    public void dropTable(String tableName) {
-        submit("dropTable", tableName);
-    }
-
-    public void dropAllTables() {
-        submit("dropAllTables");
-    }
-
-    public Set<String> listTables() {
-        return submit("listTables");
-    }
-
-    public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
-        return submit("read", batchRequest);
-    }
-
-    public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
-        return submit("write", batchRequest);
-    }
-
-    public Map<String, VersionedValue> getAll(String tableName) {
-        return submit("getAll", tableName);
-    }
-
-    Member getCurrentLeader() {
-        return currentLeader;
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseEntryExpirationTracker.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseEntryExpirationTracker.java
deleted file mode 100644
index 02743ac..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseEntryExpirationTracker.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.service.impl;
-
-import com.google.common.base.MoreObjects;
-import net.jodah.expiringmap.ExpiringMap;
-import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
-import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
-import net.kuujo.copycat.cluster.Member;
-import net.kuujo.copycat.event.EventHandler;
-import net.kuujo.copycat.event.LeaderElectEvent;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.service.DatabaseService;
-import org.onosproject.store.service.VersionedValue;
-import org.onosproject.store.service.impl.DatabaseStateMachine.State;
-import org.onosproject.store.service.impl.DatabaseStateMachine.TableMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.onlab.util.Tools.namedThreads;
-
-/**
- * Plugs into the database update stream and track the TTL of entries added to
- * the database. For tables with pre-configured finite TTL, this class has
- * mechanisms for expiring (deleting) old, expired entries from the database.
- */
-public class DatabaseEntryExpirationTracker implements
-        DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
-
-    private static final ExecutorService THREAD_POOL =
-            Executors.newCachedThreadPool(namedThreads("onos-db-stale-entry-expirer-%d"));
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    private final DatabaseService databaseService;
-    private final ClusterCommunicationService clusterCommunicator;
-
-    private final Member localMember;
-    private final ControllerNode localNode;
-    private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
-
-    private final Map<String, Map<DatabaseRow, Long>> tableEntryExpirationMap = new HashMap<>();
-
-    private final ExpirationListener<DatabaseRow, Long> expirationObserver = new ExpirationObserver();
-
-    DatabaseEntryExpirationTracker(
-            Member localMember,
-            ControllerNode localNode,
-            ClusterCommunicationService clusterCommunicator,
-            DatabaseService databaseService) {
-        this.localMember = localMember;
-        this.localNode = localNode;
-        this.clusterCommunicator = clusterCommunicator;
-        this.databaseService = databaseService;
-    }
-
-    @Override
-    public void tableModified(TableModificationEvent event) {
-        log.debug("{}: Received {}", localNode.id(), event);
-
-        if (!tableEntryExpirationMap.containsKey(event.tableName())) {
-            return;
-        }
-
-        Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(event.tableName());
-        DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
-        Long eventVersion = event.value().version();
-
-        switch (event.type()) {
-        case ROW_DELETED:
-            map.remove(row, eventVersion);
-            if (isLocalMemberLeader.get()) {
-                log.debug("Broadcasting {} to the entire cluster", event);
-                clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(
-                        localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
-                        ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
-            }
-            break;
-        case ROW_ADDED:
-        case ROW_UPDATED:
-            // To account for potential reordering of notifications,
-            // check to make sure we are replacing an old version with a new version
-            Long currentVersion = map.get(row);
-            if (currentVersion == null || currentVersion < eventVersion) {
-                map.put(row, eventVersion);
-            }
-            break;
-        default:
-            break;
-        }
-    }
-
-    @Override
-    public void tableCreated(TableMetadata metadata) {
-        log.debug("Received a table created event {}", metadata);
-        if (metadata.expireOldEntries()) {
-            tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
-                    .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
-                    .expirationListener(expirationObserver)
-                    .expirationPolicy(ExpirationPolicy.CREATED).build());
-        }
-    }
-
-    @Override
-    public void tableDeleted(String tableName) {
-        log.debug("Received a table deleted event for table ({})", tableName);
-        tableEntryExpirationMap.remove(tableName);
-    }
-
-    private class ExpirationObserver implements
-            ExpirationListener<DatabaseRow, Long> {
-        @Override
-        public void expired(DatabaseRow row, Long version) {
-            THREAD_POOL.submit(new ExpirationTask(row, version));
-        }
-    }
-
-    private class ExpirationTask implements Runnable {
-
-        private final DatabaseRow row;
-        private final Long version;
-
-        public ExpirationTask(DatabaseRow row, Long version) {
-            this.row = row;
-            this.version = version;
-        }
-
-        @Override
-        public void run() {
-            log.trace("Received an expiration event for {}, version: {}", row, version);
-            Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(row.tableName);
-            try {
-                if (isLocalMemberLeader.get()) {
-                    if (!databaseService.removeIfVersionMatches(row.tableName,
-                            row.key, version)) {
-                        log.info("Entry in database was updated right before its expiration.");
-                    } else {
-                        log.debug("Successfully expired old entry with key ({}) from table ({})",
-                                row.key, row.tableName);
-                    }
-                } else {
-                    // Only the current leader will expire keys from database.
-                    // Everyone else function as standby just in case they need to take over
-                    if (map != null) {
-                        map.putIfAbsent(row, version);
-                    }
-                }
-
-            } catch (Exception e) {
-                log.warn("Failed to delete entry from the database after ttl "
-                        + "expiration. Operation will be retried.", e);
-                map.putIfAbsent(row, version);
-            }
-        }
-    }
-
-    @Override
-    public void handle(LeaderElectEvent event) {
-        isLocalMemberLeader.set(localMember.equals(event.leader()));
-        if (isLocalMemberLeader.get()) {
-            log.info("{} is now the leader of Raft cluster", localNode.id());
-        }
-    }
-
-    /**
-     * Wrapper class for a database row identifier.
-     */
-    private class DatabaseRow {
-
-        String tableName;
-        String key;
-
-        public DatabaseRow(String tableName, String key) {
-            this.tableName = tableName;
-            this.key = key;
-        }
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(getClass())
-                .add("tableName", tableName)
-                .add("key", key)
-                .toString();
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj) {
-                return true;
-            }
-            if (!(obj instanceof DatabaseRow)) {
-                return false;
-            }
-            DatabaseRow that = (DatabaseRow) obj;
-
-            return Objects.equals(this.tableName, that.tableName)
-                    && Objects.equals(this.key, that.key);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(tableName, key);
-        }
-    }
-
-    @Override
-    public void snapshotInstalled(State state) {
-        if (!tableEntryExpirationMap.isEmpty()) {
-            return;
-        }
-        log.debug("Received a snapshot installed notification");
-        for (String tableName : state.getTableNames()) {
-
-            TableMetadata metadata = state.getTableMetadata(tableName);
-            if (!metadata.expireOldEntries()) {
-                continue;
-            }
-
-            Map<DatabaseRow, Long> tableExpirationMap = ExpiringMap.builder()
-                    .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
-                    .expirationListener(expirationObserver)
-                    .expirationPolicy(ExpirationPolicy.CREATED).build();
-            for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
-                tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue().version());
-            }
-
-            tableEntryExpirationMap.put(tableName, tableExpirationMap);
-        }
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseManager.java
deleted file mode 100644
index e5a742e..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseManager.java
+++ /dev/null
@@ -1,602 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import com.google.common.collect.ImmutableList;
-import net.kuujo.copycat.Copycat;
-import net.kuujo.copycat.CopycatConfig;
-import net.kuujo.copycat.cluster.ClusterConfig;
-import net.kuujo.copycat.cluster.Member;
-import net.kuujo.copycat.cluster.TcpCluster;
-import net.kuujo.copycat.cluster.TcpClusterConfig;
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.event.EventHandler;
-import net.kuujo.copycat.event.LeaderElectEvent;
-import net.kuujo.copycat.log.Log;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.service.BatchReadRequest;
-import org.onosproject.store.service.BatchReadResult;
-import org.onosproject.store.service.BatchWriteRequest;
-import org.onosproject.store.service.BatchWriteResult;
-import org.onosproject.store.service.DatabaseAdminService;
-import org.onosproject.store.service.DatabaseException;
-import org.onosproject.store.service.DatabaseService;
-import org.onosproject.store.service.ReadResult;
-import org.onosproject.store.service.ReadStatus;
-import org.onosproject.store.service.VersionedValue;
-import org.onosproject.store.service.WriteResult;
-import org.onosproject.store.service.WriteStatus;
-import org.slf4j.Logger;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onlab.util.Tools.namedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Strongly consistent and durable state management service based on
- * Copycat implementation of Raft consensus protocol.
- */
-@Component(immediate = false)
-@Service
-public class DatabaseManager implements DatabaseService, DatabaseAdminService {
-
-    private static final int RETRY_MS = 500;
-
-    private static final int ACTIVATE_MAX_RETRIES = 100;
-
-    private final Logger log = getLogger(getClass());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterService clusterService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterCommunicationService clusterCommunicator;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DatabaseProtocolService copycatMessagingProtocol;
-
-    public static final String LOG_FILE_PREFIX = "raft/onos-copy-cat-log_";
-
-    // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
-    // TODO: Set the path to /opt/onos/config
-    private static final String CONFIG_DIR = "../config";
-
-    private static final String DEFAULT_MEMBER_FILE = "tablets.json";
-
-    private static final String DEFAULT_TABLET = "default";
-
-    // TODO: make this configurable
-    // initial member configuration file path
-    private String initialMemberConfig = DEFAULT_MEMBER_FILE;
-
-    public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
-            new MessageSubject("raft-leader-election-event");
-
-    private Copycat copycat;
-    private DatabaseClient client;
-
-    // guarded by synchronized block
-    private ClusterConfig<TcpMember> clusterConfig;
-
-    private CountDownLatch clusterEventLatch;
-    private ClusterEventListener clusterEventListener;
-
-    private Map<String, Set<DefaultControllerNode>> tabletMembers;
-
-    private boolean autoAddMember = false;
-
-    private ScheduledExecutorService executor;
-
-    private volatile LeaderElectEvent myLeaderEvent = null;
-
-    // TODO make this configurable
-    private int maxLogSizeBytes = 128 * (1024 * 1024);
-
-    // TODO make this configurable
-    private long electionTimeoutMs = 5000; // CopyCat default: 2000
-
-    @Activate
-    public void activate() throws InterruptedException, ExecutionException {
-
-        // KARAF_DATA
-        //  http://karaf.apache.org/manual/latest/users-guide/start-stop.html
-        final String dataDir = System.getProperty("karaf.data", "./data");
-
-        // load tablet configuration
-        File file = new File(CONFIG_DIR, initialMemberConfig);
-        log.info("Loading config: {}", file.getAbsolutePath());
-        TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
-        try {
-            tabletMembers = tabletDef.read();
-        } catch (IOException e) {
-            log.error("Failed to load tablet config {}", file);
-            throw new IllegalStateException("Failed to load tablet config", e);
-        }
-
-        // load default tablet configuration and start copycat
-        clusterConfig = new TcpClusterConfig();
-        Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
-        if (defaultMembers == null || defaultMembers.isEmpty()) {
-            log.error("No members found in [{}] tablet configuration.",
-                      DEFAULT_TABLET);
-            throw new IllegalStateException("No member found in tablet configuration");
-
-        }
-
-        final ControllerNode localNode = clusterService.getLocalNode();
-        for (ControllerNode member : defaultMembers) {
-            final TcpMember tcpMember = new TcpMember(member.ip().toString(),
-                                                      member.tcpPort());
-            if (localNode.equals(member)) {
-                clusterConfig.setLocalMember(tcpMember);
-            } else {
-                clusterConfig.addRemoteMember(tcpMember);
-            }
-        }
-
-        if (clusterConfig.getLocalMember() != null) {
-
-            // Wait for a minimum viable Raft cluster to boot up.
-            waitForClusterQuorum();
-
-            final TcpCluster cluster;
-            synchronized (clusterConfig) {
-                // Create the cluster.
-                cluster = new TcpCluster(clusterConfig);
-            }
-            log.info("Starting cluster: {}", cluster);
-
-            DatabaseEntryExpirationTracker expirationTracker =
-                    new DatabaseEntryExpirationTracker(
-                            clusterConfig.getLocalMember(),
-                            clusterService.getLocalNode(),
-                            clusterCommunicator,
-                            this);
-
-            DatabaseStateMachine stateMachine = new DatabaseStateMachine();
-            stateMachine.addEventListener(expirationTracker);
-            Log consensusLog = new MapDBLog(dataDir + "/" + LOG_FILE_PREFIX + localNode.id(),
-                    ClusterMessagingProtocol.DB_SERIALIZER);
-
-            CopycatConfig ccConfig = new CopycatConfig();
-            ccConfig.setMaxLogSize(maxLogSizeBytes);
-            ccConfig.setElectionTimeout(electionTimeoutMs);
-
-            copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol, ccConfig);
-            copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
-            copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
-        }
-
-        client = new DatabaseClient(copycatMessagingProtocol);
-        clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
-
-        // Starts copycat if this node is a participant
-        // of the Raft cluster.
-        if (copycat != null) {
-            copycat.start().get();
-
-            executor =
-                    newSingleThreadScheduledExecutor(namedThreads("onos-db-heartbeat-%d"));
-            executor.scheduleWithFixedDelay(new LeaderAdvertiser(), 5, 2, TimeUnit.SECONDS);
-
-        }
-
-        client.waitForLeader();
-
-        // Try and list the tables to verify database manager is
-        // in a state where it can serve requests.
-        tryTableListing();
-
-        log.info("Started.");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        if (executor != null) {
-            executor.shutdownNow();
-        }
-        clusterService.removeListener(clusterEventListener);
-        // TODO: ClusterCommunicationService must support more than one
-        // handler per message subject.
-        clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
-        if (copycat != null) {
-            copycat.stop();
-        }
-        log.info("Stopped.");
-    }
-
-    private void waitForClusterQuorum() {
-        // note: from this point beyond, clusterConfig requires synchronization
-        clusterEventLatch = new CountDownLatch(1);
-        clusterEventListener = new InternalClusterEventListener();
-        clusterService.addListener(clusterEventListener);
-
-        final int raftClusterSize = clusterConfig.getMembers().size();
-        final int raftClusterQuorumSize = (int) (Math.floor(raftClusterSize / 2)) + 1;
-        if (clusterService.getNodes().size() < raftClusterQuorumSize) {
-            // current cluster size smaller then expected
-            try {
-                final int waitTimeSec = 120;
-                log.info("Waiting for a maximum of {}s for raft cluster quorum to boot up...", waitTimeSec);
-                if (!clusterEventLatch.await(waitTimeSec, TimeUnit.SECONDS)) {
-                    log.info("Starting with {}/{} nodes cluster",
-                             clusterService.getNodes().size(),
-                             raftClusterSize);
-                }
-            } catch (InterruptedException e) {
-                log.info("Interrupted waiting for raft quorum.", e);
-            }
-        }
-    }
-
-    private void tryTableListing() throws InterruptedException {
-        int retries = 0;
-        do {
-            try {
-                listTables();
-                return;
-            } catch (DatabaseException.Timeout e) {
-                log.debug("Failed to listTables. Will retry...", e);
-            } catch (DatabaseException e) {
-                log.debug("Failed to listTables. Will retry later...", e);
-                Thread.sleep(RETRY_MS);
-            }
-            if (retries == ACTIVATE_MAX_RETRIES) {
-                log.error("Failed to listTables after multiple attempts. Giving up.");
-                // Exiting hoping things will be fixed by the time
-                // others start using the service
-                return;
-            }
-            retries++;
-        } while (true);
-    }
-
-    @Override
-    public boolean createTable(String name) {
-        return client.createTable(name);
-    }
-
-    @Override
-    public boolean createTable(String name, int ttlMillis) {
-        return client.createTable(name, ttlMillis);
-    }
-
-    @Override
-    public void dropTable(String name) {
-        client.dropTable(name);
-    }
-
-    @Override
-    public void dropAllTables() {
-        client.dropAllTables();
-    }
-
-    @Override
-    public Set<String> listTables() {
-        return client.listTables();
-    }
-
-    @Override
-    public VersionedValue get(String tableName, String key) {
-        BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
-        ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
-        if (readResult.status().equals(ReadStatus.OK)) {
-            return readResult.value();
-        }
-        throw new DatabaseException("get failed due to status: " + readResult.status());
-    }
-
-    @Override
-    public Map<String, VersionedValue> getAll(String tableName) {
-        return client.getAll(tableName);
-    }
-
-
-    @Override
-    public BatchReadResult batchRead(BatchReadRequest batchRequest) {
-        return new BatchReadResult(client.batchRead(batchRequest));
-    }
-
-    @Override
-    public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
-        return new BatchWriteResult(client.batchWrite(batchRequest));
-    }
-
-    @Override
-    public VersionedValue put(String tableName, String key, byte[] value) {
-        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
-        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
-        if (writeResult.status().equals(WriteStatus.OK)) {
-            return writeResult.previousValue();
-        }
-        throw new DatabaseException("put failed due to status: " + writeResult.status());
-    }
-
-    @Override
-    public boolean putIfAbsent(String tableName, String key, byte[] value) {
-        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
-                    .putIfAbsent(tableName, key, value).build();
-        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
-        if (writeResult.status().equals(WriteStatus.OK)) {
-            return true;
-        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
-            return false;
-        }
-        throw new DatabaseException("putIfAbsent failed due to status: "
-                    + writeResult.status());
-    }
-
-    @Override
-    public boolean putIfVersionMatches(String tableName, String key,
-            byte[] value, long version) {
-        BatchWriteRequest batchRequest =
-                new BatchWriteRequest.Builder()
-                    .putIfVersionMatches(tableName, key, value, version).build();
-        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
-        if (writeResult.status().equals(WriteStatus.OK)) {
-            return true;
-        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
-            return false;
-        }
-        throw new DatabaseException("putIfVersionMatches failed due to status: "
-                    + writeResult.status());
-    }
-
-    @Override
-    public boolean putIfValueMatches(String tableName, String key,
-            byte[] oldValue, byte[] newValue) {
-        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
-                    .putIfValueMatches(tableName, key, oldValue, newValue).build();
-        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
-        if (writeResult.status().equals(WriteStatus.OK)) {
-            return true;
-        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
-            return false;
-        }
-        throw new DatabaseException("putIfValueMatches failed due to status: "
-                    + writeResult.status());
-    }
-
-    @Override
-    public VersionedValue remove(String tableName, String key) {
-        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
-                    .remove(tableName, key).build();
-        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
-        if (writeResult.status().equals(WriteStatus.OK)) {
-            return writeResult.previousValue();
-        }
-        throw new DatabaseException("remove failed due to status: "
-                    + writeResult.status());
-    }
-
-    @Override
-    public boolean removeIfVersionMatches(String tableName, String key,
-            long version) {
-        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
-                    .removeIfVersionMatches(tableName, key, version).build();
-        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
-        if (writeResult.status().equals(WriteStatus.OK)) {
-            return true;
-        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
-            return false;
-        }
-        throw new DatabaseException("removeIfVersionMatches failed due to status: "
-                    + writeResult.status());
-    }
-
-    @Override
-    public boolean removeIfValueMatches(String tableName, String key,
-            byte[] value) {
-        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
-                    .removeIfValueMatches(tableName, key, value).build();
-        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
-        if (writeResult.status().equals(WriteStatus.OK)) {
-            return true;
-        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
-            return false;
-        }
-        throw new DatabaseException("removeIfValueMatches failed due to status: "
-                    + writeResult.status());
-    }
-
-    @Override
-    public void addMember(final ControllerNode node) {
-        final TcpMember tcpMember = new TcpMember(node.ip().toString(),
-                                                  node.tcpPort());
-        log.info("{} was added to the cluster", tcpMember);
-        synchronized (clusterConfig) {
-            clusterConfig.addRemoteMember(tcpMember);
-        }
-    }
-
-    @Override
-    public Optional<ControllerNode> leader() {
-        if (copycat != null) {
-            if (copycat.isLeader()) {
-                return Optional.of(clusterService.getLocalNode());
-            }
-            Member leader = copycat.cluster().remoteMember(copycat.leader());
-            return Optional.ofNullable(getNodeIdFromMember(leader));
-        }
-        return Optional.ofNullable(getNodeIdFromMember(client.getCurrentLeader()));
-    }
-
-    private final class LeaderAdvertiser implements Runnable {
-
-        @Override
-        public void run() {
-            try {
-                LeaderElectEvent event = myLeaderEvent;
-                if (event != null) {
-                    log.trace("Broadcasting RAFT_LEADER_ELECTION_EVENT: {}", event);
-                    // This node just became the leader.
-                    clusterCommunicator.broadcastIncludeSelf(
-                            new ClusterMessage(
-                                    clusterService.getLocalNode().id(),
-                                    RAFT_LEADER_ELECTION_EVENT,
-                                    ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
-                }
-            } catch (Exception e) {
-                log.debug("LeaderAdvertiser failed with exception", e);
-            }
-        }
-
-    }
-
-    private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
-        @Override
-        public void handle(LeaderElectEvent event) {
-            log.debug("Received LeaderElectEvent: {}", event);
-            if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
-                log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
-                myLeaderEvent = event;
-                // This node just became the leader.
-                clusterCommunicator.broadcastIncludeSelf(
-                        new ClusterMessage(
-                                clusterService.getLocalNode().id(),
-                                RAFT_LEADER_ELECTION_EVENT,
-                                ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
-            } else {
-                if (myLeaderEvent != null) {
-                    log.debug("This node is no longer the Leader");
-                }
-                myLeaderEvent = null;
-            }
-        }
-    }
-
-    private final class InternalClusterEventListener
-    implements ClusterEventListener {
-
-        @Override
-        public void event(ClusterEvent event) {
-            // TODO: Not every node should be part of the consensus ring.
-
-            final ControllerNode node = event.subject();
-            final TcpMember tcpMember = new TcpMember(node.ip().toString(),
-                    node.tcpPort());
-
-            switch (event.type()) {
-            case INSTANCE_ACTIVATED:
-            case INSTANCE_ADDED:
-                if (autoAddMember) {
-                    synchronized (clusterConfig) {
-                        if (!clusterConfig.getMembers().contains(tcpMember)) {
-                            log.info("{} was automatically added to the cluster", tcpMember);
-                            clusterConfig.addRemoteMember(tcpMember);
-                        }
-                    }
-                }
-                break;
-            case INSTANCE_DEACTIVATED:
-            case INSTANCE_REMOVED:
-                if (autoAddMember) {
-                    Set<DefaultControllerNode> members
-                    = tabletMembers.getOrDefault(DEFAULT_TABLET,
-                            Collections.emptySet());
-                    // remove only if not the initial members
-                    if (!members.contains(node)) {
-                        synchronized (clusterConfig) {
-                            if (clusterConfig.getMembers().contains(tcpMember)) {
-                                log.info("{} was automatically removed from the cluster", tcpMember);
-                                clusterConfig.removeRemoteMember(tcpMember);
-                            }
-                        }
-                    }
-                }
-                break;
-            default:
-                break;
-            }
-            if (copycat != null) {
-                log.debug("Current cluster: {}", copycat.cluster());
-            }
-            clusterEventLatch.countDown();
-        }
-
-    }
-
-    @Override
-    public void removeMember(final ControllerNode node) {
-        final TcpMember tcpMember = new TcpMember(node.ip().toString(),
-                                                  node.tcpPort());
-      log.info("{} was removed from the cluster", tcpMember);
-      synchronized (clusterConfig) {
-          clusterConfig.removeRemoteMember(tcpMember);
-      }
-    }
-
-    @Override
-    public Collection<ControllerNode> listMembers() {
-        if (copycat == null) {
-            return ImmutableList.of();
-        }
-        Set<ControllerNode> members = new HashSet<>();
-        for (Member member : copycat.cluster().members()) {
-            ControllerNode node = getNodeIdFromMember(member);
-            if (node == null) {
-                log.info("No Node found for {}", member);
-                continue;
-            }
-            members.add(node);
-        }
-        return members;
-    }
-
-    private ControllerNode getNodeIdFromMember(Member member) {
-        if (member instanceof TcpMember) {
-            final TcpMember tcpMember = (TcpMember) member;
-            // TODO assuming tcpMember#host to be IP address,
-            // but if not lookup DNS, etc. first
-            IpAddress ip = IpAddress.valueOf(tcpMember.host());
-            int tcpPort = tcpMember.port();
-            for (ControllerNode node : clusterService.getNodes()) {
-                if (node.ip().equals(ip) &&
-                    node.tcpPort() == tcpPort) {
-                    return node;
-                }
-            }
-        }
-        return null;
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseProtocolService.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseProtocolService.java
deleted file mode 100644
index 7d9aa0a..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseProtocolService.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.spi.protocol.Protocol;
-
-// interface required for connecting DatabaseManager + ClusterMessagingProtocol
-// TODO: Consider changing ClusterMessagingProtocol to non-Service class
-public interface DatabaseProtocolService extends Protocol<TcpMember> {
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseStateMachine.java
deleted file mode 100644
index 150522c..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseStateMachine.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static org.onlab.util.Tools.namedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
-
-import net.kuujo.copycat.Command;
-import net.kuujo.copycat.Query;
-import net.kuujo.copycat.StateMachine;
-
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.service.BatchReadRequest;
-import org.onosproject.store.service.BatchWriteRequest;
-import org.onosproject.store.service.ReadRequest;
-import org.onosproject.store.service.ReadResult;
-import org.onosproject.store.service.ReadStatus;
-import org.onosproject.store.service.VersionedValue;
-import org.onosproject.store.service.WriteRequest;
-import org.onosproject.store.service.WriteResult;
-import org.onosproject.store.service.WriteStatus;
-import org.slf4j.Logger;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * StateMachine whose transitions are coordinated/replicated
- * by Raft consensus.
- * Each Raft cluster member has a instance of this state machine that is
- * independently updated in lock step once there is consensus
- * on the next transition.
- */
-public class DatabaseStateMachine implements StateMachine {
-
-    private final Logger log = getLogger(getClass());
-
-    private final ExecutorService updatesExecutor =
-            Executors.newSingleThreadExecutor(namedThreads("onos-db-statemachine-updates"));
-
-    // message subject for database update notifications.
-    public static final MessageSubject DATABASE_UPDATE_EVENTS =
-            new MessageSubject("database-update-events");
-
-    private final Set<DatabaseUpdateEventListener> listeners = Sets.newIdentityHashSet();
-
-    // durable internal state of the database.
-    private State state = new State();
-
-    // TODO make this configurable
-    private boolean compressSnapshot = true;
-
-    @Command
-    public boolean createTable(String tableName) {
-        TableMetadata metadata = new TableMetadata(tableName);
-        return createTable(metadata);
-    }
-
-    @Command
-    public boolean createTable(String tableName, Integer ttlMillis) {
-        TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
-        return createTable(metadata);
-    }
-
-    private boolean createTable(TableMetadata metadata) {
-        Map<String, VersionedValue> existingTable = state.getTable(metadata.tableName());
-        if (existingTable != null) {
-            return false;
-        }
-        state.createTable(metadata);
-
-        updatesExecutor.submit(new Runnable() {
-            @Override
-            public void run() {
-                for (DatabaseUpdateEventListener listener : listeners) {
-                    listener.tableCreated(metadata);
-                }
-            }
-        });
-
-        return true;
-    }
-
-    @Command
-    public boolean dropTable(String tableName) {
-        if (state.removeTable(tableName)) {
-
-            updatesExecutor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    for (DatabaseUpdateEventListener listener : listeners) {
-                        listener.tableDeleted(tableName);
-                    }
-                }
-            });
-
-            return true;
-        }
-        return false;
-    }
-
-    @Command
-    public boolean dropAllTables() {
-        Set<String> tableNames = state.getTableNames();
-        state.removeAllTables();
-
-        updatesExecutor.submit(new Runnable() {
-            @Override
-            public void run() {
-                for (DatabaseUpdateEventListener listener : listeners) {
-                    for (String tableName : tableNames) {
-                        listener.tableDeleted(tableName);
-                    }
-                }
-            }
-        });
-
-        return true;
-    }
-
-    @Query
-    public Set<String> listTables() {
-        return ImmutableSet.copyOf(state.getTableNames());
-    }
-
-    @Query
-    public List<ReadResult> read(BatchReadRequest batchRequest) {
-        List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
-        for (ReadRequest request : batchRequest.getAsList()) {
-            Map<String, VersionedValue> table = state.getTable(request.tableName());
-            if (table == null) {
-                results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
-                continue;
-            }
-            VersionedValue value = VersionedValue.copy(table.get(request.key()));
-            results.add(new ReadResult(ReadStatus.OK, request.tableName(), request.key(), value));
-        }
-        return results;
-    }
-
-    @Query
-    public Map<String, VersionedValue> getAll(String tableName) {
-        return ImmutableMap.copyOf(state.getTable(tableName));
-    }
-
-
-    WriteStatus checkIfApplicable(WriteRequest request, VersionedValue value) {
-
-        switch (request.type()) {
-        case PUT:
-            return WriteStatus.OK;
-
-        case PUT_IF_ABSENT:
-            if (value == null) {
-                return WriteStatus.OK;
-            }
-            return WriteStatus.PRECONDITION_VIOLATION;
-
-        case PUT_IF_VALUE:
-        case REMOVE_IF_VALUE:
-            if (value != null && Arrays.equals(value.value(), request.oldValue())) {
-                return WriteStatus.OK;
-            }
-            return WriteStatus.PRECONDITION_VIOLATION;
-
-        case PUT_IF_VERSION:
-        case REMOVE_IF_VERSION:
-            if (value != null && request.previousVersion() == value.version()) {
-                return WriteStatus.OK;
-            }
-            return WriteStatus.PRECONDITION_VIOLATION;
-
-        case REMOVE:
-            return WriteStatus.OK;
-
-        default:
-            break;
-        }
-        log.error("Should never reach here {}", request);
-        return WriteStatus.ABORTED;
-    }
-
-    @Command
-    public List<WriteResult> write(BatchWriteRequest batchRequest) {
-
-        // applicability check
-        boolean abort = false;
-        List<WriteResult> results = new ArrayList<>(batchRequest.batchSize());
-
-        for (WriteRequest request : batchRequest.getAsList()) {
-            Map<String, VersionedValue> table = state.getTable(request.tableName());
-            if (table == null) {
-                results.add(new WriteResult(WriteStatus.NO_SUCH_TABLE, null));
-                abort = true;
-                continue;
-            }
-            final VersionedValue value = table.get(request.key());
-            WriteStatus result = checkIfApplicable(request, value);
-            results.add(new WriteResult(result, value));
-            if (result != WriteStatus.OK) {
-                abort = true;
-            }
-        }
-
-        if (abort) {
-            for (int i = 0; i < results.size(); ++i) {
-                if (results.get(i).status() == WriteStatus.OK) {
-                    results.set(i, new WriteResult(WriteStatus.ABORTED, null));
-                }
-            }
-            return results;
-        }
-
-        List<TableModificationEvent> tableModificationEvents = Lists.newLinkedList();
-
-        // apply changes
-        for (WriteRequest request : batchRequest.getAsList()) {
-            Map<String, VersionedValue> table = state.getTable(request.tableName());
-
-            TableModificationEvent tableModificationEvent = null;
-            // FIXME: If this method could be called by multiple thread,
-            // synchronization scope is wrong.
-            // Whole function including applicability check needs to be protected.
-            // Confirm copycat's thread safety requirement for StateMachine
-            // TODO: If we need isolation, we need to block reads also
-            synchronized (table) {
-                switch (request.type()) {
-                case PUT:
-                case PUT_IF_ABSENT:
-                case PUT_IF_VALUE:
-                case PUT_IF_VERSION:
-                    VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
-                    VersionedValue previousValue = table.put(request.key(), newValue);
-                    WriteResult putResult = new WriteResult(WriteStatus.OK, previousValue);
-                    results.add(putResult);
-                    tableModificationEvent = (previousValue == null) ?
-                            TableModificationEvent.rowAdded(request.tableName(), request.key(), newValue) :
-                            TableModificationEvent.rowUpdated(request.tableName(), request.key(), newValue);
-                    break;
-
-                case REMOVE:
-                case REMOVE_IF_VALUE:
-                case REMOVE_IF_VERSION:
-                    VersionedValue removedValue = table.remove(request.key());
-                    WriteResult removeResult = new WriteResult(WriteStatus.OK, removedValue);
-                    results.add(removeResult);
-                    if (removedValue != null) {
-                        tableModificationEvent =
-                                TableModificationEvent.rowDeleted(request.tableName(), request.key(), removedValue);
-                    }
-                    break;
-
-                default:
-                    log.error("Invalid WriteRequest type {}", request.type());
-                    break;
-                }
-            }
-
-            if (tableModificationEvent != null) {
-                tableModificationEvents.add(tableModificationEvent);
-            }
-        }
-
-        // notify listeners of table mod events.
-
-        updatesExecutor.submit(new Runnable() {
-            @Override
-            public void run() {
-                for (DatabaseUpdateEventListener listener : listeners) {
-                    for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
-                        log.trace("Publishing table modification event: {}", tableModificationEvent);
-                        listener.tableModified(tableModificationEvent);
-                    }
-                }
-            }
-        });
-
-        return results;
-    }
-
-    public static class State {
-
-        private final Map<String, TableMetadata> tableMetadata = Maps.newHashMap();
-        private final Map<String, Map<String, VersionedValue>> tableData = Maps.newHashMap();
-        private long versionCounter = 1;
-
-        Map<String, VersionedValue> getTable(String tableName) {
-            return tableData.get(tableName);
-        }
-
-        void createTable(TableMetadata metadata) {
-            tableMetadata.put(metadata.tableName, metadata);
-            tableData.put(metadata.tableName, Maps.newHashMap());
-        }
-
-        TableMetadata getTableMetadata(String tableName) {
-            return tableMetadata.get(tableName);
-        }
-
-        long nextVersion() {
-            return versionCounter++;
-        }
-
-        Set<String> getTableNames() {
-            return ImmutableSet.copyOf(tableMetadata.keySet());
-        }
-
-
-        boolean removeTable(String tableName) {
-            if (!tableMetadata.containsKey(tableName)) {
-                return false;
-            }
-            tableMetadata.remove(tableName);
-            tableData.remove(tableName);
-            return true;
-        }
-
-        void removeAllTables() {
-            tableMetadata.clear();
-            tableData.clear();
-        }
-    }
-
-    public static class TableMetadata {
-        private final String tableName;
-        private final boolean expireOldEntries;
-        private final int ttlMillis;
-
-        public TableMetadata(String tableName) {
-            this.tableName = tableName;
-            this.expireOldEntries = false;
-            this.ttlMillis = Integer.MAX_VALUE;
-
-        }
-
-        public TableMetadata(String tableName, int ttlMillis) {
-            this.tableName = tableName;
-            this.expireOldEntries = true;
-            this.ttlMillis = ttlMillis;
-        }
-
-        public String tableName() {
-            return tableName;
-        }
-
-        public boolean expireOldEntries() {
-            return expireOldEntries;
-        }
-
-        public int ttlMillis() {
-            return ttlMillis;
-        }
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(getClass())
-                    .add("tableName", tableName)
-                    .add("expireOldEntries", expireOldEntries)
-                    .add("ttlMillis", ttlMillis)
-                    .toString();
-        }
-    }
-
-    @Override
-    public byte[] takeSnapshot() {
-        try {
-            if (compressSnapshot) {
-                byte[] input = DB_SERIALIZER.encode(state);
-                ByteArrayOutputStream comp = new ByteArrayOutputStream(input.length);
-                DeflaterOutputStream compressor = new DeflaterOutputStream(comp);
-                compressor.write(input, 0, input.length);
-                compressor.close();
-                return comp.toByteArray();
-            } else {
-                return DB_SERIALIZER.encode(state);
-            }
-        } catch (Exception e) {
-            log.error("Failed to take snapshot", e);
-            throw new SnapshotException(e);
-        }
-    }
-
-    @Override
-    public void installSnapshot(byte[] data) {
-        try {
-            if (compressSnapshot) {
-                ByteArrayInputStream in = new ByteArrayInputStream(data);
-                InflaterInputStream decompressor = new InflaterInputStream(in);
-                this.state = DB_SERIALIZER.decode(decompressor);
-            } else {
-                this.state = DB_SERIALIZER.decode(data);
-            }
-
-            updatesExecutor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    for (DatabaseUpdateEventListener listener : listeners) {
-                        listener.snapshotInstalled(state);
-                    }
-                }
-            });
-
-        } catch (Exception e) {
-            log.error("Failed to install from snapshot", e);
-            throw new SnapshotException(e);
-        }
-    }
-
-    /**
-     * Adds specified DatabaseUpdateEventListener.
-     * @param listener listener to add
-     */
-    public void addEventListener(DatabaseUpdateEventListener listener) {
-        listeners.add(listener);
-    }
-
-    /**
-     * Removes specified DatabaseUpdateEventListener.
-     * @param listener listener to remove
-     */
-    public void removeEventListener(DatabaseUpdateEventListener listener) {
-        listeners.remove(listener);
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseUpdateEventListener.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseUpdateEventListener.java
deleted file mode 100644
index 5dd8651..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseUpdateEventListener.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.service.impl;
-
-import org.onosproject.store.service.impl.DatabaseStateMachine.TableMetadata;
-
-/**
- * Interface of database update event listeners.
- */
-public interface DatabaseUpdateEventListener {
-
-    /**
-     * Notifies listeners of a table modified event.
-     * @param event table modification event.
-     */
-    public void tableModified(TableModificationEvent event);
-
-    /**
-     * Notifies listeners of a table created event.
-     * @param metadata metadata for the created table.
-     */
-    public void tableCreated(TableMetadata metadata);
-
-    /**
-     * Notifies listeners of a table deleted event.
-     * @param tableName name of the table deleted
-     */
-    public void tableDeleted(String tableName);
-
-    /**
-     * Notifies listeners of a snapshot installation event.
-     * @param snapshotState installed snapshot state.
-     */
-    public void snapshotInstalled(DatabaseStateMachine.State snapshotState);
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DistributedLock.java
deleted file mode 100644
index 5717d74..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DistributedLock.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static com.google.common.base.Verify.verify;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.joda.time.DateTime;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.store.service.DatabaseException;
-import org.onosproject.store.service.DatabaseService;
-import org.onosproject.store.service.Lock;
-import org.onosproject.store.service.VersionedValue;
-import org.slf4j.Logger;
-
-/**
- * A distributed lock implementation.
- */
-public class DistributedLock implements Lock {
-
-    private final Logger log = getLogger(getClass());
-
-    private final DistributedLockManager lockManager;
-    private final DatabaseService databaseService;
-    private final String path;
-    private DateTime lockExpirationTime;
-    private AtomicBoolean isLocked = new AtomicBoolean(false);
-    private volatile long epoch = 0;
-    private byte[] lockId;
-
-    public DistributedLock(
-            String path,
-            DatabaseService databaseService,
-            ClusterService clusterService,
-            DistributedLockManager lockManager) {
-
-        this.path = path;
-        this.databaseService = databaseService;
-        this.lockManager = lockManager;
-        this.lockId =
-                (UUID.randomUUID().toString() + "::" +
-                        clusterService.getLocalNode().id().toString()).
-                        getBytes(StandardCharsets.UTF_8);
-    }
-
-    @Override
-    public String path() {
-        return path;
-    }
-
-    @Override
-    public void lock(int leaseDurationMillis) throws InterruptedException {
-        try {
-            lockAsync(leaseDurationMillis).get();
-        } catch (ExecutionException e) {
-            throw new DatabaseException(e);
-        }
-    }
-
-    @Override
-    public CompletableFuture<Void> lockAsync(int leaseDurationMillis) {
-        try {
-            if (isLocked() || tryLock(leaseDurationMillis)) {
-                return CompletableFuture.<Void>completedFuture(null);
-            }
-            return lockManager.lockIfAvailable(this, leaseDurationMillis);
-        } catch (DatabaseException e) {
-            CompletableFuture<Void> lockFuture = new CompletableFuture<>();
-            lockFuture.completeExceptionally(e);
-            return lockFuture;
-        }
-    }
-
-    @Override
-    public boolean tryLock(int leaseDurationMillis) {
-        if (databaseService.putIfAbsent(
-                DistributedLockManager.ONOS_LOCK_TABLE_NAME,
-                path,
-                lockId)) {
-            VersionedValue vv =
-                    databaseService.get(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path);
-            verify(Arrays.equals(vv.value(), lockId));
-            epoch = vv.version();
-            isLocked.set(true);
-            lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public boolean tryLock(
-            int waitTimeMillis,
-            int leaseDurationMillis) throws InterruptedException {
-        if (isLocked() || tryLock(leaseDurationMillis)) {
-            return true;
-        }
-
-        CompletableFuture<Void> future =
-                lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
-        try {
-            future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
-            return true;
-        } catch (ExecutionException e) {
-            throw new DatabaseException(e);
-        } catch (TimeoutException e) {
-            log.debug("Timed out waiting to acquire lock for {}", path);
-            return false;
-        }
-    }
-
-    @Override
-    public boolean isLocked() {
-        if (isLocked.get()) {
-            // We rely on local information to check
-            // if the lock expired.
-            // This should should make this call
-            // light weight, while still retaining the
-            // safety guarantees.
-            if (DateTime.now().isAfter(lockExpirationTime)) {
-                isLocked.set(false);
-                return false;
-            } else {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public long epoch() {
-        return epoch;
-    }
-
-    @Override
-    public void unlock() {
-        if (!isLocked()) {
-            return;
-        } else {
-            if (databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId)) {
-                isLocked.set(false);
-            }
-        }
-    }
-
-    @Override
-    public boolean extendExpiration(int leaseDurationMillis) {
-        if (!isLocked()) {
-            log.warn("Ignoring request to extend expiration for lock {}."
-                    + " ExtendExpiration must be called for locks that are already acquired.", path);
-            return false;
-        }
-
-        if (databaseService.putIfValueMatches(
-                DistributedLockManager.ONOS_LOCK_TABLE_NAME,
-                path,
-                lockId,
-                lockId)) {
-            lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
-            log.debug("Succeeded in extending lock {} expiration time to {}", lockExpirationTime);
-            return true;
-        } else {
-            log.info("Failed to extend expiration for {}", path);
-            return false;
-        }
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DistributedLockManager.java
deleted file mode 100644
index c131b6e..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DistributedLockManager.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static org.onlab.util.Tools.namedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.joda.time.DateTime;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.service.DatabaseAdminService;
-import org.onosproject.store.service.DatabaseException;
-import org.onosproject.store.service.DatabaseService;
-import org.onosproject.store.service.Lock;
-import org.onosproject.store.service.LockEventListener;
-import org.onosproject.store.service.LockService;
-import org.slf4j.Logger;
-
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Multimaps;
-
-@Component(immediate = false)
-@Service
-public class DistributedLockManager implements LockService {
-
-    private static final ExecutorService THREAD_POOL =
-            Executors.newCachedThreadPool(namedThreads("onos-lock-manager-%d"));
-
-    private final Logger log = getLogger(getClass());
-
-    public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
-
-    public static final int DEAD_LOCK_TIMEOUT_MS = 5000;
-
-    private final ListMultimap<String, LockRequest> locksToAcquire =
-                Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterCommunicationService clusterCommunicator;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private DatabaseAdminService databaseAdminService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private DatabaseService databaseService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterService clusterService;
-
-    @Activate
-    public void activate() {
-        try {
-            Set<String> tables = databaseAdminService.listTables();
-
-            if (!tables.contains(ONOS_LOCK_TABLE_NAME)) {
-                if (databaseAdminService.createTable(ONOS_LOCK_TABLE_NAME, DEAD_LOCK_TIMEOUT_MS)) {
-                    log.info("Created {} table.", ONOS_LOCK_TABLE_NAME);
-                }
-            }
-        } catch (DatabaseException e) {
-            log.error("DistributedLockManager#activate failed.", e);
-        }
-
-         clusterCommunicator.addSubscriber(
-                 DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
-                 new LockEventMessageListener());
-
-         log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
-        locksToAcquire.clear();
-        log.info("Stopped.");
-    }
-
-    @Override
-    public Lock create(String path) {
-        return new DistributedLock(path, databaseService, clusterService, this);
-    }
-
-    @Override
-    public void addListener(LockEventListener listener) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void removeListener(LockEventListener listener) {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * Attempts to acquire the lock as soon as it becomes available.
-     * @param lock lock to acquire.
-     * @param waitTimeMillis maximum time to wait before giving up.
-     * @param leaseDurationMillis the duration for which to acquire the lock initially.
-     * @return Future that can be blocked on until lock becomes available.
-     */
-    protected CompletableFuture<Void> lockIfAvailable(
-            Lock lock,
-            int waitTimeMillis,
-            int leaseDurationMillis) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        LockRequest request = new LockRequest(
-                lock,
-                leaseDurationMillis,
-                DateTime.now().plusMillis(waitTimeMillis),
-                future);
-        locksToAcquire.put(lock.path(), request);
-        return future;
-    }
-
-    /**
-     * Attempts to acquire the lock as soon as it becomes available.
-     * @param lock lock to acquire.
-     * @param leaseDurationMillis the duration for which to acquire the lock initially.
-     * @return Future lease expiration date.
-     */
-    protected CompletableFuture<Void> lockIfAvailable(
-            Lock lock,
-            int leaseDurationMillis) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        LockRequest request = new LockRequest(
-                lock,
-                leaseDurationMillis,
-                DateTime.now().plusYears(100),
-                future);
-        locksToAcquire.put(lock.path(), request);
-        return future;
-    }
-
-    private class LockEventMessageListener implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            TableModificationEvent event = ClusterMessagingProtocol.DB_SERIALIZER
-                    .decode(message.payload());
-            if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
-                    event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
-                THREAD_POOL.submit(new RetryLockTask(event.key()));
-            }
-        }
-    }
-
-    private class RetryLockTask implements Runnable {
-
-        private final String path;
-
-        public RetryLockTask(String path) {
-            this.path = path;
-        }
-
-        @Override
-        public void run() {
-            if (!locksToAcquire.containsKey(path)) {
-                return;
-            }
-
-            List<LockRequest> existingRequests = locksToAcquire.get(path);
-            if (existingRequests == null || existingRequests.isEmpty()) {
-                return;
-            }
-            log.info("Path {} is now available for locking. There are {} outstanding "
-                    + "requests for it.",
-                    path, existingRequests.size());
-
-            synchronized (existingRequests) {
-                Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
-                while (existingRequestIterator.hasNext()) {
-                    LockRequest request = existingRequestIterator.next();
-                    if (DateTime.now().isAfter(request.requestExpirationTime())) {
-                        // request expired.
-                        existingRequestIterator.remove();
-                    } else {
-                        if (request.lock().tryLock(request.leaseDurationMillis())) {
-                            request.future().complete(null);
-                            existingRequestIterator.remove();
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    private class LockRequest {
-
-        private final Lock lock;
-        private final DateTime requestExpirationTime;
-        private final int leaseDurationMillis;
-        private final CompletableFuture<Void> future;
-
-        public LockRequest(
-                Lock lock,
-                int leaseDurationMillis,
-                DateTime requestExpirationTime,
-                CompletableFuture<Void> future) {
-
-            this.lock = lock;
-            this.requestExpirationTime = requestExpirationTime;
-            this.leaseDurationMillis = leaseDurationMillis;
-            this.future = future;
-        }
-
-        public Lock lock() {
-            return lock;
-        }
-
-        public DateTime requestExpirationTime() {
-            return requestExpirationTime;
-        }
-
-        public int leaseDurationMillis() {
-            return leaseDurationMillis;
-        }
-
-        public CompletableFuture<Void> future() {
-            return future;
-        }
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/MapDBLog.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/MapDBLog.java
deleted file mode 100644
index 1859d99..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/MapDBLog.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verifyNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentNavigableMap;
-
-import net.kuujo.copycat.log.Entry;
-import net.kuujo.copycat.log.Log;
-import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
-
-import org.mapdb.Atomic;
-import org.mapdb.BTreeMap;
-import org.mapdb.DB;
-import org.mapdb.DBMaker;
-import org.mapdb.Serializer;
-import org.mapdb.TxBlock;
-import org.mapdb.TxMaker;
-import org.onosproject.store.serializers.StoreSerializer;
-import org.slf4j.Logger;
-
-/**
- * MapDB based log implementation.
- */
-public class MapDBLog implements Log {
-
-    private final Logger log = getLogger(getClass());
-
-    private final File dbFile;
-    private TxMaker txMaker;
-    private final StoreSerializer serializer;
-    private static final String LOG_NAME = "log";
-    private static final String SIZE_FIELD_NAME = "size";
-
-    private int cacheSize = 256;
-
-    public MapDBLog(String dbFileName, StoreSerializer serializer) {
-        this.dbFile = new File(dbFileName);
-        this.serializer = serializer;
-    }
-
-    @Override
-    public void open() throws IOException {
-        txMaker = DBMaker
-                .newFileDB(dbFile)
-                .mmapFileEnableIfSupported()
-                .cacheSize(cacheSize)
-                .makeTxMaker();
-        log.info("Raft log file: {}", dbFile.getCanonicalPath());
-    }
-
-    @Override
-    public void close() throws IOException {
-        assertIsOpen();
-        txMaker.close();
-        txMaker = null;
-    }
-
-    @Override
-    public boolean isOpen() {
-        return txMaker != null;
-    }
-
-    protected void assertIsOpen() {
-        checkState(isOpen(), "The log is not currently open.");
-    }
-
-    @Override
-    public long appendEntry(Entry entry) {
-        checkArgument(entry != null, "expecting non-null entry");
-        return appendEntries(entry).get(0);
-    }
-
-    @Override
-    public List<Long> appendEntries(Entry... entries) {
-        checkArgument(entries != null, "expecting non-null entries");
-        return appendEntries(Arrays.asList(entries));
-    }
-
-    @Override
-    public synchronized List<Long> appendEntries(List<Entry> entries) {
-        assertIsOpen();
-        checkArgument(entries != null, "expecting non-null entries");
-        final List<Long> indices = new ArrayList<>(entries.size());
-
-        txMaker.execute(new TxBlock() {
-            @Override
-            public void tx(DB db) {
-                BTreeMap<Long, byte[]> log = getLogMap(db);
-                Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
-                long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
-                long addedBytes = 0;
-                for (Entry entry : entries) {
-                    byte[] entryBytes = verifyNotNull(serializer.encode(entry),
-                                                      "Writing LogEntry %s failed", nextIndex);
-                    log.put(nextIndex, entryBytes);
-                    addedBytes += entryBytes.length;
-                    indices.add(nextIndex);
-                    nextIndex++;
-                }
-                size.addAndGet(addedBytes);
-            }
-        });
-
-        return indices;
-    }
-
-    @Override
-    public boolean containsEntry(long index) {
-        assertIsOpen();
-        DB db = txMaker.makeTx();
-        try {
-            BTreeMap<Long, byte[]> log = getLogMap(db);
-            return log.containsKey(index);
-        } finally {
-            db.close();
-        }
-    }
-
-    @Override
-    public void delete() throws IOException {
-        assertIsOpen();
-        txMaker.execute(new TxBlock() {
-            @Override
-            public void tx(DB db) {
-                BTreeMap<Long, byte[]> log = getLogMap(db);
-                Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
-                log.clear();
-                size.set(0);
-            }
-        });
-    }
-
-    @Override
-    public <T extends Entry> T firstEntry() {
-        assertIsOpen();
-        DB db = txMaker.makeTx();
-        try {
-            BTreeMap<Long, byte[]> log = getLogMap(db);
-            return log.isEmpty() ? null : verifyNotNull(decodeEntry(log.firstEntry().getValue()));
-        } finally {
-            db.close();
-        }
-    }
-
-    @Override
-    public long firstIndex() {
-        assertIsOpen();
-        DB db = txMaker.makeTx();
-        try {
-            BTreeMap<Long, byte[]> log = getLogMap(db);
-            return log.isEmpty() ? 0 : log.firstKey();
-        } finally {
-            db.close();
-        }
-    }
-
-    private <T extends Entry> T decodeEntry(final byte[] bytes) {
-        if (bytes == null) {
-            return null;
-        }
-        return serializer.decode(bytes.clone());
-    }
-
-    @Override
-    public <T extends Entry> List<T> getEntries(long from, long to) {
-        assertIsOpen();
-        DB db = txMaker.makeTx();
-        try {
-            BTreeMap<Long, byte[]> log = getLogMap(db);
-            if (log.isEmpty()) {
-                throw new LogIndexOutOfBoundsException("Log is empty");
-            } else if (from < log.firstKey()) {
-                throw new LogIndexOutOfBoundsException("From index out of bounds.");
-            } else if (to > log.lastKey()) {
-                throw new LogIndexOutOfBoundsException("To index out of bounds.");
-            }
-            List<T> entries = new ArrayList<>((int) (to - from + 1));
-            for (long i = from; i <= to; i++) {
-                T entry = verifyNotNull(decodeEntry(log.get(i)), "LogEntry %s was null", i);
-                entries.add(entry);
-            }
-            return entries;
-        } finally {
-            db.close();
-        }
-    }
-
-    @Override
-    public <T extends Entry> T getEntry(long index) {
-        assertIsOpen();
-        DB db = txMaker.makeTx();
-        try {
-            BTreeMap<Long, byte[]> log = getLogMap(db);
-            byte[] entryBytes = log.get(index);
-            return entryBytes == null ? null : verifyNotNull(decodeEntry(entryBytes),
-                                                             "LogEntry %s was null", index);
-        } finally {
-            db.close();
-        }
-    }
-
-    @Override
-    public boolean isEmpty() {
-        assertIsOpen();
-        DB db = txMaker.makeTx();
-        try {
-            BTreeMap<Long, byte[]> log = getLogMap(db);
-            return log.isEmpty();
-        } finally {
-            db.close();
-        }
-    }
-
-    @Override
-    public <T extends Entry> T lastEntry() {
-        assertIsOpen();
-        DB db = txMaker.makeTx();
-        try {
-            BTreeMap<Long, byte[]> log = getLogMap(db);
-            return log.isEmpty() ? null : verifyNotNull(decodeEntry(log.lastEntry().getValue()));
-        } finally {
-            db.close();
-        }
-    }
-
-    @Override
-    public long lastIndex() {
-        assertIsOpen();
-        DB db = txMaker.makeTx();
-        try {
-            BTreeMap<Long, byte[]> log = getLogMap(db);
-            return log.isEmpty() ? 0 : log.lastKey();
-        } finally {
-            db.close();
-        }
-    }
-
-    @Override
-    public void removeAfter(long index) {
-        assertIsOpen();
-        txMaker.execute(new TxBlock() {
-            @Override
-            public void tx(DB db) {
-                BTreeMap<Long, byte[]> log = getLogMap(db);
-                Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
-                long removedBytes = 0;
-                ConcurrentNavigableMap<Long, byte[]> tailMap = log.tailMap(index, false);
-                Iterator<Map.Entry<Long, byte[]>> it = tailMap.entrySet().iterator();
-                while (it.hasNext()) {
-                    Map.Entry<Long, byte[]> entry = it.next();
-                    removedBytes += entry.getValue().length;
-                    it.remove();
-                }
-                size.addAndGet(-removedBytes);
-            }
-        });
-    }
-
-    @Override
-    public long size() {
-        assertIsOpen();
-        DB db = txMaker.makeTx();
-        try {
-            Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
-            return size.get();
-        } finally {
-            db.close();
-        }
-    }
-
-    @Override
-    public void sync() throws IOException {
-        assertIsOpen();
-    }
-
-    @Override
-    public void compact(long index, Entry entry) throws IOException {
-
-        assertIsOpen();
-        txMaker.execute(new TxBlock() {
-            @Override
-            public void tx(DB db) {
-                BTreeMap<Long, byte[]> log = getLogMap(db);
-                Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
-                ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
-                Iterator<Map.Entry<Long, byte[]>> it = headMap.entrySet().iterator();
-
-                long deletedBytes = 0;
-                while (it.hasNext()) {
-                    Map.Entry<Long, byte[]> e = it.next();
-                    deletedBytes += e.getValue().length;
-                    it.remove();
-                }
-                size.addAndGet(-deletedBytes);
-                byte[] entryBytes = verifyNotNull(serializer.encode(entry));
-                byte[] existingEntry = log.put(index, entryBytes);
-                if (existingEntry != null) {
-                    size.addAndGet(entryBytes.length - existingEntry.length);
-                } else {
-                    size.addAndGet(entryBytes.length);
-                }
-                db.compact();
-            }
-        });
-    }
-
-    private BTreeMap<Long, byte[]> getLogMap(DB db) {
-        return db.createTreeMap(LOG_NAME)
-                    .valuesOutsideNodesEnable()
-                    .keySerializerWrap(Serializer.LONG)
-                    .valueSerializer(Serializer.BYTE_ARRAY)
-                    .makeOrGet();
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/SnapshotException.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/SnapshotException.java
deleted file mode 100644
index 6b1dc8c..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/SnapshotException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import org.onosproject.store.service.DatabaseException;
-
-/**
- * Exception that indicates a problem with the state machine snapshotting.
- */
-@SuppressWarnings("serial")
-public class SnapshotException extends DatabaseException {
-    public SnapshotException(Throwable t) {
-        super(t);
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TableModificationEvent.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/TableModificationEvent.java
deleted file mode 100644
index 1802e51..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TableModificationEvent.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import org.onosproject.store.service.VersionedValue;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * A table modification event.
- */
-public final class TableModificationEvent {
-
-    /**
-     * Type of table modification event.
-     */
-    public enum Type {
-        ROW_ADDED,
-        ROW_DELETED,
-        ROW_UPDATED
-    }
-
-    private final String tableName;
-    private final String key;
-    private final VersionedValue value;
-    private final Type type;
-
-    /**
-     * Creates a new row deleted table modification event.
-     * @param tableName table name.
-     * @param key row key
-     * @param value value associated with the key when it was deleted.
-     * @return table modification event.
-     */
-    public static TableModificationEvent rowDeleted(String tableName, String key, VersionedValue value) {
-        return new TableModificationEvent(tableName, key, value, Type.ROW_DELETED);
-    }
-
-    /**
-     * Creates a new row added table modification event.
-     * @param tableName table name.
-     * @param key row key
-     * @param value value associated with the key
-     * @return table modification event.
-     */
-    public static TableModificationEvent rowAdded(String tableName, String key, VersionedValue value) {
-        return new TableModificationEvent(tableName, key, value, Type.ROW_ADDED);
-    }
-
-    /**
-     * Creates a new row updated table modification event.
-     * @param tableName table name.
-     * @param key row key
-     * @param newValue value
-     * @return table modification event.
-     */
-    public static TableModificationEvent rowUpdated(String tableName, String key, VersionedValue newValue) {
-        return new TableModificationEvent(tableName, key, newValue, Type.ROW_UPDATED);
-    }
-
-    private TableModificationEvent(String tableName, String key, VersionedValue value, Type type) {
-        this.tableName = tableName;
-        this.key = key;
-        this.value = value;
-        this.type = type;
-    }
-
-    /**
-     * Returns name of table this event is for.
-     * @return table name
-     */
-    public String tableName() {
-        return tableName;
-    }
-
-    /**
-     * Returns the row key this event is for.
-     * @return row key
-     */
-    public String key() {
-        return key;
-    }
-
-    /**
-     * Returns the value associated with the key. If the event for a deletion, this
-     * method returns value that was deleted.
-     * @return row value
-     */
-    public VersionedValue value() {
-        return value;
-    }
-
-    /**
-     * Returns the type of table modification event.
-     * @return event type.
-     */
-    public Type type() {
-        return type;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(getClass())
-                .add("type", type)
-                .add("tableName", tableName)
-                .add("key", key)
-                .add("version", value.version())
-                .toString();
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TabletDefinitionStore.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/TabletDefinitionStore.java
deleted file mode 100644
index 3a2560a..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TabletDefinitionStore.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onlab.packet.IpAddress;
-import org.slf4j.Logger;
-
-import com.fasterxml.jackson.core.JsonEncoding;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * Allows for reading and writing tablet definition as a JSON file.
- */
-public class TabletDefinitionStore {
-
-    private final Logger log = getLogger(getClass());
-
-    private final File file;
-
-    /**
-     * Creates a reader/writer of the tablet definition file.
-     *
-     * @param filePath location of the definition file
-     */
-    public TabletDefinitionStore(String filePath) {
-        file = new File(filePath);
-    }
-
-    /**
-     * Creates a reader/writer of the tablet definition file.
-     *
-     * @param filePath location of the definition file
-     */
-    public TabletDefinitionStore(File filePath) {
-        file = checkNotNull(filePath);
-    }
-
-    /**
-     * Returns the Map from tablet name to set of initial member nodes.
-     *
-     * @return Map from tablet name to set of initial member nodes
-     * @throws IOException when I/O exception of some sort has occurred.
-     */
-    public Map<String, Set<DefaultControllerNode>> read() throws IOException {
-
-        final Map<String, Set<DefaultControllerNode>> tablets = new HashMap<>();
-
-        final ObjectMapper mapper = new ObjectMapper();
-        final ObjectNode tabletNodes = (ObjectNode) mapper.readTree(file);
-        final Iterator<Entry<String, JsonNode>> fields = tabletNodes.fields();
-        while (fields.hasNext()) {
-            final Entry<String, JsonNode> next = fields.next();
-            final Set<DefaultControllerNode> nodes = new HashSet<>();
-            final Iterator<JsonNode> elements = next.getValue().elements();
-            while (elements.hasNext()) {
-                ObjectNode nodeDef = (ObjectNode) elements.next();
-                nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
-                                                    IpAddress.valueOf(nodeDef.get("ip").asText()),
-                                                    nodeDef.get("tcpPort").asInt(9876)));
-            }
-
-            tablets.put(next.getKey(), nodes);
-        }
-        return tablets;
-    }
-
-    /**
-     * Updates the Map from tablet name to set of member nodes.
-     *
-     * @param tabletName name of the tablet to update
-     * @param nodes set of initial member nodes
-     * @throws IOException when I/O exception of some sort has occurred.
-     */
-    public void write(String tabletName, Set<DefaultControllerNode> nodes) throws IOException {
-        checkNotNull(tabletName);
-        checkArgument(tabletName.isEmpty(), "Tablet name cannot be empty");
-        // TODO should validate if tabletName is allowed in JSON
-
-        // load current
-        Map<String, Set<DefaultControllerNode>> config;
-        try {
-            config = read();
-        } catch (IOException e) {
-            log.info("Reading tablet config failed, assuming empty definition.");
-            config = new HashMap<>();
-        }
-        // update with specified
-        config.put(tabletName, nodes);
-
-        // write back to file
-        final ObjectMapper mapper = new ObjectMapper();
-        final ObjectNode tabletNodes = mapper.createObjectNode();
-        for (Entry<String, Set<DefaultControllerNode>> tablet : config.entrySet()) {
-            ArrayNode nodeDefs = mapper.createArrayNode();
-            tabletNodes.set(tablet.getKey(), nodeDefs);
-
-            for (DefaultControllerNode node : tablet.getValue()) {
-                ObjectNode nodeDef = mapper.createObjectNode();
-                nodeDef.put("id", node.id().toString())
-                       .put("ip", node.ip().toString())
-                       .put("tcpPort", node.tcpPort());
-                nodeDefs.add(nodeDef);
-            }
-        }
-        mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8),
-                         tabletNodes);
-    }
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TcpClusterConfigSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/TcpClusterConfigSerializer.java
deleted file mode 100644
index 56961cf..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TcpClusterConfigSerializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import java.util.Collection;
-
-import net.kuujo.copycat.cluster.TcpClusterConfig;
-import net.kuujo.copycat.cluster.TcpMember;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-public class TcpClusterConfigSerializer extends Serializer<TcpClusterConfig> {
-
-    @Override
-    public void write(Kryo kryo, Output output, TcpClusterConfig object) {
-        kryo.writeClassAndObject(output, object.getLocalMember());
-        kryo.writeClassAndObject(output, object.getRemoteMembers());
-    }
-
-    @Override
-    public TcpClusterConfig read(Kryo kryo, Input input,
-                                 Class<TcpClusterConfig> type) {
-        TcpMember localMember = (TcpMember) kryo.readClassAndObject(input);
-        @SuppressWarnings("unchecked")
-        Collection<TcpMember> remoteMembers = (Collection<TcpMember>) kryo.readClassAndObject(input);
-        return new TcpClusterConfig(localMember, remoteMembers);
-    }
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TcpMemberSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/TcpMemberSerializer.java
deleted file mode 100644
index eb4c308..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TcpMemberSerializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import net.kuujo.copycat.cluster.TcpMember;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-public class TcpMemberSerializer extends Serializer<TcpMember> {
-
-    @Override
-    public void write(Kryo kryo, Output output, TcpMember object) {
-        output.writeString(object.host());
-        output.writeInt(object.port());
-    }
-
-    @Override
-    public TcpMember read(Kryo kryo, Input input, Class<TcpMember> type) {
-        String host = input.readString();
-        int port = input.readInt();
-        return new TcpMember(host, port);
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/package-info.java
deleted file mode 100644
index 61f80a6..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Strongly consistent, fault-tolerant and durable state management
- * based on Raft consensus protocol.
- */
-package org.onosproject.store.service.impl;
diff --git a/core/store/dist/src/test/java/org/onosproject/store/service/impl/MapDBLogTest.java b/core/store/dist/src/test/java/org/onosproject/store/service/impl/MapDBLogTest.java
deleted file mode 100644
index 64401465..0000000
--- a/core/store/dist/src/test/java/org/onosproject/store/service/impl/MapDBLogTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.List;
-
-import net.kuujo.copycat.internal.log.OperationEntry;
-import net.kuujo.copycat.log.Entry;
-import net.kuujo.copycat.log.Log;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.onosproject.store.serializers.StoreSerializer;
-
-import com.google.common.testing.EqualsTester;
-
-/**
- * Test the MapDBLog implementation.
- */
-public class MapDBLogTest {
-
-    private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.DB_SERIALIZER;
-    private static final Entry TEST_ENTRY1 = new OperationEntry(1, "test1");
-    private static final Entry TEST_ENTRY2 = new OperationEntry(2, "test12");
-    private static final Entry TEST_ENTRY3 = new OperationEntry(3, "test123");
-    private static final Entry TEST_ENTRY4 = new OperationEntry(4, "test1234");
-
-    private static final Entry TEST_SNAPSHOT_ENTRY = new OperationEntry(5, "snapshot");
-
-    private static final long TEST_ENTRY1_SIZE = SERIALIZER.encode(TEST_ENTRY1).length;
-    private static final long TEST_ENTRY2_SIZE = SERIALIZER.encode(TEST_ENTRY2).length;
-    private static final long TEST_ENTRY3_SIZE = SERIALIZER.encode(TEST_ENTRY3).length;
-    private static final long TEST_ENTRY4_SIZE = SERIALIZER.encode(TEST_ENTRY4).length;
-
-    private static final long TEST_SNAPSHOT_ENTRY_SIZE = SERIALIZER.encode(TEST_SNAPSHOT_ENTRY).length;
-
-    private String dbFileName;
-
-
-    @Before
-    public void setUp() throws Exception {
-        File logFile = File.createTempFile("mapdbTest", null);
-        dbFileName = logFile.getAbsolutePath();
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        Files.deleteIfExists(new File(dbFileName).toPath());
-        Files.deleteIfExists(new File(dbFileName + ".t").toPath());
-        Files.deleteIfExists(new File(dbFileName + ".p").toPath());
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testAssertOpen() {
-        Log log = new MapDBLog(dbFileName, SERIALIZER);
-        log.size();
-    }
-
-    @Test
-    public void testAppendEntry() throws IOException {
-        Log log = new MapDBLog(dbFileName, SERIALIZER);
-        log.open();
-        log.appendEntry(TEST_ENTRY1);
-        OperationEntry first = log.firstEntry();
-        OperationEntry last = log.lastEntry();
-        new EqualsTester()
-            .addEqualityGroup(first, last, TEST_ENTRY1)
-            .testEquals();
-        Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
-        Assert.assertEquals(1, log.firstIndex());
-        Assert.assertEquals(1, log.lastIndex());
-    }
-
-    @Test
-    public void testAppendEntries() throws IOException {
-        Log log = new MapDBLog(dbFileName, SERIALIZER);
-        log.open();
-        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3);
-        OperationEntry first = log.firstEntry();
-        OperationEntry last = log.lastEntry();
-        new EqualsTester()
-            .addEqualityGroup(first, TEST_ENTRY1)
-            .addEqualityGroup(last, TEST_ENTRY3)
-            .testEquals();
-        Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY2_SIZE, TEST_ENTRY3_SIZE, log.size());
-        Assert.assertEquals(1, log.firstIndex());
-        Assert.assertEquals(3, log.lastIndex());
-        Assert.assertTrue(log.containsEntry(1));
-        Assert.assertTrue(log.containsEntry(2));
-    }
-
-    @Test
-    public void testDelete() throws IOException {
-        Log log = new MapDBLog(dbFileName, SERIALIZER);
-        log.open();
-        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2);
-        log.delete();
-        Assert.assertEquals(0, log.size());
-        Assert.assertTrue(log.isEmpty());
-        Assert.assertEquals(0, log.firstIndex());
-        Assert.assertNull(log.firstEntry());
-        Assert.assertEquals(0, log.lastIndex());
-        Assert.assertNull(log.lastEntry());
-    }
-
-    @Test
-    public void testGetEntries() throws IOException {
-        Log log = new MapDBLog(dbFileName, SERIALIZER);
-        log.open();
-        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
-        Assert.assertEquals(
-                TEST_ENTRY1_SIZE +
-                TEST_ENTRY2_SIZE +
-                TEST_ENTRY3_SIZE +
-                TEST_ENTRY4_SIZE, log.size());
-
-        List<Entry> entries = log.getEntries(2, 3);
-        new EqualsTester()
-            .addEqualityGroup(log.getEntry(4), TEST_ENTRY4)
-            .addEqualityGroup(entries.get(0), TEST_ENTRY2)
-            .addEqualityGroup(entries.get(1), TEST_ENTRY3)
-            .testEquals();
-    }
-
-    @Test
-    public void testRemoveAfter() throws IOException {
-        Log log = new MapDBLog(dbFileName, SERIALIZER);
-        log.open();
-        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
-        log.removeAfter(1);
-        Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
-        new EqualsTester()
-            .addEqualityGroup(log.firstEntry(), log.lastEntry(), TEST_ENTRY1)
-            .testEquals();
-    }
-
-    @Test
-    public void testAddAfterRemove() throws IOException {
-        Log log = new MapDBLog(dbFileName, SERIALIZER);
-        log.open();
-        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
-        log.removeAfter(1);
-        log.appendEntry(TEST_ENTRY4);
-        Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE, log.size());
-        new EqualsTester()
-            .addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
-            .addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
-            .addEqualityGroup(log.size(), TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE)
-            .testEquals();
-    }
-
-    @Test
-    public void testClose() throws IOException {
-        Log log = new MapDBLog(dbFileName, SERIALIZER);
-        Assert.assertFalse(log.isOpen());
-        log.open();
-        Assert.assertTrue(log.isOpen());
-        log.close();
-        Assert.assertFalse(log.isOpen());
-    }
-
-    @Test
-    public void testReopen() throws IOException {
-        Log log = new MapDBLog(dbFileName, SERIALIZER);
-        log.open();
-        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
-        log.close();
-        log.open();
-
-        new EqualsTester()
-            .addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
-            .addEqualityGroup(log.getEntry(2), TEST_ENTRY2)
-            .addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
-            .addEqualityGroup(log.size(),
-                    TEST_ENTRY1_SIZE +
-                    TEST_ENTRY2_SIZE +
-                    TEST_ENTRY3_SIZE +
-                    TEST_ENTRY4_SIZE)
-            .testEquals();
-    }
-
-    @Test
-    public void testCompact() throws IOException {
-        Log log = new MapDBLog(dbFileName, SERIALIZER);
-        log.open();
-        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
-        log.compact(3, TEST_SNAPSHOT_ENTRY);
-        new EqualsTester()
-        .addEqualityGroup(log.firstEntry(), TEST_SNAPSHOT_ENTRY)
-        .addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
-        .addEqualityGroup(log.size(),
-                TEST_SNAPSHOT_ENTRY_SIZE +
-                TEST_ENTRY4_SIZE)
-        .testEquals();
-    }
-}