blob: c98c336f71dac64a5f147e8be6416013caa655f5 [file] [log] [blame]
Madan Jampani64689552015-02-17 10:00:27 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.consistent.impl;
18
19import java.util.Collection;
20import java.util.List;
21import java.util.Map;
22import java.util.Map.Entry;
23import java.util.stream.Collectors;
24import java.util.Set;
25
26import org.onlab.util.HexString;
27import org.onosproject.store.service.ConsistentMap;
28import org.onosproject.store.service.Serializer;
29import org.onosproject.store.service.TransactionContext;
30import org.onosproject.store.service.TransactionalMap;
31import org.onosproject.store.service.UpdateOperation;
32import org.onosproject.store.service.Versioned;
33
34import static com.google.common.base.Preconditions.*;
35
36import com.google.common.collect.Lists;
37import com.google.common.collect.Maps;
38import com.google.common.collect.Sets;
39
40/**
41 * Default Transactional Map implementation that provides a repeatable reads
42 * transaction isolation level.
43 *
44 * @param <K> key type
45 * @param <V> value type.
46 */
47public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
48
49 private final TransactionContext txContext;
50 private static final String TX_CLOSED_ERROR = "Transaction is closed";
51 private final ConsistentMap<K, V> backingMap;
52 private final String name;
53 private final Serializer serializer;
54 private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
55 private final Map<K, V> writeCache = Maps.newConcurrentMap();
56 private final Set<K> deleteSet = Sets.newConcurrentHashSet();
57
58 public DefaultTransactionalMap(
59 String name,
60 ConsistentMap<K, V> backingMap,
61 TransactionContext txContext,
62 Serializer serializer) {
63 this.name = name;
64 this.backingMap = backingMap;
65 this.txContext = txContext;
66 this.serializer = serializer;
67 }
68
69 @Override
70 public V get(K key) {
71 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
72 if (deleteSet.contains(key)) {
73 return null;
74 } else if (writeCache.containsKey(key)) {
75 return writeCache.get(key);
76 } else {
77 if (!readCache.containsKey(key)) {
78 readCache.put(key, backingMap.get(key));
79 }
80 Versioned<V> v = readCache.get(key);
81 return v != null ? v.value() : null;
82 }
83 }
84
85 @Override
86 public V put(K key, V value) {
87 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
88 Versioned<V> original = readCache.get(key);
89 V recentUpdate = writeCache.put(key, value);
90 deleteSet.remove(key);
91 return recentUpdate == null ? (original != null ? original.value() : null) : recentUpdate;
92 }
93
94 @Override
95 public V remove(K key) {
96 checkState(txContext.isOpen(), TX_CLOSED_ERROR);
97 Versioned<V> original = readCache.get(key);
98 V recentUpdate = writeCache.remove(key);
99 deleteSet.add(key);
100 return recentUpdate == null ? (original != null ? original.value() : null) : recentUpdate;
101 }
102
103 @Override
104 public boolean remove(K key, V value) {
105 V currentValue = get(key);
106 if (value.equals(currentValue)) {
107 remove(key);
108 return true;
109 }
110 return false;
111 }
112
113 @Override
114 public boolean replace(K key, V oldValue, V newValue) {
115 V currentValue = get(key);
116 if (oldValue.equals(currentValue)) {
117 put(key, newValue);
118 return true;
119 }
120 return false;
121 }
122
123 @Override
124 public int size() {
125 // TODO
126 throw new UnsupportedOperationException();
127 }
128
129 @Override
130 public boolean isEmpty() {
131 return size() == 0;
132 }
133
134 @Override
135 public boolean containsKey(K key) {
136 return get(key) != null;
137 }
138
139 @Override
140 public boolean containsValue(V value) {
141 // TODO
142 throw new UnsupportedOperationException();
143 }
144
145 @Override
146 public void clear() {
147 // TODO
148 throw new UnsupportedOperationException();
149 }
150
151 @Override
152 public Set<K> keySet() {
153 // TODO
154 throw new UnsupportedOperationException();
155 }
156
157 @Override
158 public Collection<V> values() {
159 // TODO
160 throw new UnsupportedOperationException();
161 }
162
163 @Override
164 public Set<Entry<K, V>> entrySet() {
165 // TODO
166 throw new UnsupportedOperationException();
167 }
168
169 @Override
170 public V putIfAbsent(K key, V value) {
171 V currentValue = get(key);
172 if (currentValue == null) {
173 put(key, value);
174 return null;
175 }
176 return currentValue;
177 }
178
179 protected List<UpdateOperation<String, byte[]>> prepareDatabaseUpdates() {
180 List<UpdateOperation<K, V>> updates = Lists.newLinkedList();
181 deleteSet.forEach(key -> {
182 Versioned<V> original = readCache.get(key);
183 if (original != null) {
184 updates.add(UpdateOperation.<K, V>newBuilder()
185 .withTableName(name)
186 .withType(UpdateOperation.Type.REMOVE_IF_VERSION_MATCH)
187 .withKey(key)
188 .withCurrentVersion(original.version())
189 .build());
190 }
191 });
192 writeCache.forEach((key, value) -> {
193 Versioned<V> original = readCache.get(key);
194 if (original == null) {
195 updates.add(UpdateOperation.<K, V>newBuilder()
196 .withTableName(name)
197 .withType(UpdateOperation.Type.PUT_IF_ABSENT)
198 .withKey(key)
199 .withValue(value)
200 .build());
201 } else {
202 updates.add(UpdateOperation.<K, V>newBuilder()
203 .withTableName(name)
204 .withType(UpdateOperation.Type.PUT_IF_VERSION_MATCH)
205 .withKey(key)
206 .withCurrentVersion(original.version())
207 .withValue(value)
208 .build());
209 }
210 });
211 return updates.stream().map(this::toRawUpdateOperation).collect(Collectors.toList());
212 }
213
214 private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
215
216 UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
217
218 rawUpdate = rawUpdate.withKey(HexString.toHexString(serializer.encode(update.key())))
219 .withCurrentVersion(update.currentVersion())
220 .withType(update.type());
221
222 rawUpdate = rawUpdate.withTableName(update.tableName());
223
224 if (update.value() != null) {
225 rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
226 }
227
228 if (update.currentValue() != null) {
229 rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
230 }
231
232 return rawUpdate.build();
233 }
234
235 /**
236 * Discards all changes made to this transactional map.
237 */
238 protected void rollback() {
239 readCache.clear();
240 writeCache.clear();
241 deleteSet.clear();
242 }
243}