blob: d138c9b194ff69bfdebbd02d9e65fa1ba19843e8 [file] [log] [blame]
Madan Jampani64689552015-02-17 10:00:27 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani64689552015-02-17 10:00:27 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampanif4c88502016-01-21 12:35:36 -080017package org.onosproject.store.primitives.impl;
Madan Jampani64689552015-02-17 10:00:27 -080018
Madan Jampani64689552015-02-17 10:00:27 -080019import java.util.List;
20import java.util.Map;
Madan Jampani64689552015-02-17 10:00:27 -080021import java.util.Set;
Madan Jampani74da78b2016-02-09 21:18:36 -080022import java.util.concurrent.CompletableFuture;
HIGUCHI Yuta16e9d282016-04-12 14:09:30 -070023import java.util.stream.Collectors;
24import java.util.stream.Stream;
25import org.apache.commons.lang3.tuple.Pair;
Madan Jampani64689552015-02-17 10:00:27 -080026import org.onlab.util.HexString;
Madan Jampani74da78b2016-02-09 21:18:36 -080027import org.onosproject.store.primitives.MapUpdate;
28import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampani64689552015-02-17 10:00:27 -080029import org.onosproject.store.service.ConsistentMap;
Madan Jampani74da78b2016-02-09 21:18:36 -080030import org.onosproject.store.service.MapTransaction;
Madan Jampani64689552015-02-17 10:00:27 -080031import org.onosproject.store.service.Serializer;
32import org.onosproject.store.service.TransactionContext;
33import org.onosproject.store.service.TransactionalMap;
Madan Jampani64689552015-02-17 10:00:27 -080034import org.onosproject.store.service.Versioned;
35
36import static com.google.common.base.Preconditions.*;
37
HIGUCHI Yutadc4394c2016-01-29 15:35:10 -080038import com.google.common.base.MoreObjects;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070039import com.google.common.base.Objects;
40import com.google.common.cache.CacheBuilder;
41import com.google.common.cache.CacheLoader;
42import com.google.common.cache.LoadingCache;
Madan Jampani64689552015-02-17 10:00:27 -080043import com.google.common.collect.Lists;
44import com.google.common.collect.Maps;
45import com.google.common.collect.Sets;
46
47/**
48 * Default Transactional Map implementation that provides a repeatable reads
49 * transaction isolation level.
50 *
51 * @param <K> key type
52 * @param <V> value type.
53 */
Madan Jampani74da78b2016-02-09 21:18:36 -080054public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
Madan Jampani64689552015-02-17 10:00:27 -080055
56 private final TransactionContext txContext;
57 private static final String TX_CLOSED_ERROR = "Transaction is closed";
Madan Jampani74da78b2016-02-09 21:18:36 -080058 private final AsyncConsistentMap<K, V> backingMap;
Sho SHIMIZU2425fcf2016-06-16 09:05:03 -070059 private final ConsistentMap<K, V> backingConsistentMap;
Madan Jampani64689552015-02-17 10:00:27 -080060 private final String name;
61 private final Serializer serializer;
62 private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
63 private final Map<K, V> writeCache = Maps.newConcurrentMap();
64 private final Set<K> deleteSet = Sets.newConcurrentHashSet();
65
Madan Jampanibff6d8f2015-03-31 16:53:47 -070066 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
67 private static final String ERROR_NULL_KEY = "Null key is not allowed";
68
69 private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
70 .softValues()
71 .build(new CacheLoader<K, String>() {
72
73 @Override
74 public String load(K key) {
75 return HexString.toHexString(serializer.encode(key));
76 }
77 });
78
79 protected K dK(String key) {
80 return serializer.decode(HexString.fromHexString(key));
81 }
82
Madan Jampani64689552015-02-17 10:00:27 -080083 public DefaultTransactionalMap(
84 String name,
Madan Jampani74da78b2016-02-09 21:18:36 -080085 AsyncConsistentMap<K, V> backingMap,
Madan Jampani64689552015-02-17 10:00:27 -080086 TransactionContext txContext,
87 Serializer serializer) {
88 this.name = name;
89 this.backingMap = backingMap;
Sho SHIMIZU2425fcf2016-06-16 09:05:03 -070090 this.backingConsistentMap = backingMap.asConsistentMap();
Madan Jampani64689552015-02-17 10:00:27 -080091 this.txContext = txContext;
92 this.serializer = serializer;
93 }
94
95 @Override
96 public V get(K key) {
97 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
Madan Jampanibff6d8f2015-03-31 16:53:47 -070098 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani64689552015-02-17 10:00:27 -080099 if (deleteSet.contains(key)) {
100 return null;
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700101 }
102 V latest = writeCache.get(key);
103 if (latest != null) {
104 return latest;
Madan Jampani64689552015-02-17 10:00:27 -0800105 } else {
Sho SHIMIZU2425fcf2016-06-16 09:05:03 -0700106 Versioned<V> v = readCache.computeIfAbsent(key, k -> backingConsistentMap.get(k));
Madan Jampani64689552015-02-17 10:00:27 -0800107 return v != null ? v.value() : null;
108 }
109 }
110
111 @Override
Sho SHIMIZU637ae5d2016-06-10 11:08:13 -0700112 public boolean containsKey(K key) {
113 return get(key) != null;
114 }
115
116 @Override
Madan Jampani64689552015-02-17 10:00:27 -0800117 public V put(K key, V value) {
118 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700119 checkNotNull(value, ERROR_NULL_VALUE);
120
121 V latest = get(key);
122 writeCache.put(key, value);
Madan Jampani64689552015-02-17 10:00:27 -0800123 deleteSet.remove(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700124 return latest;
Madan Jampani64689552015-02-17 10:00:27 -0800125 }
126
127 @Override
128 public V remove(K key) {
129 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700130 V latest = get(key);
131 if (latest != null) {
132 writeCache.remove(key);
133 deleteSet.add(key);
134 }
135 return latest;
Madan Jampani64689552015-02-17 10:00:27 -0800136 }
137
138 @Override
139 public boolean remove(K key, V value) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700140 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
141 checkNotNull(value, ERROR_NULL_VALUE);
142 V latest = get(key);
143 if (Objects.equal(value, latest)) {
Madan Jampani64689552015-02-17 10:00:27 -0800144 remove(key);
145 return true;
146 }
147 return false;
148 }
149
150 @Override
151 public boolean replace(K key, V oldValue, V newValue) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700152 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
153 checkNotNull(oldValue, ERROR_NULL_VALUE);
154 checkNotNull(newValue, ERROR_NULL_VALUE);
155 V latest = get(key);
156 if (Objects.equal(oldValue, latest)) {
Madan Jampani64689552015-02-17 10:00:27 -0800157 put(key, newValue);
158 return true;
159 }
160 return false;
161 }
162
163 @Override
Madan Jampani64689552015-02-17 10:00:27 -0800164 public V putIfAbsent(K key, V value) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700165 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
166 checkNotNull(value, ERROR_NULL_VALUE);
167 V latest = get(key);
168 if (latest == null) {
Madan Jampani64689552015-02-17 10:00:27 -0800169 put(key, value);
Madan Jampani64689552015-02-17 10:00:27 -0800170 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700171 return latest;
Madan Jampani64689552015-02-17 10:00:27 -0800172 }
173
Madan Jampani74da78b2016-02-09 21:18:36 -0800174 @Override
175 public CompletableFuture<Boolean> prepare() {
176 return backingMap.prepare(new MapTransaction<>(txContext.transactionId(), updates()));
177 }
178
179 @Override
180 public CompletableFuture<Void> commit() {
181 return backingMap.commit(txContext.transactionId());
182 }
183
184 @Override
185 public CompletableFuture<Void> rollback() {
186 return backingMap.rollback(txContext.transactionId());
187 }
188
189 @Override
Madan Jampani542d9e22016-04-05 15:39:55 -0700190 public CompletableFuture<Boolean> prepareAndCommit() {
191 return backingMap.prepareAndCommit(new MapTransaction<>(txContext.transactionId(), updates()));
192 }
193
194 @Override
195 public int totalUpdates() {
196 return updates().size();
Madan Jampani74da78b2016-02-09 21:18:36 -0800197 }
198
HIGUCHI Yuta16e9d282016-04-12 14:09:30 -0700199 @Override
200 public boolean hasPendingUpdates() {
201 return updatesStream().findAny().isPresent();
Madan Jampani74da78b2016-02-09 21:18:36 -0800202 }
203
HIGUCHI Yuta16e9d282016-04-12 14:09:30 -0700204 protected Stream<MapUpdate<K, V>> updatesStream() {
205 return Stream.concat(
206 // 1st stream: delete ops
207 deleteSet.stream()
208 .map(key -> Pair.of(key, readCache.get(key)))
209 .filter(e -> e.getValue() != null)
210 .map(e -> MapUpdate.<K, V>newBuilder()
211 .withMapName(name)
212 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
213 .withKey(e.getKey())
214 .withCurrentVersion(e.getValue().version())
215 .build()),
216 // 2nd stream: write ops
217 writeCache.entrySet().stream()
218 .map(e -> {
219 Versioned<V> original = readCache.get(e.getKey());
220 if (original == null) {
221 return MapUpdate.<K, V>newBuilder()
222 .withMapName(name)
223 .withType(MapUpdate.Type.PUT_IF_ABSENT)
224 .withKey(e.getKey())
225 .withValue(e.getValue())
226 .build();
227 } else {
228 return MapUpdate.<K, V>newBuilder()
229 .withMapName(name)
230 .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
231 .withKey(e.getKey())
232 .withCurrentVersion(original.version())
233 .withValue(e.getValue())
234 .build();
235 }
236 }));
237 }
238
239 protected List<MapUpdate<K, V>> updates() {
240 return updatesStream().collect(Collectors.toList());
241 }
Madan Jampani74da78b2016-02-09 21:18:36 -0800242
Madan Jampanicadd70b2016-02-08 13:45:43 -0800243 protected List<MapUpdate<String, byte[]>> toMapUpdates() {
244 List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
Madan Jampani64689552015-02-17 10:00:27 -0800245 deleteSet.forEach(key -> {
246 Versioned<V> original = readCache.get(key);
247 if (original != null) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800248 updates.add(MapUpdate.<String, byte[]>newBuilder()
Madan Jampani7804c992015-07-20 13:20:19 -0700249 .withMapName(name)
Madan Jampanicadd70b2016-02-08 13:45:43 -0800250 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700251 .withKey(keyCache.getUnchecked(key))
Madan Jampani64689552015-02-17 10:00:27 -0800252 .withCurrentVersion(original.version())
253 .build());
254 }
255 });
256 writeCache.forEach((key, value) -> {
257 Versioned<V> original = readCache.get(key);
258 if (original == null) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800259 updates.add(MapUpdate.<String, byte[]>newBuilder()
Madan Jampani7804c992015-07-20 13:20:19 -0700260 .withMapName(name)
Madan Jampanicadd70b2016-02-08 13:45:43 -0800261 .withType(MapUpdate.Type.PUT_IF_ABSENT)
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700262 .withKey(keyCache.getUnchecked(key))
263 .withValue(serializer.encode(value))
Madan Jampani64689552015-02-17 10:00:27 -0800264 .build());
265 } else {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800266 updates.add(MapUpdate.<String, byte[]>newBuilder()
Madan Jampani7804c992015-07-20 13:20:19 -0700267 .withMapName(name)
Madan Jampanicadd70b2016-02-08 13:45:43 -0800268 .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700269 .withKey(keyCache.getUnchecked(key))
Madan Jampani64689552015-02-17 10:00:27 -0800270 .withCurrentVersion(original.version())
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700271 .withValue(serializer.encode(value))
Madan Jampani64689552015-02-17 10:00:27 -0800272 .build());
273 }
274 });
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700275 return updates;
Madan Jampani64689552015-02-17 10:00:27 -0800276 }
277
HIGUCHI Yutadc4394c2016-01-29 15:35:10 -0800278 @Override
279 public String toString() {
280 return MoreObjects.toStringHelper(this)
281 .add("backingMap", backingMap)
Madan Jampani74da78b2016-02-09 21:18:36 -0800282 .add("updates", updates())
HIGUCHI Yutadc4394c2016-01-29 15:35:10 -0800283 .toString();
284 }
285
Madan Jampani64689552015-02-17 10:00:27 -0800286 /**
287 * Discards all changes made to this transactional map.
288 */
Madan Jampani74da78b2016-02-09 21:18:36 -0800289 protected void abort() {
Madan Jampani64689552015-02-17 10:00:27 -0800290 readCache.clear();
291 writeCache.clear();
292 deleteSet.clear();
293 }
Sho SHIMIZU2425fcf2016-06-16 09:05:03 -0700294}