blob: 3fb6f4a21422b547004a0f6edbe651ed4c57ca96 [file] [log] [blame]
Madan Jampani7c521002015-03-23 12:23:01 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.consistent.impl;
18
19import static com.google.common.base.Preconditions.*;
20
21import java.util.Collection;
22import java.util.Map;
23import java.util.Map.Entry;
Madan Jampani346d4f52015-05-04 11:09:39 -070024import java.util.Objects;
25import java.util.Optional;
Madan Jampani7c521002015-03-23 12:23:01 -070026import java.util.concurrent.CompletableFuture;
Madan Jampani346d4f52015-05-04 11:09:39 -070027import java.util.concurrent.atomic.AtomicReference;
28import java.util.function.BiFunction;
29import java.util.function.Function;
30import java.util.function.Predicate;
Madan Jampani7c521002015-03-23 12:23:01 -070031import java.util.stream.Collectors;
32import java.util.Set;
33
34import org.apache.commons.lang3.tuple.Pair;
35import org.onlab.util.HexString;
Madan Jampani346d4f52015-05-04 11:09:39 -070036import org.onlab.util.Tools;
Madan Jampani7c521002015-03-23 12:23:01 -070037import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070038import org.onosproject.store.service.ConsistentMapException;
Madan Jampani7c521002015-03-23 12:23:01 -070039import org.onosproject.store.service.Serializer;
40import org.onosproject.store.service.Versioned;
41
42import com.google.common.cache.CacheBuilder;
43import com.google.common.cache.CacheLoader;
44import com.google.common.cache.LoadingCache;
45
46/**
47 * AsyncConsistentMap implementation that is backed by a Raft consensus
48 * based database.
49 *
50 * @param <K> type of key.
51 * @param <V> type of value.
52 */
53public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
54
55 private final String name;
Madan Jampanif1b8e172015-03-23 11:42:02 -070056 private final Database database;
Madan Jampani7c521002015-03-23 12:23:01 -070057 private final Serializer serializer;
Madan Jampani02b7fb82015-05-01 13:01:20 -070058 private final boolean readOnly;
Madan Jampani7c521002015-03-23 12:23:01 -070059
60 private static final String ERROR_NULL_KEY = "Key cannot be null";
61 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
62
63 private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
64 .softValues()
65 .build(new CacheLoader<K, String>() {
66
67 @Override
68 public String load(K key) {
69 return HexString.toHexString(serializer.encode(key));
70 }
71 });
72
73 protected K dK(String key) {
74 return serializer.decode(HexString.fromHexString(key));
75 }
76
77 public DefaultAsyncConsistentMap(String name,
Madan Jampanif1b8e172015-03-23 11:42:02 -070078 Database database,
Madan Jampani02b7fb82015-05-01 13:01:20 -070079 Serializer serializer,
80 boolean readOnly) {
Madan Jampani7c521002015-03-23 12:23:01 -070081 this.name = checkNotNull(name, "map name cannot be null");
Madan Jampanif1b8e172015-03-23 11:42:02 -070082 this.database = checkNotNull(database, "database cannot be null");
Madan Jampani7c521002015-03-23 12:23:01 -070083 this.serializer = checkNotNull(serializer, "serializer cannot be null");
Madan Jampani02b7fb82015-05-01 13:01:20 -070084 this.readOnly = readOnly;
Madan Jampani7c521002015-03-23 12:23:01 -070085 }
86
87 @Override
88 public CompletableFuture<Integer> size() {
Madan Jampanif1b8e172015-03-23 11:42:02 -070089 return database.size(name);
Madan Jampani7c521002015-03-23 12:23:01 -070090 }
91
92 @Override
93 public CompletableFuture<Boolean> isEmpty() {
Madan Jampanif1b8e172015-03-23 11:42:02 -070094 return database.isEmpty(name);
Madan Jampani7c521002015-03-23 12:23:01 -070095 }
96
97 @Override
98 public CompletableFuture<Boolean> containsKey(K key) {
99 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700100 return database.containsKey(name, keyCache.getUnchecked(key));
Madan Jampani7c521002015-03-23 12:23:01 -0700101 }
102
103 @Override
104 public CompletableFuture<Boolean> containsValue(V value) {
105 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700106 return database.containsValue(name, serializer.encode(value));
Madan Jampani7c521002015-03-23 12:23:01 -0700107 }
108
109 @Override
110 public CompletableFuture<Versioned<V>> get(K key) {
111 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700112 return database.get(name, keyCache.getUnchecked(key))
Madan Jampani7c521002015-03-23 12:23:01 -0700113 .thenApply(v -> v != null
114 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
115 }
116
117 @Override
Madan Jampani346d4f52015-05-04 11:09:39 -0700118 public CompletableFuture<Versioned<V>> computeIfAbsent(K key,
119 Function<? super K, ? extends V> mappingFunction) {
120 return computeIf(key, Objects::isNull, (k, v) -> mappingFunction.apply(k));
121 }
122
123 @Override
124 public CompletableFuture<Versioned<V>> computeIfPresent(K key,
125 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
126 return computeIf(key, Objects::nonNull, remappingFunction);
127 }
128
129 @Override
130 public CompletableFuture<Versioned<V>> compute(K key,
131 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
132 return computeIf(key, v -> true, remappingFunction);
133 }
134
135 @Override
136 public CompletableFuture<Versioned<V>> computeIf(K key,
137 Predicate<? super V> condition,
138 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
139 checkNotNull(key, ERROR_NULL_KEY);
140 checkNotNull(condition, "predicate function cannot be null");
141 checkNotNull(remappingFunction, "Remapping function cannot be null");
142 return get(key).thenCompose(r1 -> {
143 V existingValue = r1 == null ? null : r1.value();
144 // if the condition evaluates to false, return existing value.
145 if (!condition.test(existingValue)) {
146 return CompletableFuture.completedFuture(r1);
147 }
148
149 AtomicReference<V> computedValue = new AtomicReference<>();
150 // if remappingFunction throws an exception, return the exception.
151 try {
152 computedValue.set(remappingFunction.apply(key, existingValue));
153 } catch (Exception e) {
154 return Tools.exceptionalFuture(e);
155 }
156
157 // if the computed value is null, remove current value if one exists.
158 // throw an exception if concurrent modification is detected.
159 if (computedValue.get() == null) {
160 if (r1 != null) {
161 return remove(key, r1.version()).thenApply(result -> {
162 if (result) {
163 return null;
164 } else {
165 throw new ConsistentMapException.ConcurrentModification();
166 }
167 });
168 } else {
169 return CompletableFuture.completedFuture(null);
170 }
171 } else {
172 // replace current value; throw an exception if concurrent modification is detected
173 if (r1 != null) {
174 return replaceAndGet(key, r1.version(), computedValue.get())
175 .thenApply(v -> {
176 if (v.isPresent()) {
177 return v.get();
178 } else {
179 throw new ConsistentMapException.ConcurrentModification();
180 }
181 });
182 } else {
183 return putIfAbsentAndGet(key, computedValue.get()).thenApply(result -> {
184 if (!result.isPresent()) {
185 throw new ConsistentMapException.ConcurrentModification();
186 } else {
187 return result.get();
188 }
189 });
190 }
191 }
192 });
193 }
194
195 @Override
Madan Jampani7c521002015-03-23 12:23:01 -0700196 public CompletableFuture<Versioned<V>> put(K key, V value) {
197 checkNotNull(key, ERROR_NULL_KEY);
198 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700199 checkIfUnmodifiable();
Madan Jampanif1b8e172015-03-23 11:42:02 -0700200 return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
Madan Jampani346d4f52015-05-04 11:09:39 -0700201 .thenApply(this::unwrapResult)
202 .thenApply(v -> v != null
203 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
204 }
205
206 @Override
207 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
208 checkNotNull(key, ERROR_NULL_KEY);
209 checkNotNull(value, ERROR_NULL_VALUE);
210 checkIfUnmodifiable();
211 return database.putAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700212 .thenApply(this::unwrapResult)
Madan Jampani346d4f52015-05-04 11:09:39 -0700213 .thenApply(v -> {
214 Versioned<byte[]> rawNewValue = v.newValue();
215 return new Versioned<>(serializer.decode(rawNewValue.value()),
216 rawNewValue.version(),
217 rawNewValue.creationTime());
218 });
219 }
220
221 @Override
222 public CompletableFuture<Optional<Versioned<V>>> putIfAbsentAndGet(K key, V value) {
223 checkNotNull(key, ERROR_NULL_KEY);
224 checkNotNull(value, ERROR_NULL_VALUE);
225 checkIfUnmodifiable();
226 return database.putIfAbsentAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
227 .thenApply(this::unwrapResult)
228 .thenApply(v -> {
229 if (v.updated()) {
230 Versioned<byte[]> rawNewValue = v.newValue();
231 return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()),
232 rawNewValue.version(),
233 rawNewValue.creationTime()));
234 } else {
235 return Optional.empty();
236 }
237 });
Madan Jampani7c521002015-03-23 12:23:01 -0700238 }
239
240 @Override
241 public CompletableFuture<Versioned<V>> remove(K key) {
242 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700243 checkIfUnmodifiable();
Madan Jampanif1b8e172015-03-23 11:42:02 -0700244 return database.remove(name, keyCache.getUnchecked(key))
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700245 .thenApply(this::unwrapResult)
Madan Jampani7c521002015-03-23 12:23:01 -0700246 .thenApply(v -> v != null
247 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
248 }
249
250 @Override
251 public CompletableFuture<Void> clear() {
Madan Jampani02b7fb82015-05-01 13:01:20 -0700252 checkIfUnmodifiable();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700253 return database.clear(name).thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700254 }
255
256 @Override
257 public CompletableFuture<Set<K>> keySet() {
Madan Jampanif1b8e172015-03-23 11:42:02 -0700258 return database.keySet(name)
Madan Jampani7c521002015-03-23 12:23:01 -0700259 .thenApply(s -> s
260 .stream()
261 .map(this::dK)
262 .collect(Collectors.toSet()));
263 }
264
265 @Override
266 public CompletableFuture<Collection<Versioned<V>>> values() {
Madan Jampanif1b8e172015-03-23 11:42:02 -0700267 return database.values(name).thenApply(c -> c
Madan Jampani7c521002015-03-23 12:23:01 -0700268 .stream()
269 .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime()))
270 .collect(Collectors.toList()));
271 }
272
273 @Override
274 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
Madan Jampanif1b8e172015-03-23 11:42:02 -0700275 return database.entrySet(name).thenApply(s -> s
Madan Jampani7c521002015-03-23 12:23:01 -0700276 .stream()
277 .map(this::fromRawEntry)
278 .collect(Collectors.toSet()));
279 }
280
281 @Override
282 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
283 checkNotNull(key, ERROR_NULL_KEY);
284 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700285 checkIfUnmodifiable();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700286 return database.putIfAbsent(name,
287 keyCache.getUnchecked(key),
288 serializer.encode(value))
Madan Jampani346d4f52015-05-04 11:09:39 -0700289 .thenApply(this::unwrapResult)
290 .thenApply(v -> v != null ?
291 new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
Madan Jampani7c521002015-03-23 12:23:01 -0700292 }
293
294 @Override
295 public CompletableFuture<Boolean> remove(K key, V value) {
296 checkNotNull(key, ERROR_NULL_KEY);
297 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700298 checkIfUnmodifiable();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700299 return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value))
Madan Jampani346d4f52015-05-04 11:09:39 -0700300 .thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700301 }
302
303 @Override
304 public CompletableFuture<Boolean> remove(K key, long version) {
305 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700306 checkIfUnmodifiable();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700307 return database.remove(name, keyCache.getUnchecked(key), version)
Madan Jampani346d4f52015-05-04 11:09:39 -0700308 .thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700309
310 }
311
312 @Override
313 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
314 checkNotNull(key, ERROR_NULL_KEY);
315 checkNotNull(newValue, ERROR_NULL_VALUE);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700316 checkIfUnmodifiable();
Madan Jampani7c521002015-03-23 12:23:01 -0700317 byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700318 return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue))
Madan Jampani346d4f52015-05-04 11:09:39 -0700319 .thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700320 }
321
322 @Override
323 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
Madan Jampani346d4f52015-05-04 11:09:39 -0700324 return replaceAndGet(key, oldVersion, newValue).thenApply(Optional::isPresent);
325 }
326
327 @Override
328 public CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue) {
Madan Jampani7c521002015-03-23 12:23:01 -0700329 checkNotNull(key, ERROR_NULL_KEY);
330 checkNotNull(newValue, ERROR_NULL_VALUE);
Madan Jampani02b7fb82015-05-01 13:01:20 -0700331 checkIfUnmodifiable();
Madan Jampani346d4f52015-05-04 11:09:39 -0700332 return database.replaceAndGet(name,
333 keyCache.getUnchecked(key),
334 oldVersion,
335 serializer.encode(newValue))
336 .thenApply(this::unwrapResult)
337 .thenApply(v -> {
338 if (v.updated()) {
339 Versioned<byte[]> rawNewValue = v.newValue();
340 return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()),
341 rawNewValue.version(),
342 rawNewValue.creationTime()));
343 } else {
344 return Optional.empty();
345 }
346 });
Madan Jampani7c521002015-03-23 12:23:01 -0700347 }
348
349 private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
350 return Pair.of(
351 dK(e.getKey()),
352 new Versioned<>(
353 serializer.decode(e.getValue().value()),
354 e.getValue().version(),
355 e.getValue().creationTime()));
356 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700357
358 private <T> T unwrapResult(Result<T> result) {
359 if (result.status() == Result.Status.LOCKED) {
360 throw new ConsistentMapException.ConcurrentModification();
361 } else if (result.success()) {
362 return result.value();
363 } else {
364 throw new IllegalStateException("Must not be here");
365 }
366 }
Madan Jampani02b7fb82015-05-01 13:01:20 -0700367
368 private void checkIfUnmodifiable() {
369 if (readOnly) {
370 throw new UnsupportedOperationException();
371 }
372 }
Madan Jampani7c521002015-03-23 12:23:01 -0700373}