blob: d71f965a52c5be6820a00e2efaf2fb522371f932 [file] [log] [blame]
Jordan Halterman8d8da592017-08-28 14:45:19 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.concurrent.CompletableFuture;
19import java.util.function.Consumer;
20
21import com.google.common.cache.CacheBuilder;
22import com.google.common.cache.CacheLoader;
23import com.google.common.cache.LoadingCache;
24import org.onosproject.store.service.AsyncDocumentTree;
25import org.onosproject.store.service.DocumentPath;
26import org.onosproject.store.service.DocumentTreeListener;
27import org.onosproject.store.service.Versioned;
28import org.slf4j.Logger;
29
30import static org.onosproject.store.service.DistributedPrimitive.Status.INACTIVE;
31import static org.onosproject.store.service.DistributedPrimitive.Status.SUSPENDED;
32import static org.slf4j.LoggerFactory.getLogger;
33
34/**
35 * Caching asynchronous document tree.
36 */
37public class CachingAsyncDocumentTree<V> extends DelegatingAsyncDocumentTree<V> implements AsyncDocumentTree<V> {
38 private static final int DEFAULT_CACHE_SIZE = 10000;
39 private final Logger log = getLogger(getClass());
40
41 private final LoadingCache<DocumentPath, CompletableFuture<Versioned<V>>> cache;
42 private final DocumentTreeListener<V> cacheUpdater;
43 private final Consumer<Status> statusListener;
44
45 /**
46 * Default constructor.
47 *
48 * @param backingTree a distributed, strongly consistent map for backing
49 */
50 public CachingAsyncDocumentTree(AsyncDocumentTree<V> backingTree) {
51 this(backingTree, DEFAULT_CACHE_SIZE);
52 }
53
54 /**
55 * Constructor to configure cache size.
56 *
57 * @param backingTree a distributed, strongly consistent map for backing
58 * @param cacheSize the maximum size of the cache
59 */
60 public CachingAsyncDocumentTree(AsyncDocumentTree<V> backingTree, int cacheSize) {
61 super(backingTree);
62 cache = CacheBuilder.newBuilder()
63 .maximumSize(cacheSize)
64 .build(CacheLoader.from(CachingAsyncDocumentTree.super::get));
65 cacheUpdater = event -> {
66 if (!event.newValue().isPresent()) {
67 cache.invalidate(event.path());
68 } else {
69 cache.put(event.path(), CompletableFuture.completedFuture(event.newValue().get()));
70 }
71 };
72 statusListener = status -> {
73 log.debug("{} status changed to {}", this.name(), status);
74 // If the status of the underlying map is SUSPENDED or INACTIVE
75 // we can no longer guarantee that the cache will be in sync.
76 if (status == SUSPENDED || status == INACTIVE) {
77 cache.invalidateAll();
78 }
79 };
80 super.addListener(cacheUpdater);
81 super.addStatusChangeListener(statusListener);
82 }
83
84 @Override
85 public CompletableFuture<Versioned<V>> get(DocumentPath path) {
86 return cache.getUnchecked(path);
87 }
88
89 @Override
90 public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
91 return super.set(path, value)
92 .whenComplete((r, e) -> cache.invalidate(path));
93 }
94
95 @Override
96 public CompletableFuture<Boolean> create(DocumentPath path, V value) {
97 return super.create(path, value)
98 .whenComplete((r, e) -> cache.invalidate(path));
99 }
100
101 @Override
102 public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) {
103 return super.createRecursive(path, value)
104 .whenComplete((r, e) -> cache.invalidate(path));
105 }
106
107 @Override
108 public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
109 return super.replace(path, newValue, version)
110 .whenComplete((r, e) -> {
111 if (r) {
112 cache.invalidate(path);
113 }
114 });
115 }
116
117 @Override
118 public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
119 return super.replace(path, newValue, currentValue)
120 .whenComplete((r, e) -> {
121 if (r) {
122 cache.invalidate(path);
123 }
124 });
125 }
126
127 @Override
128 public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
129 return super.removeNode(path)
130 .whenComplete((r, e) -> cache.invalidate(path));
131 }
132}