WIP: Consistent map implementation.
Change-Id: I51b2d954b7a8ff2c51c425d9a8125937d4eaa6b0
Change-Id: Ib27799d4eb60fc4bfaa8d2f21a904365ff5437eb
Change-Id: I95c937600ceb8f282a482280217671c471f40b9c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
new file mode 100644
index 0000000..1c7ef7f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
@@ -0,0 +1,234 @@
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.*;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.Set;
+
+import org.onlab.util.HexString;
+import org.onosproject.store.serializers.StoreSerializer;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * ConsistentMap implementation that is backed by a Raft consensus
+ * based database.
+ *
+ * @param <K> type of key.
+ * @param <V> type of value.
+ */
+public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
+
+ private final String name;
+ private final DatabaseProxy<String, byte[]> proxy;
+ private final StoreSerializer serializer;
+
+ private static final int OPERATION_TIMEOUT_MILLIS = 1000;
+ private static final String ERROR_NULL_KEY = "Key cannot be null";
+ private static final String ERROR_NULL_VALUE = "Null values are not allowed";
+
+ private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
+ .softValues()
+ .build(new CacheLoader<K, String>() {
+
+ @Override
+ public String load(K key) {
+ return HexString.toHexString(serializer.encode(key));
+ }
+ });
+
+ protected K dK(String key) {
+ return serializer.decode(HexString.fromHexString(key));
+ }
+
+ ConsistentMapImpl(String name,
+ DatabaseProxy<String, byte[]> proxy,
+ StoreSerializer serializer) {
+ this.name = checkNotNull(name, "map name cannot be null");
+ this.proxy = checkNotNull(proxy, "database proxy cannot be null");
+ this.serializer = checkNotNull(serializer, "serializer cannot be null");
+ }
+
+ @Override
+ public int size() {
+ return complete(proxy.size(name));
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return complete(proxy.isEmpty(name));
+ }
+
+ @Override
+ public boolean containsKey(K key) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ return complete(proxy.containsKey(name, keyCache.getUnchecked(key)));
+ }
+
+ @Override
+ public boolean containsValue(V value) {
+ checkNotNull(value, ERROR_NULL_VALUE);
+ return complete(proxy.containsValue(name, serializer.encode(value)));
+ }
+
+ @Override
+ public Versioned<V> get(K key) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
+ return new Versioned<>(serializer.decode(value.value()), value.version());
+ }
+
+ @Override
+ public Versioned<V> put(K key, V value) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(value, ERROR_NULL_VALUE);
+ Versioned<byte[]> previousValue = complete(proxy.get(name, keyCache.getUnchecked(key)));
+ return (previousValue != null) ?
+ new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
+
+ }
+
+ @Override
+ public Versioned<V> remove(K key) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
+ return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
+ }
+
+ @Override
+ public void clear() {
+ complete(proxy.clear(name));
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return Collections.unmodifiableSet(complete(proxy.keySet(name))
+ .stream()
+ .map(this::dK)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public Collection<Versioned<V>> values() {
+ return Collections.unmodifiableList(complete(proxy.values(name))
+ .stream()
+ .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version()))
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public Set<Entry<K, Versioned<V>>> entrySet() {
+ return Collections.unmodifiableSet(complete(proxy.entrySet(name))
+ .stream()
+ .map(this::fromRawEntry)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public Versioned<V> putIfAbsent(K key, V value) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(value, ERROR_NULL_VALUE);
+ Versioned<byte[]> existingValue = complete(proxy.putIfAbsent(
+ name, keyCache.getUnchecked(key), serializer.encode(value)));
+ return (existingValue != null) ?
+ new Versioned<>(serializer.decode(existingValue.value()), existingValue.version()) : null;
+ }
+
+ @Override
+ public boolean remove(K key, V value) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(value, ERROR_NULL_VALUE);
+ return complete(proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value)));
+ }
+
+ @Override
+ public boolean remove(K key, long version) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ return complete(proxy.remove(name, keyCache.getUnchecked(key), version));
+
+ }
+
+ @Override
+ public boolean replace(K key, V oldValue, V newValue) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(newValue, ERROR_NULL_VALUE);
+ byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
+ return complete(proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue)));
+ }
+
+ @Override
+ public boolean replace(K key, long oldVersion, V newValue) {
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(newValue, ERROR_NULL_VALUE);
+ return complete(proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)));
+ }
+
+ @Override
+ public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
+ checkNotNull(updates, "updates cannot be null");
+ return complete(proxy.atomicBatchUpdate(updates
+ .stream()
+ .map(this::toRawUpdateOperation)
+ .collect(Collectors.toList())));
+ }
+
+ private static <T> T complete(CompletableFuture<T> future) {
+ try {
+ return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ConsistentMapException.Interrupted();
+ } catch (TimeoutException e) {
+ throw new ConsistentMapException.Timeout();
+ } catch (ExecutionException e) {
+ throw new ConsistentMapException(e.getCause());
+ }
+ }
+
+ private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
+ return new AbstractMap.SimpleEntry<>(
+ dK(e.getKey()),
+ new Versioned<>(
+ serializer.decode(e.getValue().value()),
+ e.getValue().version()));
+ }
+
+ private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
+
+ checkArgument(name.equals(update.tableName()), "Unexpected table name");
+
+ UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
+
+ rawUpdate = rawUpdate.withKey(keyCache.getUnchecked(update.key()))
+ .withCurrentVersion(update.currentVersion())
+ .withType(update.type());
+
+ rawUpdate = rawUpdate.withTableName(update.tableName());
+
+ if (update.value() != null) {
+ rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
+ } else {
+ checkState(update.type() == UpdateOperation.Type.REMOVE
+ || update.type() == UpdateOperation.Type.REMOVE_IF_VERSION_MATCH,
+ ERROR_NULL_VALUE);
+ }
+
+ if (update.currentValue() != null) {
+ rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
+ }
+
+ return rawUpdate.build();
+ }
+}
\ No newline at end of file