blob: fe5a2a9448ec157da587f8ef5439632766bafaa1 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Jordan Halterman71635ae2017-07-28 10:35:43 -070018import java.util.Arrays;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070019import java.util.Collection;
20import java.util.HashMap;
21import java.util.Iterator;
22import java.util.LinkedHashMap;
23import java.util.List;
24import java.util.Map;
25import java.util.Set;
Jordan Halterman71635ae2017-07-28 10:35:43 -070026import java.util.function.Predicate;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070027import java.util.stream.Collectors;
28
29import com.google.common.base.Throwables;
30import com.google.common.collect.Lists;
31import com.google.common.collect.Maps;
32import com.google.common.collect.Sets;
33import io.atomix.protocols.raft.service.AbstractRaftService;
34import io.atomix.protocols.raft.service.Commit;
35import io.atomix.protocols.raft.service.RaftServiceExecutor;
36import io.atomix.protocols.raft.session.RaftSession;
37import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
38import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
39import org.onlab.util.KryoNamespace;
40import org.onlab.util.Match;
41import org.onosproject.store.primitives.MapUpdate;
42import org.onosproject.store.primitives.TransactionId;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
Jordan Halterman71635ae2017-07-28 10:35:43 -070047import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070054import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
55import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
56import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
57import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
58import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070059import org.onosproject.store.serializers.KryoNamespaces;
60import org.onosproject.store.service.MapEvent;
61import org.onosproject.store.service.Serializer;
62import org.onosproject.store.service.TransactionLog;
63import org.onosproject.store.service.Versioned;
64
65import static com.google.common.base.Preconditions.checkState;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapEvents.CHANGE;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ENTRY_SET;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
78import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
79import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
Jordan Halterman71635ae2017-07-28 10:35:43 -070080import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
81import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
82import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
83import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070084import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
Jordan Halterman71635ae2017-07-28 10:35:43 -070085import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
86import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
87import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
88import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
89import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070090import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
91import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070092import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070093
94/**
95 * State Machine for {@link AtomixConsistentMap} resource.
96 */
97public class AtomixConsistentMapService extends AbstractRaftService {
98
99 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
100 .register(KryoNamespaces.BASIC)
101 .register(AtomixConsistentMapOperations.NAMESPACE)
102 .register(AtomixConsistentMapEvents.NAMESPACE)
Jordan Halterman71635ae2017-07-28 10:35:43 -0700103 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700104 .register(TransactionScope.class)
105 .register(TransactionLog.class)
106 .register(TransactionId.class)
107 .register(MapEntryValue.class)
108 .register(MapEntryValue.Type.class)
109 .register(new HashMap().keySet().getClass())
110 .build());
111
Jordan Halterman71635ae2017-07-28 10:35:43 -0700112 protected Map<Long, RaftSession> listeners = new LinkedHashMap<>();
113 private Map<String, MapEntryValue> map;
114 protected Set<String> preparedKeys = Sets.newHashSet();
115 protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
116 protected long currentVersion;
117
118 public AtomixConsistentMapService() {
119 map = createMap();
120 }
121
122 protected Map<String, MapEntryValue> createMap() {
123 return Maps.newHashMap();
124 }
125
126 protected Map<String, MapEntryValue> entries() {
127 return map;
128 }
129
130 protected Serializer serializer() {
131 return SERIALIZER;
132 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700133
134 @Override
135 public void snapshot(SnapshotWriter writer) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700136 writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer()::encode);
137 writer.writeObject(preparedKeys, serializer()::encode);
138 writer.writeObject(entries(), serializer()::encode);
139 writer.writeObject(activeTransactions, serializer()::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700140 writer.writeLong(currentVersion);
141 }
142
143 @Override
144 public void install(SnapshotReader reader) {
145 listeners = new LinkedHashMap<>();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700146 for (Long sessionId : reader.<Set<Long>>readObject(serializer()::decode)) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700147 listeners.put(sessionId, getSessions().getSession(sessionId));
148 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700149 preparedKeys = reader.readObject(serializer()::decode);
150 map = reader.readObject(serializer()::decode);
151 activeTransactions = reader.readObject(serializer()::decode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700152 currentVersion = reader.readLong();
153 }
154
155 @Override
156 protected void configure(RaftServiceExecutor executor) {
157 // Listeners
Jordan Halterman71635ae2017-07-28 10:35:43 -0700158 executor.register(ADD_LISTENER, (Commit<Void> c) -> listen(c.session()));
159 executor.register(REMOVE_LISTENER, (Commit<Void> c) -> unlisten(c.session()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700160 // Queries
Jordan Halterman71635ae2017-07-28 10:35:43 -0700161 executor.register(CONTAINS_KEY, serializer()::decode, this::containsKey, serializer()::encode);
162 executor.register(CONTAINS_VALUE, serializer()::decode, this::containsValue, serializer()::encode);
163 executor.register(ENTRY_SET, (Commit<Void> c) -> entrySet(), serializer()::encode);
164 executor.register(GET, serializer()::decode, this::get, serializer()::encode);
165 executor.register(GET_OR_DEFAULT, serializer()::decode, this::getOrDefault, serializer()::encode);
166 executor.register(IS_EMPTY, (Commit<Void> c) -> isEmpty(), serializer()::encode);
167 executor.register(KEY_SET, (Commit<Void> c) -> keySet(), serializer()::encode);
168 executor.register(SIZE, (Commit<Void> c) -> size(), serializer()::encode);
169 executor.register(VALUES, (Commit<Void> c) -> values(), serializer()::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700170 // Commands
Jordan Halterman71635ae2017-07-28 10:35:43 -0700171 executor.register(PUT, serializer()::decode, this::put, serializer()::encode);
172 executor.register(PUT_IF_ABSENT, serializer()::decode, this::putIfAbsent, serializer()::encode);
173 executor.register(PUT_AND_GET, serializer()::decode, this::putAndGet, serializer()::encode);
174 executor.register(REMOVE, serializer()::decode, this::remove, serializer()::encode);
175 executor.register(REMOVE_VALUE, serializer()::decode, this::removeValue, serializer()::encode);
176 executor.register(REMOVE_VERSION, serializer()::decode, this::removeVersion, serializer()::encode);
177 executor.register(REPLACE, serializer()::decode, this::replace, serializer()::encode);
178 executor.register(REPLACE_VALUE, serializer()::decode, this::replaceValue, serializer()::encode);
179 executor.register(REPLACE_VERSION, serializer()::decode, this::replaceVersion, serializer()::encode);
180 executor.register(CLEAR, (Commit<Void> c) -> clear(), serializer()::encode);
181 executor.register(BEGIN, serializer()::decode, this::begin, serializer()::encode);
182 executor.register(PREPARE, serializer()::decode, this::prepare, serializer()::encode);
183 executor.register(PREPARE_AND_COMMIT, serializer()::decode, this::prepareAndCommit, serializer()::encode);
184 executor.register(COMMIT, serializer()::decode, this::commit, serializer()::encode);
185 executor.register(ROLLBACK, serializer()::decode, this::rollback, serializer()::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700186 }
187
188 /**
189 * Handles a contains key commit.
190 *
191 * @param commit containsKey commit
192 * @return {@code true} if map contains key
193 */
194 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700195 MapEntryValue value = entries().get(commit.value().key());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700196 return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
197 }
198
199 /**
200 * Handles a contains value commit.
201 *
202 * @param commit containsValue commit
203 * @return {@code true} if map contains value
204 */
205 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
206 Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
Jordan Halterman71635ae2017-07-28 10:35:43 -0700207 return entries().values().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700208 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
209 .anyMatch(value -> valueMatch.matches(value.value()));
210 }
211
212 /**
213 * Handles a get commit.
214 *
215 * @param commit get commit
216 * @return value mapped to key
217 */
218 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700219 return toVersioned(entries().get(commit.value().key()));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700220 }
221
222 /**
223 * Handles a get or default commit.
224 *
225 * @param commit get or default commit
226 * @return value mapped to key
227 */
228 protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700229 MapEntryValue value = entries().get(commit.value().key());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700230 if (value == null) {
231 return new Versioned<>(commit.value().defaultValue(), 0);
232 } else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
233 return new Versioned<>(commit.value().defaultValue(), value.version);
234 } else {
235 return new Versioned<>(value.value(), value.version);
236 }
237 }
238
239 /**
Jordan Halterman71635ae2017-07-28 10:35:43 -0700240 * Handles a size commit.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700241 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700242 * @return number of entries in map
243 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700244 protected int size() {
245 return (int) entries().values().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700246 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
247 .count();
248 }
249
250 /**
251 * Handles an is empty commit.
252 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700253 * @return {@code true} if map is empty
254 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700255 protected boolean isEmpty() {
256 return entries().values().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700257 .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
258 }
259
260 /**
261 * Handles a keySet commit.
262 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700263 * @return set of keys in map
264 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700265 protected Set<String> keySet() {
266 return entries().entrySet().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700267 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
268 .map(Map.Entry::getKey)
269 .collect(Collectors.toSet());
270 }
271
272 /**
273 * Handles a values commit.
274 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700275 * @return collection of values in map
276 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700277 protected Collection<Versioned<byte[]>> values() {
278 return entries().entrySet().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700279 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
280 .map(entry -> toVersioned(entry.getValue()))
281 .collect(Collectors.toList());
282 }
283
284 /**
285 * Handles a entry set commit.
286 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700287 * @return set of map entries
288 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700289 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet() {
290 return entries().entrySet().stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700291 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
292 .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
293 .collect(Collectors.toSet());
294 }
295
296 /**
Jordan Halterman71635ae2017-07-28 10:35:43 -0700297 * Returns a boolean indicating whether the given MapEntryValues are equal.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700298 *
Jordan Halterman71635ae2017-07-28 10:35:43 -0700299 * @param oldValue the first value to compare
300 * @param newValue the second value to compare
301 * @return indicates whether the two values are equal
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700302 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700303 protected boolean valuesEqual(MapEntryValue oldValue, MapEntryValue newValue) {
304 return (oldValue == null && newValue == null)
305 || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
306 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700307
Jordan Halterman71635ae2017-07-28 10:35:43 -0700308 /**
309 * Returns a boolean indicating whether the given entry values are equal.
310 *
311 * @param oldValue the first value to compare
312 * @param newValue the second value to compare
313 * @return indicates whether the two values are equal
314 */
315 protected boolean valuesEqual(byte[] oldValue, byte[] newValue) {
316 return (oldValue == null && newValue == null)
317 || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
318 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700319
Jordan Halterman71635ae2017-07-28 10:35:43 -0700320 /**
321 * Returns a boolean indicating whether the given MapEntryValue is null or a tombstone.
322 *
323 * @param value the value to check
324 * @return indicates whether the given value is null or is a tombstone
325 */
326 protected boolean valueIsNull(MapEntryValue value) {
327 return value == null || value.type() == MapEntryValue.Type.TOMBSTONE;
328 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700329
Jordan Halterman71635ae2017-07-28 10:35:43 -0700330 /**
331 * Handles a put commit.
332 *
333 * @param commit put commit
334 * @return map entry update result
335 */
336 protected MapEntryUpdateResult<String, byte[]> put(Commit<? extends Put> commit) {
337 String key = commit.value().key();
338 MapEntryValue oldValue = entries().get(key);
339 MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
340 if (valueIsNull(oldValue)) {
341 if (preparedKeys.contains(key)) {
342 return new MapEntryUpdateResult<>(
343 MapEntryUpdateResult.Status.WRITE_LOCK,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700344 commit.index(),
Jordan Halterman71635ae2017-07-28 10:35:43 -0700345 key,
346 toVersioned(oldValue));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700347 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700348 entries().put(commit.value().key(),
349 new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
350 Versioned<byte[]> result = toVersioned(oldValue);
351 publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
352 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
353 } else if (!valuesEqual(oldValue, newValue)) {
354 if (preparedKeys.contains(key)) {
355 return new MapEntryUpdateResult<>(
356 MapEntryUpdateResult.Status.WRITE_LOCK,
357 commit.index(),
358 key,
359 toVersioned(oldValue));
360 }
361 entries().put(commit.value().key(),
362 new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
363 Versioned<byte[]> result = toVersioned(oldValue);
364 publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
365 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700366 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700367 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
368 }
369
370 /**
371 * Handles a putIfAbsent commit.
372 *
373 * @param commit putIfAbsent commit
374 * @return map entry update result
375 */
376 protected MapEntryUpdateResult<String, byte[]> putIfAbsent(Commit<? extends Put> commit) {
377 String key = commit.value().key();
378 MapEntryValue oldValue = entries().get(key);
379 if (valueIsNull(oldValue)) {
380 if (preparedKeys.contains(key)) {
381 return new MapEntryUpdateResult<>(
382 MapEntryUpdateResult.Status.WRITE_LOCK,
383 commit.index(),
384 key,
385 toVersioned(oldValue));
386 }
387 MapEntryValue newValue = new MapEntryValue(
388 MapEntryValue.Type.VALUE,
389 commit.index(),
390 commit.value().value());
391 entries().put(commit.value().key(),
392 new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
393 Versioned<byte[]> result = toVersioned(newValue);
394 publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
395 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
396 }
397 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
398 }
399
400 /**
401 * Handles a putAndGet commit.
402 *
403 * @param commit putAndGet commit
404 * @return map entry update result
405 */
406 protected MapEntryUpdateResult<String, byte[]> putAndGet(Commit<? extends Put> commit) {
407 String key = commit.value().key();
408 MapEntryValue oldValue = entries().get(key);
409 MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
410 if (valueIsNull(oldValue)) {
411 if (preparedKeys.contains(key)) {
412 return new MapEntryUpdateResult<>(
413 MapEntryUpdateResult.Status.WRITE_LOCK,
414 commit.index(),
415 key,
416 toVersioned(oldValue));
417 }
418 entries().put(commit.value().key(), newValue);
419 Versioned<byte[]> result = toVersioned(newValue);
420 publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
421 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
422 } else if (!valuesEqual(oldValue, newValue)) {
423 if (preparedKeys.contains(key)) {
424 return new MapEntryUpdateResult<>(
425 MapEntryUpdateResult.Status.WRITE_LOCK,
426 commit.index(),
427 key,
428 toVersioned(oldValue));
429 }
430 entries().put(commit.value().key(), newValue);
431 Versioned<byte[]> result = toVersioned(newValue);
432 publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, result, toVersioned(oldValue)));
433 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
434 }
435 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
436 }
437
438 /**
439 * Handles a remove commit.
440 *
441 * @param index the commit index
442 * @param key the key to remove
443 * @param predicate predicate to determine whether to remove the entry
444 * @return map entry update result
445 */
446 private MapEntryUpdateResult<String, byte[]> removeIf(long index, String key, Predicate<MapEntryValue> predicate) {
447 MapEntryValue value = entries().get(key);
448 if (value == null || !predicate.test(value)) {
449 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
450 }
451 entries().remove(key);
452 if (!activeTransactions.isEmpty()) {
453 entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
454 }
455 Versioned<byte[]> result = toVersioned(value);
456 publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result)));
457 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
458 }
459
460 /**
461 * Handles a remove commit.
462 *
463 * @param commit remove commit
464 * @return map entry update result
465 */
466 protected MapEntryUpdateResult<String, byte[]> remove(Commit<? extends Remove> commit) {
467 return removeIf(commit.index(), commit.value().key(), v -> true);
468 }
469
470 /**
471 * Handles a removeValue commit.
472 *
473 * @param commit removeValue commit
474 * @return map entry update result
475 */
476 protected MapEntryUpdateResult<String, byte[]> removeValue(Commit<? extends RemoveValue> commit) {
477 return removeIf(commit.index(), commit.value().key(), v ->
478 valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
479 }
480
481 /**
482 * Handles a removeVersion commit.
483 *
484 * @param commit removeVersion commit
485 * @return map entry update result
486 */
487 protected MapEntryUpdateResult<String, byte[]> removeVersion(Commit<? extends RemoveVersion> commit) {
488 return removeIf(commit.index(), commit.value().key(), v -> v.version() == commit.value().version());
489 }
490
491 /**
492 * Handles a replace commit.
493 *
494 * @param index the commit index
495 * @param key the key to replace
496 * @param newValue the value with which to replace the key
497 * @param predicate a predicate to determine whether to replace the key
498 * @return map entry update result
499 */
500 private MapEntryUpdateResult<String, byte[]> replaceIf(
501 long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
502 MapEntryValue oldValue = entries().get(key);
503 if (oldValue == null) {
504 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
505 }
506 if (!predicate.test(oldValue)) {
507 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, toVersioned(oldValue));
508 }
509 entries().put(key, newValue);
510 Versioned<byte[]> result = toVersioned(oldValue);
511 publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result)));
512 return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
513 }
514
515 /**
516 * Handles a replace commit.
517 *
518 * @param commit replace commit
519 * @return map entry update result
520 */
521 protected MapEntryUpdateResult<String, byte[]> replace(Commit<? extends Replace> commit) {
522 MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
523 return replaceIf(commit.index(), commit.value().key(), value, v -> true);
524 }
525
526 /**
527 * Handles a replaceValue commit.
528 *
529 * @param commit replaceValue commit
530 * @return map entry update result
531 */
532 protected MapEntryUpdateResult<String, byte[]> replaceValue(Commit<? extends ReplaceValue> commit) {
533 MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
534 return replaceIf(commit.index(), commit.value().key(), value,
535 v -> valuesEqual(v.value(), commit.value().oldValue()));
536 }
537
538 /**
539 * Handles a replaceVersion commit.
540 *
541 * @param commit replaceVersion commit
542 * @return map entry update result
543 */
544 protected MapEntryUpdateResult<String, byte[]> replaceVersion(Commit<? extends ReplaceVersion> commit) {
545 MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
546 return replaceIf(commit.index(), commit.value().key(), value,
547 v -> v.version() == commit.value().oldVersion());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700548 }
549
550 /**
551 * Handles a clear commit.
552 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700553 * @return clear result
554 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700555 protected MapEntryUpdateResult.Status clear() {
556 Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
557 Map<String, MapEntryValue> entriesToAdd = new HashMap<>();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700558 while (iterator.hasNext()) {
559 Map.Entry<String, MapEntryValue> entry = iterator.next();
560 String key = entry.getKey();
561 MapEntryValue value = entry.getValue();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700562 if (!valueIsNull(value)) {
563 Versioned<byte[]> removedValue = new Versioned<>(value.value(), value.version());
564 publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, removedValue));
565 if (activeTransactions.isEmpty()) {
566 iterator.remove();
567 } else {
568 entriesToAdd.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
569 }
570 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700571 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700572 entries().putAll(entriesToAdd);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700573 return MapEntryUpdateResult.Status.OK;
574 }
575
576 /**
577 * Handles a listen commit.
578 *
Jordan Halterman71635ae2017-07-28 10:35:43 -0700579 * @param session listen session
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700580 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700581 protected void listen(RaftSession session) {
582 listeners.put(session.sessionId().id(), session);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700583 }
584
585 /**
586 * Handles an unlisten commit.
587 *
Jordan Halterman71635ae2017-07-28 10:35:43 -0700588 * @param session unlisten session
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700589 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700590 protected void unlisten(RaftSession session) {
591 listeners.remove(session.sessionId().id());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700592 }
593
594 /**
595 * Handles a begin commit.
596 *
597 * @param commit transaction begin commit
598 * @return transaction state version
599 */
600 protected long begin(Commit<? extends TransactionBegin> commit) {
601 long version = commit.index();
602 activeTransactions.put(commit.value().transactionId(), new TransactionScope(version));
603 return version;
604 }
605
606 /**
607 * Handles an prepare and commit commit.
608 *
609 * @param commit transaction prepare and commit commit
610 * @return prepare result
611 */
612 protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
613 TransactionId transactionId = commit.value().transactionLog().transactionId();
614 PrepareResult prepareResult = prepare(commit);
615 TransactionScope transactionScope = activeTransactions.remove(transactionId);
616 if (prepareResult == PrepareResult.OK) {
617 this.currentVersion = commit.index();
618 transactionScope = transactionScope.prepared(commit);
619 commitTransaction(transactionScope);
620 }
621 discardTombstones();
622 return prepareResult;
623 }
624
625 /**
626 * Handles an prepare commit.
627 *
628 * @param commit transaction prepare commit
629 * @return prepare result
630 */
631 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
632 try {
633 TransactionLog<MapUpdate<String, byte[]>> transactionLog = commit.value().transactionLog();
634
635 // Iterate through records in the transaction log and perform isolation checks.
636 for (MapUpdate<String, byte[]> record : transactionLog.records()) {
637 String key = record.key();
638
639 // If the record is a VERSION_MATCH then check that the record's version matches the current
640 // version of the state machine.
641 if (record.type() == MapUpdate.Type.VERSION_MATCH && key == null) {
642 if (record.version() > currentVersion) {
643 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
644 } else {
645 continue;
646 }
647 }
648
649 // If the prepared keys already contains the key contained within the record, that indicates a
650 // conflict with a concurrent transaction.
651 if (preparedKeys.contains(key)) {
652 return PrepareResult.CONCURRENT_TRANSACTION;
653 }
654
655 // Read the existing value from the map.
Jordan Halterman71635ae2017-07-28 10:35:43 -0700656 MapEntryValue existingValue = entries().get(key);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700657
658 // Note: if the existing value is null, that means the key has not changed during the transaction,
659 // otherwise a tombstone would have been retained.
660 if (existingValue == null) {
661 // If the value is null, ensure the version is equal to the transaction version.
662 if (record.version() != transactionLog.version()) {
663 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
664 }
665 } else {
666 // If the value is non-null, compare the current version with the record version.
667 if (existingValue.version() > record.version()) {
668 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
669 }
670 }
671 }
672
673 // No violations detected. Mark modified keys locked for transactions.
674 transactionLog.records().forEach(record -> {
675 if (record.type() != MapUpdate.Type.VERSION_MATCH) {
676 preparedKeys.add(record.key());
677 }
678 });
679
680 // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
681 // coordinator is communicating with another node. Transactions assume that the client is communicating
682 // with a single leader in order to limit the overhead of retaining tombstones.
683 TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
684 if (transactionScope == null) {
685 activeTransactions.put(
686 transactionLog.transactionId(),
687 new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
688 return PrepareResult.PARTIAL_FAILURE;
689 } else {
690 activeTransactions.put(
691 transactionLog.transactionId(),
692 transactionScope.prepared(commit));
693 return PrepareResult.OK;
694 }
695 } catch (Exception e) {
696 getLogger().warn("Failure applying {}", commit, e);
697 throw Throwables.propagate(e);
698 }
699 }
700
701 /**
702 * Handles an commit commit (ha!).
703 *
704 * @param commit transaction commit commit
705 * @return commit result
706 */
707 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
708 TransactionId transactionId = commit.value().transactionId();
709 TransactionScope transactionScope = activeTransactions.remove(transactionId);
710 if (transactionScope == null) {
711 return CommitResult.UNKNOWN_TRANSACTION_ID;
712 }
713
714 try {
715 this.currentVersion = commit.index();
716 return commitTransaction(transactionScope);
717 } catch (Exception e) {
718 getLogger().warn("Failure applying {}", commit, e);
719 throw Throwables.propagate(e);
720 } finally {
721 discardTombstones();
722 }
723 }
724
725 /**
726 * Applies committed operations to the state machine.
727 */
728 private CommitResult commitTransaction(TransactionScope transactionScope) {
729 TransactionLog<MapUpdate<String, byte[]>> transactionLog = transactionScope.transactionLog();
730 boolean retainTombstones = !activeTransactions.isEmpty();
731
732 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
733 for (MapUpdate<String, byte[]> record : transactionLog.records()) {
734 if (record.type() == MapUpdate.Type.VERSION_MATCH) {
735 continue;
736 }
737
738 String key = record.key();
739 checkState(preparedKeys.remove(key), "key is not prepared");
740
741 if (record.type() == MapUpdate.Type.LOCK) {
742 continue;
743 }
744
Jordan Halterman71635ae2017-07-28 10:35:43 -0700745 MapEntryValue previousValue = entries().remove(key);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700746 MapEntryValue newValue = null;
747
748 // If the record is not a delete, create a transactional commit.
749 if (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
750 newValue = new MapEntryValue(MapEntryValue.Type.VALUE, currentVersion, record.value());
751 } else if (retainTombstones) {
752 // For deletes, if tombstones need to be retained then create and store a tombstone commit.
753 newValue = new MapEntryValue(MapEntryValue.Type.TOMBSTONE, currentVersion, null);
754 }
755
Jordan Halterman71635ae2017-07-28 10:35:43 -0700756 MapEvent<String, byte[]> event;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700757 if (newValue != null) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700758 entries().put(key, newValue);
759 if (!valueIsNull(newValue)) {
760 if (!valueIsNull(previousValue)) {
761 event = new MapEvent<>(
762 MapEvent.Type.UPDATE,
763 "",
764 key,
765 toVersioned(newValue),
766 toVersioned(previousValue));
767 } else {
768 event = new MapEvent<>(
769 MapEvent.Type.INSERT,
770 "",
771 key,
772 toVersioned(newValue),
773 null);
774 }
775 } else {
776 event = new MapEvent<>(
777 MapEvent.Type.REMOVE,
778 "",
779 key,
780 null,
781 toVersioned(previousValue));
782 }
783 } else {
784 event = new MapEvent<>(
785 MapEvent.Type.REMOVE,
786 "",
787 key,
788 null,
789 toVersioned(previousValue));
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700790 }
Jordan Halterman71635ae2017-07-28 10:35:43 -0700791 eventsToPublish.add(event);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700792 }
793 publish(eventsToPublish);
794 return CommitResult.OK;
795 }
796
797 /**
798 * Handles an rollback commit (ha!).
799 *
800 * @param commit transaction rollback commit
801 * @return rollback result
802 */
803 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
804 TransactionId transactionId = commit.value().transactionId();
805 TransactionScope transactionScope = activeTransactions.remove(transactionId);
806 if (transactionScope == null) {
807 return RollbackResult.UNKNOWN_TRANSACTION_ID;
808 } else if (!transactionScope.isPrepared()) {
809 discardTombstones();
810 return RollbackResult.OK;
811 } else {
812 try {
813 transactionScope.transactionLog().records()
814 .forEach(record -> {
815 if (record.type() != MapUpdate.Type.VERSION_MATCH) {
816 preparedKeys.remove(record.key());
817 }
818 });
819 return RollbackResult.OK;
820 } finally {
821 discardTombstones();
822 }
823 }
824
825 }
826
827 /**
828 * Discards tombstones no longer needed by active transactions.
829 */
830 private void discardTombstones() {
831 if (activeTransactions.isEmpty()) {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700832 Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700833 while (iterator.hasNext()) {
834 MapEntryValue value = iterator.next().getValue();
835 if (value.type() == MapEntryValue.Type.TOMBSTONE) {
836 iterator.remove();
837 }
838 }
839 } else {
840 long lowWaterMark = activeTransactions.values().stream()
841 .mapToLong(TransactionScope::version)
842 .min().getAsLong();
Jordan Halterman71635ae2017-07-28 10:35:43 -0700843 Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700844 while (iterator.hasNext()) {
845 MapEntryValue value = iterator.next().getValue();
846 if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
847 iterator.remove();
848 }
849 }
850 }
851 }
852
853 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700854 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
855 * @param value map entry value
856 * @return versioned instance
857 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700858 protected Versioned<byte[]> toVersioned(MapEntryValue value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700859 return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
860 ? new Versioned<>(value.value(), value.version()) : null;
861 }
862
863 /**
Jordan Halterman71635ae2017-07-28 10:35:43 -0700864 * Publishes an event to listeners.
865 *
866 * @param event event to publish
867 */
868 private void publish(MapEvent<String, byte[]> event) {
869 publish(Lists.newArrayList(event));
870 }
871
872 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700873 * Publishes events to listeners.
874 *
875 * @param events list of map event to publish
876 */
877 private void publish(List<MapEvent<String, byte[]>> events) {
878 listeners.values().forEach(session -> {
Jordan Halterman71635ae2017-07-28 10:35:43 -0700879 session.publish(CHANGE, serializer()::encode, events);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700880 });
881 }
882
883 @Override
884 public void onExpire(RaftSession session) {
885 closeListener(session.sessionId().id());
886 }
887
888 @Override
889 public void onClose(RaftSession session) {
890 closeListener(session.sessionId().id());
891 }
892
893 private void closeListener(Long sessionId) {
894 listeners.remove(sessionId);
895 }
896
897 /**
898 * Interface implemented by map values.
899 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700900 protected static class MapEntryValue {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700901 protected final Type type;
902 protected final long version;
903 protected final byte[] value;
904
905 MapEntryValue(Type type, long version, byte[] value) {
906 this.type = type;
907 this.version = version;
908 this.value = value;
909 }
910
911 /**
912 * Returns the value type.
913 *
914 * @return the value type
915 */
916 Type type() {
917 return type;
918 }
919
920 /**
921 * Returns the version of the value.
922 *
923 * @return version
924 */
925 long version() {
926 return version;
927 }
928
929 /**
930 * Returns the raw {@code byte[]}.
931 *
932 * @return raw value
933 */
934 byte[] value() {
935 return value;
936 }
937
938 /**
939 * Value type.
940 */
941 enum Type {
942 VALUE,
943 TOMBSTONE,
944 }
945 }
946
947 /**
948 * Map transaction scope.
949 */
Jordan Halterman71635ae2017-07-28 10:35:43 -0700950 protected static final class TransactionScope {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700951 private final long version;
952 private final TransactionLog<MapUpdate<String, byte[]>> transactionLog;
953
954 private TransactionScope(long version) {
955 this(version, null);
956 }
957
958 private TransactionScope(long version, TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
959 this.version = version;
960 this.transactionLog = transactionLog;
961 }
962
963 /**
964 * Returns the transaction version.
965 *
966 * @return the transaction version
967 */
968 long version() {
969 return version;
970 }
971
972 /**
973 * Returns whether this is a prepared transaction scope.
974 *
975 * @return whether this is a prepared transaction scope
976 */
977 boolean isPrepared() {
978 return transactionLog != null;
979 }
980
981 /**
982 * Returns the transaction commit log.
983 *
984 * @return the transaction commit log
985 */
986 TransactionLog<MapUpdate<String, byte[]>> transactionLog() {
987 checkState(isPrepared());
988 return transactionLog;
989 }
990
991 /**
992 * Returns a new transaction scope with a prepare commit.
993 *
994 * @param commit the prepare commit
995 * @return new transaction scope updated with the prepare commit
996 */
997 TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
998 return new TransactionScope(version, commit.value().transactionLog());
999 }
1000 }
1001}