blob: 569cf953d6b8a23bcee5e941f6a8a92bce6411cd [file] [log] [blame]
Madan Jampani09342702015-02-05 23:32:40 -08001package org.onosproject.store.consistent.impl;
2
3import static com.google.common.base.Preconditions.*;
4
Madan Jampani09342702015-02-05 23:32:40 -08005import java.util.Collection;
6import java.util.Collections;
7import java.util.List;
8import java.util.Map;
9import java.util.Map.Entry;
10import java.util.concurrent.CompletableFuture;
11import java.util.concurrent.ExecutionException;
12import java.util.concurrent.TimeUnit;
13import java.util.concurrent.TimeoutException;
14import java.util.stream.Collectors;
15import java.util.Set;
16
Madan Jampani393e0f02015-02-12 07:35:39 +053017import org.apache.commons.lang3.tuple.Pair;
Madan Jampani09342702015-02-05 23:32:40 -080018import org.onlab.util.HexString;
Madan Jampani393e0f02015-02-12 07:35:39 +053019import org.onosproject.store.service.ConsistentMap;
20import org.onosproject.store.service.ConsistentMapException;
21import org.onosproject.store.service.Serializer;
22import org.onosproject.store.service.UpdateOperation;
23import org.onosproject.store.service.Versioned;
Madan Jampani09342702015-02-05 23:32:40 -080024
25import com.google.common.cache.CacheBuilder;
26import com.google.common.cache.CacheLoader;
27import com.google.common.cache.LoadingCache;
28
29/**
30 * ConsistentMap implementation that is backed by a Raft consensus
31 * based database.
32 *
33 * @param <K> type of key.
34 * @param <V> type of value.
35 */
36public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
37
38 private final String name;
39 private final DatabaseProxy<String, byte[]> proxy;
Madan Jampani393e0f02015-02-12 07:35:39 +053040 private final Serializer serializer;
Madan Jampani09342702015-02-05 23:32:40 -080041
42 private static final int OPERATION_TIMEOUT_MILLIS = 1000;
43 private static final String ERROR_NULL_KEY = "Key cannot be null";
44 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
45
46 private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
47 .softValues()
48 .build(new CacheLoader<K, String>() {
49
50 @Override
51 public String load(K key) {
52 return HexString.toHexString(serializer.encode(key));
53 }
54 });
55
56 protected K dK(String key) {
57 return serializer.decode(HexString.fromHexString(key));
58 }
59
60 ConsistentMapImpl(String name,
61 DatabaseProxy<String, byte[]> proxy,
Madan Jampani393e0f02015-02-12 07:35:39 +053062 Serializer serializer) {
Madan Jampani09342702015-02-05 23:32:40 -080063 this.name = checkNotNull(name, "map name cannot be null");
64 this.proxy = checkNotNull(proxy, "database proxy cannot be null");
65 this.serializer = checkNotNull(serializer, "serializer cannot be null");
66 }
67
68 @Override
69 public int size() {
70 return complete(proxy.size(name));
71 }
72
73 @Override
74 public boolean isEmpty() {
75 return complete(proxy.isEmpty(name));
76 }
77
78 @Override
79 public boolean containsKey(K key) {
80 checkNotNull(key, ERROR_NULL_KEY);
81 return complete(proxy.containsKey(name, keyCache.getUnchecked(key)));
82 }
83
84 @Override
85 public boolean containsValue(V value) {
86 checkNotNull(value, ERROR_NULL_VALUE);
87 return complete(proxy.containsValue(name, serializer.encode(value)));
88 }
89
90 @Override
91 public Versioned<V> get(K key) {
92 checkNotNull(key, ERROR_NULL_KEY);
93 Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
Madan Jampani393e0f02015-02-12 07:35:39 +053094 return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
Madan Jampani09342702015-02-05 23:32:40 -080095 }
96
97 @Override
98 public Versioned<V> put(K key, V value) {
99 checkNotNull(key, ERROR_NULL_KEY);
100 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani393e0f02015-02-12 07:35:39 +0530101 Versioned<byte[]> previousValue =
102 complete(proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)));
Madan Jampani09342702015-02-05 23:32:40 -0800103 return (previousValue != null) ?
104 new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
105
106 }
107
108 @Override
109 public Versioned<V> remove(K key) {
110 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani393e0f02015-02-12 07:35:39 +0530111 Versioned<byte[]> value = complete(proxy.remove(name, keyCache.getUnchecked(key)));
Madan Jampani09342702015-02-05 23:32:40 -0800112 return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
113 }
114
115 @Override
116 public void clear() {
117 complete(proxy.clear(name));
118 }
119
120 @Override
121 public Set<K> keySet() {
122 return Collections.unmodifiableSet(complete(proxy.keySet(name))
123 .stream()
124 .map(this::dK)
125 .collect(Collectors.toSet()));
126 }
127
128 @Override
129 public Collection<Versioned<V>> values() {
130 return Collections.unmodifiableList(complete(proxy.values(name))
131 .stream()
132 .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version()))
133 .collect(Collectors.toList()));
134 }
135
136 @Override
137 public Set<Entry<K, Versioned<V>>> entrySet() {
138 return Collections.unmodifiableSet(complete(proxy.entrySet(name))
139 .stream()
140 .map(this::fromRawEntry)
141 .collect(Collectors.toSet()));
142 }
143
144 @Override
145 public Versioned<V> putIfAbsent(K key, V value) {
146 checkNotNull(key, ERROR_NULL_KEY);
147 checkNotNull(value, ERROR_NULL_VALUE);
148 Versioned<byte[]> existingValue = complete(proxy.putIfAbsent(
149 name, keyCache.getUnchecked(key), serializer.encode(value)));
150 return (existingValue != null) ?
151 new Versioned<>(serializer.decode(existingValue.value()), existingValue.version()) : null;
152 }
153
154 @Override
155 public boolean remove(K key, V value) {
156 checkNotNull(key, ERROR_NULL_KEY);
157 checkNotNull(value, ERROR_NULL_VALUE);
158 return complete(proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value)));
159 }
160
161 @Override
162 public boolean remove(K key, long version) {
163 checkNotNull(key, ERROR_NULL_KEY);
164 return complete(proxy.remove(name, keyCache.getUnchecked(key), version));
165
166 }
167
168 @Override
169 public boolean replace(K key, V oldValue, V newValue) {
170 checkNotNull(key, ERROR_NULL_KEY);
171 checkNotNull(newValue, ERROR_NULL_VALUE);
172 byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
173 return complete(proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue)));
174 }
175
176 @Override
177 public boolean replace(K key, long oldVersion, V newValue) {
178 checkNotNull(key, ERROR_NULL_KEY);
179 checkNotNull(newValue, ERROR_NULL_VALUE);
180 return complete(proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)));
181 }
182
183 @Override
184 public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
185 checkNotNull(updates, "updates cannot be null");
186 return complete(proxy.atomicBatchUpdate(updates
187 .stream()
188 .map(this::toRawUpdateOperation)
189 .collect(Collectors.toList())));
190 }
191
192 private static <T> T complete(CompletableFuture<T> future) {
193 try {
194 return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
195 } catch (InterruptedException e) {
196 Thread.currentThread().interrupt();
197 throw new ConsistentMapException.Interrupted();
198 } catch (TimeoutException e) {
199 throw new ConsistentMapException.Timeout();
200 } catch (ExecutionException e) {
201 throw new ConsistentMapException(e.getCause());
202 }
203 }
204
205 private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530206 return Pair.of(
Madan Jampani09342702015-02-05 23:32:40 -0800207 dK(e.getKey()),
208 new Versioned<>(
209 serializer.decode(e.getValue().value()),
210 e.getValue().version()));
211 }
212
213 private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
214
215 checkArgument(name.equals(update.tableName()), "Unexpected table name");
216
217 UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
218
219 rawUpdate = rawUpdate.withKey(keyCache.getUnchecked(update.key()))
220 .withCurrentVersion(update.currentVersion())
221 .withType(update.type());
222
223 rawUpdate = rawUpdate.withTableName(update.tableName());
224
225 if (update.value() != null) {
226 rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
227 } else {
228 checkState(update.type() == UpdateOperation.Type.REMOVE
229 || update.type() == UpdateOperation.Type.REMOVE_IF_VERSION_MATCH,
230 ERROR_NULL_VALUE);
231 }
232
233 if (update.currentValue() != null) {
234 rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
235 }
236
237 return rawUpdate.build();
238 }
239}