blob: 469cf11ba1646faef4b162ce3d30dd9ebce03647 [file] [log] [blame]
Madan Jampani64689552015-02-17 10:00:27 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani64689552015-02-17 10:00:27 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampanif4c88502016-01-21 12:35:36 -080017package org.onosproject.store.primitives.impl;
Madan Jampani64689552015-02-17 10:00:27 -080018
Madan Jampani64689552015-02-17 10:00:27 -080019import java.util.List;
20import java.util.Map;
Madan Jampani64689552015-02-17 10:00:27 -080021import java.util.Set;
Madan Jampani74da78b2016-02-09 21:18:36 -080022import java.util.concurrent.CompletableFuture;
HIGUCHI Yuta16e9d282016-04-12 14:09:30 -070023import java.util.stream.Collectors;
24import java.util.stream.Stream;
25import org.apache.commons.lang3.tuple.Pair;
Madan Jampani64689552015-02-17 10:00:27 -080026import org.onlab.util.HexString;
Madan Jampani74da78b2016-02-09 21:18:36 -080027import org.onosproject.store.primitives.MapUpdate;
28import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampani64689552015-02-17 10:00:27 -080029import org.onosproject.store.service.ConsistentMap;
Madan Jampani74da78b2016-02-09 21:18:36 -080030import org.onosproject.store.service.MapTransaction;
Madan Jampani64689552015-02-17 10:00:27 -080031import org.onosproject.store.service.Serializer;
32import org.onosproject.store.service.TransactionContext;
33import org.onosproject.store.service.TransactionalMap;
Madan Jampani64689552015-02-17 10:00:27 -080034import org.onosproject.store.service.Versioned;
35
36import static com.google.common.base.Preconditions.*;
37
HIGUCHI Yutadc4394c2016-01-29 15:35:10 -080038import com.google.common.base.MoreObjects;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070039import com.google.common.base.Objects;
40import com.google.common.cache.CacheBuilder;
41import com.google.common.cache.CacheLoader;
42import com.google.common.cache.LoadingCache;
Madan Jampani64689552015-02-17 10:00:27 -080043import com.google.common.collect.Lists;
44import com.google.common.collect.Maps;
45import com.google.common.collect.Sets;
46
47/**
48 * Default Transactional Map implementation that provides a repeatable reads
49 * transaction isolation level.
50 *
51 * @param <K> key type
52 * @param <V> value type.
53 */
Madan Jampani74da78b2016-02-09 21:18:36 -080054public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
Madan Jampani64689552015-02-17 10:00:27 -080055
56 private final TransactionContext txContext;
57 private static final String TX_CLOSED_ERROR = "Transaction is closed";
Madan Jampani74da78b2016-02-09 21:18:36 -080058 private final AsyncConsistentMap<K, V> backingMap;
59 private final ConsistentMap<K, V> backingConsitentMap;
Madan Jampani64689552015-02-17 10:00:27 -080060 private final String name;
61 private final Serializer serializer;
62 private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
63 private final Map<K, V> writeCache = Maps.newConcurrentMap();
64 private final Set<K> deleteSet = Sets.newConcurrentHashSet();
65
Madan Jampanibff6d8f2015-03-31 16:53:47 -070066 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
67 private static final String ERROR_NULL_KEY = "Null key is not allowed";
68
69 private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
70 .softValues()
71 .build(new CacheLoader<K, String>() {
72
73 @Override
74 public String load(K key) {
75 return HexString.toHexString(serializer.encode(key));
76 }
77 });
78
79 protected K dK(String key) {
80 return serializer.decode(HexString.fromHexString(key));
81 }
82
Madan Jampani64689552015-02-17 10:00:27 -080083 public DefaultTransactionalMap(
84 String name,
Madan Jampani74da78b2016-02-09 21:18:36 -080085 AsyncConsistentMap<K, V> backingMap,
Madan Jampani64689552015-02-17 10:00:27 -080086 TransactionContext txContext,
87 Serializer serializer) {
88 this.name = name;
89 this.backingMap = backingMap;
Madan Jampani74da78b2016-02-09 21:18:36 -080090 this.backingConsitentMap = backingMap.asConsistentMap();
Madan Jampani64689552015-02-17 10:00:27 -080091 this.txContext = txContext;
92 this.serializer = serializer;
93 }
94
95 @Override
96 public V get(K key) {
97 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
Madan Jampanibff6d8f2015-03-31 16:53:47 -070098 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani64689552015-02-17 10:00:27 -080099 if (deleteSet.contains(key)) {
100 return null;
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700101 }
102 V latest = writeCache.get(key);
103 if (latest != null) {
104 return latest;
Madan Jampani64689552015-02-17 10:00:27 -0800105 } else {
Madan Jampani74da78b2016-02-09 21:18:36 -0800106 Versioned<V> v = readCache.computeIfAbsent(key, k -> backingConsitentMap.get(k));
Madan Jampani64689552015-02-17 10:00:27 -0800107 return v != null ? v.value() : null;
108 }
109 }
110
111 @Override
112 public V put(K key, V value) {
113 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700114 checkNotNull(value, ERROR_NULL_VALUE);
115
116 V latest = get(key);
117 writeCache.put(key, value);
Madan Jampani64689552015-02-17 10:00:27 -0800118 deleteSet.remove(key);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700119 return latest;
Madan Jampani64689552015-02-17 10:00:27 -0800120 }
121
122 @Override
123 public V remove(K key) {
124 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700125 V latest = get(key);
126 if (latest != null) {
127 writeCache.remove(key);
128 deleteSet.add(key);
129 }
130 return latest;
Madan Jampani64689552015-02-17 10:00:27 -0800131 }
132
133 @Override
134 public boolean remove(K key, V value) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700135 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
136 checkNotNull(value, ERROR_NULL_VALUE);
137 V latest = get(key);
138 if (Objects.equal(value, latest)) {
Madan Jampani64689552015-02-17 10:00:27 -0800139 remove(key);
140 return true;
141 }
142 return false;
143 }
144
145 @Override
146 public boolean replace(K key, V oldValue, V newValue) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700147 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
148 checkNotNull(oldValue, ERROR_NULL_VALUE);
149 checkNotNull(newValue, ERROR_NULL_VALUE);
150 V latest = get(key);
151 if (Objects.equal(oldValue, latest)) {
Madan Jampani64689552015-02-17 10:00:27 -0800152 put(key, newValue);
153 return true;
154 }
155 return false;
156 }
157
158 @Override
Madan Jampani64689552015-02-17 10:00:27 -0800159 public V putIfAbsent(K key, V value) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700160 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
161 checkNotNull(value, ERROR_NULL_VALUE);
162 V latest = get(key);
163 if (latest == null) {
Madan Jampani64689552015-02-17 10:00:27 -0800164 put(key, value);
Madan Jampani64689552015-02-17 10:00:27 -0800165 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700166 return latest;
Madan Jampani64689552015-02-17 10:00:27 -0800167 }
168
Madan Jampani74da78b2016-02-09 21:18:36 -0800169 @Override
170 public CompletableFuture<Boolean> prepare() {
171 return backingMap.prepare(new MapTransaction<>(txContext.transactionId(), updates()));
172 }
173
174 @Override
175 public CompletableFuture<Void> commit() {
176 return backingMap.commit(txContext.transactionId());
177 }
178
179 @Override
180 public CompletableFuture<Void> rollback() {
181 return backingMap.rollback(txContext.transactionId());
182 }
183
184 @Override
Madan Jampani542d9e22016-04-05 15:39:55 -0700185 public CompletableFuture<Boolean> prepareAndCommit() {
186 return backingMap.prepareAndCommit(new MapTransaction<>(txContext.transactionId(), updates()));
187 }
188
189 @Override
190 public int totalUpdates() {
191 return updates().size();
Madan Jampani74da78b2016-02-09 21:18:36 -0800192 }
193
HIGUCHI Yuta16e9d282016-04-12 14:09:30 -0700194 @Override
195 public boolean hasPendingUpdates() {
196 return updatesStream().findAny().isPresent();
Madan Jampani74da78b2016-02-09 21:18:36 -0800197 }
198
HIGUCHI Yuta16e9d282016-04-12 14:09:30 -0700199 protected Stream<MapUpdate<K, V>> updatesStream() {
200 return Stream.concat(
201 // 1st stream: delete ops
202 deleteSet.stream()
203 .map(key -> Pair.of(key, readCache.get(key)))
204 .filter(e -> e.getValue() != null)
205 .map(e -> MapUpdate.<K, V>newBuilder()
206 .withMapName(name)
207 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
208 .withKey(e.getKey())
209 .withCurrentVersion(e.getValue().version())
210 .build()),
211 // 2nd stream: write ops
212 writeCache.entrySet().stream()
213 .map(e -> {
214 Versioned<V> original = readCache.get(e.getKey());
215 if (original == null) {
216 return MapUpdate.<K, V>newBuilder()
217 .withMapName(name)
218 .withType(MapUpdate.Type.PUT_IF_ABSENT)
219 .withKey(e.getKey())
220 .withValue(e.getValue())
221 .build();
222 } else {
223 return MapUpdate.<K, V>newBuilder()
224 .withMapName(name)
225 .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
226 .withKey(e.getKey())
227 .withCurrentVersion(original.version())
228 .withValue(e.getValue())
229 .build();
230 }
231 }));
232 }
233
234 protected List<MapUpdate<K, V>> updates() {
235 return updatesStream().collect(Collectors.toList());
236 }
Madan Jampani74da78b2016-02-09 21:18:36 -0800237
Madan Jampanicadd70b2016-02-08 13:45:43 -0800238 protected List<MapUpdate<String, byte[]>> toMapUpdates() {
239 List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
Madan Jampani64689552015-02-17 10:00:27 -0800240 deleteSet.forEach(key -> {
241 Versioned<V> original = readCache.get(key);
242 if (original != null) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800243 updates.add(MapUpdate.<String, byte[]>newBuilder()
Madan Jampani7804c992015-07-20 13:20:19 -0700244 .withMapName(name)
Madan Jampanicadd70b2016-02-08 13:45:43 -0800245 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700246 .withKey(keyCache.getUnchecked(key))
Madan Jampani64689552015-02-17 10:00:27 -0800247 .withCurrentVersion(original.version())
248 .build());
249 }
250 });
251 writeCache.forEach((key, value) -> {
252 Versioned<V> original = readCache.get(key);
253 if (original == null) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800254 updates.add(MapUpdate.<String, byte[]>newBuilder()
Madan Jampani7804c992015-07-20 13:20:19 -0700255 .withMapName(name)
Madan Jampanicadd70b2016-02-08 13:45:43 -0800256 .withType(MapUpdate.Type.PUT_IF_ABSENT)
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700257 .withKey(keyCache.getUnchecked(key))
258 .withValue(serializer.encode(value))
Madan Jampani64689552015-02-17 10:00:27 -0800259 .build());
260 } else {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800261 updates.add(MapUpdate.<String, byte[]>newBuilder()
Madan Jampani7804c992015-07-20 13:20:19 -0700262 .withMapName(name)
Madan Jampanicadd70b2016-02-08 13:45:43 -0800263 .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700264 .withKey(keyCache.getUnchecked(key))
Madan Jampani64689552015-02-17 10:00:27 -0800265 .withCurrentVersion(original.version())
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700266 .withValue(serializer.encode(value))
Madan Jampani64689552015-02-17 10:00:27 -0800267 .build());
268 }
269 });
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700270 return updates;
Madan Jampani64689552015-02-17 10:00:27 -0800271 }
272
HIGUCHI Yutadc4394c2016-01-29 15:35:10 -0800273 @Override
274 public String toString() {
275 return MoreObjects.toStringHelper(this)
276 .add("backingMap", backingMap)
Madan Jampani74da78b2016-02-09 21:18:36 -0800277 .add("updates", updates())
HIGUCHI Yutadc4394c2016-01-29 15:35:10 -0800278 .toString();
279 }
280
Madan Jampani64689552015-02-17 10:00:27 -0800281 /**
282 * Discards all changes made to this transactional map.
283 */
Madan Jampani74da78b2016-02-09 21:18:36 -0800284 protected void abort() {
Madan Jampani64689552015-02-17 10:00:27 -0800285 readCache.clear();
286 writeCache.clear();
287 deleteSet.clear();
288 }
289}