blob: 09f612e04c5578efcf8cb7b338086088c37050a2 [file] [log] [blame]
Madan Jampani7c521002015-03-23 12:23:01 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.consistent.impl;
18
19import static com.google.common.base.Preconditions.*;
20
21import java.util.Collection;
22import java.util.Map;
23import java.util.Map.Entry;
24import java.util.concurrent.CompletableFuture;
25import java.util.stream.Collectors;
26import java.util.Set;
27
28import org.apache.commons.lang3.tuple.Pair;
29import org.onlab.util.HexString;
30import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070031import org.onosproject.store.service.ConsistentMapException;
Madan Jampani7c521002015-03-23 12:23:01 -070032import org.onosproject.store.service.Serializer;
33import org.onosproject.store.service.Versioned;
34
35import com.google.common.cache.CacheBuilder;
36import com.google.common.cache.CacheLoader;
37import com.google.common.cache.LoadingCache;
38
39/**
40 * AsyncConsistentMap implementation that is backed by a Raft consensus
41 * based database.
42 *
43 * @param <K> type of key.
44 * @param <V> type of value.
45 */
46public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
47
48 private final String name;
Madan Jampanif1b8e172015-03-23 11:42:02 -070049 private final Database database;
Madan Jampani7c521002015-03-23 12:23:01 -070050 private final Serializer serializer;
51
52 private static final String ERROR_NULL_KEY = "Key cannot be null";
53 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
54
55 private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
56 .softValues()
57 .build(new CacheLoader<K, String>() {
58
59 @Override
60 public String load(K key) {
61 return HexString.toHexString(serializer.encode(key));
62 }
63 });
64
65 protected K dK(String key) {
66 return serializer.decode(HexString.fromHexString(key));
67 }
68
69 public DefaultAsyncConsistentMap(String name,
Madan Jampanif1b8e172015-03-23 11:42:02 -070070 Database database,
Madan Jampani7c521002015-03-23 12:23:01 -070071 Serializer serializer) {
72 this.name = checkNotNull(name, "map name cannot be null");
Madan Jampanif1b8e172015-03-23 11:42:02 -070073 this.database = checkNotNull(database, "database cannot be null");
Madan Jampani7c521002015-03-23 12:23:01 -070074 this.serializer = checkNotNull(serializer, "serializer cannot be null");
75 }
76
77 @Override
78 public CompletableFuture<Integer> size() {
Madan Jampanif1b8e172015-03-23 11:42:02 -070079 return database.size(name);
Madan Jampani7c521002015-03-23 12:23:01 -070080 }
81
82 @Override
83 public CompletableFuture<Boolean> isEmpty() {
Madan Jampanif1b8e172015-03-23 11:42:02 -070084 return database.isEmpty(name);
Madan Jampani7c521002015-03-23 12:23:01 -070085 }
86
87 @Override
88 public CompletableFuture<Boolean> containsKey(K key) {
89 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampanif1b8e172015-03-23 11:42:02 -070090 return database.containsKey(name, keyCache.getUnchecked(key));
Madan Jampani7c521002015-03-23 12:23:01 -070091 }
92
93 @Override
94 public CompletableFuture<Boolean> containsValue(V value) {
95 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampanif1b8e172015-03-23 11:42:02 -070096 return database.containsValue(name, serializer.encode(value));
Madan Jampani7c521002015-03-23 12:23:01 -070097 }
98
99 @Override
100 public CompletableFuture<Versioned<V>> get(K key) {
101 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700102 return database.get(name, keyCache.getUnchecked(key))
Madan Jampani7c521002015-03-23 12:23:01 -0700103 .thenApply(v -> v != null
104 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
105 }
106
107 @Override
108 public CompletableFuture<Versioned<V>> put(K key, V value) {
109 checkNotNull(key, ERROR_NULL_KEY);
110 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700111 return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700112 .thenApply(this::unwrapResult)
Madan Jampani7c521002015-03-23 12:23:01 -0700113 .thenApply(v -> v != null
114 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
115 }
116
117 @Override
118 public CompletableFuture<Versioned<V>> remove(K key) {
119 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700120 return database.remove(name, keyCache.getUnchecked(key))
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700121 .thenApply(this::unwrapResult)
Madan Jampani7c521002015-03-23 12:23:01 -0700122 .thenApply(v -> v != null
123 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
124 }
125
126 @Override
127 public CompletableFuture<Void> clear() {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700128 return database.clear(name).thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700129 }
130
131 @Override
132 public CompletableFuture<Set<K>> keySet() {
Madan Jampanif1b8e172015-03-23 11:42:02 -0700133 return database.keySet(name)
Madan Jampani7c521002015-03-23 12:23:01 -0700134 .thenApply(s -> s
135 .stream()
136 .map(this::dK)
137 .collect(Collectors.toSet()));
138 }
139
140 @Override
141 public CompletableFuture<Collection<Versioned<V>>> values() {
Madan Jampanif1b8e172015-03-23 11:42:02 -0700142 return database.values(name).thenApply(c -> c
Madan Jampani7c521002015-03-23 12:23:01 -0700143 .stream()
144 .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime()))
145 .collect(Collectors.toList()));
146 }
147
148 @Override
149 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
Madan Jampanif1b8e172015-03-23 11:42:02 -0700150 return database.entrySet(name).thenApply(s -> s
Madan Jampani7c521002015-03-23 12:23:01 -0700151 .stream()
152 .map(this::fromRawEntry)
153 .collect(Collectors.toSet()));
154 }
155
156 @Override
157 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
158 checkNotNull(key, ERROR_NULL_KEY);
159 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700160 return database.putIfAbsent(name,
161 keyCache.getUnchecked(key),
162 serializer.encode(value))
163 .thenApply(this::unwrapResult)
164 .thenApply(v -> v != null ?
165 new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
Madan Jampani7c521002015-03-23 12:23:01 -0700166 }
167
168 @Override
169 public CompletableFuture<Boolean> remove(K key, V value) {
170 checkNotNull(key, ERROR_NULL_KEY);
171 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700172 return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value))
173 .thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700174 }
175
176 @Override
177 public CompletableFuture<Boolean> remove(K key, long version) {
178 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700179 return database.remove(name, keyCache.getUnchecked(key), version)
180 .thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700181
182 }
183
184 @Override
185 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
186 checkNotNull(key, ERROR_NULL_KEY);
187 checkNotNull(newValue, ERROR_NULL_VALUE);
188 byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700189 return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue))
190 .thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700191 }
192
193 @Override
194 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
195 checkNotNull(key, ERROR_NULL_KEY);
196 checkNotNull(newValue, ERROR_NULL_VALUE);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700197 return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue))
198 .thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700199 }
200
201 private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
202 return Pair.of(
203 dK(e.getKey()),
204 new Versioned<>(
205 serializer.decode(e.getValue().value()),
206 e.getValue().version(),
207 e.getValue().creationTime()));
208 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700209
210 private <T> T unwrapResult(Result<T> result) {
211 if (result.status() == Result.Status.LOCKED) {
212 throw new ConsistentMapException.ConcurrentModification();
213 } else if (result.success()) {
214 return result.value();
215 } else {
216 throw new IllegalStateException("Must not be here");
217 }
218 }
Madan Jampani7c521002015-03-23 12:23:01 -0700219}