blob: 1c7ef7fd614df531fc43627753bf42ffb86eb7be [file] [log] [blame]
Madan Jampani09342702015-02-05 23:32:40 -08001package org.onosproject.store.consistent.impl;
2
3import static com.google.common.base.Preconditions.*;
4
5import java.util.AbstractMap;
6import java.util.Collection;
7import java.util.Collections;
8import java.util.List;
9import java.util.Map;
10import java.util.Map.Entry;
11import java.util.concurrent.CompletableFuture;
12import java.util.concurrent.ExecutionException;
13import java.util.concurrent.TimeUnit;
14import java.util.concurrent.TimeoutException;
15import java.util.stream.Collectors;
16import java.util.Set;
17
18import org.onlab.util.HexString;
19import org.onosproject.store.serializers.StoreSerializer;
20
21import com.google.common.cache.CacheBuilder;
22import com.google.common.cache.CacheLoader;
23import com.google.common.cache.LoadingCache;
24
25/**
26 * ConsistentMap implementation that is backed by a Raft consensus
27 * based database.
28 *
29 * @param <K> type of key.
30 * @param <V> type of value.
31 */
32public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
33
34 private final String name;
35 private final DatabaseProxy<String, byte[]> proxy;
36 private final StoreSerializer serializer;
37
38 private static final int OPERATION_TIMEOUT_MILLIS = 1000;
39 private static final String ERROR_NULL_KEY = "Key cannot be null";
40 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
41
42 private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
43 .softValues()
44 .build(new CacheLoader<K, String>() {
45
46 @Override
47 public String load(K key) {
48 return HexString.toHexString(serializer.encode(key));
49 }
50 });
51
52 protected K dK(String key) {
53 return serializer.decode(HexString.fromHexString(key));
54 }
55
56 ConsistentMapImpl(String name,
57 DatabaseProxy<String, byte[]> proxy,
58 StoreSerializer serializer) {
59 this.name = checkNotNull(name, "map name cannot be null");
60 this.proxy = checkNotNull(proxy, "database proxy cannot be null");
61 this.serializer = checkNotNull(serializer, "serializer cannot be null");
62 }
63
64 @Override
65 public int size() {
66 return complete(proxy.size(name));
67 }
68
69 @Override
70 public boolean isEmpty() {
71 return complete(proxy.isEmpty(name));
72 }
73
74 @Override
75 public boolean containsKey(K key) {
76 checkNotNull(key, ERROR_NULL_KEY);
77 return complete(proxy.containsKey(name, keyCache.getUnchecked(key)));
78 }
79
80 @Override
81 public boolean containsValue(V value) {
82 checkNotNull(value, ERROR_NULL_VALUE);
83 return complete(proxy.containsValue(name, serializer.encode(value)));
84 }
85
86 @Override
87 public Versioned<V> get(K key) {
88 checkNotNull(key, ERROR_NULL_KEY);
89 Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
90 return new Versioned<>(serializer.decode(value.value()), value.version());
91 }
92
93 @Override
94 public Versioned<V> put(K key, V value) {
95 checkNotNull(key, ERROR_NULL_KEY);
96 checkNotNull(value, ERROR_NULL_VALUE);
97 Versioned<byte[]> previousValue = complete(proxy.get(name, keyCache.getUnchecked(key)));
98 return (previousValue != null) ?
99 new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
100
101 }
102
103 @Override
104 public Versioned<V> remove(K key) {
105 checkNotNull(key, ERROR_NULL_KEY);
106 Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
107 return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
108 }
109
110 @Override
111 public void clear() {
112 complete(proxy.clear(name));
113 }
114
115 @Override
116 public Set<K> keySet() {
117 return Collections.unmodifiableSet(complete(proxy.keySet(name))
118 .stream()
119 .map(this::dK)
120 .collect(Collectors.toSet()));
121 }
122
123 @Override
124 public Collection<Versioned<V>> values() {
125 return Collections.unmodifiableList(complete(proxy.values(name))
126 .stream()
127 .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version()))
128 .collect(Collectors.toList()));
129 }
130
131 @Override
132 public Set<Entry<K, Versioned<V>>> entrySet() {
133 return Collections.unmodifiableSet(complete(proxy.entrySet(name))
134 .stream()
135 .map(this::fromRawEntry)
136 .collect(Collectors.toSet()));
137 }
138
139 @Override
140 public Versioned<V> putIfAbsent(K key, V value) {
141 checkNotNull(key, ERROR_NULL_KEY);
142 checkNotNull(value, ERROR_NULL_VALUE);
143 Versioned<byte[]> existingValue = complete(proxy.putIfAbsent(
144 name, keyCache.getUnchecked(key), serializer.encode(value)));
145 return (existingValue != null) ?
146 new Versioned<>(serializer.decode(existingValue.value()), existingValue.version()) : null;
147 }
148
149 @Override
150 public boolean remove(K key, V value) {
151 checkNotNull(key, ERROR_NULL_KEY);
152 checkNotNull(value, ERROR_NULL_VALUE);
153 return complete(proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value)));
154 }
155
156 @Override
157 public boolean remove(K key, long version) {
158 checkNotNull(key, ERROR_NULL_KEY);
159 return complete(proxy.remove(name, keyCache.getUnchecked(key), version));
160
161 }
162
163 @Override
164 public boolean replace(K key, V oldValue, V newValue) {
165 checkNotNull(key, ERROR_NULL_KEY);
166 checkNotNull(newValue, ERROR_NULL_VALUE);
167 byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
168 return complete(proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue)));
169 }
170
171 @Override
172 public boolean replace(K key, long oldVersion, V newValue) {
173 checkNotNull(key, ERROR_NULL_KEY);
174 checkNotNull(newValue, ERROR_NULL_VALUE);
175 return complete(proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)));
176 }
177
178 @Override
179 public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
180 checkNotNull(updates, "updates cannot be null");
181 return complete(proxy.atomicBatchUpdate(updates
182 .stream()
183 .map(this::toRawUpdateOperation)
184 .collect(Collectors.toList())));
185 }
186
187 private static <T> T complete(CompletableFuture<T> future) {
188 try {
189 return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
190 } catch (InterruptedException e) {
191 Thread.currentThread().interrupt();
192 throw new ConsistentMapException.Interrupted();
193 } catch (TimeoutException e) {
194 throw new ConsistentMapException.Timeout();
195 } catch (ExecutionException e) {
196 throw new ConsistentMapException(e.getCause());
197 }
198 }
199
200 private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
201 return new AbstractMap.SimpleEntry<>(
202 dK(e.getKey()),
203 new Versioned<>(
204 serializer.decode(e.getValue().value()),
205 e.getValue().version()));
206 }
207
208 private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
209
210 checkArgument(name.equals(update.tableName()), "Unexpected table name");
211
212 UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
213
214 rawUpdate = rawUpdate.withKey(keyCache.getUnchecked(update.key()))
215 .withCurrentVersion(update.currentVersion())
216 .withType(update.type());
217
218 rawUpdate = rawUpdate.withTableName(update.tableName());
219
220 if (update.value() != null) {
221 rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
222 } else {
223 checkState(update.type() == UpdateOperation.Type.REMOVE
224 || update.type() == UpdateOperation.Type.REMOVE_IF_VERSION_MATCH,
225 ERROR_NULL_VALUE);
226 }
227
228 if (update.currentValue() != null) {
229 rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
230 }
231
232 return rawUpdate.build();
233 }
234}