blob: a25fb79e4f703e7bab72a10e9d8b01b4beb5ad50 [file] [log] [blame]
Madan Jampani10073672016-01-21 19:13:59 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani10073672016-01-21 19:13:59 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampani10073672016-01-21 19:13:59 -080016package org.onosproject.store.primitives.impl;
17
Madan Jampani1d3b6172016-04-28 13:22:57 -070018import static org.slf4j.LoggerFactory.getLogger;
19
Jordan Halterman74d76b12018-06-27 13:38:09 -070020import java.util.Map;
Jordan Halterman5ecdb342017-07-22 12:33:14 -070021import java.util.Objects;
Madan Jampani10073672016-01-21 19:13:59 -080022import java.util.concurrent.CompletableFuture;
Jordan Halterman74d76b12018-06-27 13:38:09 -070023import java.util.concurrent.ConcurrentHashMap;
24import java.util.concurrent.Executor;
Madan Jampani10073672016-01-21 19:13:59 -080025import java.util.function.BiFunction;
Madan Jampani1d3b6172016-04-28 13:22:57 -070026import java.util.function.Consumer;
Madan Jampani10073672016-01-21 19:13:59 -080027import java.util.function.Predicate;
28
Jordan Halterman74d76b12018-06-27 13:38:09 -070029import com.google.common.util.concurrent.MoreExecutors;
Madan Jampani10073672016-01-21 19:13:59 -080030import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampani62f15332016-02-01 13:18:39 -080031import org.onosproject.store.service.MapEventListener;
Madan Jampani10073672016-01-21 19:13:59 -080032import org.onosproject.store.service.Versioned;
Madan Jampani1d3b6172016-04-28 13:22:57 -070033import org.slf4j.Logger;
Madan Jampani10073672016-01-21 19:13:59 -080034
35import com.google.common.cache.CacheBuilder;
36import com.google.common.cache.CacheLoader;
37import com.google.common.cache.LoadingCache;
38
Madan Jampani1d3b6172016-04-28 13:22:57 -070039import static org.onosproject.store.service.DistributedPrimitive.Status.INACTIVE;
40import static org.onosproject.store.service.DistributedPrimitive.Status.SUSPENDED;
41
Madan Jampani10073672016-01-21 19:13:59 -080042/**
43 * {@code AsyncConsistentMap} that caches entries on read.
44 * <p>
45 * The cache entries are automatically invalidated when updates are detected either locally or
46 * remotely.
47 * <p> This implementation only attempts to serve cached entries for {@link AsyncConsistentMap#get get}
Jordan Halterman5ecdb342017-07-22 12:33:14 -070048 * {@link AsyncConsistentMap#getOrDefault(Object, Object) getOrDefault}, and
49 * {@link AsyncConsistentMap#containsKey(Object) containsKey} calls. All other calls skip the cache
50 * and directly go the backing map.
Madan Jampani10073672016-01-21 19:13:59 -080051 *
52 * @param <K> key type
53 * @param <V> value type
54 */
55public class CachingAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMap<K, V> {
Madan Jampani1d3b6172016-04-28 13:22:57 -070056 private static final int DEFAULT_CACHE_SIZE = 10000;
57 private final Logger log = getLogger(getClass());
Madan Jampani10073672016-01-21 19:13:59 -080058
Jordan Halterman74d76b12018-06-27 13:38:09 -070059 private final Map<MapEventListener<K, V>, Executor> mapEventListeners = new ConcurrentHashMap<>();
Madan Jampani1d3b6172016-04-28 13:22:57 -070060 private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache;
Jordan Haltermanb2243072017-05-09 12:04:04 -070061 private final AsyncConsistentMap<K, V> backingMap;
Madan Jampanie88086f2016-06-16 16:48:14 -070062 private final MapEventListener<K, V> cacheUpdater;
Madan Jampani1d3b6172016-04-28 13:22:57 -070063 private final Consumer<Status> statusListener;
Madan Jampani62f15332016-02-01 13:18:39 -080064
sangyun-hancce07c52016-04-06 11:07:59 +090065 /**
66 * Default constructor.
67 *
68 * @param backingMap a distributed, strongly consistent map for backing
69 */
Madan Jampani10073672016-01-21 19:13:59 -080070 public CachingAsyncConsistentMap(AsyncConsistentMap<K, V> backingMap) {
Madan Jampani1d3b6172016-04-28 13:22:57 -070071 this(backingMap, DEFAULT_CACHE_SIZE);
Madan Jampani62f15332016-02-01 13:18:39 -080072 }
73
sangyun-hancce07c52016-04-06 11:07:59 +090074 /**
Madan Jampani1d3b6172016-04-28 13:22:57 -070075 * Constructor to configure cache size.
sangyun-hancce07c52016-04-06 11:07:59 +090076 *
77 * @param backingMap a distributed, strongly consistent map for backing
78 * @param cacheSize the maximum size of the cache
79 */
80 public CachingAsyncConsistentMap(AsyncConsistentMap<K, V> backingMap, int cacheSize) {
81 super(backingMap);
Jordan Haltermanb2243072017-05-09 12:04:04 -070082 this.backingMap = backingMap;
Madan Jampani1d3b6172016-04-28 13:22:57 -070083 cache = CacheBuilder.newBuilder()
84 .maximumSize(cacheSize)
85 .build(CacheLoader.from(CachingAsyncConsistentMap.super::get));
Madan Jampanie88086f2016-06-16 16:48:14 -070086 cacheUpdater = event -> {
87 Versioned<V> newValue = event.newValue();
88 if (newValue == null) {
89 cache.invalidate(event.key());
90 } else {
91 cache.put(event.key(), CompletableFuture.completedFuture(newValue));
92 }
Jordan Halterman74d76b12018-06-27 13:38:09 -070093 mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event)));
Madan Jampanie88086f2016-06-16 16:48:14 -070094 };
Madan Jampani1d3b6172016-04-28 13:22:57 -070095 statusListener = status -> {
96 log.debug("{} status changed to {}", this.name(), status);
97 // If the status of the underlying map is SUSPENDED or INACTIVE
98 // we can no longer guarantee that the cache will be in sync.
99 if (status == SUSPENDED || status == INACTIVE) {
100 cache.invalidateAll();
101 }
102 };
Jordan Halterman74d76b12018-06-27 13:38:09 -0700103 super.addListener(cacheUpdater, MoreExecutors.directExecutor());
Madan Jampani1d3b6172016-04-28 13:22:57 -0700104 super.addStatusChangeListener(statusListener);
sangyun-hancce07c52016-04-06 11:07:59 +0900105 }
106
Madan Jampani62f15332016-02-01 13:18:39 -0800107 @Override
108 public CompletableFuture<Void> destroy() {
Madan Jampani1d3b6172016-04-28 13:22:57 -0700109 super.removeStatusChangeListener(statusListener);
Madan Jampanie88086f2016-06-16 16:48:14 -0700110 return super.destroy().thenCompose(v -> removeListener(cacheUpdater));
Madan Jampani10073672016-01-21 19:13:59 -0800111 }
112
113 @Override
114 public CompletableFuture<Versioned<V>> get(K key) {
Madan Jampani77012442016-06-02 07:47:42 -0700115 return cache.getUnchecked(key)
116 .whenComplete((r, e) -> {
117 if (e != null) {
118 cache.invalidate(key);
119 }
120 });
Madan Jampani10073672016-01-21 19:13:59 -0800121 }
122
123 @Override
Jordan Haltermanb2243072017-05-09 12:04:04 -0700124 public CompletableFuture<Versioned<V>> getOrDefault(K key, V defaultValue) {
125 return cache.getUnchecked(key).thenCompose(r -> {
126 if (r == null) {
127 CompletableFuture<Versioned<V>> versioned = backingMap.getOrDefault(key, defaultValue);
128 cache.put(key, versioned);
129 return versioned;
130 } else {
131 return CompletableFuture.completedFuture(r);
132 }
133 }).whenComplete((r, e) -> {
134 if (e != null) {
135 cache.invalidate(key);
136 }
137 });
138 }
139
140 @Override
Madan Jampani10073672016-01-21 19:13:59 -0800141 public CompletableFuture<Versioned<V>> computeIf(K key,
142 Predicate<? super V> condition,
143 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
144 return super.computeIf(key, condition, remappingFunction)
sangyun-hancce07c52016-04-06 11:07:59 +0900145 .whenComplete((r, e) -> cache.invalidate(key));
Madan Jampani10073672016-01-21 19:13:59 -0800146 }
147
148 @Override
149 public CompletableFuture<Versioned<V>> put(K key, V value) {
150 return super.put(key, value)
sangyun-hancce07c52016-04-06 11:07:59 +0900151 .whenComplete((r, e) -> cache.invalidate(key));
Madan Jampani10073672016-01-21 19:13:59 -0800152 }
153
154 @Override
155 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
Simon Hunt5829c342016-03-07 17:01:43 -0800156 return super.putAndGet(key, value)
sangyun-hancce07c52016-04-06 11:07:59 +0900157 .whenComplete((r, e) -> cache.invalidate(key));
Madan Jampani10073672016-01-21 19:13:59 -0800158 }
159
160 @Override
Jordan Halterman5ecdb342017-07-22 12:33:14 -0700161 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
162 return super.putIfAbsent(key, value)
163 .whenComplete((r, e) -> cache.invalidate(key));
164 }
165
166 @Override
Madan Jampani10073672016-01-21 19:13:59 -0800167 public CompletableFuture<Versioned<V>> remove(K key) {
168 return super.remove(key)
sangyun-hancce07c52016-04-06 11:07:59 +0900169 .whenComplete((r, e) -> cache.invalidate(key));
Madan Jampani10073672016-01-21 19:13:59 -0800170 }
171
172 @Override
Jordan Halterman5ecdb342017-07-22 12:33:14 -0700173 public CompletableFuture<Boolean> containsKey(K key) {
174 return cache.getUnchecked(key).thenApply(Objects::nonNull)
175 .whenComplete((r, e) -> {
176 if (e != null) {
177 cache.invalidate(key);
178 }
179 });
180 }
181
182 @Override
Madan Jampani10073672016-01-21 19:13:59 -0800183 public CompletableFuture<Void> clear() {
184 return super.clear()
sangyun-hancce07c52016-04-06 11:07:59 +0900185 .whenComplete((r, e) -> cache.invalidateAll());
Madan Jampani10073672016-01-21 19:13:59 -0800186 }
187
188 @Override
189 public CompletableFuture<Boolean> remove(K key, V value) {
190 return super.remove(key, value)
Madan Jampani77012442016-06-02 07:47:42 -0700191 .whenComplete((r, e) -> {
192 if (r) {
193 cache.invalidate(key);
194 }
195 });
Madan Jampani10073672016-01-21 19:13:59 -0800196 }
197
198 @Override
199 public CompletableFuture<Boolean> remove(K key, long version) {
200 return super.remove(key, version)
201 .whenComplete((r, e) -> {
202 if (r) {
203 cache.invalidate(key);
204 }
205 });
206 }
207
208 @Override
209 public CompletableFuture<Versioned<V>> replace(K key, V value) {
210 return super.replace(key, value)
Madan Jampani77012442016-06-02 07:47:42 -0700211 .whenComplete((r, e) -> cache.invalidate(key));
Madan Jampani10073672016-01-21 19:13:59 -0800212 }
213
214 @Override
215 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
216 return super.replace(key, oldValue, newValue)
217 .whenComplete((r, e) -> {
218 if (r) {
219 cache.invalidate(key);
220 }
221 });
222 }
223
224 @Override
225 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
226 return super.replace(key, oldVersion, newValue)
227 .whenComplete((r, e) -> {
228 if (r) {
229 cache.invalidate(key);
230 }
231 });
232 }
Jordan Halterman74d76b12018-06-27 13:38:09 -0700233
234 @Override
235 public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
236 mapEventListeners.put(listener, executor);
237 return CompletableFuture.completedFuture(null);
238 }
239
240 @Override
241 public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
242 mapEventListeners.remove(listener);
243 return CompletableFuture.completedFuture(null);
244 }
Madan Jampani10073672016-01-21 19:13:59 -0800245}