blob: 0ceb5664b93dc8f8a2a4f71bf6a56c5e68611af4 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani09342702015-02-05 23:32:40 -080017package org.onosproject.store.consistent.impl;
18
19import static com.google.common.base.Preconditions.*;
20
Madan Jampani09342702015-02-05 23:32:40 -080021import java.util.Collection;
22import java.util.Collections;
Madan Jampani09342702015-02-05 23:32:40 -080023import java.util.Map;
24import java.util.Map.Entry;
25import java.util.concurrent.CompletableFuture;
26import java.util.concurrent.ExecutionException;
27import java.util.concurrent.TimeUnit;
28import java.util.concurrent.TimeoutException;
29import java.util.stream.Collectors;
30import java.util.Set;
31
Madan Jampani393e0f02015-02-12 07:35:39 +053032import org.apache.commons.lang3.tuple.Pair;
Madan Jampani09342702015-02-05 23:32:40 -080033import org.onlab.util.HexString;
Madan Jampani393e0f02015-02-12 07:35:39 +053034import org.onosproject.store.service.ConsistentMap;
35import org.onosproject.store.service.ConsistentMapException;
36import org.onosproject.store.service.Serializer;
Madan Jampani393e0f02015-02-12 07:35:39 +053037import org.onosproject.store.service.Versioned;
Madan Jampani09342702015-02-05 23:32:40 -080038
39import com.google.common.cache.CacheBuilder;
40import com.google.common.cache.CacheLoader;
41import com.google.common.cache.LoadingCache;
42
43/**
44 * ConsistentMap implementation that is backed by a Raft consensus
45 * based database.
46 *
47 * @param <K> type of key.
48 * @param <V> type of value.
49 */
50public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
51
52 private final String name;
53 private final DatabaseProxy<String, byte[]> proxy;
Madan Jampani393e0f02015-02-12 07:35:39 +053054 private final Serializer serializer;
Madan Jampani09342702015-02-05 23:32:40 -080055
Madan Jampanid14166a2015-02-24 17:37:51 -080056 private static final int OPERATION_TIMEOUT_MILLIS = 5000;
Madan Jampani09342702015-02-05 23:32:40 -080057 private static final String ERROR_NULL_KEY = "Key cannot be null";
58 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
59
60 private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
61 .softValues()
62 .build(new CacheLoader<K, String>() {
63
64 @Override
65 public String load(K key) {
66 return HexString.toHexString(serializer.encode(key));
67 }
68 });
69
70 protected K dK(String key) {
71 return serializer.decode(HexString.fromHexString(key));
72 }
73
Madan Jampani64689552015-02-17 10:00:27 -080074 public ConsistentMapImpl(String name,
Madan Jampani09342702015-02-05 23:32:40 -080075 DatabaseProxy<String, byte[]> proxy,
Madan Jampani393e0f02015-02-12 07:35:39 +053076 Serializer serializer) {
Madan Jampani09342702015-02-05 23:32:40 -080077 this.name = checkNotNull(name, "map name cannot be null");
78 this.proxy = checkNotNull(proxy, "database proxy cannot be null");
79 this.serializer = checkNotNull(serializer, "serializer cannot be null");
80 }
81
82 @Override
83 public int size() {
84 return complete(proxy.size(name));
85 }
86
87 @Override
88 public boolean isEmpty() {
89 return complete(proxy.isEmpty(name));
90 }
91
92 @Override
93 public boolean containsKey(K key) {
94 checkNotNull(key, ERROR_NULL_KEY);
95 return complete(proxy.containsKey(name, keyCache.getUnchecked(key)));
96 }
97
98 @Override
99 public boolean containsValue(V value) {
100 checkNotNull(value, ERROR_NULL_VALUE);
101 return complete(proxy.containsValue(name, serializer.encode(value)));
102 }
103
104 @Override
105 public Versioned<V> get(K key) {
106 checkNotNull(key, ERROR_NULL_KEY);
107 Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
Madan Jampani30a57f82015-03-02 12:19:41 -0800108 if (value == null) {
109 return null;
110 }
111 return new Versioned<>(
112 serializer.decode(value.value()),
113 value.version(),
114 value.creationTime());
Madan Jampani09342702015-02-05 23:32:40 -0800115 }
116
117 @Override
118 public Versioned<V> put(K key, V value) {
119 checkNotNull(key, ERROR_NULL_KEY);
120 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani393e0f02015-02-12 07:35:39 +0530121 Versioned<byte[]> previousValue =
122 complete(proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)));
Madan Jampani30a57f82015-03-02 12:19:41 -0800123 if (previousValue == null) {
124 return null;
125 }
126 return new Versioned<>(
127 serializer.decode(previousValue.value()),
128 previousValue.version(),
129 previousValue.creationTime());
Madan Jampani09342702015-02-05 23:32:40 -0800130 }
131
132 @Override
133 public Versioned<V> remove(K key) {
134 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani393e0f02015-02-12 07:35:39 +0530135 Versioned<byte[]> value = complete(proxy.remove(name, keyCache.getUnchecked(key)));
Madan Jampani30a57f82015-03-02 12:19:41 -0800136 if (value == null) {
137 return null;
138 }
139 return new Versioned<>(
140 serializer.decode(value.value()),
141 value.version(),
142 value.creationTime());
Madan Jampani09342702015-02-05 23:32:40 -0800143 }
144
145 @Override
146 public void clear() {
147 complete(proxy.clear(name));
148 }
149
150 @Override
151 public Set<K> keySet() {
152 return Collections.unmodifiableSet(complete(proxy.keySet(name))
153 .stream()
154 .map(this::dK)
155 .collect(Collectors.toSet()));
156 }
157
158 @Override
159 public Collection<Versioned<V>> values() {
160 return Collections.unmodifiableList(complete(proxy.values(name))
161 .stream()
Madan Jampani30a57f82015-03-02 12:19:41 -0800162 .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime()))
Madan Jampani09342702015-02-05 23:32:40 -0800163 .collect(Collectors.toList()));
164 }
165
166 @Override
167 public Set<Entry<K, Versioned<V>>> entrySet() {
168 return Collections.unmodifiableSet(complete(proxy.entrySet(name))
169 .stream()
170 .map(this::fromRawEntry)
171 .collect(Collectors.toSet()));
172 }
173
174 @Override
175 public Versioned<V> putIfAbsent(K key, V value) {
176 checkNotNull(key, ERROR_NULL_KEY);
177 checkNotNull(value, ERROR_NULL_VALUE);
178 Versioned<byte[]> existingValue = complete(proxy.putIfAbsent(
179 name, keyCache.getUnchecked(key), serializer.encode(value)));
Madan Jampani30a57f82015-03-02 12:19:41 -0800180 if (existingValue == null) {
181 return null;
182 }
183 return new Versioned<>(
184 serializer.decode(existingValue.value()),
185 existingValue.version(),
186 existingValue.creationTime());
Madan Jampani09342702015-02-05 23:32:40 -0800187 }
188
189 @Override
190 public boolean remove(K key, V value) {
191 checkNotNull(key, ERROR_NULL_KEY);
192 checkNotNull(value, ERROR_NULL_VALUE);
193 return complete(proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value)));
194 }
195
196 @Override
197 public boolean remove(K key, long version) {
198 checkNotNull(key, ERROR_NULL_KEY);
199 return complete(proxy.remove(name, keyCache.getUnchecked(key), version));
200
201 }
202
203 @Override
204 public boolean replace(K key, V oldValue, V newValue) {
205 checkNotNull(key, ERROR_NULL_KEY);
206 checkNotNull(newValue, ERROR_NULL_VALUE);
207 byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
208 return complete(proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue)));
209 }
210
211 @Override
212 public boolean replace(K key, long oldVersion, V newValue) {
213 checkNotNull(key, ERROR_NULL_KEY);
214 checkNotNull(newValue, ERROR_NULL_VALUE);
215 return complete(proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)));
216 }
217
Madan Jampani09342702015-02-05 23:32:40 -0800218 private static <T> T complete(CompletableFuture<T> future) {
219 try {
220 return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
221 } catch (InterruptedException e) {
222 Thread.currentThread().interrupt();
223 throw new ConsistentMapException.Interrupted();
224 } catch (TimeoutException e) {
225 throw new ConsistentMapException.Timeout();
226 } catch (ExecutionException e) {
227 throw new ConsistentMapException(e.getCause());
228 }
229 }
230
231 private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530232 return Pair.of(
Madan Jampani09342702015-02-05 23:32:40 -0800233 dK(e.getKey()),
234 new Versioned<>(
235 serializer.decode(e.getValue().value()),
Madan Jampani30a57f82015-03-02 12:19:41 -0800236 e.getValue().version(),
237 e.getValue().creationTime()));
Madan Jampani09342702015-02-05 23:32:40 -0800238 }
Madan Jampani09342702015-02-05 23:32:40 -0800239}