/*
 * Copyright 2016-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.store.primitives.resources.impl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftServiceExecutor;
import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Versioned;

import static com.google.common.base.Preconditions.checkState;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapEvents.CHANGE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.BEGIN;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLEAR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CLOSE_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.COMMIT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.CONTAINS_VALUE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ENTRY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET_OR_DEFAULT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IS_EMPTY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorBatch;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.IteratorPosition;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.NEXT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.OPEN_ITERATOR;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;

/**
 * State Machine for {@link AtomixConsistentMap} resource.
 */
public class AtomixConsistentMapService extends AbstractRaftService {

    private static final int MAX_ITERATOR_BATCH_SIZE = 1024 * 32;

    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
        .register(KryoNamespaces.BASIC)
        .register(AtomixConsistentMapOperations.NAMESPACE)
        .register(AtomixConsistentMapEvents.NAMESPACE)
        .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
        .register(TransactionScope.class)
        .register(TransactionLog.class)
        .register(TransactionId.class)
        .register(MapEntryValue.class)
        .register(MapEntryValue.Type.class)
        .register(new HashMap().keySet().getClass())
        .build());

    protected Map<Long, RaftSession> listeners = new LinkedHashMap<>();
    private Map<String, MapEntryValue> map;
    protected Set<String> preparedKeys = Sets.newHashSet();
    private Map<Long, IteratorContext> iterators = Maps.newHashMap();
    protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
    protected long currentVersion;

    public AtomixConsistentMapService() {
        map = createMap();
    }

    protected Map<String, MapEntryValue> createMap() {
        return Maps.newConcurrentMap();
    }

    protected Map<String, MapEntryValue> entries() {
        return map;
    }

    protected Serializer serializer() {
        return SERIALIZER;
    }

    @Override
    public void snapshot(SnapshotWriter writer) {
        writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer()::encode);
        writer.writeObject(preparedKeys, serializer()::encode);
        writer.writeObject(entries(), serializer()::encode);
        writer.writeObject(activeTransactions, serializer()::encode);
        writer.writeLong(currentVersion);

        Map<Long, Long> iterators = Maps.newHashMap();
        this.iterators.forEach((id, context) -> iterators.put(id, context.sessionId));
        writer.writeObject(iterators, serializer()::encode);
    }

    @Override
    public void install(SnapshotReader reader) {
        listeners = new LinkedHashMap<>();
        for (Long sessionId : reader.<Set<Long>>readObject(serializer()::decode)) {
            listeners.put(sessionId, sessions().getSession(sessionId));
        }
        preparedKeys = reader.readObject(serializer()::decode);
        map = reader.readObject(serializer()::decode);
        activeTransactions = reader.readObject(serializer()::decode);
        currentVersion = reader.readLong();

        Map<Long, Long> iterators = reader.readObject(serializer()::decode);
        this.iterators = Maps.newHashMap();
        iterators.forEach((id, session) ->
            this.iterators.put(id, new IteratorContext(session, entries().entrySet().iterator())));
    }

    @Override
    protected void configure(RaftServiceExecutor executor) {
        // Listeners
        executor.register(ADD_LISTENER, (Commit<Void> c) -> listen(c.session()));
        executor.register(REMOVE_LISTENER, (Commit<Void> c) -> unlisten(c.session()));
        // Queries
        executor.register(CONTAINS_KEY, serializer()::decode, this::containsKey, serializer()::encode);
        executor.register(CONTAINS_VALUE, serializer()::decode, this::containsValue, serializer()::encode);
        executor.register(ENTRY_SET, (Commit<Void> c) -> entrySet(), serializer()::encode);
        executor.register(GET, serializer()::decode, this::get, serializer()::encode);
        executor.register(GET_OR_DEFAULT, serializer()::decode, this::getOrDefault, serializer()::encode);
        executor.register(IS_EMPTY, (Commit<Void> c) -> isEmpty(), serializer()::encode);
        executor.register(KEY_SET, (Commit<Void> c) -> keySet(), serializer()::encode);
        executor.register(SIZE, (Commit<Void> c) -> size(), serializer()::encode);
        executor.register(VALUES, (Commit<Void> c) -> values(), serializer()::encode);
        // Commands
        executor.register(PUT, serializer()::decode, this::put, serializer()::encode);
        executor.register(PUT_IF_ABSENT, serializer()::decode, this::putIfAbsent, serializer()::encode);
        executor.register(PUT_AND_GET, serializer()::decode, this::putAndGet, serializer()::encode);
        executor.register(REMOVE, serializer()::decode, this::remove, serializer()::encode);
        executor.register(REMOVE_VALUE, serializer()::decode, this::removeValue, serializer()::encode);
        executor.register(REMOVE_VERSION, serializer()::decode, this::removeVersion, serializer()::encode);
        executor.register(REPLACE, serializer()::decode, this::replace, serializer()::encode);
        executor.register(REPLACE_VALUE, serializer()::decode, this::replaceValue, serializer()::encode);
        executor.register(REPLACE_VERSION, serializer()::decode, this::replaceVersion, serializer()::encode);
        executor.register(CLEAR, (Commit<Void> c) -> clear(), serializer()::encode);
        executor.register(BEGIN, serializer()::decode, this::begin, serializer()::encode);
        executor.register(PREPARE, serializer()::decode, this::prepare, serializer()::encode);
        executor.register(PREPARE_AND_COMMIT, serializer()::decode, this::prepareAndCommit, serializer()::encode);
        executor.register(COMMIT, serializer()::decode, this::commit, serializer()::encode);
        executor.register(ROLLBACK, serializer()::decode, this::rollback, serializer()::encode);
        executor.register(OPEN_ITERATOR, this::openIterator, serializer()::encode);
        executor.register(NEXT, serializer()::decode, this::next, serializer()::encode);
        executor.register(CLOSE_ITERATOR, serializer()::decode, this::closeIterator);
    }

    /**
     * Handles a contains key commit.
     *
     * @param commit containsKey commit
     * @return {@code true} if map contains key
     */
    protected boolean containsKey(Commit<? extends ContainsKey> commit) {
        MapEntryValue value = entries().get(commit.value().key());
        return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
    }

    /**
     * Handles a contains value commit.
     *
     * @param commit containsValue commit
     * @return {@code true} if map contains value
     */
    protected boolean containsValue(Commit<? extends ContainsValue> commit) {
        Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
        return entries().values().stream()
            .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
            .anyMatch(value -> valueMatch.matches(value.value()));
    }

    /**
     * Handles a get commit.
     *
     * @param commit get commit
     * @return value mapped to key
     */
    protected Versioned<byte[]> get(Commit<? extends Get> commit) {
        return toVersioned(entries().get(commit.value().key()));
    }

    /**
     * Handles a get or default commit.
     *
     * @param commit get or default commit
     * @return value mapped to key
     */
    protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
        MapEntryValue value = entries().get(commit.value().key());
        if (value == null) {
            return new Versioned<>(commit.value().defaultValue(), 0);
        } else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
            return new Versioned<>(commit.value().defaultValue(), value.version);
        } else {
            return new Versioned<>(value.value(), value.version);
        }
    }

    /**
     * Handles a size commit.
     *
     * @return number of entries in map
     */
    protected int size() {
        return (int) entries().values().stream()
            .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
            .count();
    }

    /**
     * Handles an is empty commit.
     *
     * @return {@code true} if map is empty
     */
    protected boolean isEmpty() {
        return entries().values().stream()
            .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
    }

    /**
     * Handles a keySet commit.
     *
     * @return set of keys in map
     */
    protected Set<String> keySet() {
        return entries().entrySet().stream()
            .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
            .map(Map.Entry::getKey)
            .collect(Collectors.toSet());
    }

    /**
     * Handles a values commit.
     *
     * @return collection of values in map
     */
    protected Collection<Versioned<byte[]>> values() {
        return entries().entrySet().stream()
            .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
            .map(entry -> toVersioned(entry.getValue()))
            .collect(Collectors.toList());
    }

    /**
     * Handles a entry set commit.
     *
     * @return set of map entries
     */
    protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet() {
        return entries().entrySet().stream()
            .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
            .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
            .collect(Collectors.toSet());
    }

    /**
     * Returns a boolean indicating whether the given MapEntryValues are equal.
     *
     * @param oldValue the first value to compare
     * @param newValue the second value to compare
     * @return indicates whether the two values are equal
     */
    protected boolean valuesEqual(MapEntryValue oldValue, MapEntryValue newValue) {
        return (oldValue == null && newValue == null)
            || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
    }

    /**
     * Returns a boolean indicating whether the given entry values are equal.
     *
     * @param oldValue the first value to compare
     * @param newValue the second value to compare
     * @return indicates whether the two values are equal
     */
    protected boolean valuesEqual(byte[] oldValue, byte[] newValue) {
        return (oldValue == null && newValue == null)
            || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
    }

    /**
     * Returns a boolean indicating whether the given MapEntryValue is null or a tombstone.
     *
     * @param value the value to check
     * @return indicates whether the given value is null or is a tombstone
     */
    protected boolean valueIsNull(MapEntryValue value) {
        return value == null || value.type() == MapEntryValue.Type.TOMBSTONE;
    }

    /**
     * Handles a put commit.
     *
     * @param commit put commit
     * @return map entry update result
     */
    protected MapEntryUpdateResult<String, byte[]> put(Commit<? extends Put> commit) {
        String key = commit.value().key();
        MapEntryValue oldValue = entries().get(key);
        MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());

        // If the value is null or a tombstone, this is an insert.
        // Otherwise, only update the value if it has changed to reduce the number of events.
        if (valueIsNull(oldValue)) {
            // If the key has been locked by a transaction, return a WRITE_LOCK error.
            if (preparedKeys.contains(key)) {
                return new MapEntryUpdateResult<>(
                    MapEntryUpdateResult.Status.WRITE_LOCK,
                    commit.index(),
                    key,
                    toVersioned(oldValue));
            }
            entries().put(commit.value().key(),
                new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
            Versioned<byte[]> result = toVersioned(oldValue);
            publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
        } else if (!valuesEqual(oldValue, newValue)) {
            // If the key has been locked by a transaction, return a WRITE_LOCK error.
            if (preparedKeys.contains(key)) {
                return new MapEntryUpdateResult<>(
                    MapEntryUpdateResult.Status.WRITE_LOCK,
                    commit.index(),
                    key,
                    toVersioned(oldValue));
            }
            entries().put(commit.value().key(),
                new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
            Versioned<byte[]> result = toVersioned(oldValue);
            publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
        }
        // If the value hasn't changed, return a NOOP result.
        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
    }

    /**
     * Handles a putIfAbsent commit.
     *
     * @param commit putIfAbsent commit
     * @return map entry update result
     */
    protected MapEntryUpdateResult<String, byte[]> putIfAbsent(Commit<? extends Put> commit) {
        String key = commit.value().key();
        MapEntryValue oldValue = entries().get(key);

        // If the value is null, this is an INSERT.
        if (valueIsNull(oldValue)) {
            // If the key has been locked by a transaction, return a WRITE_LOCK error.
            if (preparedKeys.contains(key)) {
                return new MapEntryUpdateResult<>(
                    MapEntryUpdateResult.Status.WRITE_LOCK,
                    commit.index(),
                    key,
                    toVersioned(oldValue));
            }
            MapEntryValue newValue = new MapEntryValue(
                MapEntryValue.Type.VALUE,
                commit.index(),
                commit.value().value());
            entries().put(commit.value().key(), newValue);
            Versioned<byte[]> result = toVersioned(newValue);
            publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
        }
        return new MapEntryUpdateResult<>(
            MapEntryUpdateResult.Status.PRECONDITION_FAILED,
            commit.index(),
            key,
            toVersioned(oldValue));
    }

    /**
     * Handles a putAndGet commit.
     *
     * @param commit putAndGet commit
     * @return map entry update result
     */
    protected MapEntryUpdateResult<String, byte[]> putAndGet(Commit<? extends Put> commit) {
        String key = commit.value().key();
        MapEntryValue oldValue = entries().get(key);
        MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());

        // If the value is null or a tombstone, this is an insert.
        // Otherwise, only update the value if it has changed to reduce the number of events.
        if (valueIsNull(oldValue)) {
            // If the key has been locked by a transaction, return a WRITE_LOCK error.
            if (preparedKeys.contains(key)) {
                return new MapEntryUpdateResult<>(
                    MapEntryUpdateResult.Status.WRITE_LOCK,
                    commit.index(),
                    key,
                    toVersioned(oldValue));
            }
            entries().put(commit.value().key(), newValue);
            Versioned<byte[]> result = toVersioned(newValue);
            publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
        } else if (!valuesEqual(oldValue, newValue)) {
            // If the key has been locked by a transaction, return a WRITE_LOCK error.
            if (preparedKeys.contains(key)) {
                return new MapEntryUpdateResult<>(
                    MapEntryUpdateResult.Status.WRITE_LOCK,
                    commit.index(),
                    key,
                    toVersioned(oldValue));
            }
            entries().put(commit.value().key(), newValue);
            Versioned<byte[]> result = toVersioned(newValue);
            publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, result, toVersioned(oldValue)));
            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
        }
        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
    }

    /**
     * Handles a remove commit.
     *
     * @param index     the commit index
     * @param key       the key to remove
     * @param predicate predicate to determine whether to remove the entry
     * @return map entry update result
     */
    private MapEntryUpdateResult<String, byte[]> removeIf(long index, String key, Predicate<MapEntryValue> predicate) {
        MapEntryValue value = entries().get(key);

        // If the value does not exist or doesn't match the predicate, return a PRECONDITION_FAILED error.
        if (valueIsNull(value) || !predicate.test(value)) {
            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.PRECONDITION_FAILED, index, key, null);
        }

        // If the key has been locked by a transaction, return a WRITE_LOCK error.
        if (preparedKeys.contains(key)) {
            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
        }

        // If no transactions are active, remove the key. Otherwise, replace it with a tombstone.
        if (activeTransactions.isEmpty()) {
            entries().remove(key);
        } else {
            entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, index, null));
        }

        Versioned<byte[]> result = toVersioned(value);
        publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result));
        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
    }

    /**
     * Handles a remove commit.
     *
     * @param commit remove commit
     * @return map entry update result
     */
    protected MapEntryUpdateResult<String, byte[]> remove(Commit<? extends Remove> commit) {
        return removeIf(commit.index(), commit.value().key(), v -> true);
    }

    /**
     * Handles a removeValue commit.
     *
     * @param commit removeValue commit
     * @return map entry update result
     */
    protected MapEntryUpdateResult<String, byte[]> removeValue(Commit<? extends RemoveValue> commit) {
        return removeIf(commit.index(), commit.value().key(), v ->
            valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
    }

    /**
     * Handles a removeVersion commit.
     *
     * @param commit removeVersion commit
     * @return map entry update result
     */
    protected MapEntryUpdateResult<String, byte[]> removeVersion(Commit<? extends RemoveVersion> commit) {
        return removeIf(commit.index(), commit.value().key(), v -> v.version() == commit.value().version());
    }

    /**
     * Handles a replace commit.
     *
     * @param index     the commit index
     * @param key       the key to replace
     * @param newValue  the value with which to replace the key
     * @param predicate a predicate to determine whether to replace the key
     * @return map entry update result
     */
    private MapEntryUpdateResult<String, byte[]> replaceIf(
        long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
        MapEntryValue oldValue = entries().get(key);

        // If the key is not set or the current value doesn't match the predicate, return a PRECONDITION_FAILED error.
        if (valueIsNull(oldValue) || !predicate.test(oldValue)) {
            return new MapEntryUpdateResult<>(
                MapEntryUpdateResult.Status.PRECONDITION_FAILED,
                index,
                key,
                toVersioned(oldValue));
        }

        // If the key has been locked by a transaction, return a WRITE_LOCK error.
        if (preparedKeys.contains(key)) {
            return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
        }

        entries().put(key, newValue);
        Versioned<byte[]> result = toVersioned(oldValue);
        publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
        return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
    }

    /**
     * Handles a replace commit.
     *
     * @param commit replace commit
     * @return map entry update result
     */
    protected MapEntryUpdateResult<String, byte[]> replace(Commit<? extends Replace> commit) {
        MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
        return replaceIf(commit.index(), commit.value().key(), value, v -> true);
    }

    /**
     * Handles a replaceValue commit.
     *
     * @param commit replaceValue commit
     * @return map entry update result
     */
    protected MapEntryUpdateResult<String, byte[]> replaceValue(Commit<? extends ReplaceValue> commit) {
        MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
        return replaceIf(commit.index(), commit.value().key(), value,
            v -> valuesEqual(v.value(), commit.value().oldValue()));
    }

    /**
     * Handles a replaceVersion commit.
     *
     * @param commit replaceVersion commit
     * @return map entry update result
     */
    protected MapEntryUpdateResult<String, byte[]> replaceVersion(Commit<? extends ReplaceVersion> commit) {
        MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
        return replaceIf(commit.index(), commit.value().key(), value,
            v -> v.version() == commit.value().oldVersion());
    }

    /**
     * Handles a clear commit.
     *
     * @return clear result
     */
    protected MapEntryUpdateResult.Status clear() {
        Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
        Map<String, MapEntryValue> entriesToAdd = new HashMap<>();
        while (iterator.hasNext()) {
            Map.Entry<String, MapEntryValue> entry = iterator.next();
            String key = entry.getKey();
            MapEntryValue value = entry.getValue();
            if (!valueIsNull(value)) {
                Versioned<byte[]> removedValue = new Versioned<>(value.value(), value.version());
                publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, removedValue));
                if (activeTransactions.isEmpty()) {
                    iterator.remove();
                } else {
                    entriesToAdd.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
                }
            }
        }
        entries().putAll(entriesToAdd);
        return MapEntryUpdateResult.Status.OK;
    }

    /**
     * Handles an open iterator commit.
     *
     * @param commit the open iterator commit
     * @return iterator identifier
     */
    protected long openIterator(Commit<Void> commit) {
        iterators.put(commit.index(), new IteratorContext(
            commit.session().sessionId().id(),
            entries().entrySet().iterator()));
        return commit.index();
    }

    /**
     * Handles an iterator next commit.
     *
     * @param commit the next commit
     * @return a list of entries to iterate
     */
    protected IteratorBatch next(Commit<IteratorPosition> commit) {
        final long iteratorId = commit.value().iteratorId();
        final int position = commit.value().position();

        IteratorContext context = iterators.get(iteratorId);
        if (context == null) {
            return null;
        }

        List<Map.Entry<String, Versioned<byte[]>>> entries = new ArrayList<>();
        int size = 0;
        while (context.iterator.hasNext()) {
            context.position++;
            if (context.position > position) {
                Map.Entry<String, MapEntryValue> entry = context.iterator.next();
                String key = entry.getKey();
                Versioned<byte[]> value = toVersioned(entry.getValue());
                size += key.length();
                size += value.value() != null ? value.value().length : 0;
                entries.add(Maps.immutableEntry(key, value));

                if (size >= MAX_ITERATOR_BATCH_SIZE) {
                    break;
                }
            }
        }

        if (entries.isEmpty()) {
            return null;
        }
        return new IteratorBatch(context.position, entries);
    }

    /**
     * Handles a close iterator commit.
     *
     * @param commit the close iterator commit
     */
    protected void closeIterator(Commit<Long> commit) {
        iterators.remove(commit.value());
    }

    /**
     * Handles a listen commit.
     *
     * @param session listen session
     */
    protected void listen(RaftSession session) {
        listeners.put(session.sessionId().id(), session);
    }

    /**
     * Handles an unlisten commit.
     *
     * @param session unlisten session
     */
    protected void unlisten(RaftSession session) {
        listeners.remove(session.sessionId().id());
    }

    /**
     * Handles a begin commit.
     *
     * @param commit transaction begin commit
     * @return transaction state version
     */
    protected long begin(Commit<? extends TransactionBegin> commit) {
        long version = commit.index();
        activeTransactions.put(commit.value().transactionId(), new TransactionScope(version));
        return version;
    }

    /**
     * Handles an prepare and commit commit.
     *
     * @param commit transaction prepare and commit commit
     * @return prepare result
     */
    protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
        TransactionId transactionId = commit.value().transactionLog().transactionId();
        PrepareResult prepareResult = prepare(commit);
        TransactionScope transactionScope = activeTransactions.remove(transactionId);
        if (prepareResult == PrepareResult.OK) {
            this.currentVersion = commit.index();
            transactionScope = transactionScope.prepared(commit);
            commitTransaction(transactionScope);
        }
        discardTombstones();
        return prepareResult;
    }

    /**
     * Handles an prepare commit.
     *
     * @param commit transaction prepare commit
     * @return prepare result
     */
    protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
        try {
            TransactionLog<MapUpdate<String, byte[]>> transactionLog = commit.value().transactionLog();

            // Iterate through records in the transaction log and perform isolation checks.
            for (MapUpdate<String, byte[]> record : transactionLog.records()) {
                String key = record.key();

                // If the record is a VERSION_MATCH then check that the record's version matches the current
                // version of the state machine.
                if (record.type() == MapUpdate.Type.VERSION_MATCH && key == null) {
                    if (record.version() > currentVersion) {
                        return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
                    } else {
                        continue;
                    }
                }

                // If the prepared keys already contains the key contained within the record, that indicates a
                // conflict with a concurrent transaction.
                if (preparedKeys.contains(key)) {
                    return PrepareResult.CONCURRENT_TRANSACTION;
                }

                // Read the existing value from the map.
                MapEntryValue existingValue = entries().get(key);

                // Note: if the existing value is null, that means the key has not changed during the transaction,
                // otherwise a tombstone would have been retained.
                if (existingValue == null) {
                    // If the value is null, ensure the version is equal to the transaction version.
                    if (record.version() != transactionLog.version()) {
                        return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
                    }
                } else {
                    // If the value is non-null, compare the current version with the record version.
                    if (existingValue.version() > record.version()) {
                        return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
                    }
                }
            }

            // No violations detected. Mark modified keys locked for transactions.
            transactionLog.records().forEach(record -> {
                if (record.type() != MapUpdate.Type.VERSION_MATCH) {
                    preparedKeys.add(record.key());
                }
            });

            // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
            // coordinator is communicating with another node. Transactions assume that the client is communicating
            // with a single leader in order to limit the overhead of retaining tombstones.
            TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
            if (transactionScope == null) {
                activeTransactions.put(
                    transactionLog.transactionId(),
                    new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
                return PrepareResult.PARTIAL_FAILURE;
            } else {
                activeTransactions.put(
                    transactionLog.transactionId(),
                    transactionScope.prepared(commit));
                return PrepareResult.OK;
            }
        } catch (Exception e) {
            logger().warn("Failure applying {}", commit, e);
            throw new IllegalStateException(e);
        }
    }

    /**
     * Handles an commit commit (ha!).
     *
     * @param commit transaction commit commit
     * @return commit result
     */
    protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
        TransactionId transactionId = commit.value().transactionId();
        TransactionScope transactionScope = activeTransactions.remove(transactionId);
        if (transactionScope == null) {
            return CommitResult.UNKNOWN_TRANSACTION_ID;
        }

        try {
            this.currentVersion = commit.index();
            return commitTransaction(transactionScope);
        } catch (Exception e) {
            logger().warn("Failure applying {}", commit, e);
            throw new IllegalStateException(e);
        } finally {
            discardTombstones();
        }
    }

    /**
     * Applies committed operations to the state machine.
     */
    private CommitResult commitTransaction(TransactionScope transactionScope) {
        TransactionLog<MapUpdate<String, byte[]>> transactionLog = transactionScope.transactionLog();
        boolean retainTombstones = !activeTransactions.isEmpty();

        List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
        for (MapUpdate<String, byte[]> record : transactionLog.records()) {
            if (record.type() == MapUpdate.Type.VERSION_MATCH) {
                continue;
            }

            String key = record.key();
            checkState(preparedKeys.remove(key), "key is not prepared");

            if (record.type() == MapUpdate.Type.LOCK) {
                continue;
            }

            MapEntryValue previousValue = entries().remove(key);
            MapEntryValue newValue = null;

            // If the record is not a delete, create a transactional commit.
            if (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
                newValue = new MapEntryValue(MapEntryValue.Type.VALUE, currentVersion, record.value());
            } else if (retainTombstones) {
                // For deletes, if tombstones need to be retained then create and store a tombstone commit.
                newValue = new MapEntryValue(MapEntryValue.Type.TOMBSTONE, currentVersion, null);
            }

            MapEvent<String, byte[]> event;
            if (newValue != null) {
                entries().put(key, newValue);
                if (!valueIsNull(newValue)) {
                    if (!valueIsNull(previousValue)) {
                        event = new MapEvent<>(
                            MapEvent.Type.UPDATE,
                            "",
                            key,
                            toVersioned(newValue),
                            toVersioned(previousValue));
                    } else {
                        event = new MapEvent<>(
                            MapEvent.Type.INSERT,
                            "",
                            key,
                            toVersioned(newValue),
                            null);
                    }
                } else {
                    event = new MapEvent<>(
                        MapEvent.Type.REMOVE,
                        "",
                        key,
                        null,
                        toVersioned(previousValue));
                }
            } else {
                event = new MapEvent<>(
                    MapEvent.Type.REMOVE,
                    "",
                    key,
                    null,
                    toVersioned(previousValue));
            }
            eventsToPublish.add(event);
        }
        publish(eventsToPublish);
        return CommitResult.OK;
    }

    /**
     * Handles an rollback commit (ha!).
     *
     * @param commit transaction rollback commit
     * @return rollback result
     */
    protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
        TransactionId transactionId = commit.value().transactionId();
        TransactionScope transactionScope = activeTransactions.remove(transactionId);
        if (transactionScope == null) {
            return RollbackResult.UNKNOWN_TRANSACTION_ID;
        } else if (!transactionScope.isPrepared()) {
            discardTombstones();
            return RollbackResult.OK;
        } else {
            try {
                transactionScope.transactionLog().records()
                    .forEach(record -> {
                        if (record.type() != MapUpdate.Type.VERSION_MATCH) {
                            preparedKeys.remove(record.key());
                        }
                    });
                return RollbackResult.OK;
            } finally {
                discardTombstones();
            }
        }

    }

    /**
     * Discards tombstones no longer needed by active transactions.
     */
    private void discardTombstones() {
        if (activeTransactions.isEmpty()) {
            Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
            while (iterator.hasNext()) {
                MapEntryValue value = iterator.next().getValue();
                if (value.type() == MapEntryValue.Type.TOMBSTONE) {
                    iterator.remove();
                }
            }
        } else {
            long lowWaterMark = activeTransactions.values().stream()
                .mapToLong(TransactionScope::version)
                .min().getAsLong();
            Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
            while (iterator.hasNext()) {
                MapEntryValue value = iterator.next().getValue();
                if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
                    iterator.remove();
                }
            }
        }
    }

    /**
     * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
     *
     * @param value map entry value
     * @return versioned instance
     */
    protected Versioned<byte[]> toVersioned(MapEntryValue value) {
        return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
            ? new Versioned<>(value.value(), value.version()) : null;
    }

    /**
     * Publishes an event to listeners.
     *
     * @param event event to publish
     */
    private void publish(MapEvent<String, byte[]> event) {
        publish(Lists.newArrayList(event));
    }

    /**
     * Publishes events to listeners.
     *
     * @param events list of map event to publish
     */
    private void publish(List<MapEvent<String, byte[]>> events) {
        listeners.values().forEach(session -> {
            session.publish(CHANGE, serializer()::encode, events);
        });
    }

    @Override
    public void onExpire(RaftSession session) {
        closeListener(session.sessionId().id());
    }

    @Override
    public void onClose(RaftSession session) {
        closeListener(session.sessionId().id());
    }

    private void closeListener(Long sessionId) {
        listeners.remove(sessionId);
    }

    /**
     * Interface implemented by map values.
     */
    protected static class MapEntryValue {
        protected final Type type;
        protected final long version;
        protected final byte[] value;

        MapEntryValue(Type type, long version, byte[] value) {
            this.type = type;
            this.version = version;
            this.value = value;
        }

        /**
         * Returns the value type.
         *
         * @return the value type
         */
        Type type() {
            return type;
        }

        /**
         * Returns the version of the value.
         *
         * @return version
         */
        long version() {
            return version;
        }

        /**
         * Returns the raw {@code byte[]}.
         *
         * @return raw value
         */
        byte[] value() {
            return value;
        }

        /**
         * Value type.
         */
        enum Type {
            VALUE,
            TOMBSTONE,
        }
    }

    /**
     * Map transaction scope.
     */
    protected static final class TransactionScope {
        private final long version;
        private final TransactionLog<MapUpdate<String, byte[]>> transactionLog;

        private TransactionScope(long version) {
            this(version, null);
        }

        private TransactionScope(long version, TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
            this.version = version;
            this.transactionLog = transactionLog;
        }

        /**
         * Returns the transaction version.
         *
         * @return the transaction version
         */
        long version() {
            return version;
        }

        /**
         * Returns whether this is a prepared transaction scope.
         *
         * @return whether this is a prepared transaction scope
         */
        boolean isPrepared() {
            return transactionLog != null;
        }

        /**
         * Returns the transaction commit log.
         *
         * @return the transaction commit log
         */
        TransactionLog<MapUpdate<String, byte[]>> transactionLog() {
            checkState(isPrepared());
            return transactionLog;
        }

        /**
         * Returns a new transaction scope with a prepare commit.
         *
         * @param commit the prepare commit
         * @return new transaction scope updated with the prepare commit
         */
        TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
            return new TransactionScope(version, commit.value().transactionLog());
        }
    }

    private static class IteratorContext {
        private final long sessionId;
        private final Iterator<Map.Entry<String, MapEntryValue>> iterator;
        private int position = 0;

        IteratorContext(long sessionId, Iterator<Map.Entry<String, MapEntryValue>> iterator) {
            this.sessionId = sessionId;
            this.iterator = iterator;
        }
    }
}