blob: eb180713e1a537556a57cf8373b7d261a2e6bc22 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani09342702015-02-05 23:32:40 -080017package org.onosproject.store.consistent.impl;
18
19import static com.google.common.base.Preconditions.*;
20
Madan Jampani09342702015-02-05 23:32:40 -080021import java.util.Collection;
22import java.util.Collections;
23import java.util.List;
24import java.util.Map;
25import java.util.Map.Entry;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.ExecutionException;
28import java.util.concurrent.TimeUnit;
29import java.util.concurrent.TimeoutException;
30import java.util.stream.Collectors;
31import java.util.Set;
32
Madan Jampani393e0f02015-02-12 07:35:39 +053033import org.apache.commons.lang3.tuple.Pair;
Madan Jampani09342702015-02-05 23:32:40 -080034import org.onlab.util.HexString;
Madan Jampani393e0f02015-02-12 07:35:39 +053035import org.onosproject.store.service.ConsistentMap;
36import org.onosproject.store.service.ConsistentMapException;
37import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.UpdateOperation;
39import org.onosproject.store.service.Versioned;
Madan Jampani09342702015-02-05 23:32:40 -080040
41import com.google.common.cache.CacheBuilder;
42import com.google.common.cache.CacheLoader;
43import com.google.common.cache.LoadingCache;
44
45/**
46 * ConsistentMap implementation that is backed by a Raft consensus
47 * based database.
48 *
49 * @param <K> type of key.
50 * @param <V> type of value.
51 */
52public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
53
54 private final String name;
55 private final DatabaseProxy<String, byte[]> proxy;
Madan Jampani393e0f02015-02-12 07:35:39 +053056 private final Serializer serializer;
Madan Jampani09342702015-02-05 23:32:40 -080057
58 private static final int OPERATION_TIMEOUT_MILLIS = 1000;
59 private static final String ERROR_NULL_KEY = "Key cannot be null";
60 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
61
62 private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
63 .softValues()
64 .build(new CacheLoader<K, String>() {
65
66 @Override
67 public String load(K key) {
68 return HexString.toHexString(serializer.encode(key));
69 }
70 });
71
72 protected K dK(String key) {
73 return serializer.decode(HexString.fromHexString(key));
74 }
75
76 ConsistentMapImpl(String name,
77 DatabaseProxy<String, byte[]> proxy,
Madan Jampani393e0f02015-02-12 07:35:39 +053078 Serializer serializer) {
Madan Jampani09342702015-02-05 23:32:40 -080079 this.name = checkNotNull(name, "map name cannot be null");
80 this.proxy = checkNotNull(proxy, "database proxy cannot be null");
81 this.serializer = checkNotNull(serializer, "serializer cannot be null");
82 }
83
84 @Override
85 public int size() {
86 return complete(proxy.size(name));
87 }
88
89 @Override
90 public boolean isEmpty() {
91 return complete(proxy.isEmpty(name));
92 }
93
94 @Override
95 public boolean containsKey(K key) {
96 checkNotNull(key, ERROR_NULL_KEY);
97 return complete(proxy.containsKey(name, keyCache.getUnchecked(key)));
98 }
99
100 @Override
101 public boolean containsValue(V value) {
102 checkNotNull(value, ERROR_NULL_VALUE);
103 return complete(proxy.containsValue(name, serializer.encode(value)));
104 }
105
106 @Override
107 public Versioned<V> get(K key) {
108 checkNotNull(key, ERROR_NULL_KEY);
109 Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
Madan Jampani393e0f02015-02-12 07:35:39 +0530110 return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
Madan Jampani09342702015-02-05 23:32:40 -0800111 }
112
113 @Override
114 public Versioned<V> put(K key, V value) {
115 checkNotNull(key, ERROR_NULL_KEY);
116 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani393e0f02015-02-12 07:35:39 +0530117 Versioned<byte[]> previousValue =
118 complete(proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)));
Madan Jampani09342702015-02-05 23:32:40 -0800119 return (previousValue != null) ?
120 new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
121
122 }
123
124 @Override
125 public Versioned<V> remove(K key) {
126 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani393e0f02015-02-12 07:35:39 +0530127 Versioned<byte[]> value = complete(proxy.remove(name, keyCache.getUnchecked(key)));
Madan Jampani09342702015-02-05 23:32:40 -0800128 return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
129 }
130
131 @Override
132 public void clear() {
133 complete(proxy.clear(name));
134 }
135
136 @Override
137 public Set<K> keySet() {
138 return Collections.unmodifiableSet(complete(proxy.keySet(name))
139 .stream()
140 .map(this::dK)
141 .collect(Collectors.toSet()));
142 }
143
144 @Override
145 public Collection<Versioned<V>> values() {
146 return Collections.unmodifiableList(complete(proxy.values(name))
147 .stream()
148 .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version()))
149 .collect(Collectors.toList()));
150 }
151
152 @Override
153 public Set<Entry<K, Versioned<V>>> entrySet() {
154 return Collections.unmodifiableSet(complete(proxy.entrySet(name))
155 .stream()
156 .map(this::fromRawEntry)
157 .collect(Collectors.toSet()));
158 }
159
160 @Override
161 public Versioned<V> putIfAbsent(K key, V value) {
162 checkNotNull(key, ERROR_NULL_KEY);
163 checkNotNull(value, ERROR_NULL_VALUE);
164 Versioned<byte[]> existingValue = complete(proxy.putIfAbsent(
165 name, keyCache.getUnchecked(key), serializer.encode(value)));
166 return (existingValue != null) ?
167 new Versioned<>(serializer.decode(existingValue.value()), existingValue.version()) : null;
168 }
169
170 @Override
171 public boolean remove(K key, V value) {
172 checkNotNull(key, ERROR_NULL_KEY);
173 checkNotNull(value, ERROR_NULL_VALUE);
174 return complete(proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value)));
175 }
176
177 @Override
178 public boolean remove(K key, long version) {
179 checkNotNull(key, ERROR_NULL_KEY);
180 return complete(proxy.remove(name, keyCache.getUnchecked(key), version));
181
182 }
183
184 @Override
185 public boolean replace(K key, V oldValue, V newValue) {
186 checkNotNull(key, ERROR_NULL_KEY);
187 checkNotNull(newValue, ERROR_NULL_VALUE);
188 byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
189 return complete(proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue)));
190 }
191
192 @Override
193 public boolean replace(K key, long oldVersion, V newValue) {
194 checkNotNull(key, ERROR_NULL_KEY);
195 checkNotNull(newValue, ERROR_NULL_VALUE);
196 return complete(proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)));
197 }
198
199 @Override
200 public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
201 checkNotNull(updates, "updates cannot be null");
202 return complete(proxy.atomicBatchUpdate(updates
203 .stream()
204 .map(this::toRawUpdateOperation)
205 .collect(Collectors.toList())));
206 }
207
208 private static <T> T complete(CompletableFuture<T> future) {
209 try {
210 return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
211 } catch (InterruptedException e) {
212 Thread.currentThread().interrupt();
213 throw new ConsistentMapException.Interrupted();
214 } catch (TimeoutException e) {
215 throw new ConsistentMapException.Timeout();
216 } catch (ExecutionException e) {
217 throw new ConsistentMapException(e.getCause());
218 }
219 }
220
221 private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530222 return Pair.of(
Madan Jampani09342702015-02-05 23:32:40 -0800223 dK(e.getKey()),
224 new Versioned<>(
225 serializer.decode(e.getValue().value()),
226 e.getValue().version()));
227 }
228
229 private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
230
231 checkArgument(name.equals(update.tableName()), "Unexpected table name");
232
233 UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
234
235 rawUpdate = rawUpdate.withKey(keyCache.getUnchecked(update.key()))
236 .withCurrentVersion(update.currentVersion())
237 .withType(update.type());
238
239 rawUpdate = rawUpdate.withTableName(update.tableName());
240
241 if (update.value() != null) {
242 rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
243 } else {
244 checkState(update.type() == UpdateOperation.Type.REMOVE
245 || update.type() == UpdateOperation.Type.REMOVE_IF_VERSION_MATCH,
246 ERROR_NULL_VALUE);
247 }
248
249 if (update.currentValue() != null) {
250 rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
251 }
252
253 return rawUpdate.build();
254 }
255}