WIP: Consistent map implementation.
Change-Id: I51b2d954b7a8ff2c51c425d9a8125937d4eaa6b0
Change-Id: Ib27799d4eb60fc4bfaa8d2f21a904365ff5437eb
Change-Id: I95c937600ceb8f282a482280217671c471f40b9c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java
new file mode 100644
index 0000000..4d7f33b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java
@@ -0,0 +1,200 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.Map.Entry;
+
+/**
+ * A distributed, strongly consistent map.
+ * <p>
+ * This map offers strong read-after-update (where update == create/update/delete)
+ * consistency. All operations to the map are serialized and applied in a consistent
+ * manner.
+ * <p>
+ * The stronger consistency comes at the expense of availability in
+ * the event of a network partition. A network partition can be either due to
+ * a temporary disruption in network connectivity between participating nodes
+ * or due to a node being temporarily down.
+ * </p><p>
+ * All values stored in this map are versioned and the API supports optimistic
+ * concurrency by allowing conditional updates that take into consideration
+ * the version or value that was previously read.
+ * </p><p>
+ * The map also supports atomic batch updates (transactions). One can provide a list
+ * of updates to be applied atomically if and only if all the operations are guaranteed
+ * to succeed i.e. all their preconditions are met. For example, the precondition
+ * for a putIfAbsent API call is absence of a mapping for the key. Similarly, the
+ * precondition for a conditional replace operation is the presence of an expected
+ * version or value
+ * </p><p>
+ * This map does not allow null values. All methods can throw a ConsistentMapException
+ * (which extends RuntimeException) to indicate failures.
+ *
+ */
+public interface ConsistentMap<K, V> {
+
+ /**
+ * Returns the number of entries in the map.
+ *
+ * @return map size.
+ */
+ int size();
+
+ /**
+ * Returns true if the map is empty.
+ *
+ * @return true if map has no entries, false otherwise.
+ */
+ boolean isEmpty();
+
+ /**
+ * Returns true if this map contains a mapping for the specified key.
+ *
+ * @param key key
+ * @return true if map contains key, false otherwise.
+ */
+ boolean containsKey(K key);
+
+ /**
+ * Returns true if this map contains the specified value.
+ *
+ * @param value value
+ * @return true if map contains value, false otherwise.
+ */
+ boolean containsValue(V value);
+
+ /**
+ * Returns the value (and version) to which the specified key is mapped, or null if this
+ * map contains no mapping for the key.
+ *
+ * @param key the key whose associated value (and version) is to be returned
+ * @return the value (and version) to which the specified key is mapped, or null if
+ * this map contains no mapping for the key
+ */
+ Versioned<V> get(K key);
+
+ /**
+ * Associates the specified value with the specified key in this map (optional operation).
+ * If the map previously contained a mapping for the key, the old value is replaced by the
+ * specified value.
+ *
+ * @param key key with which the specified value is to be associated
+ * @param value value to be associated with the specified key
+ * @return the previous value (and version) associated with key, or null if there was
+ * no mapping for key.
+ */
+ Versioned<V> put(K key, V value);
+
+ /**
+ * Removes the mapping for a key from this map if it is present (optional operation).
+ *
+ * @param key key whose value is to be removed from the map
+ * @return the value (and version) to which this map previously associated the key,
+ * or null if the map contained no mapping for the key.
+ */
+ Versioned<V> remove(K key);
+
+ /**
+ * Removes all of the mappings from this map (optional operation).
+ * The map will be empty after this call returns.
+ */
+ void clear();
+
+ /**
+ * Returns a Set view of the keys contained in this map.
+ * This method differs from the behavior of java.util.Map.keySet() in that
+ * what is returned is a unmodifiable snapshot view of the keys in the ConsistentMap.
+ * Attempts to modify the returned set, whether direct or via its iterator,
+ * result in an UnsupportedOperationException.
+ *
+ * @return a set of the keys contained in this map
+ */
+ Set<K> keySet();
+
+ /**
+ * Returns the collection of values (and associated versions) contained in this map.
+ * This method differs from the behavior of java.util.Map.values() in that
+ * what is returned is a unmodifiable snapshot view of the values in the ConsistentMap.
+ * Attempts to modify the returned collection, whether direct or via its iterator,
+ * result in an UnsupportedOperationException.
+ *
+ * @return a collection of the values (and associated versions) contained in this map
+ */
+ Collection<Versioned<V>> values();
+
+ /**
+ * Returns the set of entries contained in this map.
+ * This method differs from the behavior of java.util.Map.entrySet() in that
+ * what is returned is a unmodifiable snapshot view of the entries in the ConsistentMap.
+ * Attempts to modify the returned set, whether direct or via its iterator,
+ * result in an UnsupportedOperationException.
+ *
+ * @return set of entries contained in this map.
+ */
+ Set<Entry<K, Versioned<V>>> entrySet();
+
+ /**
+ * If the specified key is not already associated with a value
+ * associates it with the given value and returns null, else returns the current value.
+ *
+ * @param key key with which the specified value is to be associated
+ * @param value value to be associated with the specified key
+ * @return the previous value associated with the specified key or null
+ * if key does not already mapped to a value.
+ */
+ Versioned<V> putIfAbsent(K key, V value);
+
+ /**
+ * Removes the entry for the specified key only if it is currently
+ * mapped to the specified value.
+ *
+ * @param key key with which the specified value is associated
+ * @param value value expected to be associated with the specified key
+ * @return true if the value was removed
+ */
+ boolean remove(K key, V value);
+
+ /**
+ * Removes the entry for the specified key only if its current
+ * version in the map is equal to the specified version.
+ *
+ * @param key key with which the specified version is associated
+ * @param version version expected to be associated with the specified key
+ * @return true if the value was removed
+ */
+ boolean remove(K key, long version);
+
+ /**
+ * Replaces the entry for the specified key only if currently mapped
+ * to the specified value.
+ *
+ * @param key key with which the specified value is associated
+ * @param oldValue value expected to be associated with the specified key
+ * @param newValue value to be associated with the specified key
+ * @return true if the value was replaced
+ */
+ boolean replace(K key, V oldValue, V newValue);
+
+ /**
+ * Replaces the entry for the specified key only if it is currently mapped to the
+ * specified version.
+ *
+ * @param key key key with which the specified value is associated
+ * @param oldVersion version expected to be associated with the specified key
+ * @param newValue value to be associated with the specified key
+ * @return true if the value was replaced
+ */
+ boolean replace(K key, long oldVersion, V newValue);
+
+ /**
+ * Atomically apply the specified list of updates to the map.
+ * If any of the updates cannot be applied due to a precondition
+ * violation, none of the updates will be applied and the state of
+ * the map remains unaltered.
+ *
+ * @param updates list of updates to apply atomically.
+ * @return true if the map was updated.
+ */
+ boolean batchUpdate(List<UpdateOperation<K, V>> updates);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java
new file mode 100644
index 0000000..d98e5a3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java
@@ -0,0 +1,26 @@
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Top level exception for ConsistentMap failures.
+ */
+@SuppressWarnings("serial")
+public class ConsistentMapException extends RuntimeException {
+ public ConsistentMapException() {
+ }
+
+ public ConsistentMapException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ * ConsistentMap operation timeout.
+ */
+ public static class Timeout extends ConsistentMapException {
+ }
+
+ /**
+ * ConsistentMap operation interrupted.
+ */
+ public static class Interrupted extends ConsistentMapException {
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
new file mode 100644
index 0000000..1c7ef7f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
@@ -0,0 +1,234 @@
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.*;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.Set;
+
+import org.onlab.util.HexString;
+import org.onosproject.store.serializers.StoreSerializer;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * ConsistentMap implementation that is backed by a Raft consensus
+ * based database.
+ *
+ * @param <K> type of key.
+ * @param <V> type of value.
+ */
+public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
+
+ private final String name;
+ private final DatabaseProxy<String, byte[]> proxy;
+ private final StoreSerializer serializer;
+
+ private static final int OPERATION_TIMEOUT_MILLIS = 1000;
+ private static final String ERROR_NULL_KEY = "Key cannot be null";
+ private static final String ERROR_NULL_VALUE = "Null values are not allowed";
+
+ private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
+ .softValues()
+ .build(new CacheLoader<K, String>() {
+
+ @Override
+ public String load(K key) {
+ return HexString.toHexString(serializer.encode(key));
+ }
+ });
+
+ protected K dK(String key) {
+ return serializer.decode(HexString.fromHexString(key));
+ }
+
+ ConsistentMapImpl(String name,
+ DatabaseProxy<String, byte[]> proxy,
+ StoreSerializer serializer) {
+ this.name = checkNotNull(name, "map name cannot be null");
+ this.proxy = checkNotNull(proxy, "database proxy cannot be null");
+ this.serializer = checkNotNull(serializer, "serializer cannot be null");
+ }
+
+ @Override
+ public int size() {
+ return complete(proxy.size(name));
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return complete(proxy.isEmpty(name));
+ }
+
+ @Override
+ public boolean containsKey(K key) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ return complete(proxy.containsKey(name, keyCache.getUnchecked(key)));
+ }
+
+ @Override
+ public boolean containsValue(V value) {
+ checkNotNull(value, ERROR_NULL_VALUE);
+ return complete(proxy.containsValue(name, serializer.encode(value)));
+ }
+
+ @Override
+ public Versioned<V> get(K key) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
+ return new Versioned<>(serializer.decode(value.value()), value.version());
+ }
+
+ @Override
+ public Versioned<V> put(K key, V value) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(value, ERROR_NULL_VALUE);
+ Versioned<byte[]> previousValue = complete(proxy.get(name, keyCache.getUnchecked(key)));
+ return (previousValue != null) ?
+ new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
+
+ }
+
+ @Override
+ public Versioned<V> remove(K key) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
+ return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
+ }
+
+ @Override
+ public void clear() {
+ complete(proxy.clear(name));
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return Collections.unmodifiableSet(complete(proxy.keySet(name))
+ .stream()
+ .map(this::dK)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public Collection<Versioned<V>> values() {
+ return Collections.unmodifiableList(complete(proxy.values(name))
+ .stream()
+ .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version()))
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public Set<Entry<K, Versioned<V>>> entrySet() {
+ return Collections.unmodifiableSet(complete(proxy.entrySet(name))
+ .stream()
+ .map(this::fromRawEntry)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public Versioned<V> putIfAbsent(K key, V value) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(value, ERROR_NULL_VALUE);
+ Versioned<byte[]> existingValue = complete(proxy.putIfAbsent(
+ name, keyCache.getUnchecked(key), serializer.encode(value)));
+ return (existingValue != null) ?
+ new Versioned<>(serializer.decode(existingValue.value()), existingValue.version()) : null;
+ }
+
+ @Override
+ public boolean remove(K key, V value) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(value, ERROR_NULL_VALUE);
+ return complete(proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value)));
+ }
+
+ @Override
+ public boolean remove(K key, long version) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ return complete(proxy.remove(name, keyCache.getUnchecked(key), version));
+
+ }
+
+ @Override
+ public boolean replace(K key, V oldValue, V newValue) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(newValue, ERROR_NULL_VALUE);
+ byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
+ return complete(proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue)));
+ }
+
+ @Override
+ public boolean replace(K key, long oldVersion, V newValue) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(newValue, ERROR_NULL_VALUE);
+ return complete(proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)));
+ }
+
+ @Override
+ public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
+ checkNotNull(updates, "updates cannot be null");
+ return complete(proxy.atomicBatchUpdate(updates
+ .stream()
+ .map(this::toRawUpdateOperation)
+ .collect(Collectors.toList())));
+ }
+
+ private static <T> T complete(CompletableFuture<T> future) {
+ try {
+ return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ConsistentMapException.Interrupted();
+ } catch (TimeoutException e) {
+ throw new ConsistentMapException.Timeout();
+ } catch (ExecutionException e) {
+ throw new ConsistentMapException(e.getCause());
+ }
+ }
+
+ private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
+ return new AbstractMap.SimpleEntry<>(
+ dK(e.getKey()),
+ new Versioned<>(
+ serializer.decode(e.getValue().value()),
+ e.getValue().version()));
+ }
+
+ private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
+
+ checkArgument(name.equals(update.tableName()), "Unexpected table name");
+
+ UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
+
+ rawUpdate = rawUpdate.withKey(keyCache.getUnchecked(update.key()))
+ .withCurrentVersion(update.currentVersion())
+ .withType(update.type());
+
+ rawUpdate = rawUpdate.withTableName(update.tableName());
+
+ if (update.value() != null) {
+ rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
+ } else {
+ checkState(update.type() == UpdateOperation.Type.REMOVE
+ || update.type() == UpdateOperation.Type.REMOVE_IF_VERSION_MATCH,
+ ERROR_NULL_VALUE);
+ }
+
+ if (update.currentValue() != null) {
+ rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
+ }
+
+ return rawUpdate.build();
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java
new file mode 100644
index 0000000..f2d5269
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java
@@ -0,0 +1,125 @@
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onlab.packet.IpAddress;
+import org.slf4j.Logger;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+
+/**
+ * Allows for reading and writing partitioned database definition as a JSON file.
+ */
+public class DatabaseDefinitionStore {
+
+ private final Logger log = getLogger(getClass());
+
+ private final File definitionfile;
+
+ /**
+ * Creates a reader/writer of the database definition file.
+ *
+ * @param filePath location of the definition file
+ */
+ public DatabaseDefinitionStore(String filePath) {
+ definitionfile = new File(filePath);
+ }
+
+ /**
+ * Creates a reader/writer of the database definition file.
+ *
+ * @param filePath location of the definition file
+ */
+ public DatabaseDefinitionStore(File filePath) {
+ definitionfile = checkNotNull(filePath);
+ }
+
+ /**
+ * Returns the Map from database partition name to set of initial active member nodes.
+ *
+ * @return Map from partition name to set of active member nodes
+ * @throws IOException when I/O exception of some sort has occurred.
+ */
+ public Map<String, Set<DefaultControllerNode>> read() throws IOException {
+
+ final Map<String, Set<DefaultControllerNode>> partitions = Maps.newHashMap();
+
+ final ObjectMapper mapper = new ObjectMapper();
+ final ObjectNode tabletNodes = (ObjectNode) mapper.readTree(definitionfile);
+ final Iterator<Entry<String, JsonNode>> fields = tabletNodes.fields();
+ while (fields.hasNext()) {
+ final Entry<String, JsonNode> next = fields.next();
+ final Set<DefaultControllerNode> nodes = new HashSet<>();
+ final Iterator<JsonNode> elements = next.getValue().elements();
+ while (elements.hasNext()) {
+ ObjectNode nodeDef = (ObjectNode) elements.next();
+ nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
+ IpAddress.valueOf(nodeDef.get("ip").asText()),
+ nodeDef.get("tcpPort").asInt(DatabaseManager.COPYCAT_TCP_PORT)));
+ }
+
+ partitions.put(next.getKey(), nodes);
+ }
+ return partitions;
+ }
+
+ /**
+ * Updates the Map from database partition name to set of member nodes.
+ *
+ * @param partitionName name of the database partition to update
+ * @param nodes set of initial member nodes
+ * @throws IOException when I/O exception of some sort has occurred.
+ */
+ public void write(String partitionName, Set<DefaultControllerNode> nodes) throws IOException {
+ checkNotNull(partitionName);
+ checkArgument(partitionName.isEmpty(), "Partition name cannot be empty");
+
+ // load current
+ Map<String, Set<DefaultControllerNode>> config;
+ try {
+ config = read();
+ } catch (IOException e) {
+ log.info("Reading partition config failed, assuming empty definition.");
+ config = new HashMap<>();
+ }
+ // update with specified
+ config.put(partitionName, nodes);
+
+ // write back to file
+ final ObjectMapper mapper = new ObjectMapper();
+ final ObjectNode partitionNodes = mapper.createObjectNode();
+ for (Entry<String, Set<DefaultControllerNode>> tablet : config.entrySet()) {
+ ArrayNode nodeDefs = mapper.createArrayNode();
+ partitionNodes.set(tablet.getKey(), nodeDefs);
+
+ for (DefaultControllerNode node : tablet.getValue()) {
+ ObjectNode nodeDef = mapper.createObjectNode();
+ nodeDef.put("id", node.id().toString())
+ .put("ip", node.ip().toString())
+ .put("tcpPort", node.tcpPort());
+ nodeDefs.add(nodeDef);
+ }
+ }
+ mapper.writeTree(new JsonFactory().createGenerator(definitionfile, JsonEncoding.UTF8),
+ partitionNodes);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
new file mode 100644
index 0000000..0fcb895
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -0,0 +1,122 @@
+package org.onosproject.store.consistent.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.log.FileLog;
+import net.kuujo.copycat.netty.NettyTcpProtocol;
+import net.kuujo.copycat.protocol.Consistency;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.store.serializers.StoreSerializer;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Database manager.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class DatabaseManager implements DatabaseService {
+
+ private final Logger log = getLogger(getClass());
+ private PartitionedDatabase partitionedDatabase;
+ public static final int COPYCAT_TCP_PORT = 7238; // 7238 = RAFT
+ private static final String CONFIG_DIR = "../config";
+ private static final String PARTITION_DEFINITION_FILE = "tablets.json";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ protected String nodeToUri(ControllerNode node) {
+ return "tcp://" + node.ip() + ":" + COPYCAT_TCP_PORT;
+ }
+
+ @Activate
+ public void activate() {
+
+ final String logDir = System.getProperty("karaf.data", "./data");
+
+ // load database configuration
+ File file = new File(CONFIG_DIR, PARTITION_DEFINITION_FILE);
+ log.info("Loading database definition: {}", file.getAbsolutePath());
+
+ DatabaseDefinitionStore databaseDef = new DatabaseDefinitionStore(file);
+ Map<String, Set<DefaultControllerNode>> partitionMap;
+ try {
+ partitionMap = databaseDef.read();
+ } catch (IOException e) {
+ log.error("Failed to load database config {}", file);
+ throw new IllegalStateException("Failed to load database config", e);
+ }
+
+ String[] activeNodeUris = partitionMap.values()
+ .stream()
+ .reduce((s1, s2) -> Sets.union(s1, s2))
+ .get()
+ .stream()
+ .map(this::nodeToUri)
+ .toArray(String[]::new);
+
+ String localNodeUri = nodeToUri(clusterService.getLocalNode());
+
+ ClusterConfig clusterConfig = new ClusterConfig()
+ .withProtocol(new NettyTcpProtocol())
+ .withMembers(activeNodeUris)
+ .withLocalMember(localNodeUri);
+
+ PartitionedDatabaseConfig databaseConfig = new PartitionedDatabaseConfig();
+
+ partitionMap.forEach((name, nodes) -> {
+ Set<String> replicas = nodes.stream().map(this::nodeToUri).collect(Collectors.toSet());
+ DatabaseConfig partitionConfig = new DatabaseConfig()
+ .withConsistency(Consistency.STRONG)
+ .withLog(new FileLog(logDir))
+ .withReplicas(replicas);
+ databaseConfig.addPartition(name, partitionConfig);
+ });
+
+ partitionedDatabase = PartitionedDatabaseManager.create("onos-store", clusterConfig, databaseConfig);
+
+ partitionedDatabase.open().whenComplete((db, error) -> {
+ if (error != null) {
+ log.warn("Failed to open database.", error);
+ } else {
+ log.info("Successfully opened database.");
+ }
+ });
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ partitionedDatabase.close().whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to cleanly close database.", error);
+ } else {
+ log.info("Successfully closed database.");
+ }
+ });
+ log.info("Stopped");
+ }
+
+ @Override
+ public <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer) {
+ return new ConsistentMapImpl<K, V>(name, partitionedDatabase, serializer);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java
new file mode 100644
index 0000000..ef4ece7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java
@@ -0,0 +1,30 @@
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Map;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.hash.Hashing;
+
+/**
+ * Partitioner for mapping table entries to individual database partitions.
+ * <p>
+ * By default a md5 hash of the hash key (key or table name) is used to pick a
+ * partition.
+ */
+public abstract class DatabasePartitioner implements Partitioner<String> {
+ // Database partitions sorted by their partition name.
+ protected final Database[] sortedPartitions;
+
+ public DatabasePartitioner(Map<String, Database> partitionMap) {
+ checkState(partitionMap != null && !partitionMap.isEmpty(), "Partition map cannot be null or empty");
+ sortedPartitions = ImmutableSortedMap.<String, Database>copyOf(partitionMap).values().toArray(new Database[]{});
+ }
+
+ protected int hash(String key) {
+ return Math.abs(Hashing.md5().newHasher().putBytes(key.getBytes(Charsets.UTF_8)).hash().asInt());
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java
new file mode 100644
index 0000000..6f2b655
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java
@@ -0,0 +1,18 @@
+package org.onosproject.store.consistent.impl;
+
+import org.onosproject.store.serializers.StoreSerializer;
+
+/**
+ * Database service.
+ */
+public interface DatabaseService {
+
+ /**
+ * Creates a ConsistentMap.
+ *
+ * @param name map name
+ * @param serializer serializer to use for serializing keys and values.
+ * @return consistent map.
+ */
+ <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer);
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index d35aca2..8cf1703 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -9,8 +9,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -188,8 +186,7 @@
.values()
.stream()
.map(Database::open)
- .collect(Collectors.toList())
- .toArray(new CompletableFuture[partitions.size()]))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> this));
}
@@ -200,8 +197,7 @@
.values()
.stream()
.map(database -> database.close())
- .collect(Collectors.toList())
- .toArray(new CompletableFuture[partitions.size()]));
+ .toArray(CompletableFuture[]::new));
CompletableFuture<Void> closeCoordinator = coordinator.close();
return closePartitions.thenCompose(v -> closeCoordinator);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
index 6d375cc..a305a11 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
@@ -24,7 +24,7 @@
* @param config partition config
* @return this instance
*/
- public PartitionedDatabaseConfig withPartition(String name, DatabaseConfig config) {
+ public PartitionedDatabaseConfig addPartition(String name, DatabaseConfig config) {
partitions.put(name, config);
return this;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
index 44cd3a1..81f00de 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
@@ -10,6 +10,9 @@
import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
+/**
+ * Manages a PartitionedDatabase.
+ */
public interface PartitionedDatabaseManager {
/**
* Opens the database.
@@ -73,7 +76,7 @@
.withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
.withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
partitionedDatabase.setPartitioner(
- new SimpleKeyHashPartitioner<>(partitionedDatabase.getRegisteredPartitions()));
+ new SimpleKeyHashPartitioner(partitionedDatabase.getRegisteredPartitions()));
return partitionedDatabase;
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
index 5410f9f..13a4ce9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
@@ -1,31 +1,22 @@
package org.onosproject.store.consistent.impl;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
/**
- * A simple Partitioner that uses the key hashCode to map
- * key to a partition.
+ * A simple Partitioner for mapping keys to database partitions.
+ * <p>
+ * This class uses a md5 hash based hashing scheme for hashing the key to
+ * a partition.
*
- * @param <K> key type.
*/
-public class SimpleKeyHashPartitioner<K> implements Partitioner<K> {
-
- private final Map<String, Database> partitionMap;
- private final List<String> sortedPartitionNames;
+public class SimpleKeyHashPartitioner extends DatabasePartitioner {
public SimpleKeyHashPartitioner(Map<String, Database> partitionMap) {
- this.partitionMap = ImmutableMap.copyOf(partitionMap);
- sortedPartitionNames = Lists.newArrayList(this.partitionMap.keySet());
- Collections.sort(sortedPartitionNames);
+ super(partitionMap);
}
@Override
- public Database getPartition(String tableName, K key) {
- return partitionMap.get(sortedPartitionNames.get(Math.abs(key.hashCode()) % partitionMap.size()));
+ public Database getPartition(String tableName, String key) {
+ return sortedPartitions[hash(key) % sortedPartitions.length];
}
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
new file mode 100644
index 0000000..367e3f2
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
@@ -0,0 +1,23 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Map;
+
+/**
+ * A simple Partitioner that uses the table name hash to
+ * pick a partition.
+ * <p>
+ * This class uses a md5 hash based hashing scheme for hashing the table name to
+ * a partition. This partitioner maps all keys for a table to the same database
+ * partition.
+ */
+public class SimpleTableHashPartitioner extends DatabasePartitioner {
+
+ public SimpleTableHashPartitioner(Map<String, Database> partitionMap) {
+ super(partitionMap);
+ }
+
+ @Override
+ public Database getPartition(String tableName, String key) {
+ return sortedPartitions[hash(tableName) % sortedPartitions.length];
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
index 9d52a09..11df520 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
@@ -1,5 +1,7 @@
package org.onosproject.store.consistent.impl;
+import static com.google.common.base.Preconditions.*;
+
import com.google.common.base.MoreObjects;
/**
@@ -28,7 +30,7 @@
private K key;
private V value;
private V currentValue;
- private long currentVersion;
+ private long currentVersion = -1;
/**
* Returns the type of update operation.
@@ -91,6 +93,17 @@
}
/**
+ * Creates a new builder instance.
+ * @param <K> key type.
+ * @param <V> value type.
+ *
+ * @return builder.
+ */
+ public static <K, V> Builder<K, V> newBuilder() {
+ return new Builder<>();
+ }
+
+ /**
* UpdatOperation builder.
*
* @param <K> key type.
@@ -100,52 +113,70 @@
private UpdateOperation<K, V> operation = new UpdateOperation<>();
- /**
- * Creates a new builder instance.
- * @param <K> key type.
- * @param <V> value type.
- *
- * @return builder.
- */
- public static <K, V> Builder<K, V> builder() {
- return new Builder<>();
- }
-
- private Builder() {
- }
-
public UpdateOperation<K, V> build() {
+ validateInputs();
return operation;
}
public Builder<K, V> withType(Type type) {
- operation.type = type;
+ operation.type = checkNotNull(type, "type cannot be null");
return this;
}
public Builder<K, V> withTableName(String tableName) {
- operation.tableName = tableName;
+ operation.tableName = checkNotNull(tableName, "tableName cannot be null");
return this;
}
public Builder<K, V> withKey(K key) {
- operation.key = key;
+ operation.key = checkNotNull(key, "key cannot be null");
return this;
}
public Builder<K, V> withCurrentValue(V value) {
- operation.currentValue = value;
+ operation.currentValue = checkNotNull(value, "currentValue cannot be null");
return this;
}
public Builder<K, V> withValue(V value) {
- operation.value = value;
+ operation.value = checkNotNull(value, "value cannot be null");
return this;
}
public Builder<K, V> withCurrentVersion(long version) {
+ checkArgument(version >= 0, "version cannot be negative");
operation.currentVersion = version;
return this;
}
+
+ private void validateInputs() {
+ checkNotNull(operation.type, "type must be specified");
+ checkNotNull(operation.tableName, "table name must be specified");
+ checkNotNull(operation.key, "key must be specified");
+ switch (operation.type) {
+ case PUT:
+ case PUT_IF_ABSENT:
+ checkNotNull(operation.value, "value must be specified.");
+ break;
+ case PUT_IF_VERSION_MATCH:
+ checkNotNull(operation.value, "value must be specified.");
+ checkState(operation.currentVersion >= 0, "current version must be specified");
+ break;
+ case PUT_IF_VALUE_MATCH:
+ checkNotNull(operation.value, "value must be specified.");
+ checkNotNull(operation.currentValue, "currentValue must be specified.");
+ break;
+ case REMOVE:
+ break;
+ case REMOVE_IF_VERSION_MATCH:
+ checkState(operation.currentVersion >= 0, "current version must be specified");
+ break;
+ case REMOVE_IF_VALUE_MATCH:
+ checkNotNull(operation.currentValue, "currentValue must be specified.");
+ break;
+ default:
+ throw new IllegalStateException("Unknown operation type");
+ }
+ }
}
}
diff --git a/utils/thirdparty/pom.xml b/utils/thirdparty/pom.xml
index 61adc1a..b79b7d6 100644
--- a/utils/thirdparty/pom.xml
+++ b/utils/thirdparty/pom.xml
@@ -100,7 +100,7 @@
<filter>
<artifact>org.onosproject:copycat*</artifact>
<includes>
- <include>net/kuujo/copycat/**</include>
+ <include>**</include>
</includes>
</filter>