WIP: Partitioned Database based on Raft.
Removed the implementation based on previous Copycat API.
Change-Id: I6b9d67e943e17095f585ae2a2cb6304c248cd686
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
index fb19d4d..8f3068c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
@@ -39,7 +39,6 @@
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.Lock;
import org.onosproject.store.service.LockService;
-import org.onosproject.store.service.impl.DistributedLockManager;
import org.slf4j.Logger;
import java.util.Map;
@@ -62,9 +61,7 @@
private final Logger log = getLogger(getClass());
- // TODO: Remove this dependency
- private static final int TERM_DURATION_MS =
- DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
+ private static final int TERM_DURATION_MS = 2000;
// Time to wait before retrying leadership after
// a unexpected error.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
new file mode 100644
index 0000000..cc1936a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
@@ -0,0 +1,69 @@
+package org.onosproject.store.consistent.impl;
+
+
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
+import net.kuujo.copycat.resource.Resource;
+
+/**
+ * Database.
+ */
+public interface Database extends DatabaseProxy<String, byte[]>, Resource<Database> {
+
+ /**
+ * Creates a new database with the default cluster configuration.<p>
+ *
+ * The database will be constructed with the default cluster configuration. The default cluster configuration
+ * searches for two resources on the classpath - {@code cluster} and {cluster-defaults} - in that order. Configuration
+ * options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.<p>
+ *
+ * Additionally, the database will be constructed with an database configuration that searches the classpath for
+ * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
+ * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
+ * as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
+ * configurations will be loaded according to namespaces as well; for example, `databases.conf`.
+ *
+ * @param name The database name.
+ * @return The database.
+ */
+ static Database create(String name) {
+ return create(name, new ClusterConfig(), new DatabaseConfig());
+ }
+
+ /**
+ * Creates a new database.<p>
+ *
+ * The database will be constructed with an database configuration that searches the classpath for
+ * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
+ * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
+ * as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
+ * configurations will be loaded according to namespaces as well; for example, `databases.conf`.
+ *
+ * @param name The database name.
+ * @param cluster The cluster configuration.
+ * @return The database.
+ */
+ static Database create(String name, ClusterConfig cluster) {
+ return create(name, cluster, new DatabaseConfig());
+ }
+
+ /**
+ * Creates a new database.
+ *
+ * @param name The database name.
+ * @param cluster The cluster configuration.
+ * @param config The database configuration.
+
+ * @return The database.
+ */
+ static Database create(String name, ClusterConfig cluster, DatabaseConfig config) {
+ ClusterCoordinator coordinator =
+ new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster));
+ return coordinator.<Database>getResource(name, config.resolve(cluster))
+ .addStartupTask(() -> coordinator.open().thenApply(v -> null))
+ .addShutdownTask(coordinator::close);
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
new file mode 100644
index 0000000..eda1e99
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
@@ -0,0 +1,108 @@
+package org.onosproject.store.consistent.impl;
+
+import com.typesafe.config.ConfigValueFactory;
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
+import net.kuujo.copycat.protocol.Consistency;
+import net.kuujo.copycat.resource.ResourceConfig;
+import net.kuujo.copycat.state.StateLogConfig;
+import net.kuujo.copycat.util.internal.Assert;
+
+import java.util.Map;
+
+/**
+ * Database configuration.
+ *
+ */
+public class DatabaseConfig extends ResourceConfig<DatabaseConfig> {
+ private static final String DATABASE_CONSISTENCY = "consistency";
+
+ private static final String DEFAULT_CONFIGURATION = "database-defaults";
+ private static final String CONFIGURATION = "database";
+
+ public DatabaseConfig() {
+ super(CONFIGURATION, DEFAULT_CONFIGURATION);
+ }
+
+ public DatabaseConfig(Map<String, Object> config) {
+ super(config, CONFIGURATION, DEFAULT_CONFIGURATION);
+ }
+
+ public DatabaseConfig(String resource) {
+ super(resource, CONFIGURATION, DEFAULT_CONFIGURATION);
+ }
+
+ protected DatabaseConfig(DatabaseConfig config) {
+ super(config);
+ }
+
+ @Override
+ public DatabaseConfig copy() {
+ return new DatabaseConfig(this);
+ }
+
+ /**
+ * Sets the database read consistency.
+ *
+ * @param consistency The database read consistency.
+ * @throws java.lang.NullPointerException If the consistency is {@code null}
+ */
+ public void setConsistency(String consistency) {
+ this.config = config.withValue(DATABASE_CONSISTENCY,
+ ConfigValueFactory.fromAnyRef(
+ Consistency.parse(Assert.isNotNull(consistency, "consistency")).toString()));
+ }
+
+ /**
+ * Sets the database read consistency.
+ *
+ * @param consistency The database read consistency.
+ * @throws java.lang.NullPointerException If the consistency is {@code null}
+ */
+ public void setConsistency(Consistency consistency) {
+ this.config = config.withValue(DATABASE_CONSISTENCY,
+ ConfigValueFactory.fromAnyRef(
+ Assert.isNotNull(consistency, "consistency").toString()));
+ }
+
+ /**
+ * Returns the database read consistency.
+ *
+ * @return The database read consistency.
+ */
+ public Consistency getConsistency() {
+ return Consistency.parse(config.getString(DATABASE_CONSISTENCY));
+ }
+
+ /**
+ * Sets the database read consistency, returning the configuration for method chaining.
+ *
+ * @param consistency The database read consistency.
+ * @return The database configuration.
+ * @throws java.lang.NullPointerException If the consistency is {@code null}
+ */
+ public DatabaseConfig withConsistency(String consistency) {
+ setConsistency(consistency);
+ return this;
+ }
+
+ /**
+ * Sets the database read consistency, returning the configuration for method chaining.
+ *
+ * @param consistency The database read consistency.
+ * @return The database configuration.
+ * @throws java.lang.NullPointerException If the consistency is {@code null}
+ */
+ public DatabaseConfig withConsistency(Consistency consistency) {
+ setConsistency(consistency);
+ return this;
+ }
+
+ @Override
+ public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
+ return new StateLogConfig(toMap())
+ .resolve(cluster)
+ .withResourceType(DefaultDatabase.class);
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
new file mode 100644
index 0000000..f83e8f8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -0,0 +1,168 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Database proxy.
+ */
+public interface DatabaseProxy<K, V> {
+
+ /**
+ * Gets the table size.
+ *
+ * @param tableName table name
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Integer> size(String tableName);
+
+ /**
+ * Checks whether the table is empty.
+ *
+ * @param tableName table name
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> isEmpty(String tableName);
+
+ /**
+ * Checks whether the table contains a key.
+ *
+ * @param tableName table name
+ * @param key The key to check.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> containsKey(String tableName, K key);
+
+ /**
+ * Checks whether the table contains a value.
+ *
+ * @param tableName table name
+ * @param value The value to check.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> containsValue(String tableName, V value);
+
+ /**
+ * Gets a value from the table.
+ *
+ * @param tableName table name
+ * @param key The key to get.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Versioned<V>> get(String tableName, K key);
+
+ /**
+ * Puts a value in the table.
+ *
+ * @param tableName table name
+ * @param key The key to set.
+ * @param value The value to set.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Versioned<V>> put(String tableName, K key, V value);
+
+ /**
+ * Removes a value from the table.
+ *
+ * @param tableName table name
+ * @param key The key to remove.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Versioned<V>> remove(String tableName, K key);
+
+ /**
+ * Clears the table.
+ *
+ * @param tableName table name
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Void> clear(String tableName);
+
+ /**
+ * Gets a set of keys in the table.
+ *
+ * @param tableName table name
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Set<K>> keySet(String tableName);
+
+ /**
+ * Gets a collection of values in the table.
+ *
+ * @param tableName table name
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Collection<Versioned<V>>> values(String tableName);
+
+ /**
+ * Gets a set of entries in the table.
+ *
+ * @param tableName table name
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet(String tableName);
+
+ /**
+ * Puts a value in the table if the given key does not exist.
+ *
+ * @param tableName table name
+ * @param key The key to set.
+ * @param value The value to set if the given key does not exist.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Versioned<V>> putIfAbsent(String tableName, K key, V value);
+
+ /**
+ * Removes a key and if the existing value for that key matches the specified value.
+ *
+ * @param tableName table name
+ * @param key The key to remove.
+ * @param value The value to remove.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> remove(String tableName, K key, V value);
+
+ /**
+ * Removes a key and if the existing version for that key matches the specified version.
+ *
+ * @param tableName table name
+ * @param key The key to remove.
+ * @param version The expected version.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> remove(String tableName, K key, long version);
+
+ /**
+ * Replaces the entry for the specified key only if currently mapped to the specified value.
+ *
+ * @param tableName table name
+ * @param key The key to replace.
+ * @param oldValue The value to replace.
+ * @param newValue The value with which to replace the given key and value.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> replace(String tableName, K key, V oldValue, V newValue);
+
+ /**
+ * Replaces the entry for the specified key only if currently mapped to the specified version.
+ *
+ * @param tableName table name
+ * @param key The key to update
+ * @param oldVersion existing version in the map for this replace to succeed.
+ * @param newValue The value with which to replace the given key and version.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
+
+ /**
+ * Perform a atomic batch update operation i.e. either all operations in batch succeed or
+ * none do and no state changes are made.
+ *
+ * @param updates list of updates to apply atomically.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<K, V>> updates);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
new file mode 100644
index 0000000..89a51e8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -0,0 +1,77 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.kuujo.copycat.state.Command;
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.Query;
+import net.kuujo.copycat.state.StateContext;
+
+/**
+ * Database state.
+ *
+ */
+public interface DatabaseState<K, V> {
+
+ /**
+ * Initializes the database state.
+ *
+ * @param context The map state context.
+ */
+ @Initializer
+ public void init(StateContext<DatabaseState<K, V>> context);
+
+ @Query
+ int size(String tableName);
+
+ @Query
+ boolean isEmpty(String tableName);
+
+ @Query
+ boolean containsKey(String tableName, K key);
+
+ @Query
+ boolean containsValue(String tableName, V value);
+
+ @Query
+ Versioned<V> get(String tableName, K key);
+
+ @Command
+ Versioned<V> put(String tableName, K key, V value);
+
+ @Command
+ Versioned<V> remove(String tableName, K key);
+
+ @Command
+ void clear(String tableName);
+
+ @Query
+ Set<K> keySet(String tableName);
+
+ @Query
+ Collection<Versioned<V>> values(String tableName);
+
+ @Query
+ Set<Entry<K, Versioned<V>>> entrySet(String tableName);
+
+ @Command
+ Versioned<V> putIfAbsent(String tableName, K key, V value);
+
+ @Command
+ boolean remove(String tableName, K key, V value);
+
+ @Command
+ boolean remove(String tableName, K key, long version);
+
+ @Command
+ boolean replace(String tableName, K key, V oldValue, V newValue);
+
+ @Command
+ boolean replace(String tableName, K key, long oldVersion, V newValue);
+
+ @Command
+ boolean batchUpdate(List<UpdateOperation<K, V>> updates);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
new file mode 100644
index 0000000..837f3b4
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -0,0 +1,146 @@
+package org.onosproject.store.consistent.impl;
+
+import net.kuujo.copycat.resource.internal.ResourceContext;
+import net.kuujo.copycat.state.StateMachine;
+import net.kuujo.copycat.resource.internal.AbstractResource;
+import net.kuujo.copycat.state.internal.DefaultStateMachine;
+import net.kuujo.copycat.util.concurrent.Futures;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+/**
+ * Default database.
+ */
+public class DefaultDatabase extends AbstractResource<Database> implements Database {
+ private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
+ private DatabaseProxy<String, byte[]> proxy;
+
+ @SuppressWarnings("unchecked")
+ public DefaultDatabase(ResourceContext context) {
+ super(context);
+ this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
+ }
+
+ /**
+ * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
+ * return the completed future result.
+ *
+ * @param supplier The supplier to call if the database is open.
+ * @param <T> The future result type.
+ * @return A completable future that if this database is closed is immediately failed.
+ */
+ protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
+ if (proxy == null) {
+ return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
+ }
+ return supplier.get();
+ }
+
+ @Override
+ public CompletableFuture<Integer> size(String tableName) {
+ return checkOpen(() -> proxy.size(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty(String tableName) {
+ return checkOpen(() -> proxy.isEmpty(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsKey(String tableName, String key) {
+ return checkOpen(() -> proxy.containsKey(tableName, key));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
+ return checkOpen(() -> proxy.containsValue(tableName, value));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
+ return checkOpen(() -> proxy.get(tableName, key));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+ return checkOpen(() -> proxy.put(tableName, key, value));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+ return checkOpen(() -> proxy.remove(tableName, key));
+ }
+
+ @Override
+ public CompletableFuture<Void> clear(String tableName) {
+ return checkOpen(() -> proxy.clear(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> keySet(String tableName) {
+ return checkOpen(() -> proxy.keySet(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
+ return checkOpen(() -> proxy.values(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
+ return checkOpen(() -> proxy.entrySet(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+ return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+ return checkOpen(() -> proxy.remove(tableName, key, value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+ return checkOpen(() -> proxy.remove(tableName, key, version));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+ return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
+ return checkOpen(() -> proxy.atomicBatchUpdate(updates));
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public synchronized CompletableFuture<Database> open() {
+ return runStartupTasks()
+ .thenCompose(v -> stateMachine.open())
+ .thenRun(() -> {
+ this.proxy = stateMachine.createProxy(DatabaseProxy.class);
+ })
+ .thenApply(v -> null);
+ }
+
+ @Override
+ public synchronized CompletableFuture<Void> close() {
+ proxy = null;
+ return stateMachine.close()
+ .thenCompose(v -> runShutdownTasks());
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
new file mode 100644
index 0000000..2b20f53
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -0,0 +1,217 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.StateContext;
+
+/**
+ * Default database state.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
+
+ private Long nextVersion;
+ private Map<String, Map<K, Versioned<V>>> tables;
+
+ @Initializer
+ @Override
+ public void init(StateContext<DatabaseState<K, V>> context) {
+ tables = context.get("tables");
+ if (tables == null) {
+ tables = new HashMap<>();
+ context.put("tables", tables);
+ }
+ nextVersion = context.get("nextVersion");
+ if (nextVersion == null) {
+ nextVersion = new Long(0);
+ context.put("nextVersion", nextVersion);
+ }
+ }
+
+ private Map<K, Versioned<V>> getTableMap(String tableName) {
+ Map<K, Versioned<V>> table = tables.get(tableName);
+ if (table == null) {
+ table = new HashMap<>();
+ tables.put(tableName, table);
+ }
+ return table;
+ }
+
+ @Override
+ public int size(String tableName) {
+ return getTableMap(tableName).size();
+ }
+
+ @Override
+ public boolean isEmpty(String tableName) {
+ return getTableMap(tableName).isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(String tableName, K key) {
+ return getTableMap(tableName).containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(String tableName, V value) {
+ return getTableMap(tableName).values().stream().anyMatch(v -> checkEquality(v.value(), value));
+ }
+
+ @Override
+ public Versioned<V> get(String tableName, K key) {
+ return getTableMap(tableName).get(key);
+ }
+
+ @Override
+ public Versioned<V> put(String tableName, K key, V value) {
+ return getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion));
+ }
+
+ @Override
+ public Versioned<V> remove(String tableName, K key) {
+ return getTableMap(tableName).remove(key);
+ }
+
+ @Override
+ public void clear(String tableName) {
+ getTableMap(tableName).clear();
+ }
+
+ @Override
+ public Set<K> keySet(String tableName) {
+ return getTableMap(tableName).keySet();
+ }
+
+ @Override
+ public Collection<Versioned<V>> values(String tableName) {
+ return getTableMap(tableName).values();
+ }
+
+ @Override
+ public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
+ return getTableMap(tableName).entrySet();
+ }
+
+ @Override
+ public Versioned<V> putIfAbsent(String tableName, K key, V value) {
+ Versioned<V> existingValue = getTableMap(tableName).get(key);
+ return existingValue != null ? existingValue : put(tableName, key, value);
+ }
+
+ @Override
+ public boolean remove(String tableName, K key, V value) {
+ Versioned<V> existing = getTableMap(tableName).get(key);
+ if (existing != null && existing.value().equals(value)) {
+ getTableMap(tableName).remove(key);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean remove(String tableName, K key, long version) {
+ Versioned<V> existing = getTableMap(tableName).get(key);
+ if (existing != null && existing.version() == version) {
+ remove(tableName, key);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean replace(String tableName, K key, V oldValue, V newValue) {
+ Versioned<V> existing = getTableMap(tableName).get(key);
+ if (existing != null && existing.value().equals(oldValue)) {
+ put(tableName, key, newValue);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean replace(String tableName, K key, long oldVersion, V newValue) {
+ Versioned<V> existing = getTableMap(tableName).get(key);
+ if (existing != null && existing.version() == oldVersion) {
+ put(tableName, key, newValue);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
+ if (updates.stream().anyMatch(update -> !checkIfUpdateIsPossible(update))) {
+ return false;
+ } else {
+ updates.stream().forEach(this::doUpdate);
+ return true;
+ }
+ }
+
+ private void doUpdate(UpdateOperation<K, V> update) {
+ String tableName = update.tableName();
+ K key = update.key();
+ switch (update.type()) {
+ case PUT:
+ put(tableName, key, update.value());
+ return;
+ case REMOVE:
+ remove(tableName, key);
+ return;
+ case PUT_IF_ABSENT:
+ putIfAbsent(tableName, key, update.value());
+ return;
+ case PUT_IF_VERSION_MATCH:
+ replace(tableName, key, update.currentValue(), update.value());
+ return;
+ case PUT_IF_VALUE_MATCH:
+ replace(tableName, key, update.currentVersion(), update.value());
+ return;
+ case REMOVE_IF_VERSION_MATCH:
+ remove(tableName, key, update.currentVersion());
+ return;
+ case REMOVE_IF_VALUE_MATCH:
+ remove(tableName, key, update.currentValue());
+ return;
+ default:
+ throw new IllegalStateException("Unsupported type: " + update.type());
+ }
+ }
+
+ private boolean checkIfUpdateIsPossible(UpdateOperation<K, V> update) {
+ Versioned<V> existingEntry = get(update.tableName(), update.key());
+ switch (update.type()) {
+ case PUT:
+ case REMOVE:
+ return true;
+ case PUT_IF_ABSENT:
+ return existingEntry == null;
+ case PUT_IF_VERSION_MATCH:
+ return existingEntry != null && existingEntry.version() == update.currentVersion();
+ case PUT_IF_VALUE_MATCH:
+ return existingEntry != null && existingEntry.value().equals(update.currentValue());
+ case REMOVE_IF_VERSION_MATCH:
+ return existingEntry == null || existingEntry.version() == update.currentVersion();
+ case REMOVE_IF_VALUE_MATCH:
+ return existingEntry == null || existingEntry.value().equals(update.currentValue());
+ default:
+ throw new IllegalStateException("Unsupported type: " + update.type());
+ }
+ }
+
+ private boolean checkEquality(V value1, V value2) {
+ if (value1 instanceof byte[]) {
+ return Arrays.equals((byte[]) value1, (byte[]) value2);
+ }
+ return value1.equals(value2);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
new file mode 100644
index 0000000..d35aca2
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -0,0 +1,208 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+
+/**
+ * A database that partitions the keys across one or more database partitions.
+ */
+public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
+
+ private Partitioner<String> partitioner;
+ private final ClusterCoordinator coordinator;
+ private final Map<String, Database> partitions = Maps.newConcurrentMap();
+
+ protected PartitionedDatabase(ClusterCoordinator coordinator) {
+ this.coordinator = coordinator;
+ }
+
+ @Override
+ public void registerPartition(String name, Database partition) {
+ partitions.put(name, partition);
+ }
+
+ @Override
+ public Map<String, Database> getRegisteredPartitions() {
+ return ImmutableMap.copyOf(partitions);
+ }
+
+ @Override
+ public CompletableFuture<Integer> size(String tableName) {
+ AtomicInteger totalSize = new AtomicInteger(0);
+ return CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> totalSize.get());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty(String tableName) {
+ return size(tableName).thenApply(size -> size == 0);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsKey(String tableName, String key) {
+ return partitioner.getPartition(tableName, key).containsKey(tableName, key);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
+ AtomicBoolean containsValue = new AtomicBoolean(false);
+ return CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> containsValue.get());
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
+ return partitioner.getPartition(tableName, key).get(tableName, key);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+ return partitioner.getPartition(tableName, key).put(tableName, key, value);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+ return partitioner.getPartition(tableName, key).remove(tableName, key);
+ }
+
+ @Override
+ public CompletableFuture<Void> clear(String tableName) {
+ return CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(p -> p.clear(tableName))
+ .toArray(CompletableFuture[]::new));
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> keySet(String tableName) {
+ Set<String> keySet = Sets.newConcurrentHashSet();
+ return CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> keySet);
+ }
+
+ @Override
+ public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
+ List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
+ return CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(p -> p.values(tableName))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> values);
+ }
+
+ @Override
+ public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
+ Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
+ return CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> entrySet);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+ return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+ return partitioner.getPartition(tableName, key).remove(tableName, key, value);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+ return partitioner.getPartition(tableName, key).remove(tableName, key, version);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+ return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
+ Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
+ for (UpdateOperation<String, byte[]> update : updates) {
+ Database partition = partitioner.getPartition(update.tableName(), update.key());
+ List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
+ if (partitionUpdates == null) {
+ partitionUpdates = Lists.newArrayList();
+ perPartitionUpdates.put(partition, partitionUpdates);
+ }
+ partitionUpdates.add(update);
+ }
+ if (perPartitionUpdates.size() > 1) {
+ // TODO
+ throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
+ } else {
+ Entry<Database, List<UpdateOperation<String, byte[]>>> only =
+ perPartitionUpdates.entrySet().iterator().next();
+ return only.getKey().atomicBatchUpdate(only.getValue());
+ }
+ }
+
+ @Override
+ public void setPartitioner(Partitioner<String> partitioner) {
+ this.partitioner = partitioner;
+ }
+
+ @Override
+ public CompletableFuture<PartitionedDatabase> open() {
+ return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(Database::open)
+ .collect(Collectors.toList())
+ .toArray(new CompletableFuture[partitions.size()]))
+ .thenApply(v -> this));
+
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(database -> database.close())
+ .collect(Collectors.toList())
+ .toArray(new CompletableFuture[partitions.size()]));
+ CompletableFuture<Void> closeCoordinator = coordinator.close();
+ return closePartitions.thenCompose(v -> closeCoordinator);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
new file mode 100644
index 0000000..6d375cc
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
@@ -0,0 +1,31 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Partitioned database configuration.
+ */
+public class PartitionedDatabaseConfig {
+ private final Map<String, DatabaseConfig> partitions = new HashMap<>();
+
+ /**
+ * Returns the configuration for all partitions.
+ * @return partition map to configuartion mapping.
+ */
+ public Map<String, DatabaseConfig> partitions() {
+ return Collections.unmodifiableMap(partitions);
+ }
+
+ /**
+ * Adds the specified partition name and configuration.
+ * @param name partition name.
+ * @param config partition config
+ * @return this instance
+ */
+ public PartitionedDatabaseConfig withPartition(String name, DatabaseConfig config) {
+ partitions.put(name, config);
+ return this;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
new file mode 100644
index 0000000..44cd3a1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
@@ -0,0 +1,79 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+import net.kuujo.copycat.CopycatConfig;
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
+import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
+
+public interface PartitionedDatabaseManager {
+ /**
+ * Opens the database.
+ *
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<PartitionedDatabase> open();
+
+ /**
+ * Closes the database.
+ *
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Void> close();
+
+ /**
+ * Sets the partitioner to use for mapping keys to partitions.
+ *
+ * @param partitioner partitioner
+ */
+ void setPartitioner(Partitioner<String> partitioner);
+
+ /**
+ * Registers a new partition.
+ *
+ * @param partitionName partition name.
+ * @param partition partition.
+ */
+ void registerPartition(String partitionName, Database partition);
+
+ /**
+ * Returns all the registered database partitions.
+ *
+ * @return mapping of all registered database partitions.
+ */
+ Map<String, Database> getRegisteredPartitions();
+
+
+ /**
+ * Creates a new partitioned database.
+ *
+ * @param name The database name.
+ * @param clusterConfig The cluster configuration.
+ * @param partitionedDatabaseConfig The database configuration.
+
+ * @return The database.
+ */
+ public static PartitionedDatabase create(
+ String name,
+ ClusterConfig clusterConfig,
+ PartitionedDatabaseConfig partitionedDatabaseConfig) {
+ CopycatConfig copycatConfig = new CopycatConfig()
+ .withName(name)
+ .withClusterConfig(clusterConfig)
+ .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
+ ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
+ PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
+ partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
+ partitionedDatabase.registerPartition(partitionName ,
+ coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
+ .withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
+ .withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
+ partitionedDatabase.setPartitioner(
+ new SimpleKeyHashPartitioner<>(partitionedDatabase.getRegisteredPartitions()));
+ return partitionedDatabase;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java
new file mode 100644
index 0000000..df8d56e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java
@@ -0,0 +1,17 @@
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Partitioner is responsible for mapping keys to individual database partitions.
+ *
+ * @param <K> key type.
+ */
+public interface Partitioner<K> {
+
+ /**
+ * Returns the database partition.
+ * @param tableName table name
+ * @param key key
+ * @return Database partition
+ */
+ Database getPartition(String tableName, K key);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
new file mode 100644
index 0000000..5410f9f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
@@ -0,0 +1,31 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+/**
+ * A simple Partitioner that uses the key hashCode to map
+ * key to a partition.
+ *
+ * @param <K> key type.
+ */
+public class SimpleKeyHashPartitioner<K> implements Partitioner<K> {
+
+ private final Map<String, Database> partitionMap;
+ private final List<String> sortedPartitionNames;
+
+ public SimpleKeyHashPartitioner(Map<String, Database> partitionMap) {
+ this.partitionMap = ImmutableMap.copyOf(partitionMap);
+ sortedPartitionNames = Lists.newArrayList(this.partitionMap.keySet());
+ Collections.sort(sortedPartitionNames);
+ }
+
+ @Override
+ public Database getPartition(String tableName, K key) {
+ return partitionMap.get(sortedPartitionNames.get(Math.abs(key.hashCode()) % partitionMap.size()));
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
new file mode 100644
index 0000000..9d52a09
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
@@ -0,0 +1,151 @@
+package org.onosproject.store.consistent.impl;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Database update operation.
+ *
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+public class UpdateOperation<K, V> {
+
+ /**
+ * Type of database update operation.
+ */
+ public static enum Type {
+ PUT,
+ PUT_IF_ABSENT,
+ PUT_IF_VERSION_MATCH,
+ PUT_IF_VALUE_MATCH,
+ REMOVE,
+ REMOVE_IF_VERSION_MATCH,
+ REMOVE_IF_VALUE_MATCH,
+ }
+
+ private Type type;
+ private String tableName;
+ private K key;
+ private V value;
+ private V currentValue;
+ private long currentVersion;
+
+ /**
+ * Returns the type of update operation.
+ * @return type of update.
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the tableName being updated.
+ * @return table name.
+ */
+ public String tableName() {
+ return tableName;
+ }
+
+ /**
+ * Returns the item key being updated.
+ * @return item key
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * Returns the new value.
+ * @return item's target value.
+ */
+ public V value() {
+ return value;
+ }
+
+ /**
+ * Returns the expected current value in the database value for the key.
+ * @return current value in database.
+ */
+ public V currentValue() {
+ return currentValue;
+ }
+
+ /**
+ * Returns the expected current version in the database for the key.
+ * @return expected version.
+ */
+ public long currentVersion() {
+ return currentVersion;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("type", type)
+ .add("tableName", tableName)
+ .add("key", key)
+ .add("value", value)
+ .add("currentValue", currentValue)
+ .add("currentVersion", currentVersion)
+ .toString();
+ }
+
+ /**
+ * UpdatOperation builder.
+ *
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+ public static final class Builder<K, V> {
+
+ private UpdateOperation<K, V> operation = new UpdateOperation<>();
+
+ /**
+ * Creates a new builder instance.
+ * @param <K> key type.
+ * @param <V> value type.
+ *
+ * @return builder.
+ */
+ public static <K, V> Builder<K, V> builder() {
+ return new Builder<>();
+ }
+
+ private Builder() {
+ }
+
+ public UpdateOperation<K, V> build() {
+ return operation;
+ }
+
+ public Builder<K, V> withType(Type type) {
+ operation.type = type;
+ return this;
+ }
+
+ public Builder<K, V> withTableName(String tableName) {
+ operation.tableName = tableName;
+ return this;
+ }
+
+ public Builder<K, V> withKey(K key) {
+ operation.key = key;
+ return this;
+ }
+
+ public Builder<K, V> withCurrentValue(V value) {
+ operation.currentValue = value;
+ return this;
+ }
+
+ public Builder<K, V> withValue(V value) {
+ operation.value = value;
+ return this;
+ }
+
+ public Builder<K, V> withCurrentVersion(long version) {
+ operation.currentVersion = version;
+ return this;
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
new file mode 100644
index 0000000..6eb908d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
@@ -0,0 +1,50 @@
+package org.onosproject.store.consistent.impl;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Versioned value.
+ *
+ * @param <V> value type.
+ */
+public class Versioned<V> {
+
+ private final V value;
+ private final long version;
+
+ /**
+ * Constructs a new versioned value.
+ * @param value value
+ * @param version version
+ */
+ public Versioned(V value, long version) {
+ this.value = value;
+ this.version = version;
+ }
+
+ /**
+ * Returns the value.
+ *
+ * @return value.
+ */
+ public V value() {
+ return value;
+ }
+
+ /**
+ * Returns the version.
+ *
+ * @return version
+ */
+ public long version() {
+ return version;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("value", value)
+ .add("version", version)
+ .toString();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocol.java
deleted file mode 100644
index 516aa44..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocol.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Vector;
-
-import net.kuujo.copycat.cluster.TcpClusterConfig;
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.event.LeaderElectEvent;
-import net.kuujo.copycat.internal.log.ConfigurationEntry;
-import net.kuujo.copycat.internal.log.CopycatEntry;
-import net.kuujo.copycat.internal.log.OperationEntry;
-import net.kuujo.copycat.internal.log.SnapshotEntry;
-import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PingResponse;
-import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.PollResponse;
-import net.kuujo.copycat.protocol.Response.Status;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
-import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.protocol.SyncResponse;
-import net.kuujo.copycat.spi.protocol.Protocol;
-import net.kuujo.copycat.spi.protocol.ProtocolClient;
-import net.kuujo.copycat.spi.protocol.ProtocolServer;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.serializers.StoreSerializer;
-import org.onosproject.store.service.impl.DatabaseStateMachine.State;
-import org.onosproject.store.service.impl.DatabaseStateMachine.TableMetadata;
-import org.onlab.util.KryoNamespace;
-import org.slf4j.Logger;
-
-/**
- * ONOS Cluster messaging based Copycat protocol.
- */
-@Component(immediate = false)
-@Service
-public class ClusterMessagingProtocol
- implements DatabaseProtocolService, Protocol<TcpMember> {
-
- private final Logger log = getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- public static final MessageSubject COPYCAT_PING =
- new MessageSubject("copycat-raft-consensus-ping");
- public static final MessageSubject COPYCAT_SYNC =
- new MessageSubject("copycat-raft-consensus-sync");
- public static final MessageSubject COPYCAT_POLL =
- new MessageSubject("copycat-raft-consensus-poll");
- public static final MessageSubject COPYCAT_SUBMIT =
- new MessageSubject("copycat-raft-consensus-submit");
-
- static final int AFTER_COPYCAT = KryoNamespaces.BEGIN_USER_CUSTOM_ID + 50;
-
- static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- .register(PingRequest.class)
- .register(PingResponse.class)
- .register(PollRequest.class)
- .register(PollResponse.class)
- .register(SyncRequest.class)
- .register(SyncResponse.class)
- .register(SubmitRequest.class)
- .register(SubmitResponse.class)
- .register(Status.class)
- .register(ConfigurationEntry.class)
- .register(SnapshotEntry.class)
- .register(CopycatEntry.class)
- .register(OperationEntry.class)
- .register(TcpClusterConfig.class)
- .register(TcpMember.class)
- .register(LeaderElectEvent.class)
- .register(Vector.class)
- .build();
-
- // serializer used for CopyCat Protocol
- public static final StoreSerializer DB_SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(COPYCAT)
- .nextId(AFTER_COPYCAT)
- // for snapshot
- .register(State.class)
- .register(TableMetadata.class)
- // TODO: Move this out to API?
- .register(TableModificationEvent.class)
- .register(TableModificationEvent.Type.class)
- .build();
- }
- };
-
- @Activate
- public void activate() {
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- @Override
- public ProtocolServer createServer(TcpMember member) {
- return new ClusterMessagingProtocolServer(clusterCommunicator);
- }
-
- @Override
- public ProtocolClient createClient(TcpMember member) {
- return new ClusterMessagingProtocolClient(clusterService,
- clusterCommunicator,
- clusterService.getLocalNode(),
- member);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocolClient.java
deleted file mode 100644
index 75c0969..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocolClient.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static com.google.common.base.Verify.verifyNotNull;
-import static org.onosproject.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
-import static org.onlab.util.Tools.namedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-import static java.util.concurrent.Executors.newCachedThreadPool;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PingResponse;
-import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.PollResponse;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
-import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.protocol.SyncResponse;
-import net.kuujo.copycat.spi.protocol.ProtocolClient;
-
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.slf4j.Logger;
-
-/**
- * ONOS Cluster messaging based Copycat protocol client.
- */
-public class ClusterMessagingProtocolClient implements ProtocolClient {
-
- private final Logger log = getLogger(getClass());
-
- public static final Duration RETRY_INTERVAL = Duration.ofMillis(2000);
-
- private final ClusterService clusterService;
- private final ClusterCommunicationService clusterCommunicator;
- private final ControllerNode localNode;
- private final TcpMember remoteMember;
-
- private ControllerNode remoteNode;
- private final AtomicBoolean connectionOK = new AtomicBoolean(true);
-
- private ExecutorService pool;
-
- public ClusterMessagingProtocolClient(
- ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
- ControllerNode localNode,
- TcpMember remoteMember) {
-
- this.clusterService = clusterService;
- this.clusterCommunicator = clusterCommunicator;
- this.localNode = localNode;
- this.remoteMember = remoteMember;
- }
-
- @Override
- public CompletableFuture<PingResponse> ping(PingRequest request) {
- return requestReply(request);
- }
-
- @Override
- public CompletableFuture<SyncResponse> sync(SyncRequest request) {
- return requestReply(request);
- }
-
- @Override
- public CompletableFuture<PollResponse> poll(PollRequest request) {
- return requestReply(request);
- }
-
- @Override
- public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
- return requestReply(request);
- }
-
- @Override
- public synchronized CompletableFuture<Void> connect() {
- if (pool == null || pool.isShutdown()) {
- // TODO include remote name?
- pool = newCachedThreadPool(namedThreads("onos-copycat-netty-messaging-client-%d"));
- }
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public synchronized CompletableFuture<Void> close() {
- if (pool != null) {
- pool.shutdownNow();
- pool = null;
- }
- return CompletableFuture.completedFuture(null);
- }
-
- private <I> MessageSubject messageType(I input) {
- Class<?> clazz = input.getClass();
- if (clazz.equals(PollRequest.class)) {
- return ClusterMessagingProtocol.COPYCAT_POLL;
- } else if (clazz.equals(SyncRequest.class)) {
- return ClusterMessagingProtocol.COPYCAT_SYNC;
- } else if (clazz.equals(SubmitRequest.class)) {
- return ClusterMessagingProtocol.COPYCAT_SUBMIT;
- } else if (clazz.equals(PingRequest.class)) {
- return ClusterMessagingProtocol.COPYCAT_PING;
- } else {
- throw new IllegalArgumentException("Unknown class " + clazz.getName());
- }
- }
-
- private <I, O> CompletableFuture<O> requestReply(I request) {
- CompletableFuture<O> future = new CompletableFuture<>();
- if (pool == null) {
- log.info("Attempted to use closed client, connecting now. {}", request);
- connect();
- }
- pool.submit(new RPCTask<I, O>(request, future));
- return future;
- }
-
- private ControllerNode getControllerNode(TcpMember remoteMember) {
- final String host = remoteMember.host();
- final int port = remoteMember.port();
- for (ControllerNode node : clusterService.getNodes()) {
- if (node.ip().toString().equals(host) && node.tcpPort() == port) {
- return node;
- }
- }
- return null;
- }
-
- private class RPCTask<I, O> implements Runnable {
-
- private final I request;
- private final ClusterMessage message;
- private final CompletableFuture<O> future;
-
- public RPCTask(I request, CompletableFuture<O> future) {
- this.request = request;
- this.message =
- new ClusterMessage(
- localNode.id(),
- messageType(request),
- verifyNotNull(DB_SERIALIZER.encode(request)));
- this.future = future;
- }
-
- @Override
- public void run() {
- try {
- if (remoteNode == null) {
- remoteNode = getControllerNode(remoteMember);
- if (remoteNode == null) {
- throw new IOException("Remote node is offline!");
- }
- }
- byte[] response = clusterCommunicator
- .sendAndReceive(message, remoteNode.id())
- .get(RETRY_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
- if (!connectionOK.getAndSet(true)) {
- log.info("Connectivity to {} restored", remoteNode);
- }
- future.complete(verifyNotNull(DB_SERIALIZER.decode(response)));
-
- } catch (IOException | TimeoutException e) {
- if (connectionOK.getAndSet(false)) {
- log.warn("Detected connectivity issues with {}. Reason: {}", remoteNode, e.getMessage());
- }
- log.debug("RPCTask for {} failed.", request, e);
- future.completeExceptionally(e);
- } catch (ExecutionException e) {
- log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
- log.debug("RPCTask execution for {} failed.", request, e);
- future.completeExceptionally(e);
- } catch (InterruptedException e) {
- log.warn("RPCTask for {} was interrupted: {}", request, e.getMessage());
- log.debug("RPCTask for {} was interrupted.", request, e);
- future.completeExceptionally(e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- log.warn("RPCTask for {} terribly failed.", request, e);
- future.completeExceptionally(e);
- }
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocolServer.java
deleted file mode 100644
index aa56855..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/ClusterMessagingProtocolServer.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static java.util.concurrent.Executors.newCachedThreadPool;
-import static org.onlab.util.Tools.namedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.store.service.impl.ClusterMessagingProtocol.*;
-import static org.onosproject.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiConsumer;
-
-import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.RequestHandler;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.spi.protocol.ProtocolServer;
-
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.slf4j.Logger;
-
-/**
- * ONOS Cluster messaging based Copycat protocol server.
- */
-public class ClusterMessagingProtocolServer implements ProtocolServer {
-
- private final Logger log = getLogger(getClass());
-
- private final ClusterCommunicationService clusterCommunicator;
-
- private volatile RequestHandler handler;
-
- private ExecutorService pool;
-
- public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
- this.clusterCommunicator = clusterCommunicator;
- }
-
- @Override
- public void requestHandler(RequestHandler handler) {
- this.handler = handler;
- }
-
- @Override
- public CompletableFuture<Void> listen() {
- if (pool == null || pool.isShutdown()) {
- pool = newCachedThreadPool(namedThreads("onos-copycat-netty-messaging-server-%d"));
- }
-
- clusterCommunicator.addSubscriber(COPYCAT_PING, new PingHandler());
- clusterCommunicator.addSubscriber(COPYCAT_SYNC, new SyncHandler());
- clusterCommunicator.addSubscriber(COPYCAT_POLL, new PollHandler());
- clusterCommunicator.addSubscriber(COPYCAT_SUBMIT, new SubmitHandler());
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> close() {
- clusterCommunicator.removeSubscriber(COPYCAT_PING);
- clusterCommunicator.removeSubscriber(COPYCAT_SYNC);
- clusterCommunicator.removeSubscriber(COPYCAT_POLL);
- clusterCommunicator.removeSubscriber(COPYCAT_SUBMIT);
- if (pool != null) {
- pool.shutdownNow();
- pool = null;
- }
- return CompletableFuture.completedFuture(null);
- }
-
- private final class PingHandler extends CopycatMessageHandler<PingRequest> {
-
- @Override
- public void raftHandle(PingRequest request, ClusterMessage message) {
- pool.submit(new Runnable() {
-
- @Override
- public void run() {
- currentHandler().ping(request)
- .whenComplete(new PostExecutionTask<>(message));
- }
- });
- }
- }
-
- private final class SyncHandler extends CopycatMessageHandler<SyncRequest> {
-
- @Override
- public void raftHandle(SyncRequest request, ClusterMessage message) {
- pool.submit(new Runnable() {
-
- @Override
- public void run() {
- currentHandler().sync(request)
- .whenComplete(new PostExecutionTask<>(message));
- }
- });
- }
- }
-
- private final class PollHandler extends CopycatMessageHandler<PollRequest> {
-
- @Override
- public void raftHandle(PollRequest request, ClusterMessage message) {
- pool.submit(new Runnable() {
-
- @Override
- public void run() {
- currentHandler().poll(request)
- .whenComplete(new PostExecutionTask<>(message));
- }
- });
- }
- }
-
- private final class SubmitHandler extends CopycatMessageHandler<SubmitRequest> {
-
- @Override
- public void raftHandle(SubmitRequest request, ClusterMessage message) {
- pool.submit(new Runnable() {
-
- @Override
- public void run() {
- currentHandler().submit(request)
- .whenComplete(new PostExecutionTask<>(message));
- }
- });
- }
- }
-
- private abstract class CopycatMessageHandler<T> implements ClusterMessageHandler {
-
- public abstract void raftHandle(T request, ClusterMessage message);
-
- @Override
- public void handle(ClusterMessage message) {
- T request = DB_SERIALIZER.decode(message.payload());
- raftHandle(request, message);
- }
-
- RequestHandler currentHandler() {
- RequestHandler currentHandler = handler;
- if (currentHandler == null) {
- // there is a slight window of time during state transition,
- // where handler becomes null
- long sleepMs = 1;
- for (int i = 0; i < 10; ++i) {
- currentHandler = handler;
- if (currentHandler != null) {
- break;
- }
- try {
- sleepMs <<= 1;
- Thread.sleep(sleepMs);
- } catch (InterruptedException e) {
- log.error("Interrupted", e);
- return handler;
- }
- }
- if (currentHandler == null) {
- log.error("There was no handler registered!");
- return handler;
- }
- }
- return currentHandler;
- }
-
- final class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
-
- private final ClusterMessage message;
-
- public PostExecutionTask(ClusterMessage message) {
- this.message = message;
- }
-
- @Override
- public void accept(R response, Throwable error) {
- if (error != null) {
- log.error("Processing {} failed.", message.subject(), error);
- } else {
- try {
- log.trace("responding to {}", message.subject());
- message.respond(DB_SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed responding with {}", response.getClass().getName(), e);
- }
- }
- }
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseClient.java
deleted file mode 100644
index 869b188..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseClient.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import net.kuujo.copycat.cluster.Member;
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.event.LeaderElectEvent;
-import net.kuujo.copycat.protocol.Response.Status;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
-import net.kuujo.copycat.spi.protocol.ProtocolClient;
-
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.service.BatchReadRequest;
-import org.onosproject.store.service.BatchWriteRequest;
-import org.onosproject.store.service.DatabaseException;
-import org.onosproject.store.service.ReadResult;
-import org.onosproject.store.service.VersionedValue;
-import org.onosproject.store.service.WriteResult;
-import org.slf4j.Logger;
-
-/**
- * Client for interacting with the Copycat Raft cluster.
- */
-public class DatabaseClient implements ClusterMessageHandler {
-
- private static final int RETRIES = 5;
-
- private static final int TIMEOUT_MS = 2000;
-
- private final Logger log = getLogger(getClass());
-
- private final DatabaseProtocolService protocol;
- private volatile ProtocolClient client = null;
- private volatile Member currentLeader = null;
- private volatile long currentLeaderTerm = 0;
-
- public DatabaseClient(DatabaseProtocolService protocol) {
- this.protocol = checkNotNull(protocol);
- }
-
- @Override
- public void handle(ClusterMessage message) {
- LeaderElectEvent event =
- ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
- TcpMember newLeader = event.leader();
- long newLeaderTerm = event.term();
- if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
- log.info("New leader detected. Leader: {}, term: {}", newLeader, newLeaderTerm);
- ProtocolClient prevClient = client;
- ProtocolClient newClient = protocol.createClient(newLeader);
- newClient.connect();
- client = newClient;
- currentLeader = newLeader;
- currentLeaderTerm = newLeaderTerm;
-
- if (prevClient != null) {
- prevClient.close();
- }
- }
- }
-
- private String nextRequestId() {
- return UUID.randomUUID().toString();
- }
-
- public void waitForLeader() {
- if (currentLeader != null) {
- return;
- }
-
- log.info("No leader in cluster, waiting for election.");
-
- try {
- while (currentLeader == null) {
- Thread.sleep(200);
- }
- return;
- } catch (InterruptedException e) {
- log.error("Interrupted while waiting for Leader", e);
- Thread.currentThread().interrupt();
- }
- }
-
- private <T> T submit(String operationName, Object... args) {
- waitForLeader();
- if (currentLeader == null) {
- throw new DatabaseException("Raft cluster does not have a leader.");
- }
-
- SubmitRequest request =
- new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
-
- CompletableFuture<SubmitResponse> submitResponse = client.submit(request);
-
- log.debug("Sent {} to {}", request, currentLeader);
-
- try {
- final SubmitResponse response = submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- if (response.status() != Status.OK) {
- throw new DatabaseException(response.error());
- }
- return (T) response.result();
- } catch (ExecutionException | InterruptedException e) {
- throw new DatabaseException(e);
- } catch (TimeoutException e) {
- throw new DatabaseException.Timeout(e);
- }
- }
-
- public boolean createTable(String tableName) {
- return submit("createTable", tableName);
- }
-
- public boolean createTable(String tableName, int ttlMillis) {
- return submit("createTable", tableName, ttlMillis);
- }
-
- public void dropTable(String tableName) {
- submit("dropTable", tableName);
- }
-
- public void dropAllTables() {
- submit("dropAllTables");
- }
-
- public Set<String> listTables() {
- return submit("listTables");
- }
-
- public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
- return submit("read", batchRequest);
- }
-
- public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
- return submit("write", batchRequest);
- }
-
- public Map<String, VersionedValue> getAll(String tableName) {
- return submit("getAll", tableName);
- }
-
- Member getCurrentLeader() {
- return currentLeader;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseEntryExpirationTracker.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseEntryExpirationTracker.java
deleted file mode 100644
index 02743ac..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseEntryExpirationTracker.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.service.impl;
-
-import com.google.common.base.MoreObjects;
-import net.jodah.expiringmap.ExpiringMap;
-import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
-import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
-import net.kuujo.copycat.cluster.Member;
-import net.kuujo.copycat.event.EventHandler;
-import net.kuujo.copycat.event.LeaderElectEvent;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.service.DatabaseService;
-import org.onosproject.store.service.VersionedValue;
-import org.onosproject.store.service.impl.DatabaseStateMachine.State;
-import org.onosproject.store.service.impl.DatabaseStateMachine.TableMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.onlab.util.Tools.namedThreads;
-
-/**
- * Plugs into the database update stream and track the TTL of entries added to
- * the database. For tables with pre-configured finite TTL, this class has
- * mechanisms for expiring (deleting) old, expired entries from the database.
- */
-public class DatabaseEntryExpirationTracker implements
- DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
-
- private static final ExecutorService THREAD_POOL =
- Executors.newCachedThreadPool(namedThreads("onos-db-stale-entry-expirer-%d"));
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final DatabaseService databaseService;
- private final ClusterCommunicationService clusterCommunicator;
-
- private final Member localMember;
- private final ControllerNode localNode;
- private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
-
- private final Map<String, Map<DatabaseRow, Long>> tableEntryExpirationMap = new HashMap<>();
-
- private final ExpirationListener<DatabaseRow, Long> expirationObserver = new ExpirationObserver();
-
- DatabaseEntryExpirationTracker(
- Member localMember,
- ControllerNode localNode,
- ClusterCommunicationService clusterCommunicator,
- DatabaseService databaseService) {
- this.localMember = localMember;
- this.localNode = localNode;
- this.clusterCommunicator = clusterCommunicator;
- this.databaseService = databaseService;
- }
-
- @Override
- public void tableModified(TableModificationEvent event) {
- log.debug("{}: Received {}", localNode.id(), event);
-
- if (!tableEntryExpirationMap.containsKey(event.tableName())) {
- return;
- }
-
- Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(event.tableName());
- DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
- Long eventVersion = event.value().version();
-
- switch (event.type()) {
- case ROW_DELETED:
- map.remove(row, eventVersion);
- if (isLocalMemberLeader.get()) {
- log.debug("Broadcasting {} to the entire cluster", event);
- clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(
- localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
- ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
- }
- break;
- case ROW_ADDED:
- case ROW_UPDATED:
- // To account for potential reordering of notifications,
- // check to make sure we are replacing an old version with a new version
- Long currentVersion = map.get(row);
- if (currentVersion == null || currentVersion < eventVersion) {
- map.put(row, eventVersion);
- }
- break;
- default:
- break;
- }
- }
-
- @Override
- public void tableCreated(TableMetadata metadata) {
- log.debug("Received a table created event {}", metadata);
- if (metadata.expireOldEntries()) {
- tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
- .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
- .expirationListener(expirationObserver)
- .expirationPolicy(ExpirationPolicy.CREATED).build());
- }
- }
-
- @Override
- public void tableDeleted(String tableName) {
- log.debug("Received a table deleted event for table ({})", tableName);
- tableEntryExpirationMap.remove(tableName);
- }
-
- private class ExpirationObserver implements
- ExpirationListener<DatabaseRow, Long> {
- @Override
- public void expired(DatabaseRow row, Long version) {
- THREAD_POOL.submit(new ExpirationTask(row, version));
- }
- }
-
- private class ExpirationTask implements Runnable {
-
- private final DatabaseRow row;
- private final Long version;
-
- public ExpirationTask(DatabaseRow row, Long version) {
- this.row = row;
- this.version = version;
- }
-
- @Override
- public void run() {
- log.trace("Received an expiration event for {}, version: {}", row, version);
- Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(row.tableName);
- try {
- if (isLocalMemberLeader.get()) {
- if (!databaseService.removeIfVersionMatches(row.tableName,
- row.key, version)) {
- log.info("Entry in database was updated right before its expiration.");
- } else {
- log.debug("Successfully expired old entry with key ({}) from table ({})",
- row.key, row.tableName);
- }
- } else {
- // Only the current leader will expire keys from database.
- // Everyone else function as standby just in case they need to take over
- if (map != null) {
- map.putIfAbsent(row, version);
- }
- }
-
- } catch (Exception e) {
- log.warn("Failed to delete entry from the database after ttl "
- + "expiration. Operation will be retried.", e);
- map.putIfAbsent(row, version);
- }
- }
- }
-
- @Override
- public void handle(LeaderElectEvent event) {
- isLocalMemberLeader.set(localMember.equals(event.leader()));
- if (isLocalMemberLeader.get()) {
- log.info("{} is now the leader of Raft cluster", localNode.id());
- }
- }
-
- /**
- * Wrapper class for a database row identifier.
- */
- private class DatabaseRow {
-
- String tableName;
- String key;
-
- public DatabaseRow(String tableName, String key) {
- this.tableName = tableName;
- this.key = key;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("tableName", tableName)
- .add("key", key)
- .toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof DatabaseRow)) {
- return false;
- }
- DatabaseRow that = (DatabaseRow) obj;
-
- return Objects.equals(this.tableName, that.tableName)
- && Objects.equals(this.key, that.key);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(tableName, key);
- }
- }
-
- @Override
- public void snapshotInstalled(State state) {
- if (!tableEntryExpirationMap.isEmpty()) {
- return;
- }
- log.debug("Received a snapshot installed notification");
- for (String tableName : state.getTableNames()) {
-
- TableMetadata metadata = state.getTableMetadata(tableName);
- if (!metadata.expireOldEntries()) {
- continue;
- }
-
- Map<DatabaseRow, Long> tableExpirationMap = ExpiringMap.builder()
- .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
- .expirationListener(expirationObserver)
- .expirationPolicy(ExpirationPolicy.CREATED).build();
- for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
- tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue().version());
- }
-
- tableEntryExpirationMap.put(tableName, tableExpirationMap);
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseManager.java
deleted file mode 100644
index e5a742e..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseManager.java
+++ /dev/null
@@ -1,602 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import com.google.common.collect.ImmutableList;
-import net.kuujo.copycat.Copycat;
-import net.kuujo.copycat.CopycatConfig;
-import net.kuujo.copycat.cluster.ClusterConfig;
-import net.kuujo.copycat.cluster.Member;
-import net.kuujo.copycat.cluster.TcpCluster;
-import net.kuujo.copycat.cluster.TcpClusterConfig;
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.event.EventHandler;
-import net.kuujo.copycat.event.LeaderElectEvent;
-import net.kuujo.copycat.log.Log;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.service.BatchReadRequest;
-import org.onosproject.store.service.BatchReadResult;
-import org.onosproject.store.service.BatchWriteRequest;
-import org.onosproject.store.service.BatchWriteResult;
-import org.onosproject.store.service.DatabaseAdminService;
-import org.onosproject.store.service.DatabaseException;
-import org.onosproject.store.service.DatabaseService;
-import org.onosproject.store.service.ReadResult;
-import org.onosproject.store.service.ReadStatus;
-import org.onosproject.store.service.VersionedValue;
-import org.onosproject.store.service.WriteResult;
-import org.onosproject.store.service.WriteStatus;
-import org.slf4j.Logger;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onlab.util.Tools.namedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Strongly consistent and durable state management service based on
- * Copycat implementation of Raft consensus protocol.
- */
-@Component(immediate = false)
-@Service
-public class DatabaseManager implements DatabaseService, DatabaseAdminService {
-
- private static final int RETRY_MS = 500;
-
- private static final int ACTIVATE_MAX_RETRIES = 100;
-
- private final Logger log = getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DatabaseProtocolService copycatMessagingProtocol;
-
- public static final String LOG_FILE_PREFIX = "raft/onos-copy-cat-log_";
-
- // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
- // TODO: Set the path to /opt/onos/config
- private static final String CONFIG_DIR = "../config";
-
- private static final String DEFAULT_MEMBER_FILE = "tablets.json";
-
- private static final String DEFAULT_TABLET = "default";
-
- // TODO: make this configurable
- // initial member configuration file path
- private String initialMemberConfig = DEFAULT_MEMBER_FILE;
-
- public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
- new MessageSubject("raft-leader-election-event");
-
- private Copycat copycat;
- private DatabaseClient client;
-
- // guarded by synchronized block
- private ClusterConfig<TcpMember> clusterConfig;
-
- private CountDownLatch clusterEventLatch;
- private ClusterEventListener clusterEventListener;
-
- private Map<String, Set<DefaultControllerNode>> tabletMembers;
-
- private boolean autoAddMember = false;
-
- private ScheduledExecutorService executor;
-
- private volatile LeaderElectEvent myLeaderEvent = null;
-
- // TODO make this configurable
- private int maxLogSizeBytes = 128 * (1024 * 1024);
-
- // TODO make this configurable
- private long electionTimeoutMs = 5000; // CopyCat default: 2000
-
- @Activate
- public void activate() throws InterruptedException, ExecutionException {
-
- // KARAF_DATA
- // http://karaf.apache.org/manual/latest/users-guide/start-stop.html
- final String dataDir = System.getProperty("karaf.data", "./data");
-
- // load tablet configuration
- File file = new File(CONFIG_DIR, initialMemberConfig);
- log.info("Loading config: {}", file.getAbsolutePath());
- TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
- try {
- tabletMembers = tabletDef.read();
- } catch (IOException e) {
- log.error("Failed to load tablet config {}", file);
- throw new IllegalStateException("Failed to load tablet config", e);
- }
-
- // load default tablet configuration and start copycat
- clusterConfig = new TcpClusterConfig();
- Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
- if (defaultMembers == null || defaultMembers.isEmpty()) {
- log.error("No members found in [{}] tablet configuration.",
- DEFAULT_TABLET);
- throw new IllegalStateException("No member found in tablet configuration");
-
- }
-
- final ControllerNode localNode = clusterService.getLocalNode();
- for (ControllerNode member : defaultMembers) {
- final TcpMember tcpMember = new TcpMember(member.ip().toString(),
- member.tcpPort());
- if (localNode.equals(member)) {
- clusterConfig.setLocalMember(tcpMember);
- } else {
- clusterConfig.addRemoteMember(tcpMember);
- }
- }
-
- if (clusterConfig.getLocalMember() != null) {
-
- // Wait for a minimum viable Raft cluster to boot up.
- waitForClusterQuorum();
-
- final TcpCluster cluster;
- synchronized (clusterConfig) {
- // Create the cluster.
- cluster = new TcpCluster(clusterConfig);
- }
- log.info("Starting cluster: {}", cluster);
-
- DatabaseEntryExpirationTracker expirationTracker =
- new DatabaseEntryExpirationTracker(
- clusterConfig.getLocalMember(),
- clusterService.getLocalNode(),
- clusterCommunicator,
- this);
-
- DatabaseStateMachine stateMachine = new DatabaseStateMachine();
- stateMachine.addEventListener(expirationTracker);
- Log consensusLog = new MapDBLog(dataDir + "/" + LOG_FILE_PREFIX + localNode.id(),
- ClusterMessagingProtocol.DB_SERIALIZER);
-
- CopycatConfig ccConfig = new CopycatConfig();
- ccConfig.setMaxLogSize(maxLogSizeBytes);
- ccConfig.setElectionTimeout(electionTimeoutMs);
-
- copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol, ccConfig);
- copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
- copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
- }
-
- client = new DatabaseClient(copycatMessagingProtocol);
- clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
-
- // Starts copycat if this node is a participant
- // of the Raft cluster.
- if (copycat != null) {
- copycat.start().get();
-
- executor =
- newSingleThreadScheduledExecutor(namedThreads("onos-db-heartbeat-%d"));
- executor.scheduleWithFixedDelay(new LeaderAdvertiser(), 5, 2, TimeUnit.SECONDS);
-
- }
-
- client.waitForLeader();
-
- // Try and list the tables to verify database manager is
- // in a state where it can serve requests.
- tryTableListing();
-
- log.info("Started.");
- }
-
- @Deactivate
- public void deactivate() {
- if (executor != null) {
- executor.shutdownNow();
- }
- clusterService.removeListener(clusterEventListener);
- // TODO: ClusterCommunicationService must support more than one
- // handler per message subject.
- clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
- if (copycat != null) {
- copycat.stop();
- }
- log.info("Stopped.");
- }
-
- private void waitForClusterQuorum() {
- // note: from this point beyond, clusterConfig requires synchronization
- clusterEventLatch = new CountDownLatch(1);
- clusterEventListener = new InternalClusterEventListener();
- clusterService.addListener(clusterEventListener);
-
- final int raftClusterSize = clusterConfig.getMembers().size();
- final int raftClusterQuorumSize = (int) (Math.floor(raftClusterSize / 2)) + 1;
- if (clusterService.getNodes().size() < raftClusterQuorumSize) {
- // current cluster size smaller then expected
- try {
- final int waitTimeSec = 120;
- log.info("Waiting for a maximum of {}s for raft cluster quorum to boot up...", waitTimeSec);
- if (!clusterEventLatch.await(waitTimeSec, TimeUnit.SECONDS)) {
- log.info("Starting with {}/{} nodes cluster",
- clusterService.getNodes().size(),
- raftClusterSize);
- }
- } catch (InterruptedException e) {
- log.info("Interrupted waiting for raft quorum.", e);
- }
- }
- }
-
- private void tryTableListing() throws InterruptedException {
- int retries = 0;
- do {
- try {
- listTables();
- return;
- } catch (DatabaseException.Timeout e) {
- log.debug("Failed to listTables. Will retry...", e);
- } catch (DatabaseException e) {
- log.debug("Failed to listTables. Will retry later...", e);
- Thread.sleep(RETRY_MS);
- }
- if (retries == ACTIVATE_MAX_RETRIES) {
- log.error("Failed to listTables after multiple attempts. Giving up.");
- // Exiting hoping things will be fixed by the time
- // others start using the service
- return;
- }
- retries++;
- } while (true);
- }
-
- @Override
- public boolean createTable(String name) {
- return client.createTable(name);
- }
-
- @Override
- public boolean createTable(String name, int ttlMillis) {
- return client.createTable(name, ttlMillis);
- }
-
- @Override
- public void dropTable(String name) {
- client.dropTable(name);
- }
-
- @Override
- public void dropAllTables() {
- client.dropAllTables();
- }
-
- @Override
- public Set<String> listTables() {
- return client.listTables();
- }
-
- @Override
- public VersionedValue get(String tableName, String key) {
- BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
- ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
- if (readResult.status().equals(ReadStatus.OK)) {
- return readResult.value();
- }
- throw new DatabaseException("get failed due to status: " + readResult.status());
- }
-
- @Override
- public Map<String, VersionedValue> getAll(String tableName) {
- return client.getAll(tableName);
- }
-
-
- @Override
- public BatchReadResult batchRead(BatchReadRequest batchRequest) {
- return new BatchReadResult(client.batchRead(batchRequest));
- }
-
- @Override
- public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
- return new BatchWriteResult(client.batchWrite(batchRequest));
- }
-
- @Override
- public VersionedValue put(String tableName, String key, byte[] value) {
- BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
- WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
- if (writeResult.status().equals(WriteStatus.OK)) {
- return writeResult.previousValue();
- }
- throw new DatabaseException("put failed due to status: " + writeResult.status());
- }
-
- @Override
- public boolean putIfAbsent(String tableName, String key, byte[] value) {
- BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
- .putIfAbsent(tableName, key, value).build();
- WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
- if (writeResult.status().equals(WriteStatus.OK)) {
- return true;
- } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
- return false;
- }
- throw new DatabaseException("putIfAbsent failed due to status: "
- + writeResult.status());
- }
-
- @Override
- public boolean putIfVersionMatches(String tableName, String key,
- byte[] value, long version) {
- BatchWriteRequest batchRequest =
- new BatchWriteRequest.Builder()
- .putIfVersionMatches(tableName, key, value, version).build();
- WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
- if (writeResult.status().equals(WriteStatus.OK)) {
- return true;
- } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
- return false;
- }
- throw new DatabaseException("putIfVersionMatches failed due to status: "
- + writeResult.status());
- }
-
- @Override
- public boolean putIfValueMatches(String tableName, String key,
- byte[] oldValue, byte[] newValue) {
- BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
- .putIfValueMatches(tableName, key, oldValue, newValue).build();
- WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
- if (writeResult.status().equals(WriteStatus.OK)) {
- return true;
- } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
- return false;
- }
- throw new DatabaseException("putIfValueMatches failed due to status: "
- + writeResult.status());
- }
-
- @Override
- public VersionedValue remove(String tableName, String key) {
- BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
- .remove(tableName, key).build();
- WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
- if (writeResult.status().equals(WriteStatus.OK)) {
- return writeResult.previousValue();
- }
- throw new DatabaseException("remove failed due to status: "
- + writeResult.status());
- }
-
- @Override
- public boolean removeIfVersionMatches(String tableName, String key,
- long version) {
- BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
- .removeIfVersionMatches(tableName, key, version).build();
- WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
- if (writeResult.status().equals(WriteStatus.OK)) {
- return true;
- } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
- return false;
- }
- throw new DatabaseException("removeIfVersionMatches failed due to status: "
- + writeResult.status());
- }
-
- @Override
- public boolean removeIfValueMatches(String tableName, String key,
- byte[] value) {
- BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
- .removeIfValueMatches(tableName, key, value).build();
- WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
- if (writeResult.status().equals(WriteStatus.OK)) {
- return true;
- } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
- return false;
- }
- throw new DatabaseException("removeIfValueMatches failed due to status: "
- + writeResult.status());
- }
-
- @Override
- public void addMember(final ControllerNode node) {
- final TcpMember tcpMember = new TcpMember(node.ip().toString(),
- node.tcpPort());
- log.info("{} was added to the cluster", tcpMember);
- synchronized (clusterConfig) {
- clusterConfig.addRemoteMember(tcpMember);
- }
- }
-
- @Override
- public Optional<ControllerNode> leader() {
- if (copycat != null) {
- if (copycat.isLeader()) {
- return Optional.of(clusterService.getLocalNode());
- }
- Member leader = copycat.cluster().remoteMember(copycat.leader());
- return Optional.ofNullable(getNodeIdFromMember(leader));
- }
- return Optional.ofNullable(getNodeIdFromMember(client.getCurrentLeader()));
- }
-
- private final class LeaderAdvertiser implements Runnable {
-
- @Override
- public void run() {
- try {
- LeaderElectEvent event = myLeaderEvent;
- if (event != null) {
- log.trace("Broadcasting RAFT_LEADER_ELECTION_EVENT: {}", event);
- // This node just became the leader.
- clusterCommunicator.broadcastIncludeSelf(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- RAFT_LEADER_ELECTION_EVENT,
- ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
- }
- } catch (Exception e) {
- log.debug("LeaderAdvertiser failed with exception", e);
- }
- }
-
- }
-
- private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
- @Override
- public void handle(LeaderElectEvent event) {
- log.debug("Received LeaderElectEvent: {}", event);
- if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
- log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
- myLeaderEvent = event;
- // This node just became the leader.
- clusterCommunicator.broadcastIncludeSelf(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- RAFT_LEADER_ELECTION_EVENT,
- ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
- } else {
- if (myLeaderEvent != null) {
- log.debug("This node is no longer the Leader");
- }
- myLeaderEvent = null;
- }
- }
- }
-
- private final class InternalClusterEventListener
- implements ClusterEventListener {
-
- @Override
- public void event(ClusterEvent event) {
- // TODO: Not every node should be part of the consensus ring.
-
- final ControllerNode node = event.subject();
- final TcpMember tcpMember = new TcpMember(node.ip().toString(),
- node.tcpPort());
-
- switch (event.type()) {
- case INSTANCE_ACTIVATED:
- case INSTANCE_ADDED:
- if (autoAddMember) {
- synchronized (clusterConfig) {
- if (!clusterConfig.getMembers().contains(tcpMember)) {
- log.info("{} was automatically added to the cluster", tcpMember);
- clusterConfig.addRemoteMember(tcpMember);
- }
- }
- }
- break;
- case INSTANCE_DEACTIVATED:
- case INSTANCE_REMOVED:
- if (autoAddMember) {
- Set<DefaultControllerNode> members
- = tabletMembers.getOrDefault(DEFAULT_TABLET,
- Collections.emptySet());
- // remove only if not the initial members
- if (!members.contains(node)) {
- synchronized (clusterConfig) {
- if (clusterConfig.getMembers().contains(tcpMember)) {
- log.info("{} was automatically removed from the cluster", tcpMember);
- clusterConfig.removeRemoteMember(tcpMember);
- }
- }
- }
- }
- break;
- default:
- break;
- }
- if (copycat != null) {
- log.debug("Current cluster: {}", copycat.cluster());
- }
- clusterEventLatch.countDown();
- }
-
- }
-
- @Override
- public void removeMember(final ControllerNode node) {
- final TcpMember tcpMember = new TcpMember(node.ip().toString(),
- node.tcpPort());
- log.info("{} was removed from the cluster", tcpMember);
- synchronized (clusterConfig) {
- clusterConfig.removeRemoteMember(tcpMember);
- }
- }
-
- @Override
- public Collection<ControllerNode> listMembers() {
- if (copycat == null) {
- return ImmutableList.of();
- }
- Set<ControllerNode> members = new HashSet<>();
- for (Member member : copycat.cluster().members()) {
- ControllerNode node = getNodeIdFromMember(member);
- if (node == null) {
- log.info("No Node found for {}", member);
- continue;
- }
- members.add(node);
- }
- return members;
- }
-
- private ControllerNode getNodeIdFromMember(Member member) {
- if (member instanceof TcpMember) {
- final TcpMember tcpMember = (TcpMember) member;
- // TODO assuming tcpMember#host to be IP address,
- // but if not lookup DNS, etc. first
- IpAddress ip = IpAddress.valueOf(tcpMember.host());
- int tcpPort = tcpMember.port();
- for (ControllerNode node : clusterService.getNodes()) {
- if (node.ip().equals(ip) &&
- node.tcpPort() == tcpPort) {
- return node;
- }
- }
- }
- return null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseProtocolService.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseProtocolService.java
deleted file mode 100644
index 7d9aa0a..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseProtocolService.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.spi.protocol.Protocol;
-
-// interface required for connecting DatabaseManager + ClusterMessagingProtocol
-// TODO: Consider changing ClusterMessagingProtocol to non-Service class
-public interface DatabaseProtocolService extends Protocol<TcpMember> {
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseStateMachine.java
deleted file mode 100644
index 150522c..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseStateMachine.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static org.onlab.util.Tools.namedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
-
-import net.kuujo.copycat.Command;
-import net.kuujo.copycat.Query;
-import net.kuujo.copycat.StateMachine;
-
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.service.BatchReadRequest;
-import org.onosproject.store.service.BatchWriteRequest;
-import org.onosproject.store.service.ReadRequest;
-import org.onosproject.store.service.ReadResult;
-import org.onosproject.store.service.ReadStatus;
-import org.onosproject.store.service.VersionedValue;
-import org.onosproject.store.service.WriteRequest;
-import org.onosproject.store.service.WriteResult;
-import org.onosproject.store.service.WriteStatus;
-import org.slf4j.Logger;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * StateMachine whose transitions are coordinated/replicated
- * by Raft consensus.
- * Each Raft cluster member has a instance of this state machine that is
- * independently updated in lock step once there is consensus
- * on the next transition.
- */
-public class DatabaseStateMachine implements StateMachine {
-
- private final Logger log = getLogger(getClass());
-
- private final ExecutorService updatesExecutor =
- Executors.newSingleThreadExecutor(namedThreads("onos-db-statemachine-updates"));
-
- // message subject for database update notifications.
- public static final MessageSubject DATABASE_UPDATE_EVENTS =
- new MessageSubject("database-update-events");
-
- private final Set<DatabaseUpdateEventListener> listeners = Sets.newIdentityHashSet();
-
- // durable internal state of the database.
- private State state = new State();
-
- // TODO make this configurable
- private boolean compressSnapshot = true;
-
- @Command
- public boolean createTable(String tableName) {
- TableMetadata metadata = new TableMetadata(tableName);
- return createTable(metadata);
- }
-
- @Command
- public boolean createTable(String tableName, Integer ttlMillis) {
- TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
- return createTable(metadata);
- }
-
- private boolean createTable(TableMetadata metadata) {
- Map<String, VersionedValue> existingTable = state.getTable(metadata.tableName());
- if (existingTable != null) {
- return false;
- }
- state.createTable(metadata);
-
- updatesExecutor.submit(new Runnable() {
- @Override
- public void run() {
- for (DatabaseUpdateEventListener listener : listeners) {
- listener.tableCreated(metadata);
- }
- }
- });
-
- return true;
- }
-
- @Command
- public boolean dropTable(String tableName) {
- if (state.removeTable(tableName)) {
-
- updatesExecutor.submit(new Runnable() {
- @Override
- public void run() {
- for (DatabaseUpdateEventListener listener : listeners) {
- listener.tableDeleted(tableName);
- }
- }
- });
-
- return true;
- }
- return false;
- }
-
- @Command
- public boolean dropAllTables() {
- Set<String> tableNames = state.getTableNames();
- state.removeAllTables();
-
- updatesExecutor.submit(new Runnable() {
- @Override
- public void run() {
- for (DatabaseUpdateEventListener listener : listeners) {
- for (String tableName : tableNames) {
- listener.tableDeleted(tableName);
- }
- }
- }
- });
-
- return true;
- }
-
- @Query
- public Set<String> listTables() {
- return ImmutableSet.copyOf(state.getTableNames());
- }
-
- @Query
- public List<ReadResult> read(BatchReadRequest batchRequest) {
- List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
- for (ReadRequest request : batchRequest.getAsList()) {
- Map<String, VersionedValue> table = state.getTable(request.tableName());
- if (table == null) {
- results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
- continue;
- }
- VersionedValue value = VersionedValue.copy(table.get(request.key()));
- results.add(new ReadResult(ReadStatus.OK, request.tableName(), request.key(), value));
- }
- return results;
- }
-
- @Query
- public Map<String, VersionedValue> getAll(String tableName) {
- return ImmutableMap.copyOf(state.getTable(tableName));
- }
-
-
- WriteStatus checkIfApplicable(WriteRequest request, VersionedValue value) {
-
- switch (request.type()) {
- case PUT:
- return WriteStatus.OK;
-
- case PUT_IF_ABSENT:
- if (value == null) {
- return WriteStatus.OK;
- }
- return WriteStatus.PRECONDITION_VIOLATION;
-
- case PUT_IF_VALUE:
- case REMOVE_IF_VALUE:
- if (value != null && Arrays.equals(value.value(), request.oldValue())) {
- return WriteStatus.OK;
- }
- return WriteStatus.PRECONDITION_VIOLATION;
-
- case PUT_IF_VERSION:
- case REMOVE_IF_VERSION:
- if (value != null && request.previousVersion() == value.version()) {
- return WriteStatus.OK;
- }
- return WriteStatus.PRECONDITION_VIOLATION;
-
- case REMOVE:
- return WriteStatus.OK;
-
- default:
- break;
- }
- log.error("Should never reach here {}", request);
- return WriteStatus.ABORTED;
- }
-
- @Command
- public List<WriteResult> write(BatchWriteRequest batchRequest) {
-
- // applicability check
- boolean abort = false;
- List<WriteResult> results = new ArrayList<>(batchRequest.batchSize());
-
- for (WriteRequest request : batchRequest.getAsList()) {
- Map<String, VersionedValue> table = state.getTable(request.tableName());
- if (table == null) {
- results.add(new WriteResult(WriteStatus.NO_SUCH_TABLE, null));
- abort = true;
- continue;
- }
- final VersionedValue value = table.get(request.key());
- WriteStatus result = checkIfApplicable(request, value);
- results.add(new WriteResult(result, value));
- if (result != WriteStatus.OK) {
- abort = true;
- }
- }
-
- if (abort) {
- for (int i = 0; i < results.size(); ++i) {
- if (results.get(i).status() == WriteStatus.OK) {
- results.set(i, new WriteResult(WriteStatus.ABORTED, null));
- }
- }
- return results;
- }
-
- List<TableModificationEvent> tableModificationEvents = Lists.newLinkedList();
-
- // apply changes
- for (WriteRequest request : batchRequest.getAsList()) {
- Map<String, VersionedValue> table = state.getTable(request.tableName());
-
- TableModificationEvent tableModificationEvent = null;
- // FIXME: If this method could be called by multiple thread,
- // synchronization scope is wrong.
- // Whole function including applicability check needs to be protected.
- // Confirm copycat's thread safety requirement for StateMachine
- // TODO: If we need isolation, we need to block reads also
- synchronized (table) {
- switch (request.type()) {
- case PUT:
- case PUT_IF_ABSENT:
- case PUT_IF_VALUE:
- case PUT_IF_VERSION:
- VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
- VersionedValue previousValue = table.put(request.key(), newValue);
- WriteResult putResult = new WriteResult(WriteStatus.OK, previousValue);
- results.add(putResult);
- tableModificationEvent = (previousValue == null) ?
- TableModificationEvent.rowAdded(request.tableName(), request.key(), newValue) :
- TableModificationEvent.rowUpdated(request.tableName(), request.key(), newValue);
- break;
-
- case REMOVE:
- case REMOVE_IF_VALUE:
- case REMOVE_IF_VERSION:
- VersionedValue removedValue = table.remove(request.key());
- WriteResult removeResult = new WriteResult(WriteStatus.OK, removedValue);
- results.add(removeResult);
- if (removedValue != null) {
- tableModificationEvent =
- TableModificationEvent.rowDeleted(request.tableName(), request.key(), removedValue);
- }
- break;
-
- default:
- log.error("Invalid WriteRequest type {}", request.type());
- break;
- }
- }
-
- if (tableModificationEvent != null) {
- tableModificationEvents.add(tableModificationEvent);
- }
- }
-
- // notify listeners of table mod events.
-
- updatesExecutor.submit(new Runnable() {
- @Override
- public void run() {
- for (DatabaseUpdateEventListener listener : listeners) {
- for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
- log.trace("Publishing table modification event: {}", tableModificationEvent);
- listener.tableModified(tableModificationEvent);
- }
- }
- }
- });
-
- return results;
- }
-
- public static class State {
-
- private final Map<String, TableMetadata> tableMetadata = Maps.newHashMap();
- private final Map<String, Map<String, VersionedValue>> tableData = Maps.newHashMap();
- private long versionCounter = 1;
-
- Map<String, VersionedValue> getTable(String tableName) {
- return tableData.get(tableName);
- }
-
- void createTable(TableMetadata metadata) {
- tableMetadata.put(metadata.tableName, metadata);
- tableData.put(metadata.tableName, Maps.newHashMap());
- }
-
- TableMetadata getTableMetadata(String tableName) {
- return tableMetadata.get(tableName);
- }
-
- long nextVersion() {
- return versionCounter++;
- }
-
- Set<String> getTableNames() {
- return ImmutableSet.copyOf(tableMetadata.keySet());
- }
-
-
- boolean removeTable(String tableName) {
- if (!tableMetadata.containsKey(tableName)) {
- return false;
- }
- tableMetadata.remove(tableName);
- tableData.remove(tableName);
- return true;
- }
-
- void removeAllTables() {
- tableMetadata.clear();
- tableData.clear();
- }
- }
-
- public static class TableMetadata {
- private final String tableName;
- private final boolean expireOldEntries;
- private final int ttlMillis;
-
- public TableMetadata(String tableName) {
- this.tableName = tableName;
- this.expireOldEntries = false;
- this.ttlMillis = Integer.MAX_VALUE;
-
- }
-
- public TableMetadata(String tableName, int ttlMillis) {
- this.tableName = tableName;
- this.expireOldEntries = true;
- this.ttlMillis = ttlMillis;
- }
-
- public String tableName() {
- return tableName;
- }
-
- public boolean expireOldEntries() {
- return expireOldEntries;
- }
-
- public int ttlMillis() {
- return ttlMillis;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("tableName", tableName)
- .add("expireOldEntries", expireOldEntries)
- .add("ttlMillis", ttlMillis)
- .toString();
- }
- }
-
- @Override
- public byte[] takeSnapshot() {
- try {
- if (compressSnapshot) {
- byte[] input = DB_SERIALIZER.encode(state);
- ByteArrayOutputStream comp = new ByteArrayOutputStream(input.length);
- DeflaterOutputStream compressor = new DeflaterOutputStream(comp);
- compressor.write(input, 0, input.length);
- compressor.close();
- return comp.toByteArray();
- } else {
- return DB_SERIALIZER.encode(state);
- }
- } catch (Exception e) {
- log.error("Failed to take snapshot", e);
- throw new SnapshotException(e);
- }
- }
-
- @Override
- public void installSnapshot(byte[] data) {
- try {
- if (compressSnapshot) {
- ByteArrayInputStream in = new ByteArrayInputStream(data);
- InflaterInputStream decompressor = new InflaterInputStream(in);
- this.state = DB_SERIALIZER.decode(decompressor);
- } else {
- this.state = DB_SERIALIZER.decode(data);
- }
-
- updatesExecutor.submit(new Runnable() {
- @Override
- public void run() {
- for (DatabaseUpdateEventListener listener : listeners) {
- listener.snapshotInstalled(state);
- }
- }
- });
-
- } catch (Exception e) {
- log.error("Failed to install from snapshot", e);
- throw new SnapshotException(e);
- }
- }
-
- /**
- * Adds specified DatabaseUpdateEventListener.
- * @param listener listener to add
- */
- public void addEventListener(DatabaseUpdateEventListener listener) {
- listeners.add(listener);
- }
-
- /**
- * Removes specified DatabaseUpdateEventListener.
- * @param listener listener to remove
- */
- public void removeEventListener(DatabaseUpdateEventListener listener) {
- listeners.remove(listener);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseUpdateEventListener.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseUpdateEventListener.java
deleted file mode 100644
index 5dd8651..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DatabaseUpdateEventListener.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.service.impl;
-
-import org.onosproject.store.service.impl.DatabaseStateMachine.TableMetadata;
-
-/**
- * Interface of database update event listeners.
- */
-public interface DatabaseUpdateEventListener {
-
- /**
- * Notifies listeners of a table modified event.
- * @param event table modification event.
- */
- public void tableModified(TableModificationEvent event);
-
- /**
- * Notifies listeners of a table created event.
- * @param metadata metadata for the created table.
- */
- public void tableCreated(TableMetadata metadata);
-
- /**
- * Notifies listeners of a table deleted event.
- * @param tableName name of the table deleted
- */
- public void tableDeleted(String tableName);
-
- /**
- * Notifies listeners of a snapshot installation event.
- * @param snapshotState installed snapshot state.
- */
- public void snapshotInstalled(DatabaseStateMachine.State snapshotState);
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DistributedLock.java
deleted file mode 100644
index 5717d74..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DistributedLock.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static com.google.common.base.Verify.verify;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.joda.time.DateTime;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.store.service.DatabaseException;
-import org.onosproject.store.service.DatabaseService;
-import org.onosproject.store.service.Lock;
-import org.onosproject.store.service.VersionedValue;
-import org.slf4j.Logger;
-
-/**
- * A distributed lock implementation.
- */
-public class DistributedLock implements Lock {
-
- private final Logger log = getLogger(getClass());
-
- private final DistributedLockManager lockManager;
- private final DatabaseService databaseService;
- private final String path;
- private DateTime lockExpirationTime;
- private AtomicBoolean isLocked = new AtomicBoolean(false);
- private volatile long epoch = 0;
- private byte[] lockId;
-
- public DistributedLock(
- String path,
- DatabaseService databaseService,
- ClusterService clusterService,
- DistributedLockManager lockManager) {
-
- this.path = path;
- this.databaseService = databaseService;
- this.lockManager = lockManager;
- this.lockId =
- (UUID.randomUUID().toString() + "::" +
- clusterService.getLocalNode().id().toString()).
- getBytes(StandardCharsets.UTF_8);
- }
-
- @Override
- public String path() {
- return path;
- }
-
- @Override
- public void lock(int leaseDurationMillis) throws InterruptedException {
- try {
- lockAsync(leaseDurationMillis).get();
- } catch (ExecutionException e) {
- throw new DatabaseException(e);
- }
- }
-
- @Override
- public CompletableFuture<Void> lockAsync(int leaseDurationMillis) {
- try {
- if (isLocked() || tryLock(leaseDurationMillis)) {
- return CompletableFuture.<Void>completedFuture(null);
- }
- return lockManager.lockIfAvailable(this, leaseDurationMillis);
- } catch (DatabaseException e) {
- CompletableFuture<Void> lockFuture = new CompletableFuture<>();
- lockFuture.completeExceptionally(e);
- return lockFuture;
- }
- }
-
- @Override
- public boolean tryLock(int leaseDurationMillis) {
- if (databaseService.putIfAbsent(
- DistributedLockManager.ONOS_LOCK_TABLE_NAME,
- path,
- lockId)) {
- VersionedValue vv =
- databaseService.get(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path);
- verify(Arrays.equals(vv.value(), lockId));
- epoch = vv.version();
- isLocked.set(true);
- lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
- return true;
- }
- return false;
- }
-
- @Override
- public boolean tryLock(
- int waitTimeMillis,
- int leaseDurationMillis) throws InterruptedException {
- if (isLocked() || tryLock(leaseDurationMillis)) {
- return true;
- }
-
- CompletableFuture<Void> future =
- lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
- try {
- future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
- return true;
- } catch (ExecutionException e) {
- throw new DatabaseException(e);
- } catch (TimeoutException e) {
- log.debug("Timed out waiting to acquire lock for {}", path);
- return false;
- }
- }
-
- @Override
- public boolean isLocked() {
- if (isLocked.get()) {
- // We rely on local information to check
- // if the lock expired.
- // This should should make this call
- // light weight, while still retaining the
- // safety guarantees.
- if (DateTime.now().isAfter(lockExpirationTime)) {
- isLocked.set(false);
- return false;
- } else {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public long epoch() {
- return epoch;
- }
-
- @Override
- public void unlock() {
- if (!isLocked()) {
- return;
- } else {
- if (databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId)) {
- isLocked.set(false);
- }
- }
- }
-
- @Override
- public boolean extendExpiration(int leaseDurationMillis) {
- if (!isLocked()) {
- log.warn("Ignoring request to extend expiration for lock {}."
- + " ExtendExpiration must be called for locks that are already acquired.", path);
- return false;
- }
-
- if (databaseService.putIfValueMatches(
- DistributedLockManager.ONOS_LOCK_TABLE_NAME,
- path,
- lockId,
- lockId)) {
- lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
- log.debug("Succeeded in extending lock {} expiration time to {}", lockExpirationTime);
- return true;
- } else {
- log.info("Failed to extend expiration for {}", path);
- return false;
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/DistributedLockManager.java
deleted file mode 100644
index c131b6e..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/DistributedLockManager.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static org.onlab.util.Tools.namedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.joda.time.DateTime;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.service.DatabaseAdminService;
-import org.onosproject.store.service.DatabaseException;
-import org.onosproject.store.service.DatabaseService;
-import org.onosproject.store.service.Lock;
-import org.onosproject.store.service.LockEventListener;
-import org.onosproject.store.service.LockService;
-import org.slf4j.Logger;
-
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Multimaps;
-
-@Component(immediate = false)
-@Service
-public class DistributedLockManager implements LockService {
-
- private static final ExecutorService THREAD_POOL =
- Executors.newCachedThreadPool(namedThreads("onos-lock-manager-%d"));
-
- private final Logger log = getLogger(getClass());
-
- public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
-
- public static final int DEAD_LOCK_TIMEOUT_MS = 5000;
-
- private final ListMultimap<String, LockRequest> locksToAcquire =
- Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private DatabaseAdminService databaseAdminService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private DatabaseService databaseService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterService clusterService;
-
- @Activate
- public void activate() {
- try {
- Set<String> tables = databaseAdminService.listTables();
-
- if (!tables.contains(ONOS_LOCK_TABLE_NAME)) {
- if (databaseAdminService.createTable(ONOS_LOCK_TABLE_NAME, DEAD_LOCK_TIMEOUT_MS)) {
- log.info("Created {} table.", ONOS_LOCK_TABLE_NAME);
- }
- }
- } catch (DatabaseException e) {
- log.error("DistributedLockManager#activate failed.", e);
- }
-
- clusterCommunicator.addSubscriber(
- DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
- new LockEventMessageListener());
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
- locksToAcquire.clear();
- log.info("Stopped.");
- }
-
- @Override
- public Lock create(String path) {
- return new DistributedLock(path, databaseService, clusterService, this);
- }
-
- @Override
- public void addListener(LockEventListener listener) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void removeListener(LockEventListener listener) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Attempts to acquire the lock as soon as it becomes available.
- * @param lock lock to acquire.
- * @param waitTimeMillis maximum time to wait before giving up.
- * @param leaseDurationMillis the duration for which to acquire the lock initially.
- * @return Future that can be blocked on until lock becomes available.
- */
- protected CompletableFuture<Void> lockIfAvailable(
- Lock lock,
- int waitTimeMillis,
- int leaseDurationMillis) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- LockRequest request = new LockRequest(
- lock,
- leaseDurationMillis,
- DateTime.now().plusMillis(waitTimeMillis),
- future);
- locksToAcquire.put(lock.path(), request);
- return future;
- }
-
- /**
- * Attempts to acquire the lock as soon as it becomes available.
- * @param lock lock to acquire.
- * @param leaseDurationMillis the duration for which to acquire the lock initially.
- * @return Future lease expiration date.
- */
- protected CompletableFuture<Void> lockIfAvailable(
- Lock lock,
- int leaseDurationMillis) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- LockRequest request = new LockRequest(
- lock,
- leaseDurationMillis,
- DateTime.now().plusYears(100),
- future);
- locksToAcquire.put(lock.path(), request);
- return future;
- }
-
- private class LockEventMessageListener implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- TableModificationEvent event = ClusterMessagingProtocol.DB_SERIALIZER
- .decode(message.payload());
- if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
- event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
- THREAD_POOL.submit(new RetryLockTask(event.key()));
- }
- }
- }
-
- private class RetryLockTask implements Runnable {
-
- private final String path;
-
- public RetryLockTask(String path) {
- this.path = path;
- }
-
- @Override
- public void run() {
- if (!locksToAcquire.containsKey(path)) {
- return;
- }
-
- List<LockRequest> existingRequests = locksToAcquire.get(path);
- if (existingRequests == null || existingRequests.isEmpty()) {
- return;
- }
- log.info("Path {} is now available for locking. There are {} outstanding "
- + "requests for it.",
- path, existingRequests.size());
-
- synchronized (existingRequests) {
- Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
- while (existingRequestIterator.hasNext()) {
- LockRequest request = existingRequestIterator.next();
- if (DateTime.now().isAfter(request.requestExpirationTime())) {
- // request expired.
- existingRequestIterator.remove();
- } else {
- if (request.lock().tryLock(request.leaseDurationMillis())) {
- request.future().complete(null);
- existingRequestIterator.remove();
- }
- }
- }
- }
- }
- }
-
- private class LockRequest {
-
- private final Lock lock;
- private final DateTime requestExpirationTime;
- private final int leaseDurationMillis;
- private final CompletableFuture<Void> future;
-
- public LockRequest(
- Lock lock,
- int leaseDurationMillis,
- DateTime requestExpirationTime,
- CompletableFuture<Void> future) {
-
- this.lock = lock;
- this.requestExpirationTime = requestExpirationTime;
- this.leaseDurationMillis = leaseDurationMillis;
- this.future = future;
- }
-
- public Lock lock() {
- return lock;
- }
-
- public DateTime requestExpirationTime() {
- return requestExpirationTime;
- }
-
- public int leaseDurationMillis() {
- return leaseDurationMillis;
- }
-
- public CompletableFuture<Void> future() {
- return future;
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/MapDBLog.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/MapDBLog.java
deleted file mode 100644
index 1859d99..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/MapDBLog.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verifyNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentNavigableMap;
-
-import net.kuujo.copycat.log.Entry;
-import net.kuujo.copycat.log.Log;
-import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
-
-import org.mapdb.Atomic;
-import org.mapdb.BTreeMap;
-import org.mapdb.DB;
-import org.mapdb.DBMaker;
-import org.mapdb.Serializer;
-import org.mapdb.TxBlock;
-import org.mapdb.TxMaker;
-import org.onosproject.store.serializers.StoreSerializer;
-import org.slf4j.Logger;
-
-/**
- * MapDB based log implementation.
- */
-public class MapDBLog implements Log {
-
- private final Logger log = getLogger(getClass());
-
- private final File dbFile;
- private TxMaker txMaker;
- private final StoreSerializer serializer;
- private static final String LOG_NAME = "log";
- private static final String SIZE_FIELD_NAME = "size";
-
- private int cacheSize = 256;
-
- public MapDBLog(String dbFileName, StoreSerializer serializer) {
- this.dbFile = new File(dbFileName);
- this.serializer = serializer;
- }
-
- @Override
- public void open() throws IOException {
- txMaker = DBMaker
- .newFileDB(dbFile)
- .mmapFileEnableIfSupported()
- .cacheSize(cacheSize)
- .makeTxMaker();
- log.info("Raft log file: {}", dbFile.getCanonicalPath());
- }
-
- @Override
- public void close() throws IOException {
- assertIsOpen();
- txMaker.close();
- txMaker = null;
- }
-
- @Override
- public boolean isOpen() {
- return txMaker != null;
- }
-
- protected void assertIsOpen() {
- checkState(isOpen(), "The log is not currently open.");
- }
-
- @Override
- public long appendEntry(Entry entry) {
- checkArgument(entry != null, "expecting non-null entry");
- return appendEntries(entry).get(0);
- }
-
- @Override
- public List<Long> appendEntries(Entry... entries) {
- checkArgument(entries != null, "expecting non-null entries");
- return appendEntries(Arrays.asList(entries));
- }
-
- @Override
- public synchronized List<Long> appendEntries(List<Entry> entries) {
- assertIsOpen();
- checkArgument(entries != null, "expecting non-null entries");
- final List<Long> indices = new ArrayList<>(entries.size());
-
- txMaker.execute(new TxBlock() {
- @Override
- public void tx(DB db) {
- BTreeMap<Long, byte[]> log = getLogMap(db);
- Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
- long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
- long addedBytes = 0;
- for (Entry entry : entries) {
- byte[] entryBytes = verifyNotNull(serializer.encode(entry),
- "Writing LogEntry %s failed", nextIndex);
- log.put(nextIndex, entryBytes);
- addedBytes += entryBytes.length;
- indices.add(nextIndex);
- nextIndex++;
- }
- size.addAndGet(addedBytes);
- }
- });
-
- return indices;
- }
-
- @Override
- public boolean containsEntry(long index) {
- assertIsOpen();
- DB db = txMaker.makeTx();
- try {
- BTreeMap<Long, byte[]> log = getLogMap(db);
- return log.containsKey(index);
- } finally {
- db.close();
- }
- }
-
- @Override
- public void delete() throws IOException {
- assertIsOpen();
- txMaker.execute(new TxBlock() {
- @Override
- public void tx(DB db) {
- BTreeMap<Long, byte[]> log = getLogMap(db);
- Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
- log.clear();
- size.set(0);
- }
- });
- }
-
- @Override
- public <T extends Entry> T firstEntry() {
- assertIsOpen();
- DB db = txMaker.makeTx();
- try {
- BTreeMap<Long, byte[]> log = getLogMap(db);
- return log.isEmpty() ? null : verifyNotNull(decodeEntry(log.firstEntry().getValue()));
- } finally {
- db.close();
- }
- }
-
- @Override
- public long firstIndex() {
- assertIsOpen();
- DB db = txMaker.makeTx();
- try {
- BTreeMap<Long, byte[]> log = getLogMap(db);
- return log.isEmpty() ? 0 : log.firstKey();
- } finally {
- db.close();
- }
- }
-
- private <T extends Entry> T decodeEntry(final byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- return serializer.decode(bytes.clone());
- }
-
- @Override
- public <T extends Entry> List<T> getEntries(long from, long to) {
- assertIsOpen();
- DB db = txMaker.makeTx();
- try {
- BTreeMap<Long, byte[]> log = getLogMap(db);
- if (log.isEmpty()) {
- throw new LogIndexOutOfBoundsException("Log is empty");
- } else if (from < log.firstKey()) {
- throw new LogIndexOutOfBoundsException("From index out of bounds.");
- } else if (to > log.lastKey()) {
- throw new LogIndexOutOfBoundsException("To index out of bounds.");
- }
- List<T> entries = new ArrayList<>((int) (to - from + 1));
- for (long i = from; i <= to; i++) {
- T entry = verifyNotNull(decodeEntry(log.get(i)), "LogEntry %s was null", i);
- entries.add(entry);
- }
- return entries;
- } finally {
- db.close();
- }
- }
-
- @Override
- public <T extends Entry> T getEntry(long index) {
- assertIsOpen();
- DB db = txMaker.makeTx();
- try {
- BTreeMap<Long, byte[]> log = getLogMap(db);
- byte[] entryBytes = log.get(index);
- return entryBytes == null ? null : verifyNotNull(decodeEntry(entryBytes),
- "LogEntry %s was null", index);
- } finally {
- db.close();
- }
- }
-
- @Override
- public boolean isEmpty() {
- assertIsOpen();
- DB db = txMaker.makeTx();
- try {
- BTreeMap<Long, byte[]> log = getLogMap(db);
- return log.isEmpty();
- } finally {
- db.close();
- }
- }
-
- @Override
- public <T extends Entry> T lastEntry() {
- assertIsOpen();
- DB db = txMaker.makeTx();
- try {
- BTreeMap<Long, byte[]> log = getLogMap(db);
- return log.isEmpty() ? null : verifyNotNull(decodeEntry(log.lastEntry().getValue()));
- } finally {
- db.close();
- }
- }
-
- @Override
- public long lastIndex() {
- assertIsOpen();
- DB db = txMaker.makeTx();
- try {
- BTreeMap<Long, byte[]> log = getLogMap(db);
- return log.isEmpty() ? 0 : log.lastKey();
- } finally {
- db.close();
- }
- }
-
- @Override
- public void removeAfter(long index) {
- assertIsOpen();
- txMaker.execute(new TxBlock() {
- @Override
- public void tx(DB db) {
- BTreeMap<Long, byte[]> log = getLogMap(db);
- Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
- long removedBytes = 0;
- ConcurrentNavigableMap<Long, byte[]> tailMap = log.tailMap(index, false);
- Iterator<Map.Entry<Long, byte[]>> it = tailMap.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<Long, byte[]> entry = it.next();
- removedBytes += entry.getValue().length;
- it.remove();
- }
- size.addAndGet(-removedBytes);
- }
- });
- }
-
- @Override
- public long size() {
- assertIsOpen();
- DB db = txMaker.makeTx();
- try {
- Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
- return size.get();
- } finally {
- db.close();
- }
- }
-
- @Override
- public void sync() throws IOException {
- assertIsOpen();
- }
-
- @Override
- public void compact(long index, Entry entry) throws IOException {
-
- assertIsOpen();
- txMaker.execute(new TxBlock() {
- @Override
- public void tx(DB db) {
- BTreeMap<Long, byte[]> log = getLogMap(db);
- Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
- ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
- Iterator<Map.Entry<Long, byte[]>> it = headMap.entrySet().iterator();
-
- long deletedBytes = 0;
- while (it.hasNext()) {
- Map.Entry<Long, byte[]> e = it.next();
- deletedBytes += e.getValue().length;
- it.remove();
- }
- size.addAndGet(-deletedBytes);
- byte[] entryBytes = verifyNotNull(serializer.encode(entry));
- byte[] existingEntry = log.put(index, entryBytes);
- if (existingEntry != null) {
- size.addAndGet(entryBytes.length - existingEntry.length);
- } else {
- size.addAndGet(entryBytes.length);
- }
- db.compact();
- }
- });
- }
-
- private BTreeMap<Long, byte[]> getLogMap(DB db) {
- return db.createTreeMap(LOG_NAME)
- .valuesOutsideNodesEnable()
- .keySerializerWrap(Serializer.LONG)
- .valueSerializer(Serializer.BYTE_ARRAY)
- .makeOrGet();
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/SnapshotException.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/SnapshotException.java
deleted file mode 100644
index 6b1dc8c..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/SnapshotException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import org.onosproject.store.service.DatabaseException;
-
-/**
- * Exception that indicates a problem with the state machine snapshotting.
- */
-@SuppressWarnings("serial")
-public class SnapshotException extends DatabaseException {
- public SnapshotException(Throwable t) {
- super(t);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TableModificationEvent.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/TableModificationEvent.java
deleted file mode 100644
index 1802e51..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TableModificationEvent.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import org.onosproject.store.service.VersionedValue;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * A table modification event.
- */
-public final class TableModificationEvent {
-
- /**
- * Type of table modification event.
- */
- public enum Type {
- ROW_ADDED,
- ROW_DELETED,
- ROW_UPDATED
- }
-
- private final String tableName;
- private final String key;
- private final VersionedValue value;
- private final Type type;
-
- /**
- * Creates a new row deleted table modification event.
- * @param tableName table name.
- * @param key row key
- * @param value value associated with the key when it was deleted.
- * @return table modification event.
- */
- public static TableModificationEvent rowDeleted(String tableName, String key, VersionedValue value) {
- return new TableModificationEvent(tableName, key, value, Type.ROW_DELETED);
- }
-
- /**
- * Creates a new row added table modification event.
- * @param tableName table name.
- * @param key row key
- * @param value value associated with the key
- * @return table modification event.
- */
- public static TableModificationEvent rowAdded(String tableName, String key, VersionedValue value) {
- return new TableModificationEvent(tableName, key, value, Type.ROW_ADDED);
- }
-
- /**
- * Creates a new row updated table modification event.
- * @param tableName table name.
- * @param key row key
- * @param newValue value
- * @return table modification event.
- */
- public static TableModificationEvent rowUpdated(String tableName, String key, VersionedValue newValue) {
- return new TableModificationEvent(tableName, key, newValue, Type.ROW_UPDATED);
- }
-
- private TableModificationEvent(String tableName, String key, VersionedValue value, Type type) {
- this.tableName = tableName;
- this.key = key;
- this.value = value;
- this.type = type;
- }
-
- /**
- * Returns name of table this event is for.
- * @return table name
- */
- public String tableName() {
- return tableName;
- }
-
- /**
- * Returns the row key this event is for.
- * @return row key
- */
- public String key() {
- return key;
- }
-
- /**
- * Returns the value associated with the key. If the event for a deletion, this
- * method returns value that was deleted.
- * @return row value
- */
- public VersionedValue value() {
- return value;
- }
-
- /**
- * Returns the type of table modification event.
- * @return event type.
- */
- public Type type() {
- return type;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("type", type)
- .add("tableName", tableName)
- .add("key", key)
- .add("version", value.version())
- .toString();
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TabletDefinitionStore.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/TabletDefinitionStore.java
deleted file mode 100644
index 3a2560a..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TabletDefinitionStore.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onlab.packet.IpAddress;
-import org.slf4j.Logger;
-
-import com.fasterxml.jackson.core.JsonEncoding;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * Allows for reading and writing tablet definition as a JSON file.
- */
-public class TabletDefinitionStore {
-
- private final Logger log = getLogger(getClass());
-
- private final File file;
-
- /**
- * Creates a reader/writer of the tablet definition file.
- *
- * @param filePath location of the definition file
- */
- public TabletDefinitionStore(String filePath) {
- file = new File(filePath);
- }
-
- /**
- * Creates a reader/writer of the tablet definition file.
- *
- * @param filePath location of the definition file
- */
- public TabletDefinitionStore(File filePath) {
- file = checkNotNull(filePath);
- }
-
- /**
- * Returns the Map from tablet name to set of initial member nodes.
- *
- * @return Map from tablet name to set of initial member nodes
- * @throws IOException when I/O exception of some sort has occurred.
- */
- public Map<String, Set<DefaultControllerNode>> read() throws IOException {
-
- final Map<String, Set<DefaultControllerNode>> tablets = new HashMap<>();
-
- final ObjectMapper mapper = new ObjectMapper();
- final ObjectNode tabletNodes = (ObjectNode) mapper.readTree(file);
- final Iterator<Entry<String, JsonNode>> fields = tabletNodes.fields();
- while (fields.hasNext()) {
- final Entry<String, JsonNode> next = fields.next();
- final Set<DefaultControllerNode> nodes = new HashSet<>();
- final Iterator<JsonNode> elements = next.getValue().elements();
- while (elements.hasNext()) {
- ObjectNode nodeDef = (ObjectNode) elements.next();
- nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
- IpAddress.valueOf(nodeDef.get("ip").asText()),
- nodeDef.get("tcpPort").asInt(9876)));
- }
-
- tablets.put(next.getKey(), nodes);
- }
- return tablets;
- }
-
- /**
- * Updates the Map from tablet name to set of member nodes.
- *
- * @param tabletName name of the tablet to update
- * @param nodes set of initial member nodes
- * @throws IOException when I/O exception of some sort has occurred.
- */
- public void write(String tabletName, Set<DefaultControllerNode> nodes) throws IOException {
- checkNotNull(tabletName);
- checkArgument(tabletName.isEmpty(), "Tablet name cannot be empty");
- // TODO should validate if tabletName is allowed in JSON
-
- // load current
- Map<String, Set<DefaultControllerNode>> config;
- try {
- config = read();
- } catch (IOException e) {
- log.info("Reading tablet config failed, assuming empty definition.");
- config = new HashMap<>();
- }
- // update with specified
- config.put(tabletName, nodes);
-
- // write back to file
- final ObjectMapper mapper = new ObjectMapper();
- final ObjectNode tabletNodes = mapper.createObjectNode();
- for (Entry<String, Set<DefaultControllerNode>> tablet : config.entrySet()) {
- ArrayNode nodeDefs = mapper.createArrayNode();
- tabletNodes.set(tablet.getKey(), nodeDefs);
-
- for (DefaultControllerNode node : tablet.getValue()) {
- ObjectNode nodeDef = mapper.createObjectNode();
- nodeDef.put("id", node.id().toString())
- .put("ip", node.ip().toString())
- .put("tcpPort", node.tcpPort());
- nodeDefs.add(nodeDef);
- }
- }
- mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8),
- tabletNodes);
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TcpClusterConfigSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/TcpClusterConfigSerializer.java
deleted file mode 100644
index 56961cf..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TcpClusterConfigSerializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import java.util.Collection;
-
-import net.kuujo.copycat.cluster.TcpClusterConfig;
-import net.kuujo.copycat.cluster.TcpMember;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-public class TcpClusterConfigSerializer extends Serializer<TcpClusterConfig> {
-
- @Override
- public void write(Kryo kryo, Output output, TcpClusterConfig object) {
- kryo.writeClassAndObject(output, object.getLocalMember());
- kryo.writeClassAndObject(output, object.getRemoteMembers());
- }
-
- @Override
- public TcpClusterConfig read(Kryo kryo, Input input,
- Class<TcpClusterConfig> type) {
- TcpMember localMember = (TcpMember) kryo.readClassAndObject(input);
- @SuppressWarnings("unchecked")
- Collection<TcpMember> remoteMembers = (Collection<TcpMember>) kryo.readClassAndObject(input);
- return new TcpClusterConfig(localMember, remoteMembers);
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TcpMemberSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/TcpMemberSerializer.java
deleted file mode 100644
index eb4c308..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/TcpMemberSerializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service.impl;
-
-import net.kuujo.copycat.cluster.TcpMember;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-public class TcpMemberSerializer extends Serializer<TcpMember> {
-
- @Override
- public void write(Kryo kryo, Output output, TcpMember object) {
- output.writeString(object.host());
- output.writeInt(object.port());
- }
-
- @Override
- public TcpMember read(Kryo kryo, Input input, Class<TcpMember> type) {
- String host = input.readString();
- int port = input.readInt();
- return new TcpMember(host, port);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/service/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/service/impl/package-info.java
deleted file mode 100644
index 61f80a6..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/service/impl/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Strongly consistent, fault-tolerant and durable state management
- * based on Raft consensus protocol.
- */
-package org.onosproject.store.service.impl;