blob: 9d72be5e5b7cfaa85ffb3f8f01b3cb6f67e29327 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Jordan Halterman71635ae2017-07-28 10:35:43 -070018import java.util.Arrays;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070019import java.util.Collection;
20import java.util.HashMap;
21import java.util.Iterator;
22import java.util.LinkedHashMap;
23import java.util.List;
24import java.util.Map;
25import java.util.Set;
Jordan Halterman71635ae2017-07-28 10:35:43 -070026import java.util.function.Predicate;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070027import java.util.stream.Collectors;
28
29import com.google.common.base.Throwables;
30import com.google.common.collect.Lists;
31import com.google.common.collect.Maps;
32import com.google.common.collect.Sets;
33import io.atomix.protocols.raft.service.AbstractRaftService;
34import io.atomix.protocols.raft.service.Commit;
35import io.atomix.protocols.raft.service.RaftServiceExecutor;
36import io.atomix.protocols.raft.session.RaftSession;
37import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
38import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
39import org.onlab.util.KryoNamespace;
40import org.onlab.util.Match;
41import org.onosproject.store.primitives.MapUpdate;
42import org.onosproject.store.primitives.TransactionId;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
Jordan Halterman71635ae2017-07-28 10:35:43 -070047import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070054import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
55import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
56import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
57import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
58import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070059import org.onosproject.store.serializers.KryoNamespaces;
60import org.onosproject.store.service.MapEvent;
61import org.onosproject.store.service.Serializer;
62import org.onosproject.store.service.TransactionLog;
63import org.onosproject.store.service.Versioned;
64
65import static com.google.common.base.Preconditions.checkState;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapEvents.CHANGE;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ENTRY_SET;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
78import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
79import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
Jordan Halterman71635ae2017-07-28 10:35:43 -070080import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
81import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
82import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
83import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070084import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
Jordan Halterman71635ae2017-07-28 10:35:43 -070085import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
86import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
87import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
88import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
89import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070090import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
91import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070092import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070093
94/**
95 * State Machine for {@link AtomixConsistentMap} resource.
96 */
97public class AtomixConsistentMapService extends AbstractRaftService {
98
99 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
100 .register(KryoNamespaces.BASIC)
101 .register(AtomixConsistentMapOperations.NAMESPACE)
102 .register(AtomixConsistentMapEvents.NAMESPACE)
Jordan Halterman71635ae2017-07-28 10:35:43 -0700103 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700104 .register(TransactionScope.class)
105 .register(TransactionLog.class)
106 .register(TransactionId.class)
107 .register(MapEntryValue.class)
108 .register(MapEntryValue.Type.class)
109 .register(new HashMap().keySet().getClass())
110 .build());
111
Jordan Halterman71635ae2017-07-28 10:35:43 -0700112 protected Map<Long, RaftSession> listeners = new LinkedHashMap<>();
113 private Map<String, MapEntryValue> map;
114 protected Set<String> preparedKeys = Sets.newHashSet();
115 protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
116 protected long currentVersion;
117
118 public AtomixConsistentMapService() {
119 map = createMap();
120 }
121
122 protected Map<String, MapEntryValue> createMap() {
123 return Maps.newHashMap();
124 }
125
126 protected Map<String, MapEntryValue> entries() {
127 return map;
128 }
129
130 protected Serializer serializer() {
131 return SERIALIZER;
132 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700133
134 @Override
135 public void snapshot(SnapshotWriter writer) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700136 writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer()::encode);
137 writer.writeObject(preparedKeys, serializer()::encode);
138 writer.writeObject(entries(), serializer()::encode);
139 writer.writeObject(activeTransactions, serializer()::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700140 writer.writeLong(currentVersion);
141 }
142
143 @Override
144 public void install(SnapshotReader reader) {
145 listeners = new LinkedHashMap<>();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700146 for (Long sessionId : reader.<Set<Long>>readObject(serializer()::decode)) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700147 listeners.put(sessionId, getSessions().getSession(sessionId));
148 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700149 preparedKeys = reader.readObject(serializer()::decode);
150 map = reader.readObject(serializer()::decode);
151 activeTransactions = reader.readObject(serializer()::decode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700152 currentVersion = reader.readLong();
153 }
154
155 @Override
156 protected void configure(RaftServiceExecutor executor) {
157 // Listeners
Jordan Halterman71635ae2017-07-28 10:35:43 -0700158 executor.register(ADD_LISTENER, (Commit<Void> c) -> listen(c.session()));
159 executor.register(REMOVE_LISTENER, (Commit<Void> c) -> unlisten(c.session()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700160 // Queries
Jordan Halterman71635ae2017-07-28 10:35:43 -0700161 executor.register(CONTAINS_KEY, serializer()::decode, this::containsKey, serializer()::encode);
162 executor.register(CONTAINS_VALUE, serializer()::decode, this::containsValue, serializer()::encode);
163 executor.register(ENTRY_SET, (Commit<Void> c) -> entrySet(), serializer()::encode);
164 executor.register(GET, serializer()::decode, this::get, serializer()::encode);
165 executor.register(GET_OR_DEFAULT, serializer()::decode, this::getOrDefault, serializer()::encode);
166 executor.register(IS_EMPTY, (Commit<Void> c) -> isEmpty(), serializer()::encode);
167 executor.register(KEY_SET, (Commit<Void> c) -> keySet(), serializer()::encode);
168 executor.register(SIZE, (Commit<Void> c) -> size(), serializer()::encode);
169 executor.register(VALUES, (Commit<Void> c) -> values(), serializer()::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700170 // Commands
Jordan Halterman71635ae2017-07-28 10:35:43 -0700171 executor.register(PUT, serializer()::decode, this::put, serializer()::encode);
172 executor.register(PUT_IF_ABSENT, serializer()::decode, this::putIfAbsent, serializer()::encode);
173 executor.register(PUT_AND_GET, serializer()::decode, this::putAndGet, serializer()::encode);
174 executor.register(REMOVE, serializer()::decode, this::remove, serializer()::encode);
175 executor.register(REMOVE_VALUE, serializer()::decode, this::removeValue, serializer()::encode);
176 executor.register(REMOVE_VERSION, serializer()::decode, this::removeVersion, serializer()::encode);
177 executor.register(REPLACE, serializer()::decode, this::replace, serializer()::encode);
178 executor.register(REPLACE_VALUE, serializer()::decode, this::replaceValue, serializer()::encode);
179 executor.register(REPLACE_VERSION, serializer()::decode, this::replaceVersion, serializer()::encode);
180 executor.register(CLEAR, (Commit<Void> c) -> clear(), serializer()::encode);
181 executor.register(BEGIN, serializer()::decode, this::begin, serializer()::encode);
182 executor.register(PREPARE, serializer()::decode, this::prepare, serializer()::encode);
183 executor.register(PREPARE_AND_COMMIT, serializer()::decode, this::prepareAndCommit, serializer()::encode);
184 executor.register(COMMIT, serializer()::decode, this::commit, serializer()::encode);
185 executor.register(ROLLBACK, serializer()::decode, this::rollback, serializer()::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700186 }
187
188 /**
189 * Handles a contains key commit.
190 *
191 * @param commit containsKey commit
192 * @return {@code true} if map contains key
193 */
194 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700195 MapEntryValue value = entries().get(commit.value().key());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700196 return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
197 }
198
199 /**
200 * Handles a contains value commit.
201 *
202 * @param commit containsValue commit
203 * @return {@code true} if map contains value
204 */
205 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
206 Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
Jordan Halterman71635ae2017-07-28 10:35:43 -0700207 return entries().values().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700208 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
209 .anyMatch(value -> valueMatch.matches(value.value()));
210 }
211
212 /**
213 * Handles a get commit.
214 *
215 * @param commit get commit
216 * @return value mapped to key
217 */
218 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700219 return toVersioned(entries().get(commit.value().key()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700220 }
221
222 /**
223 * Handles a get or default commit.
224 *
225 * @param commit get or default commit
226 * @return value mapped to key
227 */
228 protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700229 MapEntryValue value = entries().get(commit.value().key());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700230 if (value == null) {
231 return new Versioned<>(commit.value().defaultValue(), 0);
232 } else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
233 return new Versioned<>(commit.value().defaultValue(), value.version);
234 } else {
235 return new Versioned<>(value.value(), value.version);
236 }
237 }
238
239 /**
Jordan Halterman71635ae2017-07-28 10:35:43 -0700240 * Handles a size commit.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700241 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700242 * @return number of entries in map
243 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700244 protected int size() {
245 return (int) entries().values().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700246 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
247 .count();
248 }
249
250 /**
251 * Handles an is empty commit.
252 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700253 * @return {@code true} if map is empty
254 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700255 protected boolean isEmpty() {
256 return entries().values().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700257 .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
258 }
259
260 /**
261 * Handles a keySet commit.
262 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700263 * @return set of keys in map
264 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700265 protected Set<String> keySet() {
266 return entries().entrySet().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700267 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
268 .map(Map.Entry::getKey)
269 .collect(Collectors.toSet());
270 }
271
272 /**
273 * Handles a values commit.
274 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700275 * @return collection of values in map
276 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700277 protected Collection<Versioned<byte[]>> values() {
278 return entries().entrySet().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700279 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
280 .map(entry -> toVersioned(entry.getValue()))
281 .collect(Collectors.toList());
282 }
283
284 /**
285 * Handles a entry set commit.
286 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700287 * @return set of map entries
288 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700289 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet() {
290 return entries().entrySet().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700291 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
292 .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
293 .collect(Collectors.toSet());
294 }
295
296 /**
Jordan Halterman71635ae2017-07-28 10:35:43 -0700297 * Returns a boolean indicating whether the given MapEntryValues are equal.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700298 *
Jordan Halterman71635ae2017-07-28 10:35:43 -0700299 * @param oldValue the first value to compare
300 * @param newValue the second value to compare
301 * @return indicates whether the two values are equal
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700302 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700303 protected boolean valuesEqual(MapEntryValue oldValue, MapEntryValue newValue) {
304 return (oldValue == null && newValue == null)
305 || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
306 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700307
Jordan Halterman71635ae2017-07-28 10:35:43 -0700308 /**
309 * Returns a boolean indicating whether the given entry values are equal.
310 *
311 * @param oldValue the first value to compare
312 * @param newValue the second value to compare
313 * @return indicates whether the two values are equal
314 */
315 protected boolean valuesEqual(byte[] oldValue, byte[] newValue) {
316 return (oldValue == null && newValue == null)
317 || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
318 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700319
Jordan Halterman71635ae2017-07-28 10:35:43 -0700320 /**
321 * Returns a boolean indicating whether the given MapEntryValue is null or a tombstone.
322 *
323 * @param value the value to check
324 * @return indicates whether the given value is null or is a tombstone
325 */
326 protected boolean valueIsNull(MapEntryValue value) {
327 return value == null || value.type() == MapEntryValue.Type.TOMBSTONE;
328 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700329
Jordan Halterman71635ae2017-07-28 10:35:43 -0700330 /**
331 * Handles a put commit.
332 *
333 * @param commit put commit
334 * @return map entry update result
335 */
336 protected MapEntryUpdateResult<String, byte[]> put(Commit<? extends Put> commit) {
337 String key = commit.value().key();
338 MapEntryValue oldValue = entries().get(key);
339 MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
Jordan Halterman70df7672017-08-03 16:25:19 -0700340
341 // If the value is null or a tombstone, this is an insert.
342 // Otherwise, only update the value if it has changed to reduce the number of events.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700343 if (valueIsNull(oldValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700344 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700345 if (preparedKeys.contains(key)) {
346 return new MapEntryUpdateResult<>(
347 MapEntryUpdateResult.Status.WRITE_LOCK,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700348 commit.index(),
Jordan Halterman71635ae2017-07-28 10:35:43 -0700349 key,
350 toVersioned(oldValue));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700351 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700352 entries().put(commit.value().key(),
353 new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
354 Versioned<byte[]> result = toVersioned(oldValue);
355 publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
356 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
357 } else if (!valuesEqual(oldValue, newValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700358 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700359 if (preparedKeys.contains(key)) {
360 return new MapEntryUpdateResult<>(
361 MapEntryUpdateResult.Status.WRITE_LOCK,
362 commit.index(),
363 key,
364 toVersioned(oldValue));
365 }
366 entries().put(commit.value().key(),
367 new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
368 Versioned<byte[]> result = toVersioned(oldValue);
369 publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
370 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700371 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700372 // If the value hasn't changed, return a NOOP result.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700373 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
374 }
375
376 /**
377 * Handles a putIfAbsent commit.
378 *
379 * @param commit putIfAbsent commit
380 * @return map entry update result
381 */
382 protected MapEntryUpdateResult<String, byte[]> putIfAbsent(Commit<? extends Put> commit) {
383 String key = commit.value().key();
384 MapEntryValue oldValue = entries().get(key);
Jordan Halterman70df7672017-08-03 16:25:19 -0700385
386 // If the value is null, this is an INSERT.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700387 if (valueIsNull(oldValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700388 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700389 if (preparedKeys.contains(key)) {
390 return new MapEntryUpdateResult<>(
391 MapEntryUpdateResult.Status.WRITE_LOCK,
392 commit.index(),
393 key,
394 toVersioned(oldValue));
395 }
396 MapEntryValue newValue = new MapEntryValue(
397 MapEntryValue.Type.VALUE,
398 commit.index(),
399 commit.value().value());
Jordan Halterman70df7672017-08-03 16:25:19 -0700400 entries().put(commit.value().key(), newValue);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700401 Versioned<byte[]> result = toVersioned(newValue);
402 publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
403 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
404 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700405 return new MapEntryUpdateResult<>(
406 MapEntryUpdateResult.Status.PRECONDITION_FAILED,
407 commit.index(),
408 key,
409 toVersioned(oldValue));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700410 }
411
412 /**
413 * Handles a putAndGet commit.
414 *
415 * @param commit putAndGet commit
416 * @return map entry update result
417 */
418 protected MapEntryUpdateResult<String, byte[]> putAndGet(Commit<? extends Put> commit) {
419 String key = commit.value().key();
420 MapEntryValue oldValue = entries().get(key);
421 MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
Jordan Halterman70df7672017-08-03 16:25:19 -0700422
423 // If the value is null or a tombstone, this is an insert.
424 // Otherwise, only update the value if it has changed to reduce the number of events.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700425 if (valueIsNull(oldValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700426 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700427 if (preparedKeys.contains(key)) {
428 return new MapEntryUpdateResult<>(
429 MapEntryUpdateResult.Status.WRITE_LOCK,
430 commit.index(),
431 key,
432 toVersioned(oldValue));
433 }
434 entries().put(commit.value().key(), newValue);
435 Versioned<byte[]> result = toVersioned(newValue);
436 publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
437 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
438 } else if (!valuesEqual(oldValue, newValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700439 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700440 if (preparedKeys.contains(key)) {
441 return new MapEntryUpdateResult<>(
442 MapEntryUpdateResult.Status.WRITE_LOCK,
443 commit.index(),
444 key,
445 toVersioned(oldValue));
446 }
447 entries().put(commit.value().key(), newValue);
448 Versioned<byte[]> result = toVersioned(newValue);
449 publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, result, toVersioned(oldValue)));
450 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
451 }
452 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
453 }
454
455 /**
456 * Handles a remove commit.
457 *
458 * @param index the commit index
459 * @param key the key to remove
460 * @param predicate predicate to determine whether to remove the entry
461 * @return map entry update result
462 */
463 private MapEntryUpdateResult<String, byte[]> removeIf(long index, String key, Predicate<MapEntryValue> predicate) {
464 MapEntryValue value = entries().get(key);
Jordan Halterman70df7672017-08-03 16:25:19 -0700465
466 // If the value does not exist or doesn't match the predicate, return a PRECONDITION_FAILED error.
467 if (valueIsNull(value) || !predicate.test(value)) {
468 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.PRECONDITION_FAILED, index, key, null);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700469 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700470
471 // If the key has been locked by a transaction, return a WRITE_LOCK error.
472 if (preparedKeys.contains(key)) {
473 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700474 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700475
476 // If no transactions are active, remove the key. Otherwise, replace it with a tombstone.
477 if (activeTransactions.isEmpty()) {
478 entries().remove(key);
479 } else {
480 entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, index, null));
481 }
482
Jordan Halterman71635ae2017-07-28 10:35:43 -0700483 Versioned<byte[]> result = toVersioned(value);
Jordan Halterman70df7672017-08-03 16:25:19 -0700484 publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700485 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
486 }
487
488 /**
489 * Handles a remove commit.
490 *
491 * @param commit remove commit
492 * @return map entry update result
493 */
494 protected MapEntryUpdateResult<String, byte[]> remove(Commit<? extends Remove> commit) {
495 return removeIf(commit.index(), commit.value().key(), v -> true);
496 }
497
498 /**
499 * Handles a removeValue commit.
500 *
501 * @param commit removeValue commit
502 * @return map entry update result
503 */
504 protected MapEntryUpdateResult<String, byte[]> removeValue(Commit<? extends RemoveValue> commit) {
505 return removeIf(commit.index(), commit.value().key(), v ->
506 valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
507 }
508
509 /**
510 * Handles a removeVersion commit.
511 *
512 * @param commit removeVersion commit
513 * @return map entry update result
514 */
515 protected MapEntryUpdateResult<String, byte[]> removeVersion(Commit<? extends RemoveVersion> commit) {
516 return removeIf(commit.index(), commit.value().key(), v -> v.version() == commit.value().version());
517 }
518
519 /**
520 * Handles a replace commit.
521 *
522 * @param index the commit index
523 * @param key the key to replace
524 * @param newValue the value with which to replace the key
525 * @param predicate a predicate to determine whether to replace the key
526 * @return map entry update result
527 */
528 private MapEntryUpdateResult<String, byte[]> replaceIf(
529 long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
530 MapEntryValue oldValue = entries().get(key);
Jordan Halterman70df7672017-08-03 16:25:19 -0700531
532 // If the key is not set or the current value doesn't match the predicate, return a PRECONDITION_FAILED error.
533 if (valueIsNull(oldValue) || !predicate.test(oldValue)) {
534 return new MapEntryUpdateResult<>(
535 MapEntryUpdateResult.Status.PRECONDITION_FAILED,
536 index,
537 key,
538 toVersioned(oldValue));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700539 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700540
541 // If the key has been locked by a transaction, return a WRITE_LOCK error.
542 if (preparedKeys.contains(key)) {
543 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700544 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700545
Jordan Halterman71635ae2017-07-28 10:35:43 -0700546 entries().put(key, newValue);
547 Versioned<byte[]> result = toVersioned(oldValue);
Jordan Halterman70df7672017-08-03 16:25:19 -0700548 publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700549 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
550 }
551
552 /**
553 * Handles a replace commit.
554 *
555 * @param commit replace commit
556 * @return map entry update result
557 */
558 protected MapEntryUpdateResult<String, byte[]> replace(Commit<? extends Replace> commit) {
559 MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
560 return replaceIf(commit.index(), commit.value().key(), value, v -> true);
561 }
562
563 /**
564 * Handles a replaceValue commit.
565 *
566 * @param commit replaceValue commit
567 * @return map entry update result
568 */
569 protected MapEntryUpdateResult<String, byte[]> replaceValue(Commit<? extends ReplaceValue> commit) {
570 MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
571 return replaceIf(commit.index(), commit.value().key(), value,
572 v -> valuesEqual(v.value(), commit.value().oldValue()));
573 }
574
575 /**
576 * Handles a replaceVersion commit.
577 *
578 * @param commit replaceVersion commit
579 * @return map entry update result
580 */
581 protected MapEntryUpdateResult<String, byte[]> replaceVersion(Commit<? extends ReplaceVersion> commit) {
582 MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
583 return replaceIf(commit.index(), commit.value().key(), value,
584 v -> v.version() == commit.value().oldVersion());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700585 }
586
587 /**
588 * Handles a clear commit.
589 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700590 * @return clear result
591 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700592 protected MapEntryUpdateResult.Status clear() {
593 Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
594 Map<String, MapEntryValue> entriesToAdd = new HashMap<>();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700595 while (iterator.hasNext()) {
596 Map.Entry<String, MapEntryValue> entry = iterator.next();
597 String key = entry.getKey();
598 MapEntryValue value = entry.getValue();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700599 if (!valueIsNull(value)) {
600 Versioned<byte[]> removedValue = new Versioned<>(value.value(), value.version());
601 publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, removedValue));
602 if (activeTransactions.isEmpty()) {
603 iterator.remove();
604 } else {
605 entriesToAdd.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
606 }
607 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700608 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700609 entries().putAll(entriesToAdd);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700610 return MapEntryUpdateResult.Status.OK;
611 }
612
613 /**
614 * Handles a listen commit.
615 *
Jordan Halterman71635ae2017-07-28 10:35:43 -0700616 * @param session listen session
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700617 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700618 protected void listen(RaftSession session) {
619 listeners.put(session.sessionId().id(), session);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700620 }
621
622 /**
623 * Handles an unlisten commit.
624 *
Jordan Halterman71635ae2017-07-28 10:35:43 -0700625 * @param session unlisten session
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700626 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700627 protected void unlisten(RaftSession session) {
628 listeners.remove(session.sessionId().id());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700629 }
630
631 /**
632 * Handles a begin commit.
633 *
634 * @param commit transaction begin commit
635 * @return transaction state version
636 */
637 protected long begin(Commit<? extends TransactionBegin> commit) {
638 long version = commit.index();
639 activeTransactions.put(commit.value().transactionId(), new TransactionScope(version));
640 return version;
641 }
642
643 /**
644 * Handles an prepare and commit commit.
645 *
646 * @param commit transaction prepare and commit commit
647 * @return prepare result
648 */
649 protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
650 TransactionId transactionId = commit.value().transactionLog().transactionId();
651 PrepareResult prepareResult = prepare(commit);
652 TransactionScope transactionScope = activeTransactions.remove(transactionId);
653 if (prepareResult == PrepareResult.OK) {
654 this.currentVersion = commit.index();
655 transactionScope = transactionScope.prepared(commit);
656 commitTransaction(transactionScope);
657 }
658 discardTombstones();
659 return prepareResult;
660 }
661
662 /**
663 * Handles an prepare commit.
664 *
665 * @param commit transaction prepare commit
666 * @return prepare result
667 */
668 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
669 try {
670 TransactionLog<MapUpdate<String, byte[]>> transactionLog = commit.value().transactionLog();
671
672 // Iterate through records in the transaction log and perform isolation checks.
673 for (MapUpdate<String, byte[]> record : transactionLog.records()) {
674 String key = record.key();
675
676 // If the record is a VERSION_MATCH then check that the record's version matches the current
677 // version of the state machine.
678 if (record.type() == MapUpdate.Type.VERSION_MATCH && key == null) {
679 if (record.version() > currentVersion) {
680 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
681 } else {
682 continue;
683 }
684 }
685
686 // If the prepared keys already contains the key contained within the record, that indicates a
687 // conflict with a concurrent transaction.
688 if (preparedKeys.contains(key)) {
689 return PrepareResult.CONCURRENT_TRANSACTION;
690 }
691
692 // Read the existing value from the map.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700693 MapEntryValue existingValue = entries().get(key);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700694
695 // Note: if the existing value is null, that means the key has not changed during the transaction,
696 // otherwise a tombstone would have been retained.
697 if (existingValue == null) {
698 // If the value is null, ensure the version is equal to the transaction version.
699 if (record.version() != transactionLog.version()) {
700 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
701 }
702 } else {
703 // If the value is non-null, compare the current version with the record version.
704 if (existingValue.version() > record.version()) {
705 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
706 }
707 }
708 }
709
710 // No violations detected. Mark modified keys locked for transactions.
711 transactionLog.records().forEach(record -> {
712 if (record.type() != MapUpdate.Type.VERSION_MATCH) {
713 preparedKeys.add(record.key());
714 }
715 });
716
717 // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
718 // coordinator is communicating with another node. Transactions assume that the client is communicating
719 // with a single leader in order to limit the overhead of retaining tombstones.
720 TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
721 if (transactionScope == null) {
722 activeTransactions.put(
723 transactionLog.transactionId(),
724 new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
725 return PrepareResult.PARTIAL_FAILURE;
726 } else {
727 activeTransactions.put(
728 transactionLog.transactionId(),
729 transactionScope.prepared(commit));
730 return PrepareResult.OK;
731 }
732 } catch (Exception e) {
733 getLogger().warn("Failure applying {}", commit, e);
734 throw Throwables.propagate(e);
735 }
736 }
737
738 /**
739 * Handles an commit commit (ha!).
740 *
741 * @param commit transaction commit commit
742 * @return commit result
743 */
744 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
745 TransactionId transactionId = commit.value().transactionId();
746 TransactionScope transactionScope = activeTransactions.remove(transactionId);
747 if (transactionScope == null) {
748 return CommitResult.UNKNOWN_TRANSACTION_ID;
749 }
750
751 try {
752 this.currentVersion = commit.index();
753 return commitTransaction(transactionScope);
754 } catch (Exception e) {
755 getLogger().warn("Failure applying {}", commit, e);
756 throw Throwables.propagate(e);
757 } finally {
758 discardTombstones();
759 }
760 }
761
762 /**
763 * Applies committed operations to the state machine.
764 */
765 private CommitResult commitTransaction(TransactionScope transactionScope) {
766 TransactionLog<MapUpdate<String, byte[]>> transactionLog = transactionScope.transactionLog();
767 boolean retainTombstones = !activeTransactions.isEmpty();
768
769 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
770 for (MapUpdate<String, byte[]> record : transactionLog.records()) {
771 if (record.type() == MapUpdate.Type.VERSION_MATCH) {
772 continue;
773 }
774
775 String key = record.key();
776 checkState(preparedKeys.remove(key), "key is not prepared");
777
778 if (record.type() == MapUpdate.Type.LOCK) {
779 continue;
780 }
781
Jordan Halterman71635ae2017-07-28 10:35:43 -0700782 MapEntryValue previousValue = entries().remove(key);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700783 MapEntryValue newValue = null;
784
785 // If the record is not a delete, create a transactional commit.
786 if (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
787 newValue = new MapEntryValue(MapEntryValue.Type.VALUE, currentVersion, record.value());
788 } else if (retainTombstones) {
789 // For deletes, if tombstones need to be retained then create and store a tombstone commit.
790 newValue = new MapEntryValue(MapEntryValue.Type.TOMBSTONE, currentVersion, null);
791 }
792
Jordan Halterman71635ae2017-07-28 10:35:43 -0700793 MapEvent<String, byte[]> event;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700794 if (newValue != null) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700795 entries().put(key, newValue);
796 if (!valueIsNull(newValue)) {
797 if (!valueIsNull(previousValue)) {
798 event = new MapEvent<>(
799 MapEvent.Type.UPDATE,
800 "",
801 key,
802 toVersioned(newValue),
803 toVersioned(previousValue));
804 } else {
805 event = new MapEvent<>(
806 MapEvent.Type.INSERT,
807 "",
808 key,
809 toVersioned(newValue),
810 null);
811 }
812 } else {
813 event = new MapEvent<>(
814 MapEvent.Type.REMOVE,
815 "",
816 key,
817 null,
818 toVersioned(previousValue));
819 }
820 } else {
821 event = new MapEvent<>(
822 MapEvent.Type.REMOVE,
823 "",
824 key,
825 null,
826 toVersioned(previousValue));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700827 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700828 eventsToPublish.add(event);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700829 }
830 publish(eventsToPublish);
831 return CommitResult.OK;
832 }
833
834 /**
835 * Handles an rollback commit (ha!).
836 *
837 * @param commit transaction rollback commit
838 * @return rollback result
839 */
840 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
841 TransactionId transactionId = commit.value().transactionId();
842 TransactionScope transactionScope = activeTransactions.remove(transactionId);
843 if (transactionScope == null) {
844 return RollbackResult.UNKNOWN_TRANSACTION_ID;
845 } else if (!transactionScope.isPrepared()) {
846 discardTombstones();
847 return RollbackResult.OK;
848 } else {
849 try {
850 transactionScope.transactionLog().records()
851 .forEach(record -> {
852 if (record.type() != MapUpdate.Type.VERSION_MATCH) {
853 preparedKeys.remove(record.key());
854 }
855 });
856 return RollbackResult.OK;
857 } finally {
858 discardTombstones();
859 }
860 }
861
862 }
863
864 /**
865 * Discards tombstones no longer needed by active transactions.
866 */
867 private void discardTombstones() {
868 if (activeTransactions.isEmpty()) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700869 Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700870 while (iterator.hasNext()) {
871 MapEntryValue value = iterator.next().getValue();
872 if (value.type() == MapEntryValue.Type.TOMBSTONE) {
873 iterator.remove();
874 }
875 }
876 } else {
877 long lowWaterMark = activeTransactions.values().stream()
878 .mapToLong(TransactionScope::version)
879 .min().getAsLong();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700880 Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700881 while (iterator.hasNext()) {
882 MapEntryValue value = iterator.next().getValue();
883 if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
884 iterator.remove();
885 }
886 }
887 }
888 }
889
890 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700891 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
892 * @param value map entry value
893 * @return versioned instance
894 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700895 protected Versioned<byte[]> toVersioned(MapEntryValue value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700896 return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
897 ? new Versioned<>(value.value(), value.version()) : null;
898 }
899
900 /**
Jordan Halterman71635ae2017-07-28 10:35:43 -0700901 * Publishes an event to listeners.
902 *
903 * @param event event to publish
904 */
905 private void publish(MapEvent<String, byte[]> event) {
906 publish(Lists.newArrayList(event));
907 }
908
909 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700910 * Publishes events to listeners.
911 *
912 * @param events list of map event to publish
913 */
914 private void publish(List<MapEvent<String, byte[]>> events) {
915 listeners.values().forEach(session -> {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700916 session.publish(CHANGE, serializer()::encode, events);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700917 });
918 }
919
920 @Override
921 public void onExpire(RaftSession session) {
922 closeListener(session.sessionId().id());
923 }
924
925 @Override
926 public void onClose(RaftSession session) {
927 closeListener(session.sessionId().id());
928 }
929
930 private void closeListener(Long sessionId) {
931 listeners.remove(sessionId);
932 }
933
934 /**
935 * Interface implemented by map values.
936 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700937 protected static class MapEntryValue {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700938 protected final Type type;
939 protected final long version;
940 protected final byte[] value;
941
942 MapEntryValue(Type type, long version, byte[] value) {
943 this.type = type;
944 this.version = version;
945 this.value = value;
946 }
947
948 /**
949 * Returns the value type.
950 *
951 * @return the value type
952 */
953 Type type() {
954 return type;
955 }
956
957 /**
958 * Returns the version of the value.
959 *
960 * @return version
961 */
962 long version() {
963 return version;
964 }
965
966 /**
967 * Returns the raw {@code byte[]}.
968 *
969 * @return raw value
970 */
971 byte[] value() {
972 return value;
973 }
974
975 /**
976 * Value type.
977 */
978 enum Type {
979 VALUE,
980 TOMBSTONE,
981 }
982 }
983
984 /**
985 * Map transaction scope.
986 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700987 protected static final class TransactionScope {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700988 private final long version;
989 private final TransactionLog<MapUpdate<String, byte[]>> transactionLog;
990
991 private TransactionScope(long version) {
992 this(version, null);
993 }
994
995 private TransactionScope(long version, TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
996 this.version = version;
997 this.transactionLog = transactionLog;
998 }
999
1000 /**
1001 * Returns the transaction version.
1002 *
1003 * @return the transaction version
1004 */
1005 long version() {
1006 return version;
1007 }
1008
1009 /**
1010 * Returns whether this is a prepared transaction scope.
1011 *
1012 * @return whether this is a prepared transaction scope
1013 */
1014 boolean isPrepared() {
1015 return transactionLog != null;
1016 }
1017
1018 /**
1019 * Returns the transaction commit log.
1020 *
1021 * @return the transaction commit log
1022 */
1023 TransactionLog<MapUpdate<String, byte[]>> transactionLog() {
1024 checkState(isPrepared());
1025 return transactionLog;
1026 }
1027
1028 /**
1029 * Returns a new transaction scope with a prepare commit.
1030 *
1031 * @param commit the prepare commit
1032 * @return new transaction scope updated with the prepare commit
1033 */
1034 TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
1035 return new TransactionScope(version, commit.value().transactionLog());
1036 }
1037 }
1038}