blob: f42857fcf64fa0e5fa4f77b12ff367438ccb7cf7 [file] [log] [blame]
Madan Jampani7c521002015-03-23 12:23:01 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.consistent.impl;
18
19import static com.google.common.base.Preconditions.*;
Madan Jampani50589ac2015-06-08 11:38:46 -070020import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani7c521002015-03-23 12:23:01 -070021
22import java.util.Collection;
23import java.util.Map;
24import java.util.Map.Entry;
Madan Jampani346d4f52015-05-04 11:09:39 -070025import java.util.Objects;
Madan Jampani7c521002015-03-23 12:23:01 -070026import java.util.concurrent.CompletableFuture;
Madan Jampani50589ac2015-06-08 11:38:46 -070027import java.util.concurrent.CopyOnWriteArraySet;
Madan Jampani346d4f52015-05-04 11:09:39 -070028import java.util.concurrent.atomic.AtomicReference;
29import java.util.function.BiFunction;
Madan Jampani50589ac2015-06-08 11:38:46 -070030import java.util.function.Consumer;
Madan Jampani346d4f52015-05-04 11:09:39 -070031import java.util.function.Function;
32import java.util.function.Predicate;
Madan Jampani7c521002015-03-23 12:23:01 -070033import java.util.stream.Collectors;
34import java.util.Set;
35
Madan Jampani7c521002015-03-23 12:23:01 -070036import org.onlab.util.HexString;
Madan Jampani346d4f52015-05-04 11:09:39 -070037import org.onlab.util.Tools;
Madan Jampanie8af1cc2015-06-23 14:23:31 -070038import org.onosproject.core.ApplicationId;
Madan Jampani7c521002015-03-23 12:23:01 -070039import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070040import org.onosproject.store.service.ConsistentMapException;
Madan Jampani50589ac2015-06-08 11:38:46 -070041import org.onosproject.store.service.MapEvent;
42import org.onosproject.store.service.MapEventListener;
Madan Jampani7c521002015-03-23 12:23:01 -070043import org.onosproject.store.service.Serializer;
44import org.onosproject.store.service.Versioned;
Madan Jampani50589ac2015-06-08 11:38:46 -070045import org.slf4j.Logger;
Madan Jampani7c521002015-03-23 12:23:01 -070046
47import com.google.common.cache.CacheBuilder;
48import com.google.common.cache.CacheLoader;
49import com.google.common.cache.LoadingCache;
Madan Jampani7804c992015-07-20 13:20:19 -070050import com.google.common.collect.Maps;
Madan Jampani7c521002015-03-23 12:23:01 -070051
52/**
53 * AsyncConsistentMap implementation that is backed by a Raft consensus
54 * based database.
55 *
56 * @param <K> type of key.
57 * @param <V> type of value.
58 */
59public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
60
61 private final String name;
Madan Jampanie8af1cc2015-06-23 14:23:31 -070062 private final ApplicationId applicationId;
Madan Jampanif1b8e172015-03-23 11:42:02 -070063 private final Database database;
Madan Jampani7c521002015-03-23 12:23:01 -070064 private final Serializer serializer;
Madan Jampani02b7fb82015-05-01 13:01:20 -070065 private final boolean readOnly;
Madan Jampanie8af1cc2015-06-23 14:23:31 -070066 private final boolean purgeOnUninstall;
Madan Jampani50589ac2015-06-08 11:38:46 -070067 private final Consumer<MapEvent<K, V>> eventPublisher;
68
69 private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
70
71 private final Logger log = getLogger(getClass());
Madan Jampani7c521002015-03-23 12:23:01 -070072
73 private static final String ERROR_NULL_KEY = "Key cannot be null";
74 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
75
76 private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
77 .softValues()
78 .build(new CacheLoader<K, String>() {
79
80 @Override
81 public String load(K key) {
82 return HexString.toHexString(serializer.encode(key));
83 }
84 });
85
86 protected K dK(String key) {
87 return serializer.decode(HexString.fromHexString(key));
88 }
89
90 public DefaultAsyncConsistentMap(String name,
Madan Jampanie8af1cc2015-06-23 14:23:31 -070091 ApplicationId applicationId,
Madan Jampanif1b8e172015-03-23 11:42:02 -070092 Database database,
Madan Jampani02b7fb82015-05-01 13:01:20 -070093 Serializer serializer,
Madan Jampani50589ac2015-06-08 11:38:46 -070094 boolean readOnly,
Madan Jampanie8af1cc2015-06-23 14:23:31 -070095 boolean purgeOnUninstall,
Madan Jampani50589ac2015-06-08 11:38:46 -070096 Consumer<MapEvent<K, V>> eventPublisher) {
Madan Jampani7c521002015-03-23 12:23:01 -070097 this.name = checkNotNull(name, "map name cannot be null");
Madan Jampanie8af1cc2015-06-23 14:23:31 -070098 this.applicationId = applicationId;
Madan Jampanif1b8e172015-03-23 11:42:02 -070099 this.database = checkNotNull(database, "database cannot be null");
Madan Jampani7c521002015-03-23 12:23:01 -0700100 this.serializer = checkNotNull(serializer, "serializer cannot be null");
Madan Jampani02b7fb82015-05-01 13:01:20 -0700101 this.readOnly = readOnly;
Madan Jampanie8af1cc2015-06-23 14:23:31 -0700102 this.purgeOnUninstall = purgeOnUninstall;
Madan Jampani50589ac2015-06-08 11:38:46 -0700103 this.eventPublisher = eventPublisher;
104 }
105
106 /**
107 * Returns this map name.
108 * @return map name
109 */
110 public String name() {
111 return name;
112 }
113
114 /**
115 * Returns the serializer for map entries.
116 * @return map entry serializer
117 */
118 public Serializer serializer() {
119 return serializer;
Madan Jampani7c521002015-03-23 12:23:01 -0700120 }
121
Madan Jampanie8af1cc2015-06-23 14:23:31 -0700122 /**
123 * Returns the applicationId owning this map.
124 * @return application Id
125 */
126 public ApplicationId applicationId() {
127 return applicationId;
128 }
129
130 /**
131 * Returns whether the map entries should be purged when the application
132 * owning it is uninstalled.
133 * @return true is map needs to cleared on app uninstall; false otherwise
134 */
135 public boolean purgeOnUninstall() {
136 return purgeOnUninstall;
137 }
138
Madan Jampani7c521002015-03-23 12:23:01 -0700139 @Override
140 public CompletableFuture<Integer> size() {
Madan Jampani7804c992015-07-20 13:20:19 -0700141 return database.mapSize(name);
Madan Jampani7c521002015-03-23 12:23:01 -0700142 }
143
144 @Override
145 public CompletableFuture<Boolean> isEmpty() {
Madan Jampani7804c992015-07-20 13:20:19 -0700146 return database.mapIsEmpty(name);
Madan Jampani7c521002015-03-23 12:23:01 -0700147 }
148
149 @Override
150 public CompletableFuture<Boolean> containsKey(K key) {
151 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani7804c992015-07-20 13:20:19 -0700152 return database.mapContainsKey(name, keyCache.getUnchecked(key));
Madan Jampani7c521002015-03-23 12:23:01 -0700153 }
154
155 @Override
156 public CompletableFuture<Boolean> containsValue(V value) {
157 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani7804c992015-07-20 13:20:19 -0700158 return database.mapContainsValue(name, serializer.encode(value));
Madan Jampani7c521002015-03-23 12:23:01 -0700159 }
160
161 @Override
162 public CompletableFuture<Versioned<V>> get(K key) {
163 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani7804c992015-07-20 13:20:19 -0700164 return database.mapGet(name, keyCache.getUnchecked(key))
165 .thenApply(v -> v != null ? v.map(serializer::decode) : null);
Madan Jampani7c521002015-03-23 12:23:01 -0700166 }
167
168 @Override
Madan Jampani346d4f52015-05-04 11:09:39 -0700169 public CompletableFuture<Versioned<V>> computeIfAbsent(K key,
170 Function<? super K, ? extends V> mappingFunction) {
Madan Jampani7804c992015-07-20 13:20:19 -0700171 checkNotNull(key, ERROR_NULL_KEY);
172 checkNotNull(mappingFunction, "Mapping function cannot be null");
173 return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key)).thenApply(v -> v.newValue());
Madan Jampani346d4f52015-05-04 11:09:39 -0700174 }
175
176 @Override
177 public CompletableFuture<Versioned<V>> computeIfPresent(K key,
178 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
179 return computeIf(key, Objects::nonNull, remappingFunction);
180 }
181
182 @Override
183 public CompletableFuture<Versioned<V>> compute(K key,
184 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
185 return computeIf(key, v -> true, remappingFunction);
186 }
187
188 @Override
189 public CompletableFuture<Versioned<V>> computeIf(K key,
190 Predicate<? super V> condition,
191 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
192 checkNotNull(key, ERROR_NULL_KEY);
193 checkNotNull(condition, "predicate function cannot be null");
194 checkNotNull(remappingFunction, "Remapping function cannot be null");
195 return get(key).thenCompose(r1 -> {
196 V existingValue = r1 == null ? null : r1.value();
197 // if the condition evaluates to false, return existing value.
198 if (!condition.test(existingValue)) {
199 return CompletableFuture.completedFuture(r1);
200 }
201
202 AtomicReference<V> computedValue = new AtomicReference<>();
203 // if remappingFunction throws an exception, return the exception.
204 try {
205 computedValue.set(remappingFunction.apply(key, existingValue));
206 } catch (Exception e) {
207 return Tools.exceptionalFuture(e);
208 }
Madan Jampani7804c992015-07-20 13:20:19 -0700209 if (computedValue.get() == null && r1 == null) {
210 return CompletableFuture.completedFuture(null);
Madan Jampani346d4f52015-05-04 11:09:39 -0700211 }
Madan Jampani7804c992015-07-20 13:20:19 -0700212 Match<V> valueMatcher = r1 == null ? Match.ifNull() : Match.any();
213 Match<Long> versionMatcher = r1 == null ? Match.any() : Match.ifValue(r1.version());
214 return updateAndGet(key, valueMatcher, versionMatcher, computedValue.get())
215 .thenApply(v -> {
216 if (v.updated()) {
217 return v.newValue();
218 } else {
219 throw new ConsistentMapException.ConcurrentModification();
220 }
221 });
222 });
Madan Jampani346d4f52015-05-04 11:09:39 -0700223 }
224
225 @Override
Madan Jampani7c521002015-03-23 12:23:01 -0700226 public CompletableFuture<Versioned<V>> put(K key, V value) {
227 checkNotNull(key, ERROR_NULL_KEY);
228 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani7804c992015-07-20 13:20:19 -0700229 return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue());
Madan Jampani346d4f52015-05-04 11:09:39 -0700230 }
231
232 @Override
233 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
234 checkNotNull(key, ERROR_NULL_KEY);
235 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani7804c992015-07-20 13:20:19 -0700236 return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue());
Madan Jampani7c521002015-03-23 12:23:01 -0700237 }
238
239 @Override
240 public CompletableFuture<Versioned<V>> remove(K key) {
241 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani7804c992015-07-20 13:20:19 -0700242 return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue());
Madan Jampani7c521002015-03-23 12:23:01 -0700243 }
244
245 @Override
246 public CompletableFuture<Void> clear() {
Madan Jampani02b7fb82015-05-01 13:01:20 -0700247 checkIfUnmodifiable();
Madan Jampani7804c992015-07-20 13:20:19 -0700248 return database.mapClear(name).thenApply(this::unwrapResult);
Madan Jampani7c521002015-03-23 12:23:01 -0700249 }
250
251 @Override
252 public CompletableFuture<Set<K>> keySet() {
Madan Jampani7804c992015-07-20 13:20:19 -0700253 return database.mapKeySet(name)
Madan Jampani7c521002015-03-23 12:23:01 -0700254 .thenApply(s -> s
255 .stream()
256 .map(this::dK)
257 .collect(Collectors.toSet()));
258 }
259
260 @Override
261 public CompletableFuture<Collection<Versioned<V>>> values() {
Madan Jampani7804c992015-07-20 13:20:19 -0700262 return database.mapValues(name).thenApply(c -> c
Madan Jampani7c521002015-03-23 12:23:01 -0700263 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700264 .map(v -> v.<V>map(serializer::decode))
Madan Jampani7c521002015-03-23 12:23:01 -0700265 .collect(Collectors.toList()));
266 }
267
268 @Override
269 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
Madan Jampani7804c992015-07-20 13:20:19 -0700270 return database.mapEntrySet(name).thenApply(s -> s
Madan Jampani7c521002015-03-23 12:23:01 -0700271 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700272 .map(this::mapRawEntry)
Madan Jampani7c521002015-03-23 12:23:01 -0700273 .collect(Collectors.toSet()));
274 }
275
276 @Override
277 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
278 checkNotNull(key, ERROR_NULL_KEY);
279 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampaniae2d0d72015-07-21 16:42:59 -0700280 return updateAndGet(key, Match.ifNull(), Match.any(), value).thenApply(v -> v.oldValue());
Madan Jampani7c521002015-03-23 12:23:01 -0700281 }
282
283 @Override
284 public CompletableFuture<Boolean> remove(K key, V value) {
285 checkNotNull(key, ERROR_NULL_KEY);
286 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani7804c992015-07-20 13:20:19 -0700287 return updateAndGet(key, Match.ifValue(value), Match.any(), null).thenApply(v -> v.updated());
Madan Jampani7c521002015-03-23 12:23:01 -0700288 }
289
290 @Override
291 public CompletableFuture<Boolean> remove(K key, long version) {
292 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani7804c992015-07-20 13:20:19 -0700293 return updateAndGet(key, Match.any(), Match.ifValue(version), null).thenApply(v -> v.updated());
Madan Jampani7c521002015-03-23 12:23:01 -0700294 }
295
296 @Override
297 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
298 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani7804c992015-07-20 13:20:19 -0700299 checkNotNull(oldValue, ERROR_NULL_VALUE);
Madan Jampani7c521002015-03-23 12:23:01 -0700300 checkNotNull(newValue, ERROR_NULL_VALUE);
Madan Jampani7804c992015-07-20 13:20:19 -0700301 return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue).thenApply(v -> v.updated());
Madan Jampani7c521002015-03-23 12:23:01 -0700302 }
303
304 @Override
305 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
Madan Jampani7804c992015-07-20 13:20:19 -0700306 return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue).thenApply(v -> v.updated());
Madan Jampani346d4f52015-05-04 11:09:39 -0700307 }
308
Madan Jampani7804c992015-07-20 13:20:19 -0700309 private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
310 return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode));
311 }
312
313 private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key,
314 Match<V> oldValueMatch,
315 Match<Long> oldVersionMatch,
316 V value) {
Madan Jampani02b7fb82015-05-01 13:01:20 -0700317 checkIfUnmodifiable();
Madan Jampani7804c992015-07-20 13:20:19 -0700318 return database.mapUpdate(name,
319 keyCache.getUnchecked(key),
320 oldValueMatch.map(serializer::encode),
321 oldVersionMatch,
322 value == null ? null : serializer.encode(value))
323 .thenApply(this::unwrapResult)
324 .thenApply(r -> r.<K, V>map(this::dK, serializer::decode))
325 .whenComplete((r, e) -> notifyListeners(r != null ? r.toMapEvent() : null));
Madan Jampani7c521002015-03-23 12:23:01 -0700326 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700327
328 private <T> T unwrapResult(Result<T> result) {
329 if (result.status() == Result.Status.LOCKED) {
330 throw new ConsistentMapException.ConcurrentModification();
331 } else if (result.success()) {
332 return result.value();
333 } else {
334 throw new IllegalStateException("Must not be here");
335 }
336 }
Madan Jampani02b7fb82015-05-01 13:01:20 -0700337
338 private void checkIfUnmodifiable() {
339 if (readOnly) {
340 throw new UnsupportedOperationException();
341 }
342 }
Madan Jampani50589ac2015-06-08 11:38:46 -0700343
344 @Override
345 public void addListener(MapEventListener<K, V> listener) {
346 listeners.add(listener);
347 }
348
349 @Override
350 public void removeListener(MapEventListener<K, V> listener) {
351 listeners.remove(listener);
352 }
353
354 protected void notifyListeners(MapEvent<K, V> event) {
355 try {
356 if (event != null) {
357 notifyLocalListeners(event);
358 notifyRemoteListeners(event);
359 }
360 } catch (Exception e) {
361 log.warn("Failure notifying listeners about {}", event, e);
362 }
363 }
364
365 protected void notifyLocalListeners(MapEvent<K, V> event) {
366 listeners.forEach(listener -> listener.event(event));
367 }
368
369 protected void notifyRemoteListeners(MapEvent<K, V> event) {
370 if (eventPublisher != null) {
371 eventPublisher.accept(event);
372 }
373 }
Madan Jampani7c521002015-03-23 12:23:01 -0700374}