blob: a5b665677626c74072535f8bf759b772160a698b [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani09342702015-02-05 23:32:40 -080017package org.onosproject.store.consistent.impl;
18
19import static com.google.common.base.Preconditions.*;
20
Madan Jampani09342702015-02-05 23:32:40 -080021import java.util.Collection;
22import java.util.Collections;
Madan Jampani09342702015-02-05 23:32:40 -080023import java.util.Map;
24import java.util.Map.Entry;
25import java.util.concurrent.CompletableFuture;
26import java.util.concurrent.ExecutionException;
27import java.util.concurrent.TimeUnit;
28import java.util.concurrent.TimeoutException;
29import java.util.stream.Collectors;
30import java.util.Set;
31
Madan Jampani393e0f02015-02-12 07:35:39 +053032import org.apache.commons.lang3.tuple.Pair;
Madan Jampani09342702015-02-05 23:32:40 -080033import org.onlab.util.HexString;
Madan Jampani393e0f02015-02-12 07:35:39 +053034import org.onosproject.store.service.ConsistentMap;
35import org.onosproject.store.service.ConsistentMapException;
36import org.onosproject.store.service.Serializer;
Madan Jampani393e0f02015-02-12 07:35:39 +053037import org.onosproject.store.service.Versioned;
Madan Jampani09342702015-02-05 23:32:40 -080038
39import com.google.common.cache.CacheBuilder;
40import com.google.common.cache.CacheLoader;
41import com.google.common.cache.LoadingCache;
42
43/**
44 * ConsistentMap implementation that is backed by a Raft consensus
45 * based database.
46 *
47 * @param <K> type of key.
48 * @param <V> type of value.
49 */
50public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
51
52 private final String name;
53 private final DatabaseProxy<String, byte[]> proxy;
Madan Jampani393e0f02015-02-12 07:35:39 +053054 private final Serializer serializer;
Madan Jampani09342702015-02-05 23:32:40 -080055
56 private static final int OPERATION_TIMEOUT_MILLIS = 1000;
57 private static final String ERROR_NULL_KEY = "Key cannot be null";
58 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
59
60 private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
61 .softValues()
62 .build(new CacheLoader<K, String>() {
63
64 @Override
65 public String load(K key) {
66 return HexString.toHexString(serializer.encode(key));
67 }
68 });
69
70 protected K dK(String key) {
71 return serializer.decode(HexString.fromHexString(key));
72 }
73
Madan Jampani64689552015-02-17 10:00:27 -080074 public ConsistentMapImpl(String name,
Madan Jampani09342702015-02-05 23:32:40 -080075 DatabaseProxy<String, byte[]> proxy,
Madan Jampani393e0f02015-02-12 07:35:39 +053076 Serializer serializer) {
Madan Jampani09342702015-02-05 23:32:40 -080077 this.name = checkNotNull(name, "map name cannot be null");
78 this.proxy = checkNotNull(proxy, "database proxy cannot be null");
79 this.serializer = checkNotNull(serializer, "serializer cannot be null");
80 }
81
82 @Override
83 public int size() {
84 return complete(proxy.size(name));
85 }
86
87 @Override
88 public boolean isEmpty() {
89 return complete(proxy.isEmpty(name));
90 }
91
92 @Override
93 public boolean containsKey(K key) {
94 checkNotNull(key, ERROR_NULL_KEY);
95 return complete(proxy.containsKey(name, keyCache.getUnchecked(key)));
96 }
97
98 @Override
99 public boolean containsValue(V value) {
100 checkNotNull(value, ERROR_NULL_VALUE);
101 return complete(proxy.containsValue(name, serializer.encode(value)));
102 }
103
104 @Override
105 public Versioned<V> get(K key) {
106 checkNotNull(key, ERROR_NULL_KEY);
107 Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
Madan Jampani393e0f02015-02-12 07:35:39 +0530108 return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
Madan Jampani09342702015-02-05 23:32:40 -0800109 }
110
111 @Override
112 public Versioned<V> put(K key, V value) {
113 checkNotNull(key, ERROR_NULL_KEY);
114 checkNotNull(value, ERROR_NULL_VALUE);
Madan Jampani393e0f02015-02-12 07:35:39 +0530115 Versioned<byte[]> previousValue =
116 complete(proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)));
Madan Jampani09342702015-02-05 23:32:40 -0800117 return (previousValue != null) ?
118 new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
119
120 }
121
122 @Override
123 public Versioned<V> remove(K key) {
124 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani393e0f02015-02-12 07:35:39 +0530125 Versioned<byte[]> value = complete(proxy.remove(name, keyCache.getUnchecked(key)));
Madan Jampani09342702015-02-05 23:32:40 -0800126 return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
127 }
128
129 @Override
130 public void clear() {
131 complete(proxy.clear(name));
132 }
133
134 @Override
135 public Set<K> keySet() {
136 return Collections.unmodifiableSet(complete(proxy.keySet(name))
137 .stream()
138 .map(this::dK)
139 .collect(Collectors.toSet()));
140 }
141
142 @Override
143 public Collection<Versioned<V>> values() {
144 return Collections.unmodifiableList(complete(proxy.values(name))
145 .stream()
146 .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version()))
147 .collect(Collectors.toList()));
148 }
149
150 @Override
151 public Set<Entry<K, Versioned<V>>> entrySet() {
152 return Collections.unmodifiableSet(complete(proxy.entrySet(name))
153 .stream()
154 .map(this::fromRawEntry)
155 .collect(Collectors.toSet()));
156 }
157
158 @Override
159 public Versioned<V> putIfAbsent(K key, V value) {
160 checkNotNull(key, ERROR_NULL_KEY);
161 checkNotNull(value, ERROR_NULL_VALUE);
162 Versioned<byte[]> existingValue = complete(proxy.putIfAbsent(
163 name, keyCache.getUnchecked(key), serializer.encode(value)));
164 return (existingValue != null) ?
165 new Versioned<>(serializer.decode(existingValue.value()), existingValue.version()) : null;
166 }
167
168 @Override
169 public boolean remove(K key, V value) {
170 checkNotNull(key, ERROR_NULL_KEY);
171 checkNotNull(value, ERROR_NULL_VALUE);
172 return complete(proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value)));
173 }
174
175 @Override
176 public boolean remove(K key, long version) {
177 checkNotNull(key, ERROR_NULL_KEY);
178 return complete(proxy.remove(name, keyCache.getUnchecked(key), version));
179
180 }
181
182 @Override
183 public boolean replace(K key, V oldValue, V newValue) {
184 checkNotNull(key, ERROR_NULL_KEY);
185 checkNotNull(newValue, ERROR_NULL_VALUE);
186 byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
187 return complete(proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue)));
188 }
189
190 @Override
191 public boolean replace(K key, long oldVersion, V newValue) {
192 checkNotNull(key, ERROR_NULL_KEY);
193 checkNotNull(newValue, ERROR_NULL_VALUE);
194 return complete(proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)));
195 }
196
Madan Jampani09342702015-02-05 23:32:40 -0800197 private static <T> T complete(CompletableFuture<T> future) {
198 try {
199 return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
200 } catch (InterruptedException e) {
201 Thread.currentThread().interrupt();
202 throw new ConsistentMapException.Interrupted();
203 } catch (TimeoutException e) {
204 throw new ConsistentMapException.Timeout();
205 } catch (ExecutionException e) {
206 throw new ConsistentMapException(e.getCause());
207 }
208 }
209
210 private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
Madan Jampani393e0f02015-02-12 07:35:39 +0530211 return Pair.of(
Madan Jampani09342702015-02-05 23:32:40 -0800212 dK(e.getKey()),
213 new Versioned<>(
214 serializer.decode(e.getValue().value()),
215 e.getValue().version()));
216 }
Madan Jampani09342702015-02-05 23:32:40 -0800217}