blob: 6d8c1b01f94f981cfd824a0a633419a6b8fb8062 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.util.Collection;
19import java.util.HashMap;
20import java.util.Iterator;
21import java.util.LinkedHashMap;
22import java.util.List;
23import java.util.Map;
24import java.util.Set;
25import java.util.stream.Collectors;
26
27import com.google.common.base.Throwables;
28import com.google.common.collect.Lists;
29import com.google.common.collect.Maps;
30import com.google.common.collect.Sets;
31import io.atomix.protocols.raft.service.AbstractRaftService;
32import io.atomix.protocols.raft.service.Commit;
33import io.atomix.protocols.raft.service.RaftServiceExecutor;
34import io.atomix.protocols.raft.session.RaftSession;
35import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
36import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
37import org.onlab.util.KryoNamespace;
38import org.onlab.util.Match;
39import org.onosproject.store.primitives.MapUpdate;
40import org.onosproject.store.primitives.TransactionId;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UpdateAndGet;
51import org.onosproject.store.serializers.KryoNamespaces;
52import org.onosproject.store.service.MapEvent;
53import org.onosproject.store.service.Serializer;
54import org.onosproject.store.service.TransactionLog;
55import org.onosproject.store.service.Versioned;
56
57import static com.google.common.base.Preconditions.checkState;
58import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapEvents.CHANGE;
59import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
61import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ENTRY_SET;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UPDATE_AND_GET;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
77import static org.onosproject.store.service.MapEvent.Type.INSERT;
78import static org.onosproject.store.service.MapEvent.Type.REMOVE;
79import static org.onosproject.store.service.MapEvent.Type.UPDATE;
80
81/**
82 * State Machine for {@link AtomixConsistentMap} resource.
83 */
84public class AtomixConsistentMapService extends AbstractRaftService {
85
86 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
87 .register(KryoNamespaces.BASIC)
88 .register(AtomixConsistentMapOperations.NAMESPACE)
89 .register(AtomixConsistentMapEvents.NAMESPACE)
90 .nextId(KryoNamespace.FLOATING_ID)
91 .register(TransactionScope.class)
92 .register(TransactionLog.class)
93 .register(TransactionId.class)
94 .register(MapEntryValue.class)
95 .register(MapEntryValue.Type.class)
96 .register(new HashMap().keySet().getClass())
97 .build());
98
99 private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
100 private Map<String, MapEntryValue> mapEntries = new HashMap<>();
101 private Set<String> preparedKeys = Sets.newHashSet();
102 private Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
103 private long currentVersion;
104
105 @Override
106 public void snapshot(SnapshotWriter writer) {
107 writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
108 writer.writeObject(preparedKeys, SERIALIZER::encode);
109 writer.writeObject(mapEntries, SERIALIZER::encode);
110 writer.writeObject(activeTransactions, SERIALIZER::encode);
111 writer.writeLong(currentVersion);
112 }
113
114 @Override
115 public void install(SnapshotReader reader) {
116 listeners = new LinkedHashMap<>();
117 for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
118 listeners.put(sessionId, getSessions().getSession(sessionId));
119 }
120 preparedKeys = reader.readObject(SERIALIZER::decode);
121 mapEntries = reader.readObject(SERIALIZER::decode);
122 activeTransactions = reader.readObject(SERIALIZER::decode);
123 currentVersion = reader.readLong();
124 }
125
126 @Override
127 protected void configure(RaftServiceExecutor executor) {
128 // Listeners
129 executor.register(ADD_LISTENER, this::listen);
130 executor.register(REMOVE_LISTENER, this::unlisten);
131 // Queries
132 executor.register(CONTAINS_KEY, SERIALIZER::decode, this::containsKey, SERIALIZER::encode);
133 executor.register(CONTAINS_VALUE, SERIALIZER::decode, this::containsValue, SERIALIZER::encode);
134 executor.register(ENTRY_SET, this::entrySet, SERIALIZER::encode);
135 executor.register(GET, SERIALIZER::decode, this::get, SERIALIZER::encode);
136 executor.register(GET_OR_DEFAULT, SERIALIZER::decode, this::getOrDefault, SERIALIZER::encode);
137 executor.register(IS_EMPTY, this::isEmpty, SERIALIZER::encode);
138 executor.register(KEY_SET, this::keySet, SERIALIZER::encode);
139 executor.register(SIZE, this::size, SERIALIZER::encode);
140 executor.register(VALUES, this::values, SERIALIZER::encode);
141 // Commands
142 executor.register(UPDATE_AND_GET, SERIALIZER::decode, this::updateAndGet, SERIALIZER::encode);
143 executor.register(CLEAR, this::clear, SERIALIZER::encode);
144 executor.register(BEGIN, SERIALIZER::decode, this::begin, SERIALIZER::encode);
145 executor.register(PREPARE, SERIALIZER::decode, this::prepare, SERIALIZER::encode);
146 executor.register(PREPARE_AND_COMMIT, SERIALIZER::decode, this::prepareAndCommit, SERIALIZER::encode);
147 executor.register(COMMIT, SERIALIZER::decode, this::commit, SERIALIZER::encode);
148 executor.register(ROLLBACK, SERIALIZER::decode, this::rollback, SERIALIZER::encode);
149 }
150
151 /**
152 * Handles a contains key commit.
153 *
154 * @param commit containsKey commit
155 * @return {@code true} if map contains key
156 */
157 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
158 MapEntryValue value = mapEntries.get(commit.value().key());
159 return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
160 }
161
162 /**
163 * Handles a contains value commit.
164 *
165 * @param commit containsValue commit
166 * @return {@code true} if map contains value
167 */
168 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
169 Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
170 return mapEntries.values().stream()
171 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
172 .anyMatch(value -> valueMatch.matches(value.value()));
173 }
174
175 /**
176 * Handles a get commit.
177 *
178 * @param commit get commit
179 * @return value mapped to key
180 */
181 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
182 return toVersioned(mapEntries.get(commit.value().key()));
183 }
184
185 /**
186 * Handles a get or default commit.
187 *
188 * @param commit get or default commit
189 * @return value mapped to key
190 */
191 protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
192 MapEntryValue value = mapEntries.get(commit.value().key());
193 if (value == null) {
194 return new Versioned<>(commit.value().defaultValue(), 0);
195 } else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
196 return new Versioned<>(commit.value().defaultValue(), value.version);
197 } else {
198 return new Versioned<>(value.value(), value.version);
199 }
200 }
201
202 /**
203 * Handles a count commit.
204 *
205 * @param commit size commit
206 * @return number of entries in map
207 */
208 protected int size(Commit<Void> commit) {
209 return (int) mapEntries.values().stream()
210 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
211 .count();
212 }
213
214 /**
215 * Handles an is empty commit.
216 *
217 * @param commit isEmpty commit
218 * @return {@code true} if map is empty
219 */
220 protected boolean isEmpty(Commit<Void> commit) {
221 return mapEntries.values().stream()
222 .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
223 }
224
225 /**
226 * Handles a keySet commit.
227 *
228 * @param commit keySet commit
229 * @return set of keys in map
230 */
231 protected Set<String> keySet(Commit<Void> commit) {
232 return mapEntries.entrySet().stream()
233 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
234 .map(Map.Entry::getKey)
235 .collect(Collectors.toSet());
236 }
237
238 /**
239 * Handles a values commit.
240 *
241 * @param commit values commit
242 * @return collection of values in map
243 */
244 protected Collection<Versioned<byte[]>> values(Commit<Void> commit) {
245 return mapEntries.entrySet().stream()
246 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
247 .map(entry -> toVersioned(entry.getValue()))
248 .collect(Collectors.toList());
249 }
250
251 /**
252 * Handles a entry set commit.
253 *
254 * @param commit entrySet commit
255 * @return set of map entries
256 */
257 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<Void> commit) {
258 return mapEntries.entrySet().stream()
259 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
260 .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
261 .collect(Collectors.toSet());
262 }
263
264 /**
265 * Handles a update and get commit.
266 *
267 * @param commit updateAndGet commit
268 * @return update result
269 */
270 protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
271 try {
272 MapEntryUpdateResult.Status updateStatus = validate(commit.value());
273 String key = commit.value().key();
274 MapEntryValue oldCommitValue = mapEntries.get(commit.value().key());
275 Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
276
277 if (updateStatus != MapEntryUpdateResult.Status.OK) {
278 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, oldMapValue);
279 }
280
281 byte[] newValue = commit.value().value();
282 currentVersion = commit.index();
283 Versioned<byte[]> newMapValue = newValue == null ? null
284 : new Versioned<>(newValue, currentVersion);
285
286 MapEvent.Type updateType = newValue == null ? REMOVE
287 : oldCommitValue == null ? INSERT : UPDATE;
288
289 // If a value existed in the map, remove and discard the value to ensure disk can be freed.
290 if (updateType == REMOVE || updateType == UPDATE) {
291 mapEntries.remove(key);
292 }
293
294 // If this is an insert/update commit, add the commit to the map entries.
295 if (updateType == INSERT || updateType == UPDATE) {
296 mapEntries.put(key, new MapEntryValue(
297 MapEntryValue.Type.VALUE,
298 commit.index(),
299 commit.value().value()));
300 } else if (!activeTransactions.isEmpty()) {
301 // If this is a delete but transactions are currently running, ensure tombstones are retained
302 // for version checks.
303 mapEntries.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, commit.index(), null));
304 }
305
306 publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
307 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, newMapValue);
308 } catch (Exception e) {
309 getLogger().error("State machine operation failed", e);
310 throw Throwables.propagate(e);
311 }
312 }
313
314 /**
315 * Handles a clear commit.
316 *
317 * @param commit clear commit
318 * @return clear result
319 */
320 protected MapEntryUpdateResult.Status clear(Commit<Void> commit) {
321 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
322 .entrySet().iterator();
323 while (iterator.hasNext()) {
324 Map.Entry<String, MapEntryValue> entry = iterator.next();
325 String key = entry.getKey();
326 MapEntryValue value = entry.getValue();
327 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
328 value.version());
329 publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
330 iterator.remove();
331 }
332 return MapEntryUpdateResult.Status.OK;
333 }
334
335 /**
336 * Handles a listen commit.
337 *
338 * @param commit listen commit
339 */
340 protected void listen(Commit<Void> commit) {
341 listeners.put(commit.session().sessionId().id(), commit.session());
342 }
343
344 /**
345 * Handles an unlisten commit.
346 *
347 * @param commit unlisten commit
348 */
349 protected void unlisten(Commit<Void> commit) {
350 listeners.remove(commit.session().sessionId().id());
351 }
352
353 /**
354 * Handles a begin commit.
355 *
356 * @param commit transaction begin commit
357 * @return transaction state version
358 */
359 protected long begin(Commit<? extends TransactionBegin> commit) {
360 long version = commit.index();
361 activeTransactions.put(commit.value().transactionId(), new TransactionScope(version));
362 return version;
363 }
364
365 /**
366 * Handles an prepare and commit commit.
367 *
368 * @param commit transaction prepare and commit commit
369 * @return prepare result
370 */
371 protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
372 TransactionId transactionId = commit.value().transactionLog().transactionId();
373 PrepareResult prepareResult = prepare(commit);
374 TransactionScope transactionScope = activeTransactions.remove(transactionId);
375 if (prepareResult == PrepareResult.OK) {
376 this.currentVersion = commit.index();
377 transactionScope = transactionScope.prepared(commit);
378 commitTransaction(transactionScope);
379 }
380 discardTombstones();
381 return prepareResult;
382 }
383
384 /**
385 * Handles an prepare commit.
386 *
387 * @param commit transaction prepare commit
388 * @return prepare result
389 */
390 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
391 try {
392 TransactionLog<MapUpdate<String, byte[]>> transactionLog = commit.value().transactionLog();
393
394 // Iterate through records in the transaction log and perform isolation checks.
395 for (MapUpdate<String, byte[]> record : transactionLog.records()) {
396 String key = record.key();
397
398 // If the record is a VERSION_MATCH then check that the record's version matches the current
399 // version of the state machine.
400 if (record.type() == MapUpdate.Type.VERSION_MATCH && key == null) {
401 if (record.version() > currentVersion) {
402 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
403 } else {
404 continue;
405 }
406 }
407
408 // If the prepared keys already contains the key contained within the record, that indicates a
409 // conflict with a concurrent transaction.
410 if (preparedKeys.contains(key)) {
411 return PrepareResult.CONCURRENT_TRANSACTION;
412 }
413
414 // Read the existing value from the map.
415 MapEntryValue existingValue = mapEntries.get(key);
416
417 // Note: if the existing value is null, that means the key has not changed during the transaction,
418 // otherwise a tombstone would have been retained.
419 if (existingValue == null) {
420 // If the value is null, ensure the version is equal to the transaction version.
421 if (record.version() != transactionLog.version()) {
422 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
423 }
424 } else {
425 // If the value is non-null, compare the current version with the record version.
426 if (existingValue.version() > record.version()) {
427 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
428 }
429 }
430 }
431
432 // No violations detected. Mark modified keys locked for transactions.
433 transactionLog.records().forEach(record -> {
434 if (record.type() != MapUpdate.Type.VERSION_MATCH) {
435 preparedKeys.add(record.key());
436 }
437 });
438
439 // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
440 // coordinator is communicating with another node. Transactions assume that the client is communicating
441 // with a single leader in order to limit the overhead of retaining tombstones.
442 TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
443 if (transactionScope == null) {
444 activeTransactions.put(
445 transactionLog.transactionId(),
446 new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
447 return PrepareResult.PARTIAL_FAILURE;
448 } else {
449 activeTransactions.put(
450 transactionLog.transactionId(),
451 transactionScope.prepared(commit));
452 return PrepareResult.OK;
453 }
454 } catch (Exception e) {
455 getLogger().warn("Failure applying {}", commit, e);
456 throw Throwables.propagate(e);
457 }
458 }
459
460 /**
461 * Handles an commit commit (ha!).
462 *
463 * @param commit transaction commit commit
464 * @return commit result
465 */
466 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
467 TransactionId transactionId = commit.value().transactionId();
468 TransactionScope transactionScope = activeTransactions.remove(transactionId);
469 if (transactionScope == null) {
470 return CommitResult.UNKNOWN_TRANSACTION_ID;
471 }
472
473 try {
474 this.currentVersion = commit.index();
475 return commitTransaction(transactionScope);
476 } catch (Exception e) {
477 getLogger().warn("Failure applying {}", commit, e);
478 throw Throwables.propagate(e);
479 } finally {
480 discardTombstones();
481 }
482 }
483
484 /**
485 * Applies committed operations to the state machine.
486 */
487 private CommitResult commitTransaction(TransactionScope transactionScope) {
488 TransactionLog<MapUpdate<String, byte[]>> transactionLog = transactionScope.transactionLog();
489 boolean retainTombstones = !activeTransactions.isEmpty();
490
491 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
492 for (MapUpdate<String, byte[]> record : transactionLog.records()) {
493 if (record.type() == MapUpdate.Type.VERSION_MATCH) {
494 continue;
495 }
496
497 String key = record.key();
498 checkState(preparedKeys.remove(key), "key is not prepared");
499
500 if (record.type() == MapUpdate.Type.LOCK) {
501 continue;
502 }
503
504 MapEntryValue previousValue = mapEntries.remove(key);
505 MapEntryValue newValue = null;
506
507 // If the record is not a delete, create a transactional commit.
508 if (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
509 newValue = new MapEntryValue(MapEntryValue.Type.VALUE, currentVersion, record.value());
510 } else if (retainTombstones) {
511 // For deletes, if tombstones need to be retained then create and store a tombstone commit.
512 newValue = new MapEntryValue(MapEntryValue.Type.TOMBSTONE, currentVersion, null);
513 }
514
515 eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
516
517 if (newValue != null) {
518 mapEntries.put(key, newValue);
519 }
520 }
521 publish(eventsToPublish);
522 return CommitResult.OK;
523 }
524
525 /**
526 * Handles an rollback commit (ha!).
527 *
528 * @param commit transaction rollback commit
529 * @return rollback result
530 */
531 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
532 TransactionId transactionId = commit.value().transactionId();
533 TransactionScope transactionScope = activeTransactions.remove(transactionId);
534 if (transactionScope == null) {
535 return RollbackResult.UNKNOWN_TRANSACTION_ID;
536 } else if (!transactionScope.isPrepared()) {
537 discardTombstones();
538 return RollbackResult.OK;
539 } else {
540 try {
541 transactionScope.transactionLog().records()
542 .forEach(record -> {
543 if (record.type() != MapUpdate.Type.VERSION_MATCH) {
544 preparedKeys.remove(record.key());
545 }
546 });
547 return RollbackResult.OK;
548 } finally {
549 discardTombstones();
550 }
551 }
552
553 }
554
555 /**
556 * Discards tombstones no longer needed by active transactions.
557 */
558 private void discardTombstones() {
559 if (activeTransactions.isEmpty()) {
560 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
561 while (iterator.hasNext()) {
562 MapEntryValue value = iterator.next().getValue();
563 if (value.type() == MapEntryValue.Type.TOMBSTONE) {
564 iterator.remove();
565 }
566 }
567 } else {
568 long lowWaterMark = activeTransactions.values().stream()
569 .mapToLong(TransactionScope::version)
570 .min().getAsLong();
571 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
572 while (iterator.hasNext()) {
573 MapEntryValue value = iterator.next().getValue();
574 if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
575 iterator.remove();
576 }
577 }
578 }
579 }
580
581 /**
582 * Computes the update status that would result if the specified update were to applied to
583 * the state machine.
584 *
585 * @param update update
586 * @return status
587 */
588 private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
589 MapEntryValue existingValue = mapEntries.get(update.key());
590 boolean isEmpty = existingValue == null || existingValue.type() == MapEntryValue.Type.TOMBSTONE;
591 if (isEmpty && update.value() == null) {
592 return MapEntryUpdateResult.Status.NOOP;
593 }
594 if (preparedKeys.contains(update.key())) {
595 return MapEntryUpdateResult.Status.WRITE_LOCK;
596 }
597 byte[] existingRawValue = isEmpty ? null : existingValue.value();
598 Long existingVersion = isEmpty ? null : existingValue.version();
599 return update.valueMatch().matches(existingRawValue)
600 && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
601 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
602 }
603
604 /**
605 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
606 * @param value map entry value
607 * @return versioned instance
608 */
609 private Versioned<byte[]> toVersioned(MapEntryValue value) {
610 return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
611 ? new Versioned<>(value.value(), value.version()) : null;
612 }
613
614 /**
615 * Publishes events to listeners.
616 *
617 * @param events list of map event to publish
618 */
619 private void publish(List<MapEvent<String, byte[]>> events) {
620 listeners.values().forEach(session -> {
621 session.publish(CHANGE, SERIALIZER::encode, events);
622 });
623 }
624
625 @Override
626 public void onExpire(RaftSession session) {
627 closeListener(session.sessionId().id());
628 }
629
630 @Override
631 public void onClose(RaftSession session) {
632 closeListener(session.sessionId().id());
633 }
634
635 private void closeListener(Long sessionId) {
636 listeners.remove(sessionId);
637 }
638
639 /**
640 * Interface implemented by map values.
641 */
642 private static class MapEntryValue {
643 protected final Type type;
644 protected final long version;
645 protected final byte[] value;
646
647 MapEntryValue(Type type, long version, byte[] value) {
648 this.type = type;
649 this.version = version;
650 this.value = value;
651 }
652
653 /**
654 * Returns the value type.
655 *
656 * @return the value type
657 */
658 Type type() {
659 return type;
660 }
661
662 /**
663 * Returns the version of the value.
664 *
665 * @return version
666 */
667 long version() {
668 return version;
669 }
670
671 /**
672 * Returns the raw {@code byte[]}.
673 *
674 * @return raw value
675 */
676 byte[] value() {
677 return value;
678 }
679
680 /**
681 * Value type.
682 */
683 enum Type {
684 VALUE,
685 TOMBSTONE,
686 }
687 }
688
689 /**
690 * Map transaction scope.
691 */
692 private static final class TransactionScope {
693 private final long version;
694 private final TransactionLog<MapUpdate<String, byte[]>> transactionLog;
695
696 private TransactionScope(long version) {
697 this(version, null);
698 }
699
700 private TransactionScope(long version, TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
701 this.version = version;
702 this.transactionLog = transactionLog;
703 }
704
705 /**
706 * Returns the transaction version.
707 *
708 * @return the transaction version
709 */
710 long version() {
711 return version;
712 }
713
714 /**
715 * Returns whether this is a prepared transaction scope.
716 *
717 * @return whether this is a prepared transaction scope
718 */
719 boolean isPrepared() {
720 return transactionLog != null;
721 }
722
723 /**
724 * Returns the transaction commit log.
725 *
726 * @return the transaction commit log
727 */
728 TransactionLog<MapUpdate<String, byte[]>> transactionLog() {
729 checkState(isPrepared());
730 return transactionLog;
731 }
732
733 /**
734 * Returns a new transaction scope with a prepare commit.
735 *
736 * @param commit the prepare commit
737 * @return new transaction scope updated with the prepare commit
738 */
739 TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
740 return new TransactionScope(version, commit.value().transactionLog());
741 }
742 }
743}