blob: 0fcf3b5a0204dda5ae8a9c51653045fbe9f15311 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Jordan Haltermandae11602018-07-03 00:00:47 -070018import java.util.ArrayList;
Jordan Halterman71635ae2017-07-28 10:35:43 -070019import java.util.Arrays;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070020import java.util.Collection;
21import java.util.HashMap;
22import java.util.Iterator;
23import java.util.LinkedHashMap;
24import java.util.List;
25import java.util.Map;
26import java.util.Set;
Jordan Halterman71635ae2017-07-28 10:35:43 -070027import java.util.function.Predicate;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070028import java.util.stream.Collectors;
29
Jordan Halterman2bf177c2017-06-29 01:49:08 -070030import com.google.common.collect.Lists;
31import com.google.common.collect.Maps;
32import com.google.common.collect.Sets;
33import io.atomix.protocols.raft.service.AbstractRaftService;
34import io.atomix.protocols.raft.service.Commit;
35import io.atomix.protocols.raft.service.RaftServiceExecutor;
36import io.atomix.protocols.raft.session.RaftSession;
37import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
38import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
39import org.onlab.util.KryoNamespace;
40import org.onlab.util.Match;
41import org.onosproject.store.primitives.MapUpdate;
42import org.onosproject.store.primitives.TransactionId;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
Jordan Halterman71635ae2017-07-28 10:35:43 -070047import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070054import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
55import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
56import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
57import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
58import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070059import org.onosproject.store.serializers.KryoNamespaces;
60import org.onosproject.store.service.MapEvent;
61import org.onosproject.store.service.Serializer;
62import org.onosproject.store.service.TransactionLog;
63import org.onosproject.store.service.Versioned;
64
65import static com.google.common.base.Preconditions.checkState;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapEvents.CHANGE;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
Jordan Haltermandae11602018-07-03 00:00:47 -070070import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLOSE_ITERATOR;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070071import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ENTRY_SET;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
Jordan Haltermandae11602018-07-03 00:00:47 -070078import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorBatch;
79import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorPosition;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070080import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
Jordan Haltermandae11602018-07-03 00:00:47 -070081import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.NEXT;
82import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.OPEN_ITERATOR;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070083import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
84import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
Jordan Halterman71635ae2017-07-28 10:35:43 -070085import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
86import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
87import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
88import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070089import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
Jordan Halterman71635ae2017-07-28 10:35:43 -070090import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
91import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
92import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
93import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
94import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070095import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
96import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070097import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070098
99/**
100 * State Machine for {@link AtomixConsistentMap} resource.
101 */
102public class AtomixConsistentMapService extends AbstractRaftService {
103
Jordan Haltermandae11602018-07-03 00:00:47 -0700104 private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;
105
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700106 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
Jordan Haltermandae11602018-07-03 00:00:47 -0700107 .register(KryoNamespaces.BASIC)
108 .register(AtomixConsistentMapOperations.NAMESPACE)
109 .register(AtomixConsistentMapEvents.NAMESPACE)
110 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
111 .register(TransactionScope.class)
112 .register(TransactionLog.class)
113 .register(TransactionId.class)
114 .register(MapEntryValue.class)
115 .register(MapEntryValue.Type.class)
116 .register(new HashMap().keySet().getClass())
117 .build());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700118
Jordan Halterman71635ae2017-07-28 10:35:43 -0700119 protected Map<Long, RaftSession> listeners = new LinkedHashMap<>();
120 private Map<String, MapEntryValue> map;
121 protected Set<String> preparedKeys = Sets.newHashSet();
Jordan Haltermandae11602018-07-03 00:00:47 -0700122 private Map<Long, IteratorContext> iterators = Maps.newHashMap();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700123 protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
124 protected long currentVersion;
125
126 public AtomixConsistentMapService() {
127 map = createMap();
128 }
129
130 protected Map<String, MapEntryValue> createMap() {
Jordan Haltermandae11602018-07-03 00:00:47 -0700131 return Maps.newConcurrentMap();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700132 }
133
134 protected Map<String, MapEntryValue> entries() {
135 return map;
136 }
137
138 protected Serializer serializer() {
139 return SERIALIZER;
140 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700141
142 @Override
143 public void snapshot(SnapshotWriter writer) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700144 writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer()::encode);
145 writer.writeObject(preparedKeys, serializer()::encode);
146 writer.writeObject(entries(), serializer()::encode);
147 writer.writeObject(activeTransactions, serializer()::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700148 writer.writeLong(currentVersion);
Jordan Haltermandae11602018-07-03 00:00:47 -0700149
150 Map<Long, Long> iterators = Maps.newHashMap();
151 this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
152 writer.writeObject(iterators, serializer()::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700153 }
154
155 @Override
156 public void install(SnapshotReader reader) {
157 listeners = new LinkedHashMap<>();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700158 for (Long sessionId : reader.<Set<Long>>readObject(serializer()::decode)) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700159 listeners.put(sessionId, sessions().getSession(sessionId));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700160 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700161 preparedKeys = reader.readObject(serializer()::decode);
162 map = reader.readObject(serializer()::decode);
163 activeTransactions = reader.readObject(serializer()::decode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700164 currentVersion = reader.readLong();
Jordan Haltermandae11602018-07-03 00:00:47 -0700165
166 Map<Long, Long> iterators = reader.readObject(serializer()::decode);
167 this.iterators = Maps.newHashMap();
168 iterators.forEach((id, session) ->
169 this.iterators.put(id, new IteratorContext(session, entries().entrySet().iterator())));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700170 }
171
172 @Override
173 protected void configure(RaftServiceExecutor executor) {
174 // Listeners
Jordan Halterman71635ae2017-07-28 10:35:43 -0700175 executor.register(ADD_LISTENER, (Commit<Void> c) -> listen(c.session()));
176 executor.register(REMOVE_LISTENER, (Commit<Void> c) -> unlisten(c.session()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700177 // Queries
Jordan Halterman71635ae2017-07-28 10:35:43 -0700178 executor.register(CONTAINS_KEY, serializer()::decode, this::containsKey, serializer()::encode);
179 executor.register(CONTAINS_VALUE, serializer()::decode, this::containsValue, serializer()::encode);
180 executor.register(ENTRY_SET, (Commit<Void> c) -> entrySet(), serializer()::encode);
181 executor.register(GET, serializer()::decode, this::get, serializer()::encode);
182 executor.register(GET_OR_DEFAULT, serializer()::decode, this::getOrDefault, serializer()::encode);
183 executor.register(IS_EMPTY, (Commit<Void> c) -> isEmpty(), serializer()::encode);
184 executor.register(KEY_SET, (Commit<Void> c) -> keySet(), serializer()::encode);
185 executor.register(SIZE, (Commit<Void> c) -> size(), serializer()::encode);
186 executor.register(VALUES, (Commit<Void> c) -> values(), serializer()::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700187 // Commands
Jordan Halterman71635ae2017-07-28 10:35:43 -0700188 executor.register(PUT, serializer()::decode, this::put, serializer()::encode);
189 executor.register(PUT_IF_ABSENT, serializer()::decode, this::putIfAbsent, serializer()::encode);
190 executor.register(PUT_AND_GET, serializer()::decode, this::putAndGet, serializer()::encode);
191 executor.register(REMOVE, serializer()::decode, this::remove, serializer()::encode);
192 executor.register(REMOVE_VALUE, serializer()::decode, this::removeValue, serializer()::encode);
193 executor.register(REMOVE_VERSION, serializer()::decode, this::removeVersion, serializer()::encode);
194 executor.register(REPLACE, serializer()::decode, this::replace, serializer()::encode);
195 executor.register(REPLACE_VALUE, serializer()::decode, this::replaceValue, serializer()::encode);
196 executor.register(REPLACE_VERSION, serializer()::decode, this::replaceVersion, serializer()::encode);
197 executor.register(CLEAR, (Commit<Void> c) -> clear(), serializer()::encode);
198 executor.register(BEGIN, serializer()::decode, this::begin, serializer()::encode);
199 executor.register(PREPARE, serializer()::decode, this::prepare, serializer()::encode);
200 executor.register(PREPARE_AND_COMMIT, serializer()::decode, this::prepareAndCommit, serializer()::encode);
201 executor.register(COMMIT, serializer()::decode, this::commit, serializer()::encode);
202 executor.register(ROLLBACK, serializer()::decode, this::rollback, serializer()::encode);
Jordan Haltermandae11602018-07-03 00:00:47 -0700203 executor.register(OPEN_ITERATOR, this::openIterator, serializer()::encode);
204 executor.register(NEXT, serializer()::decode, this::next, serializer()::encode);
205 executor.register(CLOSE_ITERATOR, serializer()::decode, this::closeIterator);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700206 }
207
208 /**
209 * Handles a contains key commit.
210 *
211 * @param commit containsKey commit
212 * @return {@code true} if map contains key
213 */
214 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700215 MapEntryValue value = entries().get(commit.value().key());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700216 return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
217 }
218
219 /**
220 * Handles a contains value commit.
221 *
222 * @param commit containsValue commit
223 * @return {@code true} if map contains value
224 */
225 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
226 Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
Jordan Halterman71635ae2017-07-28 10:35:43 -0700227 return entries().values().stream()
Jordan Haltermandae11602018-07-03 00:00:47 -0700228 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
229 .anyMatch(value -> valueMatch.matches(value.value()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700230 }
231
232 /**
233 * Handles a get commit.
234 *
235 * @param commit get commit
236 * @return value mapped to key
237 */
238 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700239 return toVersioned(entries().get(commit.value().key()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700240 }
241
242 /**
243 * Handles a get or default commit.
244 *
245 * @param commit get or default commit
246 * @return value mapped to key
247 */
248 protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700249 MapEntryValue value = entries().get(commit.value().key());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700250 if (value == null) {
251 return new Versioned<>(commit.value().defaultValue(), 0);
252 } else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
253 return new Versioned<>(commit.value().defaultValue(), value.version);
254 } else {
255 return new Versioned<>(value.value(), value.version);
256 }
257 }
258
259 /**
Jordan Halterman71635ae2017-07-28 10:35:43 -0700260 * Handles a size commit.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700261 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700262 * @return number of entries in map
263 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700264 protected int size() {
265 return (int) entries().values().stream()
Jordan Haltermandae11602018-07-03 00:00:47 -0700266 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
267 .count();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700268 }
269
270 /**
271 * Handles an is empty commit.
272 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700273 * @return {@code true} if map is empty
274 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700275 protected boolean isEmpty() {
276 return entries().values().stream()
Jordan Haltermandae11602018-07-03 00:00:47 -0700277 .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700278 }
279
280 /**
281 * Handles a keySet commit.
282 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700283 * @return set of keys in map
284 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700285 protected Set<String> keySet() {
286 return entries().entrySet().stream()
Jordan Haltermandae11602018-07-03 00:00:47 -0700287 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
288 .map(Map.Entry::getKey)
289 .collect(Collectors.toSet());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700290 }
291
292 /**
293 * Handles a values commit.
294 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700295 * @return collection of values in map
296 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700297 protected Collection<Versioned<byte[]>> values() {
298 return entries().entrySet().stream()
Jordan Haltermandae11602018-07-03 00:00:47 -0700299 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
300 .map(entry -> toVersioned(entry.getValue()))
301 .collect(Collectors.toList());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700302 }
303
304 /**
305 * Handles a entry set commit.
306 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700307 * @return set of map entries
308 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700309 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet() {
310 return entries().entrySet().stream()
Jordan Haltermandae11602018-07-03 00:00:47 -0700311 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
312 .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
313 .collect(Collectors.toSet());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700314 }
315
316 /**
Jordan Halterman71635ae2017-07-28 10:35:43 -0700317 * Returns a boolean indicating whether the given MapEntryValues are equal.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700318 *
Jordan Halterman71635ae2017-07-28 10:35:43 -0700319 * @param oldValue the first value to compare
320 * @param newValue the second value to compare
321 * @return indicates whether the two values are equal
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700322 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700323 protected boolean valuesEqual(MapEntryValue oldValue, MapEntryValue newValue) {
324 return (oldValue == null && newValue == null)
Jordan Haltermandae11602018-07-03 00:00:47 -0700325 || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700326 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700327
Jordan Halterman71635ae2017-07-28 10:35:43 -0700328 /**
329 * Returns a boolean indicating whether the given entry values are equal.
330 *
331 * @param oldValue the first value to compare
332 * @param newValue the second value to compare
333 * @return indicates whether the two values are equal
334 */
335 protected boolean valuesEqual(byte[] oldValue, byte[] newValue) {
336 return (oldValue == null && newValue == null)
Jordan Haltermandae11602018-07-03 00:00:47 -0700337 || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700338 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700339
Jordan Halterman71635ae2017-07-28 10:35:43 -0700340 /**
341 * Returns a boolean indicating whether the given MapEntryValue is null or a tombstone.
342 *
343 * @param value the value to check
344 * @return indicates whether the given value is null or is a tombstone
345 */
346 protected boolean valueIsNull(MapEntryValue value) {
347 return value == null || value.type() == MapEntryValue.Type.TOMBSTONE;
348 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700349
Jordan Halterman71635ae2017-07-28 10:35:43 -0700350 /**
351 * Handles a put commit.
352 *
353 * @param commit put commit
354 * @return map entry update result
355 */
356 protected MapEntryUpdateResult<String, byte[]> put(Commit<? extends Put> commit) {
357 String key = commit.value().key();
358 MapEntryValue oldValue = entries().get(key);
359 MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
Jordan Halterman70df7672017-08-03 16:25:19 -0700360
361 // If the value is null or a tombstone, this is an insert.
362 // Otherwise, only update the value if it has changed to reduce the number of events.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700363 if (valueIsNull(oldValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700364 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700365 if (preparedKeys.contains(key)) {
366 return new MapEntryUpdateResult<>(
Jordan Haltermandae11602018-07-03 00:00:47 -0700367 MapEntryUpdateResult.Status.WRITE_LOCK,
368 commit.index(),
369 key,
370 toVersioned(oldValue));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700371 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700372 entries().put(commit.value().key(),
Jordan Haltermandae11602018-07-03 00:00:47 -0700373 new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700374 Versioned<byte[]> result = toVersioned(oldValue);
375 publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
376 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
377 } else if (!valuesEqual(oldValue, newValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700378 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700379 if (preparedKeys.contains(key)) {
380 return new MapEntryUpdateResult<>(
Jordan Haltermandae11602018-07-03 00:00:47 -0700381 MapEntryUpdateResult.Status.WRITE_LOCK,
382 commit.index(),
383 key,
384 toVersioned(oldValue));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700385 }
386 entries().put(commit.value().key(),
Jordan Haltermandae11602018-07-03 00:00:47 -0700387 new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700388 Versioned<byte[]> result = toVersioned(oldValue);
389 publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
390 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700391 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700392 // If the value hasn't changed, return a NOOP result.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700393 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
394 }
395
396 /**
397 * Handles a putIfAbsent commit.
398 *
399 * @param commit putIfAbsent commit
400 * @return map entry update result
401 */
402 protected MapEntryUpdateResult<String, byte[]> putIfAbsent(Commit<? extends Put> commit) {
403 String key = commit.value().key();
404 MapEntryValue oldValue = entries().get(key);
Jordan Halterman70df7672017-08-03 16:25:19 -0700405
406 // If the value is null, this is an INSERT.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700407 if (valueIsNull(oldValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700408 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700409 if (preparedKeys.contains(key)) {
410 return new MapEntryUpdateResult<>(
Jordan Haltermandae11602018-07-03 00:00:47 -0700411 MapEntryUpdateResult.Status.WRITE_LOCK,
412 commit.index(),
413 key,
414 toVersioned(oldValue));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700415 }
416 MapEntryValue newValue = new MapEntryValue(
Jordan Haltermandae11602018-07-03 00:00:47 -0700417 MapEntryValue.Type.VALUE,
418 commit.index(),
419 commit.value().value());
Jordan Halterman70df7672017-08-03 16:25:19 -0700420 entries().put(commit.value().key(), newValue);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700421 Versioned<byte[]> result = toVersioned(newValue);
422 publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
423 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
424 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700425 return new MapEntryUpdateResult<>(
Jordan Haltermandae11602018-07-03 00:00:47 -0700426 MapEntryUpdateResult.Status.PRECONDITION_FAILED,
427 commit.index(),
428 key,
429 toVersioned(oldValue));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700430 }
431
432 /**
433 * Handles a putAndGet commit.
434 *
435 * @param commit putAndGet commit
436 * @return map entry update result
437 */
438 protected MapEntryUpdateResult<String, byte[]> putAndGet(Commit<? extends Put> commit) {
439 String key = commit.value().key();
440 MapEntryValue oldValue = entries().get(key);
441 MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
Jordan Halterman70df7672017-08-03 16:25:19 -0700442
443 // If the value is null or a tombstone, this is an insert.
444 // Otherwise, only update the value if it has changed to reduce the number of events.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700445 if (valueIsNull(oldValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700446 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700447 if (preparedKeys.contains(key)) {
448 return new MapEntryUpdateResult<>(
Jordan Haltermandae11602018-07-03 00:00:47 -0700449 MapEntryUpdateResult.Status.WRITE_LOCK,
450 commit.index(),
451 key,
452 toVersioned(oldValue));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700453 }
454 entries().put(commit.value().key(), newValue);
455 Versioned<byte[]> result = toVersioned(newValue);
456 publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
457 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
458 } else if (!valuesEqual(oldValue, newValue)) {
Jordan Halterman70df7672017-08-03 16:25:19 -0700459 // If the key has been locked by a transaction, return a WRITE_LOCK error.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700460 if (preparedKeys.contains(key)) {
461 return new MapEntryUpdateResult<>(
Jordan Haltermandae11602018-07-03 00:00:47 -0700462 MapEntryUpdateResult.Status.WRITE_LOCK,
463 commit.index(),
464 key,
465 toVersioned(oldValue));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700466 }
467 entries().put(commit.value().key(), newValue);
468 Versioned<byte[]> result = toVersioned(newValue);
469 publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, result, toVersioned(oldValue)));
470 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
471 }
472 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
473 }
474
475 /**
476 * Handles a remove commit.
477 *
Jordan Haltermandae11602018-07-03 00:00:47 -0700478 * @param index the commit index
479 * @param key the key to remove
Jordan Halterman71635ae2017-07-28 10:35:43 -0700480 * @param predicate predicate to determine whether to remove the entry
481 * @return map entry update result
482 */
483 private MapEntryUpdateResult<String, byte[]> removeIf(long index, String key, Predicate<MapEntryValue> predicate) {
484 MapEntryValue value = entries().get(key);
Jordan Halterman70df7672017-08-03 16:25:19 -0700485
486 // If the value does not exist or doesn't match the predicate, return a PRECONDITION_FAILED error.
487 if (valueIsNull(value) || !predicate.test(value)) {
488 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.PRECONDITION_FAILED, index, key, null);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700489 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700490
491 // If the key has been locked by a transaction, return a WRITE_LOCK error.
492 if (preparedKeys.contains(key)) {
493 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700494 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700495
496 // If no transactions are active, remove the key. Otherwise, replace it with a tombstone.
497 if (activeTransactions.isEmpty()) {
498 entries().remove(key);
499 } else {
500 entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, index, null));
501 }
502
Jordan Halterman71635ae2017-07-28 10:35:43 -0700503 Versioned<byte[]> result = toVersioned(value);
Jordan Halterman70df7672017-08-03 16:25:19 -0700504 publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700505 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
506 }
507
508 /**
509 * Handles a remove commit.
510 *
511 * @param commit remove commit
512 * @return map entry update result
513 */
514 protected MapEntryUpdateResult<String, byte[]> remove(Commit<? extends Remove> commit) {
515 return removeIf(commit.index(), commit.value().key(), v -> true);
516 }
517
518 /**
519 * Handles a removeValue commit.
520 *
521 * @param commit removeValue commit
522 * @return map entry update result
523 */
524 protected MapEntryUpdateResult<String, byte[]> removeValue(Commit<? extends RemoveValue> commit) {
525 return removeIf(commit.index(), commit.value().key(), v ->
Jordan Haltermandae11602018-07-03 00:00:47 -0700526 valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700527 }
528
529 /**
530 * Handles a removeVersion commit.
531 *
532 * @param commit removeVersion commit
533 * @return map entry update result
534 */
535 protected MapEntryUpdateResult<String, byte[]> removeVersion(Commit<? extends RemoveVersion> commit) {
536 return removeIf(commit.index(), commit.value().key(), v -> v.version() == commit.value().version());
537 }
538
539 /**
540 * Handles a replace commit.
541 *
Jordan Haltermandae11602018-07-03 00:00:47 -0700542 * @param index the commit index
543 * @param key the key to replace
544 * @param newValue the value with which to replace the key
Jordan Halterman71635ae2017-07-28 10:35:43 -0700545 * @param predicate a predicate to determine whether to replace the key
546 * @return map entry update result
547 */
548 private MapEntryUpdateResult<String, byte[]> replaceIf(
Jordan Haltermandae11602018-07-03 00:00:47 -0700549 long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700550 MapEntryValue oldValue = entries().get(key);
Jordan Halterman70df7672017-08-03 16:25:19 -0700551
552 // If the key is not set or the current value doesn't match the predicate, return a PRECONDITION_FAILED error.
553 if (valueIsNull(oldValue) || !predicate.test(oldValue)) {
554 return new MapEntryUpdateResult<>(
Jordan Haltermandae11602018-07-03 00:00:47 -0700555 MapEntryUpdateResult.Status.PRECONDITION_FAILED,
556 index,
557 key,
558 toVersioned(oldValue));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700559 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700560
561 // If the key has been locked by a transaction, return a WRITE_LOCK error.
562 if (preparedKeys.contains(key)) {
563 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700564 }
Jordan Halterman70df7672017-08-03 16:25:19 -0700565
Jordan Halterman71635ae2017-07-28 10:35:43 -0700566 entries().put(key, newValue);
567 Versioned<byte[]> result = toVersioned(oldValue);
Jordan Halterman70df7672017-08-03 16:25:19 -0700568 publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700569 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
570 }
571
572 /**
573 * Handles a replace commit.
574 *
575 * @param commit replace commit
576 * @return map entry update result
577 */
578 protected MapEntryUpdateResult<String, byte[]> replace(Commit<? extends Replace> commit) {
579 MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
580 return replaceIf(commit.index(), commit.value().key(), value, v -> true);
581 }
582
583 /**
584 * Handles a replaceValue commit.
585 *
586 * @param commit replaceValue commit
587 * @return map entry update result
588 */
589 protected MapEntryUpdateResult<String, byte[]> replaceValue(Commit<? extends ReplaceValue> commit) {
590 MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
591 return replaceIf(commit.index(), commit.value().key(), value,
Jordan Haltermandae11602018-07-03 00:00:47 -0700592 v -> valuesEqual(v.value(), commit.value().oldValue()));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700593 }
594
595 /**
596 * Handles a replaceVersion commit.
597 *
598 * @param commit replaceVersion commit
599 * @return map entry update result
600 */
601 protected MapEntryUpdateResult<String, byte[]> replaceVersion(Commit<? extends ReplaceVersion> commit) {
602 MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
603 return replaceIf(commit.index(), commit.value().key(), value,
Jordan Haltermandae11602018-07-03 00:00:47 -0700604 v -> v.version() == commit.value().oldVersion());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700605 }
606
607 /**
608 * Handles a clear commit.
609 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700610 * @return clear result
611 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700612 protected MapEntryUpdateResult.Status clear() {
613 Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
614 Map<String, MapEntryValue> entriesToAdd = new HashMap<>();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700615 while (iterator.hasNext()) {
616 Map.Entry<String, MapEntryValue> entry = iterator.next();
617 String key = entry.getKey();
618 MapEntryValue value = entry.getValue();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700619 if (!valueIsNull(value)) {
620 Versioned<byte[]> removedValue = new Versioned<>(value.value(), value.version());
621 publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, removedValue));
622 if (activeTransactions.isEmpty()) {
623 iterator.remove();
624 } else {
625 entriesToAdd.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
626 }
627 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700628 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700629 entries().putAll(entriesToAdd);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700630 return MapEntryUpdateResult.Status.OK;
631 }
632
633 /**
Jordan Haltermandae11602018-07-03 00:00:47 -0700634 * Handles an open iterator commit.
635 *
636 * @param commit the open iterator commit
637 * @return iterator identifier
638 */
639 protected long openIterator(Commit<Void> commit) {
640 iterators.put(commit.index(), new IteratorContext(
641 commit.session().sessionId().id(),
642 entries().entrySet().iterator()));
643 return commit.index();
644 }
645
646 /**
647 * Handles an iterator next commit.
648 *
649 * @param commit the next commit
650 * @return a list of entries to iterate
651 */
652 protected IteratorBatch next(Commit<IteratorPosition> commit) {
653 final long iteratorId = commit.value().iteratorId();
654 final int position = commit.value().position();
655
656 IteratorContext context = iterators.get(iteratorId);
657 if (context == null) {
658 return null;
659 }
660
661 List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
662 int size = 0;
663 while (context.iterator.hasNext()) {
664 context.position++;
665 if (context.position > position) {
666 Map.Entry<String, MapEntryValue> entry = context.iterator.next();
667 String key = entry.getKey();
668 Versioned<byte[]> value = toVersioned(entry.getValue());
669 size += key.length();
670 size += value.value() != null ? value.value().length : 0;
671 entries.add(Maps.immutableEntry(key, value));
672
673 if (size >= MAX_ITERATOR_BATCH_SIZE) {
674 break;
675 }
676 }
677 }
678
679 if (entries.isEmpty()) {
680 return null;
681 }
682 return new IteratorBatch(context.position, entries);
683 }
684
685 /**
686 * Handles a close iterator commit.
687 *
688 * @param commit the close iterator commit
689 */
690 protected void closeIterator(Commit<Long> commit) {
691 iterators.remove(commit.value());
692 }
693
694 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700695 * Handles a listen commit.
696 *
Jordan Halterman71635ae2017-07-28 10:35:43 -0700697 * @param session listen session
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700698 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700699 protected void listen(RaftSession session) {
700 listeners.put(session.sessionId().id(), session);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700701 }
702
703 /**
704 * Handles an unlisten commit.
705 *
Jordan Halterman71635ae2017-07-28 10:35:43 -0700706 * @param session unlisten session
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700707 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700708 protected void unlisten(RaftSession session) {
709 listeners.remove(session.sessionId().id());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700710 }
711
712 /**
713 * Handles a begin commit.
714 *
715 * @param commit transaction begin commit
716 * @return transaction state version
717 */
718 protected long begin(Commit<? extends TransactionBegin> commit) {
719 long version = commit.index();
720 activeTransactions.put(commit.value().transactionId(), new TransactionScope(version));
721 return version;
722 }
723
724 /**
725 * Handles an prepare and commit commit.
726 *
727 * @param commit transaction prepare and commit commit
728 * @return prepare result
729 */
730 protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
731 TransactionId transactionId = commit.value().transactionLog().transactionId();
732 PrepareResult prepareResult = prepare(commit);
733 TransactionScope transactionScope = activeTransactions.remove(transactionId);
734 if (prepareResult == PrepareResult.OK) {
735 this.currentVersion = commit.index();
736 transactionScope = transactionScope.prepared(commit);
737 commitTransaction(transactionScope);
738 }
739 discardTombstones();
740 return prepareResult;
741 }
742
743 /**
744 * Handles an prepare commit.
745 *
746 * @param commit transaction prepare commit
747 * @return prepare result
748 */
749 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
750 try {
751 TransactionLog<MapUpdate<String, byte[]>> transactionLog = commit.value().transactionLog();
752
753 // Iterate through records in the transaction log and perform isolation checks.
754 for (MapUpdate<String, byte[]> record : transactionLog.records()) {
755 String key = record.key();
756
757 // If the record is a VERSION_MATCH then check that the record's version matches the current
758 // version of the state machine.
759 if (record.type() == MapUpdate.Type.VERSION_MATCH && key == null) {
760 if (record.version() > currentVersion) {
761 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
762 } else {
763 continue;
764 }
765 }
766
767 // If the prepared keys already contains the key contained within the record, that indicates a
768 // conflict with a concurrent transaction.
769 if (preparedKeys.contains(key)) {
770 return PrepareResult.CONCURRENT_TRANSACTION;
771 }
772
773 // Read the existing value from the map.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700774 MapEntryValue existingValue = entries().get(key);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700775
776 // Note: if the existing value is null, that means the key has not changed during the transaction,
777 // otherwise a tombstone would have been retained.
778 if (existingValue == null) {
779 // If the value is null, ensure the version is equal to the transaction version.
780 if (record.version() != transactionLog.version()) {
781 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
782 }
783 } else {
784 // If the value is non-null, compare the current version with the record version.
785 if (existingValue.version() > record.version()) {
786 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
787 }
788 }
789 }
790
791 // No violations detected. Mark modified keys locked for transactions.
792 transactionLog.records().forEach(record -> {
793 if (record.type() != MapUpdate.Type.VERSION_MATCH) {
794 preparedKeys.add(record.key());
795 }
796 });
797
798 // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
799 // coordinator is communicating with another node. Transactions assume that the client is communicating
800 // with a single leader in order to limit the overhead of retaining tombstones.
801 TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
802 if (transactionScope == null) {
803 activeTransactions.put(
Jordan Haltermandae11602018-07-03 00:00:47 -0700804 transactionLog.transactionId(),
805 new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700806 return PrepareResult.PARTIAL_FAILURE;
807 } else {
808 activeTransactions.put(
Jordan Haltermandae11602018-07-03 00:00:47 -0700809 transactionLog.transactionId(),
810 transactionScope.prepared(commit));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700811 return PrepareResult.OK;
812 }
813 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700814 logger().warn("Failure applying {}", commit, e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800815 throw new IllegalStateException(e);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700816 }
817 }
818
819 /**
820 * Handles an commit commit (ha!).
821 *
822 * @param commit transaction commit commit
823 * @return commit result
824 */
825 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
826 TransactionId transactionId = commit.value().transactionId();
827 TransactionScope transactionScope = activeTransactions.remove(transactionId);
828 if (transactionScope == null) {
829 return CommitResult.UNKNOWN_TRANSACTION_ID;
830 }
831
832 try {
833 this.currentVersion = commit.index();
834 return commitTransaction(transactionScope);
835 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700836 logger().warn("Failure applying {}", commit, e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800837 throw new IllegalStateException(e);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700838 } finally {
839 discardTombstones();
840 }
841 }
842
843 /**
844 * Applies committed operations to the state machine.
845 */
846 private CommitResult commitTransaction(TransactionScope transactionScope) {
847 TransactionLog<MapUpdate<String, byte[]>> transactionLog = transactionScope.transactionLog();
848 boolean retainTombstones = !activeTransactions.isEmpty();
849
850 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
851 for (MapUpdate<String, byte[]> record : transactionLog.records()) {
852 if (record.type() == MapUpdate.Type.VERSION_MATCH) {
853 continue;
854 }
855
856 String key = record.key();
857 checkState(preparedKeys.remove(key), "key is not prepared");
858
859 if (record.type() == MapUpdate.Type.LOCK) {
860 continue;
861 }
862
Jordan Halterman71635ae2017-07-28 10:35:43 -0700863 MapEntryValue previousValue = entries().remove(key);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700864 MapEntryValue newValue = null;
865
866 // If the record is not a delete, create a transactional commit.
867 if (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
868 newValue = new MapEntryValue(MapEntryValue.Type.VALUE, currentVersion, record.value());
869 } else if (retainTombstones) {
870 // For deletes, if tombstones need to be retained then create and store a tombstone commit.
871 newValue = new MapEntryValue(MapEntryValue.Type.TOMBSTONE, currentVersion, null);
872 }
873
Jordan Halterman71635ae2017-07-28 10:35:43 -0700874 MapEvent<String, byte[]> event;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700875 if (newValue != null) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700876 entries().put(key, newValue);
877 if (!valueIsNull(newValue)) {
878 if (!valueIsNull(previousValue)) {
879 event = new MapEvent<>(
Jordan Haltermandae11602018-07-03 00:00:47 -0700880 MapEvent.Type.UPDATE,
881 "",
882 key,
883 toVersioned(newValue),
884 toVersioned(previousValue));
Jordan Halterman71635ae2017-07-28 10:35:43 -0700885 } else {
886 event = new MapEvent<>(
Jordan Haltermandae11602018-07-03 00:00:47 -0700887 MapEvent.Type.INSERT,
888 "",
889 key,
890 toVersioned(newValue),
891 null);
Jordan Halterman71635ae2017-07-28 10:35:43 -0700892 }
893 } else {
894 event = new MapEvent<>(
Jordan Halterman71635ae2017-07-28 10:35:43 -0700895 MapEvent.Type.REMOVE,
896 "",
897 key,
898 null,
899 toVersioned(previousValue));
Jordan Haltermandae11602018-07-03 00:00:47 -0700900 }
901 } else {
902 event = new MapEvent<>(
903 MapEvent.Type.REMOVE,
904 "",
905 key,
906 null,
907 toVersioned(previousValue));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700908 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700909 eventsToPublish.add(event);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700910 }
911 publish(eventsToPublish);
912 return CommitResult.OK;
913 }
914
915 /**
916 * Handles an rollback commit (ha!).
917 *
918 * @param commit transaction rollback commit
919 * @return rollback result
920 */
921 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
922 TransactionId transactionId = commit.value().transactionId();
923 TransactionScope transactionScope = activeTransactions.remove(transactionId);
924 if (transactionScope == null) {
925 return RollbackResult.UNKNOWN_TRANSACTION_ID;
926 } else if (!transactionScope.isPrepared()) {
927 discardTombstones();
928 return RollbackResult.OK;
929 } else {
930 try {
931 transactionScope.transactionLog().records()
Jordan Haltermandae11602018-07-03 00:00:47 -0700932 .forEach(record -> {
933 if (record.type() != MapUpdate.Type.VERSION_MATCH) {
934 preparedKeys.remove(record.key());
935 }
936 });
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700937 return RollbackResult.OK;
938 } finally {
939 discardTombstones();
940 }
941 }
942
943 }
944
945 /**
946 * Discards tombstones no longer needed by active transactions.
947 */
948 private void discardTombstones() {
949 if (activeTransactions.isEmpty()) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700950 Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700951 while (iterator.hasNext()) {
952 MapEntryValue value = iterator.next().getValue();
953 if (value.type() == MapEntryValue.Type.TOMBSTONE) {
954 iterator.remove();
955 }
956 }
957 } else {
958 long lowWaterMark = activeTransactions.values().stream()
Jordan Haltermandae11602018-07-03 00:00:47 -0700959 .mapToLong(TransactionScope::version)
960 .min().getAsLong();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700961 Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700962 while (iterator.hasNext()) {
963 MapEntryValue value = iterator.next().getValue();
964 if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
965 iterator.remove();
966 }
967 }
968 }
969 }
970
971 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700972 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
Jordan Haltermandae11602018-07-03 00:00:47 -0700973 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700974 * @param value map entry value
975 * @return versioned instance
976 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700977 protected Versioned<byte[]> toVersioned(MapEntryValue value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700978 return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
Jordan Haltermandae11602018-07-03 00:00:47 -0700979 ? new Versioned<>(value.value(), value.version()) : null;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700980 }
981
982 /**
Jordan Halterman71635ae2017-07-28 10:35:43 -0700983 * Publishes an event to listeners.
984 *
985 * @param event event to publish
986 */
987 private void publish(MapEvent<String, byte[]> event) {
988 publish(Lists.newArrayList(event));
989 }
990
991 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700992 * Publishes events to listeners.
993 *
994 * @param events list of map event to publish
995 */
996 private void publish(List<MapEvent<String, byte[]>> events) {
997 listeners.values().forEach(session -> {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700998 session.publish(CHANGE, serializer()::encode, events);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700999 });
1000 }
1001
1002 @Override
1003 public void onExpire(RaftSession session) {
1004 closeListener(session.sessionId().id());
1005 }
1006
1007 @Override
1008 public void onClose(RaftSession session) {
1009 closeListener(session.sessionId().id());
1010 }
1011
1012 private void closeListener(Long sessionId) {
1013 listeners.remove(sessionId);
1014 }
1015
1016 /**
1017 * Interface implemented by map values.
1018 */
Jordan Halterman71635ae2017-07-28 10:35:43 -07001019 protected static class MapEntryValue {
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001020 protected final Type type;
1021 protected final long version;
1022 protected final byte[] value;
1023
1024 MapEntryValue(Type type, long version, byte[] value) {
1025 this.type = type;
1026 this.version = version;
1027 this.value = value;
1028 }
1029
1030 /**
1031 * Returns the value type.
1032 *
1033 * @return the value type
1034 */
1035 Type type() {
1036 return type;
1037 }
1038
1039 /**
1040 * Returns the version of the value.
1041 *
1042 * @return version
1043 */
1044 long version() {
1045 return version;
1046 }
1047
1048 /**
1049 * Returns the raw {@code byte[]}.
1050 *
1051 * @return raw value
1052 */
1053 byte[] value() {
1054 return value;
1055 }
1056
1057 /**
1058 * Value type.
1059 */
1060 enum Type {
1061 VALUE,
1062 TOMBSTONE,
1063 }
1064 }
1065
1066 /**
1067 * Map transaction scope.
1068 */
Jordan Halterman71635ae2017-07-28 10:35:43 -07001069 protected static final class TransactionScope {
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001070 private final long version;
1071 private final TransactionLog<MapUpdate<String, byte[]>> transactionLog;
1072
1073 private TransactionScope(long version) {
1074 this(version, null);
1075 }
1076
1077 private TransactionScope(long version, TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
1078 this.version = version;
1079 this.transactionLog = transactionLog;
1080 }
1081
1082 /**
1083 * Returns the transaction version.
1084 *
1085 * @return the transaction version
1086 */
1087 long version() {
1088 return version;
1089 }
1090
1091 /**
1092 * Returns whether this is a prepared transaction scope.
1093 *
1094 * @return whether this is a prepared transaction scope
1095 */
1096 boolean isPrepared() {
1097 return transactionLog != null;
1098 }
1099
1100 /**
1101 * Returns the transaction commit log.
1102 *
1103 * @return the transaction commit log
1104 */
1105 TransactionLog<MapUpdate<String, byte[]>> transactionLog() {
1106 checkState(isPrepared());
1107 return transactionLog;
1108 }
1109
1110 /**
1111 * Returns a new transaction scope with a prepare commit.
1112 *
1113 * @param commit the prepare commit
1114 * @return new transaction scope updated with the prepare commit
1115 */
1116 TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
1117 return new TransactionScope(version, commit.value().transactionLog());
1118 }
1119 }
Jordan Haltermandae11602018-07-03 00:00:47 -07001120
1121 private static class IteratorContext {
1122 private final long sessionId;
1123 private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
1124 private int position = 0;
1125
1126 IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
1127 this.sessionId = sessionId;
1128 this.iterator = iterator;
1129 }
1130 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001131}