[ONOS-6594] Upgrade to Atomix 2.0.0

Change-Id: I6534bca1c8570b4e017f682953b876da29146675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
new file mode 100644
index 0000000..6d8c1b0
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
@@ -0,0 +1,743 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import io.atomix.protocols.raft.service.AbstractRaftService;
+import io.atomix.protocols.raft.service.Commit;
+import io.atomix.protocols.raft.service.RaftServiceExecutor;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Match;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UpdateAndGet;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Versioned;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapEvents.CHANGE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ENTRY_SET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UPDATE_AND_GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
+import static org.onosproject.store.service.MapEvent.Type.INSERT;
+import static org.onosproject.store.service.MapEvent.Type.REMOVE;
+import static org.onosproject.store.service.MapEvent.Type.UPDATE;
+
+/**
+ * State Machine for {@link AtomixConsistentMap} resource.
+ */
+public class AtomixConsistentMapService extends AbstractRaftService {
+
+    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+            .register(KryoNamespaces.BASIC)
+            .register(AtomixConsistentMapOperations.NAMESPACE)
+            .register(AtomixConsistentMapEvents.NAMESPACE)
+            .nextId(KryoNamespace.FLOATING_ID)
+            .register(TransactionScope.class)
+            .register(TransactionLog.class)
+            .register(TransactionId.class)
+            .register(MapEntryValue.class)
+            .register(MapEntryValue.Type.class)
+            .register(new HashMap().keySet().getClass())
+            .build());
+
+    private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
+    private Map<String, MapEntryValue> mapEntries = new HashMap<>();
+    private Set<String> preparedKeys = Sets.newHashSet();
+    private Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
+    private long currentVersion;
+
+    @Override
+    public void snapshot(SnapshotWriter writer) {
+        writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
+        writer.writeObject(preparedKeys, SERIALIZER::encode);
+        writer.writeObject(mapEntries, SERIALIZER::encode);
+        writer.writeObject(activeTransactions, SERIALIZER::encode);
+        writer.writeLong(currentVersion);
+    }
+
+    @Override
+    public void install(SnapshotReader reader) {
+        listeners = new LinkedHashMap<>();
+        for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
+            listeners.put(sessionId, getSessions().getSession(sessionId));
+        }
+        preparedKeys = reader.readObject(SERIALIZER::decode);
+        mapEntries = reader.readObject(SERIALIZER::decode);
+        activeTransactions = reader.readObject(SERIALIZER::decode);
+        currentVersion = reader.readLong();
+    }
+
+    @Override
+    protected void configure(RaftServiceExecutor executor) {
+        // Listeners
+        executor.register(ADD_LISTENER, this::listen);
+        executor.register(REMOVE_LISTENER, this::unlisten);
+        // Queries
+        executor.register(CONTAINS_KEY, SERIALIZER::decode, this::containsKey, SERIALIZER::encode);
+        executor.register(CONTAINS_VALUE, SERIALIZER::decode, this::containsValue, SERIALIZER::encode);
+        executor.register(ENTRY_SET, this::entrySet, SERIALIZER::encode);
+        executor.register(GET, SERIALIZER::decode, this::get, SERIALIZER::encode);
+        executor.register(GET_OR_DEFAULT, SERIALIZER::decode, this::getOrDefault, SERIALIZER::encode);
+        executor.register(IS_EMPTY, this::isEmpty, SERIALIZER::encode);
+        executor.register(KEY_SET, this::keySet, SERIALIZER::encode);
+        executor.register(SIZE, this::size, SERIALIZER::encode);
+        executor.register(VALUES, this::values, SERIALIZER::encode);
+        // Commands
+        executor.register(UPDATE_AND_GET, SERIALIZER::decode, this::updateAndGet, SERIALIZER::encode);
+        executor.register(CLEAR, this::clear, SERIALIZER::encode);
+        executor.register(BEGIN, SERIALIZER::decode, this::begin, SERIALIZER::encode);
+        executor.register(PREPARE, SERIALIZER::decode, this::prepare, SERIALIZER::encode);
+        executor.register(PREPARE_AND_COMMIT, SERIALIZER::decode, this::prepareAndCommit, SERIALIZER::encode);
+        executor.register(COMMIT, SERIALIZER::decode, this::commit, SERIALIZER::encode);
+        executor.register(ROLLBACK, SERIALIZER::decode, this::rollback, SERIALIZER::encode);
+    }
+
+    /**
+     * Handles a contains key commit.
+     *
+     * @param commit containsKey commit
+     * @return {@code true} if map contains key
+     */
+    protected boolean containsKey(Commit<? extends ContainsKey> commit) {
+        MapEntryValue value = mapEntries.get(commit.value().key());
+        return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
+    }
+
+    /**
+     * Handles a contains value commit.
+     *
+     * @param commit containsValue commit
+     * @return {@code true} if map contains value
+     */
+    protected boolean containsValue(Commit<? extends ContainsValue> commit) {
+        Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
+        return mapEntries.values().stream()
+                .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
+                .anyMatch(value -> valueMatch.matches(value.value()));
+    }
+
+    /**
+     * Handles a get commit.
+     *
+     * @param commit get commit
+     * @return value mapped to key
+     */
+    protected Versioned<byte[]> get(Commit<? extends Get> commit) {
+        return toVersioned(mapEntries.get(commit.value().key()));
+    }
+
+    /**
+     * Handles a get or default commit.
+     *
+     * @param commit get or default commit
+     * @return value mapped to key
+     */
+    protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
+        MapEntryValue value = mapEntries.get(commit.value().key());
+        if (value == null) {
+            return new Versioned<>(commit.value().defaultValue(), 0);
+        } else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
+            return new Versioned<>(commit.value().defaultValue(), value.version);
+        } else {
+            return new Versioned<>(value.value(), value.version);
+        }
+    }
+
+    /**
+     * Handles a count commit.
+     *
+     * @param commit size commit
+     * @return number of entries in map
+     */
+    protected int size(Commit<Void> commit) {
+        return (int) mapEntries.values().stream()
+                .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
+                .count();
+    }
+
+    /**
+     * Handles an is empty commit.
+     *
+     * @param commit isEmpty commit
+     * @return {@code true} if map is empty
+     */
+    protected boolean isEmpty(Commit<Void> commit) {
+        return mapEntries.values().stream()
+                .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
+    }
+
+    /**
+     * Handles a keySet commit.
+     *
+     * @param commit keySet commit
+     * @return set of keys in map
+     */
+    protected Set<String> keySet(Commit<Void> commit) {
+        return mapEntries.entrySet().stream()
+                .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Handles a values commit.
+     *
+     * @param commit values commit
+     * @return collection of values in map
+     */
+    protected Collection<Versioned<byte[]>> values(Commit<Void> commit) {
+        return mapEntries.entrySet().stream()
+                .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+                .map(entry -> toVersioned(entry.getValue()))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Handles a entry set commit.
+     *
+     * @param commit entrySet commit
+     * @return set of map entries
+     */
+    protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<Void> commit) {
+        return mapEntries.entrySet().stream()
+                .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
+                .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Handles a update and get commit.
+     *
+     * @param commit updateAndGet commit
+     * @return update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
+        try {
+            MapEntryUpdateResult.Status updateStatus = validate(commit.value());
+            String key = commit.value().key();
+            MapEntryValue oldCommitValue = mapEntries.get(commit.value().key());
+            Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
+
+            if (updateStatus != MapEntryUpdateResult.Status.OK) {
+                return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, oldMapValue);
+            }
+
+            byte[] newValue = commit.value().value();
+            currentVersion = commit.index();
+            Versioned<byte[]> newMapValue = newValue == null ? null
+                    : new Versioned<>(newValue, currentVersion);
+
+            MapEvent.Type updateType = newValue == null ? REMOVE
+                    : oldCommitValue == null ? INSERT : UPDATE;
+
+            // If a value existed in the map, remove and discard the value to ensure disk can be freed.
+            if (updateType == REMOVE || updateType == UPDATE) {
+                mapEntries.remove(key);
+            }
+
+            // If this is an insert/update commit, add the commit to the map entries.
+            if (updateType == INSERT || updateType == UPDATE) {
+                mapEntries.put(key, new MapEntryValue(
+                        MapEntryValue.Type.VALUE,
+                        commit.index(),
+                        commit.value().value()));
+            } else if (!activeTransactions.isEmpty()) {
+                // If this is a delete but transactions are currently running, ensure tombstones are retained
+                // for version checks.
+                mapEntries.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, commit.index(), null));
+            }
+
+            publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
+            return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, newMapValue);
+        } catch (Exception e) {
+            getLogger().error("State machine operation failed", e);
+            throw Throwables.propagate(e);
+        }
+    }
+
+    /**
+     * Handles a clear commit.
+     *
+     * @param commit clear commit
+     * @return clear result
+     */
+    protected MapEntryUpdateResult.Status clear(Commit<Void> commit) {
+        Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
+                .entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<String, MapEntryValue> entry = iterator.next();
+            String key = entry.getKey();
+            MapEntryValue value = entry.getValue();
+            Versioned<byte[]> removedValue = new Versioned<>(value.value(),
+                    value.version());
+            publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
+            iterator.remove();
+        }
+        return MapEntryUpdateResult.Status.OK;
+    }
+
+    /**
+     * Handles a listen commit.
+     *
+     * @param commit listen commit
+     */
+    protected void listen(Commit<Void> commit) {
+        listeners.put(commit.session().sessionId().id(), commit.session());
+    }
+
+    /**
+     * Handles an unlisten commit.
+     *
+     * @param commit unlisten commit
+     */
+    protected void unlisten(Commit<Void> commit) {
+        listeners.remove(commit.session().sessionId().id());
+    }
+
+    /**
+     * Handles a begin commit.
+     *
+     * @param commit transaction begin commit
+     * @return transaction state version
+     */
+    protected long begin(Commit<? extends TransactionBegin> commit) {
+        long version = commit.index();
+        activeTransactions.put(commit.value().transactionId(), new TransactionScope(version));
+        return version;
+    }
+
+    /**
+     * Handles an prepare and commit commit.
+     *
+     * @param commit transaction prepare and commit commit
+     * @return prepare result
+     */
+    protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
+        TransactionId transactionId = commit.value().transactionLog().transactionId();
+        PrepareResult prepareResult = prepare(commit);
+        TransactionScope transactionScope = activeTransactions.remove(transactionId);
+        if (prepareResult == PrepareResult.OK) {
+            this.currentVersion = commit.index();
+            transactionScope = transactionScope.prepared(commit);
+            commitTransaction(transactionScope);
+        }
+        discardTombstones();
+        return prepareResult;
+    }
+
+    /**
+     * Handles an prepare commit.
+     *
+     * @param commit transaction prepare commit
+     * @return prepare result
+     */
+    protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
+        try {
+            TransactionLog<MapUpdate<String, byte[]>> transactionLog = commit.value().transactionLog();
+
+            // Iterate through records in the transaction log and perform isolation checks.
+            for (MapUpdate<String, byte[]> record : transactionLog.records()) {
+                String key = record.key();
+
+                // If the record is a VERSION_MATCH then check that the record's version matches the current
+                // version of the state machine.
+                if (record.type() == MapUpdate.Type.VERSION_MATCH && key == null) {
+                    if (record.version() > currentVersion) {
+                        return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+                    } else {
+                        continue;
+                    }
+                }
+
+                // If the prepared keys already contains the key contained within the record, that indicates a
+                // conflict with a concurrent transaction.
+                if (preparedKeys.contains(key)) {
+                    return PrepareResult.CONCURRENT_TRANSACTION;
+                }
+
+                // Read the existing value from the map.
+                MapEntryValue existingValue = mapEntries.get(key);
+
+                // Note: if the existing value is null, that means the key has not changed during the transaction,
+                // otherwise a tombstone would have been retained.
+                if (existingValue == null) {
+                    // If the value is null, ensure the version is equal to the transaction version.
+                    if (record.version() != transactionLog.version()) {
+                        return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+                    }
+                } else {
+                    // If the value is non-null, compare the current version with the record version.
+                    if (existingValue.version() > record.version()) {
+                        return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+                    }
+                }
+            }
+
+            // No violations detected. Mark modified keys locked for transactions.
+            transactionLog.records().forEach(record -> {
+                if (record.type() != MapUpdate.Type.VERSION_MATCH) {
+                    preparedKeys.add(record.key());
+                }
+            });
+
+            // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
+            // coordinator is communicating with another node. Transactions assume that the client is communicating
+            // with a single leader in order to limit the overhead of retaining tombstones.
+            TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
+            if (transactionScope == null) {
+                activeTransactions.put(
+                        transactionLog.transactionId(),
+                        new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
+                return PrepareResult.PARTIAL_FAILURE;
+            } else {
+                activeTransactions.put(
+                        transactionLog.transactionId(),
+                        transactionScope.prepared(commit));
+                return PrepareResult.OK;
+            }
+        } catch (Exception e) {
+            getLogger().warn("Failure applying {}", commit, e);
+            throw Throwables.propagate(e);
+        }
+    }
+
+    /**
+     * Handles an commit commit (ha!).
+     *
+     * @param commit transaction commit commit
+     * @return commit result
+     */
+    protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
+        TransactionId transactionId = commit.value().transactionId();
+        TransactionScope transactionScope = activeTransactions.remove(transactionId);
+        if (transactionScope == null) {
+            return CommitResult.UNKNOWN_TRANSACTION_ID;
+        }
+
+        try {
+            this.currentVersion = commit.index();
+            return commitTransaction(transactionScope);
+        } catch (Exception e) {
+            getLogger().warn("Failure applying {}", commit, e);
+            throw Throwables.propagate(e);
+        } finally {
+            discardTombstones();
+        }
+    }
+
+    /**
+     * Applies committed operations to the state machine.
+     */
+    private CommitResult commitTransaction(TransactionScope transactionScope) {
+        TransactionLog<MapUpdate<String, byte[]>> transactionLog = transactionScope.transactionLog();
+        boolean retainTombstones = !activeTransactions.isEmpty();
+
+        List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
+        for (MapUpdate<String, byte[]> record : transactionLog.records()) {
+            if (record.type() == MapUpdate.Type.VERSION_MATCH) {
+                continue;
+            }
+
+            String key = record.key();
+            checkState(preparedKeys.remove(key), "key is not prepared");
+
+            if (record.type() == MapUpdate.Type.LOCK) {
+                continue;
+            }
+
+            MapEntryValue previousValue = mapEntries.remove(key);
+            MapEntryValue newValue = null;
+
+            // If the record is not a delete, create a transactional commit.
+            if (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
+                newValue = new MapEntryValue(MapEntryValue.Type.VALUE, currentVersion, record.value());
+            } else if (retainTombstones) {
+                // For deletes, if tombstones need to be retained then create and store a tombstone commit.
+                newValue = new MapEntryValue(MapEntryValue.Type.TOMBSTONE, currentVersion, null);
+            }
+
+            eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
+
+            if (newValue != null) {
+                mapEntries.put(key, newValue);
+            }
+        }
+        publish(eventsToPublish);
+        return CommitResult.OK;
+    }
+
+    /**
+     * Handles an rollback commit (ha!).
+     *
+     * @param commit transaction rollback commit
+     * @return rollback result
+     */
+    protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
+        TransactionId transactionId = commit.value().transactionId();
+        TransactionScope transactionScope = activeTransactions.remove(transactionId);
+        if (transactionScope == null) {
+            return RollbackResult.UNKNOWN_TRANSACTION_ID;
+        } else if (!transactionScope.isPrepared()) {
+            discardTombstones();
+            return RollbackResult.OK;
+        } else {
+            try {
+                transactionScope.transactionLog().records()
+                        .forEach(record -> {
+                            if (record.type() != MapUpdate.Type.VERSION_MATCH) {
+                                preparedKeys.remove(record.key());
+                            }
+                        });
+                return RollbackResult.OK;
+            } finally {
+                discardTombstones();
+            }
+        }
+
+    }
+
+    /**
+     * Discards tombstones no longer needed by active transactions.
+     */
+    private void discardTombstones() {
+        if (activeTransactions.isEmpty()) {
+            Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+            while (iterator.hasNext()) {
+                MapEntryValue value = iterator.next().getValue();
+                if (value.type() == MapEntryValue.Type.TOMBSTONE) {
+                    iterator.remove();
+                }
+            }
+        } else {
+            long lowWaterMark = activeTransactions.values().stream()
+                    .mapToLong(TransactionScope::version)
+                    .min().getAsLong();
+            Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+            while (iterator.hasNext()) {
+                MapEntryValue value = iterator.next().getValue();
+                if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
+                    iterator.remove();
+                }
+            }
+        }
+    }
+
+    /**
+     * Computes the update status that would result if the specified update were to applied to
+     * the state machine.
+     *
+     * @param update update
+     * @return status
+     */
+    private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
+        MapEntryValue existingValue = mapEntries.get(update.key());
+        boolean isEmpty = existingValue == null || existingValue.type() == MapEntryValue.Type.TOMBSTONE;
+        if (isEmpty && update.value() == null) {
+            return MapEntryUpdateResult.Status.NOOP;
+        }
+        if (preparedKeys.contains(update.key())) {
+            return MapEntryUpdateResult.Status.WRITE_LOCK;
+        }
+        byte[] existingRawValue = isEmpty ? null : existingValue.value();
+        Long existingVersion = isEmpty ? null : existingValue.version();
+        return update.valueMatch().matches(existingRawValue)
+                && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
+                : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
+    }
+
+    /**
+     * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
+     * @param value map entry value
+     * @return versioned instance
+     */
+    private Versioned<byte[]> toVersioned(MapEntryValue value) {
+        return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
+                ? new Versioned<>(value.value(), value.version()) : null;
+    }
+
+    /**
+     * Publishes events to listeners.
+     *
+     * @param events list of map event to publish
+     */
+    private void publish(List<MapEvent<String, byte[]>> events) {
+        listeners.values().forEach(session -> {
+            session.publish(CHANGE, SERIALIZER::encode, events);
+        });
+    }
+
+    @Override
+    public void onExpire(RaftSession session) {
+        closeListener(session.sessionId().id());
+    }
+
+    @Override
+    public void onClose(RaftSession session) {
+        closeListener(session.sessionId().id());
+    }
+
+    private void closeListener(Long sessionId) {
+        listeners.remove(sessionId);
+    }
+
+    /**
+     * Interface implemented by map values.
+     */
+    private static class MapEntryValue {
+        protected final Type type;
+        protected final long version;
+        protected final byte[] value;
+
+        MapEntryValue(Type type, long version, byte[] value) {
+            this.type = type;
+            this.version = version;
+            this.value = value;
+        }
+
+        /**
+         * Returns the value type.
+         *
+         * @return the value type
+         */
+        Type type() {
+            return type;
+        }
+
+        /**
+         * Returns the version of the value.
+         *
+         * @return version
+         */
+        long version() {
+            return version;
+        }
+
+        /**
+         * Returns the raw {@code byte[]}.
+         *
+         * @return raw value
+         */
+        byte[] value() {
+            return value;
+        }
+
+        /**
+         * Value type.
+         */
+        enum Type {
+            VALUE,
+            TOMBSTONE,
+        }
+    }
+
+    /**
+     * Map transaction scope.
+     */
+    private static final class TransactionScope {
+        private final long version;
+        private final TransactionLog<MapUpdate<String, byte[]>> transactionLog;
+
+        private TransactionScope(long version) {
+            this(version, null);
+        }
+
+        private TransactionScope(long version, TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+            this.version = version;
+            this.transactionLog = transactionLog;
+        }
+
+        /**
+         * Returns the transaction version.
+         *
+         * @return the transaction version
+         */
+        long version() {
+            return version;
+        }
+
+        /**
+         * Returns whether this is a prepared transaction scope.
+         *
+         * @return whether this is a prepared transaction scope
+         */
+        boolean isPrepared() {
+            return transactionLog != null;
+        }
+
+        /**
+         * Returns the transaction commit log.
+         *
+         * @return the transaction commit log
+         */
+        TransactionLog<MapUpdate<String, byte[]>> transactionLog() {
+            checkState(isPrepared());
+            return transactionLog;
+        }
+
+        /**
+         * Returns a new transaction scope with a prepare commit.
+         *
+         * @param commit the prepare commit
+         * @return new transaction scope updated with the prepare commit
+         */
+        TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
+            return new TransactionScope(version, commit.value().transactionLog());
+        }
+    }
+}
\ No newline at end of file