/*
 * Copyright 2016 Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.onosproject.store.primitives.resources.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import org.onlab.util.Match;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Versioned;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Clear;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsValue;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.EntrySet;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Get;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.IsEmpty;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.KeySet;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Listen;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollFirstEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollLastEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Size;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.SubMap;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Unlisten;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.UpdateAndGet;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Values;
import static org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult.*;

/**
 * State machine corresponding to {@link AtomixConsistentTreeMap} backed by a
 * {@link TreeMap}.
 */
public class AtomixConsistentTreeMapState extends ResourceStateMachine implements SessionListener, Snapshottable {

    private final Map<Long, Commit<? extends Listen>> listeners =
            Maps.newHashMap();
    private TreeMap<String, TreeMapEntryValue> tree = Maps.newTreeMap();
    private final Set<String> preparedKeys = Sets.newHashSet();
    private AtomicLong versionCounter = new AtomicLong(0);

    private Function<Commit<SubMap>, NavigableMap<String, TreeMapEntryValue>> subMapFunction = this::subMap;
    private Function<Commit<FirstKey>, String> firstKeyFunction = this::firstKey;
    private Function<Commit<LastKey>, String> lastKeyFunction = this::lastKey;
    private Function<Commit<HigherEntry>, Map.Entry<String, Versioned<byte[]>>> higherEntryFunction =
            this::higherEntry;
    private Function<Commit<FirstEntry>, Map.Entry<String, Versioned<byte[]>>> firstEntryFunction =
            this::firstEntry;
    private Function<Commit<LastEntry>, Map.Entry<String, Versioned<byte[]>>> lastEntryFunction =
            this::lastEntry;
    private Function<Commit<PollFirstEntry>, Map.Entry<String, Versioned<byte[]>>> pollFirstEntryFunction =
            this::pollFirstEntry;
    private Function<Commit<PollLastEntry>, Map.Entry<String, Versioned<byte[]>>> pollLastEntryFunction =
            this::pollLastEntry;
    private Function<Commit<LowerEntry>, Map.Entry<String, Versioned<byte[]>>> lowerEntryFunction =
            this::lowerEntry;
    private Function<Commit<LowerKey>, String> lowerKeyFunction = this::lowerKey;
    private Function<Commit<FloorEntry>, Map.Entry<String, Versioned<byte[]>>> floorEntryFunction =
            this::floorEntry;
    private Function<Commit<CeilingEntry>, Map.Entry<String, Versioned<byte[]>>> ceilingEntryFunction =
            this::ceilingEntry;
    private Function<Commit<FloorKey>, String> floorKeyFunction = this::floorKey;
    private Function<Commit<CeilingKey>, String> ceilingKeyFunction = this::ceilingKey;
    private Function<Commit<HigherKey>, String> higherKeyFunction = this::higherKey;

    public AtomixConsistentTreeMapState(Properties properties) {
        super(properties);
    }

    @Override
    public void snapshot(SnapshotWriter writer) {
        writer.writeLong(versionCounter.get());
    }

    @Override
    public void install(SnapshotReader reader) {
        versionCounter = new AtomicLong(reader.readLong());
    }

    @Override
    public void configure(StateMachineExecutor executor) {
        // Listeners
        executor.register(Listen.class, this::listen);
        executor.register(Unlisten.class, this::unlisten);
        // Queries
        executor.register(ContainsKey.class, this::containsKey);
        executor.register(ContainsValue.class, this::containsValue);
        executor.register(EntrySet.class, this::entrySet);
        executor.register(Get.class, this::get);
        executor.register(IsEmpty.class, this::isEmpty);
        executor.register(KeySet.class, this::keySet);
        executor.register(Size.class, this::size);
        executor.register(Values.class, this::values);
        executor.register(SubMap.class, subMapFunction);
        executor.register(FirstKey.class, firstKeyFunction);
        executor.register(LastKey.class, lastKeyFunction);
        executor.register(FirstEntry.class, firstEntryFunction);
        executor.register(LastEntry.class, lastEntryFunction);
        executor.register(PollFirstEntry.class, pollFirstEntryFunction);
        executor.register(PollLastEntry.class, pollLastEntryFunction);
        executor.register(LowerEntry.class, lowerEntryFunction);
        executor.register(LowerKey.class, lowerKeyFunction);
        executor.register(FloorEntry.class, floorEntryFunction);
        executor.register(FloorKey.class, floorKeyFunction);
        executor.register(CeilingEntry.class, ceilingEntryFunction);
        executor.register(CeilingKey.class, ceilingKeyFunction);
        executor.register(HigherEntry.class, higherEntryFunction);
        executor.register(HigherKey.class, higherKeyFunction);

        // Commands
        executor.register(UpdateAndGet.class, this::updateAndGet);
        executor.register(Clear.class, this::clear);
    }

    @Override
    public void delete() {
        listeners.values().forEach(Commit::close);
        listeners.clear();
        tree.values().forEach(TreeMapEntryValue::discard);
        tree.clear();
    }

    protected boolean containsKey(Commit<? extends ContainsKey> commit) {
        try {
            return toVersioned(tree.get((commit.operation().key()))) != null;
        } finally {
            commit.close();
        }
    }

    protected boolean containsValue(Commit<? extends ContainsValue> commit) {
        try {
            Match<byte[]> valueMatch = Match
                    .ifValue(commit.operation().value());
            return tree.values().stream().anyMatch(
                    value -> valueMatch.matches(value.value()));
        } finally {
            commit.close();
        }
    }

    protected Versioned<byte[]> get(Commit<? extends Get> commit) {
        try {
            return toVersioned(tree.get(commit.operation().key()));
        } finally {
            commit.close();
        }
    }

    protected int size(Commit<? extends Size> commit) {
        try {
            return tree.size();
        } finally {
            commit.close();
        }
    }

    protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
        try {
            return tree.isEmpty();
        } finally {
            commit.close();
        }
    }

    protected Set<String> keySet(Commit<? extends KeySet> commit) {
        try {
            return tree.keySet().stream().collect(Collectors.toSet());
        } finally {
            commit.close();
        }
    }

    protected Collection<Versioned<byte[]>> values(
            Commit<? extends Values> commit) {
        try {
            return tree.values().stream().map(this::toVersioned)
                    .collect(Collectors.toList());
        } finally {
            commit.close();
        }
    }

    protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
            Commit<? extends EntrySet> commit) {
        try {
            return tree
                    .entrySet()
                    .stream()
                    .map(e -> Maps.immutableEntry(e.getKey(),
                                                  toVersioned(e.getValue())))
                    .collect(Collectors.toSet());
        } finally {
            commit.close();
        }
    }

    protected MapEntryUpdateResult<String, byte[]> updateAndGet(
            Commit<? extends UpdateAndGet> commit) {
        Status updateStatus = validate(commit.operation());
        String key = commit.operation().key();
        TreeMapEntryValue oldCommitValue = tree.get(commit.operation().key());
        Versioned<byte[]> oldTreeValue = toVersioned(oldCommitValue);

        if (updateStatus != Status.OK) {
            commit.close();
            return new MapEntryUpdateResult<>(updateStatus, "", key,
                                                  oldTreeValue, oldTreeValue);
        }

        byte[] newValue = commit.operation().value();
        long newVersion = versionCounter.incrementAndGet();
        Versioned<byte[]> newTreeValue = newValue == null ? null
                : new Versioned<byte[]>(newValue, newVersion);

        MapEvent.Type updateType = newValue == null ? MapEvent.Type.REMOVE
                : oldCommitValue == null ? MapEvent.Type.INSERT :
                MapEvent.Type.UPDATE;
        if (updateType == MapEvent.Type.REMOVE ||
                updateType == MapEvent.Type.UPDATE) {
            tree.remove(key);
            oldCommitValue.discard();
        }
        if (updateType == MapEvent.Type.INSERT ||
                updateType == MapEvent.Type.UPDATE) {
            tree.put(key, new NonTransactionalCommit(newVersion, commit));
        } else {
            commit.close();
        }
        publish(Lists.newArrayList(new MapEvent<>("", key, newTreeValue,
                                                  oldTreeValue)));
        return new MapEntryUpdateResult<>(updateStatus, "", key, oldTreeValue,
                                          newTreeValue);
    }

    protected Status clear(
            Commit<? extends Clear> commit) {
        try {
            Iterator<Map.Entry<String, TreeMapEntryValue>> iterator = tree
                    .entrySet()
                    .iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, TreeMapEntryValue> entry = iterator.next();
                String key = entry.getKey();
                TreeMapEntryValue value = entry.getValue();
                Versioned<byte[]> removedValue =
                        new Versioned<byte[]>(value.value(),
                                              value.version());
                publish(Lists.newArrayList(new MapEvent<>("", key, null,
                                                          removedValue)));
                value.discard();
                iterator.remove();
            }
            return Status.OK;
        } finally {
            commit.close();
        }
    }

    protected void listen(
            Commit<? extends Listen> commit) {
        Long sessionId = commit.session().id();
        listeners.put(sessionId, commit);
        commit.session()
                .onStateChange(
                        state -> {
                            if (state == ServerSession.State.CLOSED
                                    || state == ServerSession.State.EXPIRED) {
                                Commit<? extends Listen> listener =
                                        listeners.remove(sessionId);
                                if (listener != null) {
                                    listener.close();
                                }
                            }
                        });
    }

    protected void unlisten(
            Commit<? extends Unlisten> commit) {
        try {
            Commit<? extends AtomixConsistentTreeMapCommands.Listen> listener =
                    listeners.remove(commit.session());
            if (listener != null) {
                listener.close();
            }
        } finally {
            commit.close();
        }
    }

    private Status validate(UpdateAndGet update) {
        TreeMapEntryValue existingValue = tree.get(update.key());
        if (existingValue == null && update.value() == null) {
            return Status.NOOP;
        }
        if (preparedKeys.contains(update.key())) {
            return Status.WRITE_LOCK;
        }
        byte[] existingRawValue = existingValue == null ? null :
                existingValue.value();
        Long existingVersion = existingValue == null ? null :
                existingValue.version();
        return update.valueMatch().matches(existingRawValue)
                && update.versionMatch().matches(existingVersion) ?
                Status.OK
                : Status.PRECONDITION_FAILED;
    }

    protected NavigableMap<String, TreeMapEntryValue> subMap(
            Commit<? extends SubMap> commit) {
        //Do not support this until lazy communication is possible.  At present
        // it transmits up to the entire map.
        try {
            SubMap<String, TreeMapEntryValue> subMap = commit.operation();
            return tree.subMap(subMap.fromKey(), subMap.isInclusiveFrom(),
                               subMap.toKey(), subMap.isInclusiveTo());
        } finally {
            commit.close();
        }
    }

    protected String firstKey(Commit<? extends FirstKey> commit) {
        try {
            if (tree.isEmpty()) {
                return null;
            }
            return tree.firstKey();
        } finally {
            commit.close();
        }
    }

    protected String lastKey(Commit<? extends LastKey> commit) {
        try {
            return tree.isEmpty() ? null : tree.lastKey();
        } finally {
            commit.close();
        }
    }

    protected Map.Entry<String, Versioned<byte[]>> higherEntry(
            Commit<? extends HigherEntry> commit) {
        try {
            if (tree.isEmpty()) {
                return null;
            }
            return toVersionedEntry(
                    tree.higherEntry(commit.operation().key()));
        } finally {
            commit.close();
        }
    }

    protected Map.Entry<String, Versioned<byte[]>> firstEntry(
            Commit<? extends FirstEntry> commit) {
        try {
            if (tree.isEmpty()) {
                return null;
            }
            return toVersionedEntry(tree.firstEntry());
        } finally {
            commit.close();
        }
    }

    protected Map.Entry<String, Versioned<byte[]>> lastEntry(
            Commit<? extends LastEntry> commit) {
        try {
            if (tree.isEmpty()) {
                return null;
            }
            return toVersionedEntry(tree.lastEntry());
        } finally {
            commit.close();
        }
    }

    protected Map.Entry<String, Versioned<byte[]>> pollFirstEntry(
            Commit<? extends PollFirstEntry> commit) {
        try {
            return toVersionedEntry(tree.pollFirstEntry());
        } finally {
            commit.close();
        }
    }

    protected Map.Entry<String, Versioned<byte[]>> pollLastEntry(
            Commit<? extends PollLastEntry> commit) {
        try {
            return toVersionedEntry(tree.pollLastEntry());
        } finally {
            commit.close();
        }
    }

    protected Map.Entry<String, Versioned<byte[]>> lowerEntry(
            Commit<? extends LowerEntry> commit) {
        try {
            return toVersionedEntry(tree.lowerEntry(commit.operation().key()));
        } finally {
            commit.close();
        }
    }

    protected String lowerKey(Commit<? extends LowerKey> commit) {
        try {
            return tree.lowerKey(commit.operation().key());
        } finally {
            commit.close();
        }
    }

    protected Map.Entry<String, Versioned<byte[]>> floorEntry(
            Commit<? extends FloorEntry> commit) {
        try {
            return toVersionedEntry(tree.floorEntry(commit.operation().key()));
        } finally {
            commit.close();
        }
    }

    protected String floorKey(Commit<? extends FloorKey> commit) {
        try {
            return tree.floorKey(commit.operation().key());
        } finally {
            commit.close();
        }
    }

    protected Map.Entry<String, Versioned<byte[]>> ceilingEntry(
            Commit<CeilingEntry> commit) {
        try {
            return toVersionedEntry(
                    tree.ceilingEntry(commit.operation().key()));
        } finally {
            commit.close();
        }
    }

    protected String ceilingKey(Commit<CeilingKey> commit) {
        try {
            return tree.ceilingKey(commit.operation().key());
        } finally {
            commit.close();
        }
    }

    protected String higherKey(Commit<HigherKey> commit) {
        try {
            return tree.higherKey(commit.operation().key());
        } finally {
            commit.close();
        }
    }

    private Versioned<byte[]> toVersioned(TreeMapEntryValue value) {
        return value == null ? null :
                new Versioned<byte[]>(value.value(), value.version());
    }

    private Map.Entry<String, Versioned<byte[]>> toVersionedEntry(
            Map.Entry<String, TreeMapEntryValue> entry) {
        //FIXME is this the best type of entry to return?
        return entry == null ? null : new SimpleImmutableEntry<>(
                entry.getKey(), toVersioned(entry.getValue()));
    }

    private void publish(List<MapEvent<String, byte[]>> events) {
        listeners.values().forEach(commit -> commit.session()
                .publish(AtomixConsistentTreeMap.CHANGE_SUBJECT, events));
    }

    @Override
    public void register(ServerSession session) {
    }

    @Override
    public void unregister(ServerSession session) {
        closeListener(session.id());
    }

    @Override
    public void expire(ServerSession session) {
        closeListener(session.id());
    }

    @Override
    public void close(ServerSession session) {
        closeListener(session.id());
    }

    private void closeListener(Long sessionId) {
        Commit<? extends Listen> commit = listeners.remove(sessionId);
        if (commit != null) {
            commit.close();
        }
    }

    private interface TreeMapEntryValue {

        byte[] value();

        long version();

        void discard();
    }

    private class NonTransactionalCommit implements TreeMapEntryValue {
        private final long version;
        private final Commit<? extends UpdateAndGet> commit;

        public NonTransactionalCommit(long version,
                                      Commit<? extends UpdateAndGet> commit) {
            this.version = version;
            this.commit = commit;
        }

        @Override
        public byte[] value() {
            return commit.operation().value();
        }

        @Override
        public long version() {
            return version;
        }

        @Override
        public void discard() {
            commit.close();
        }
    }
}
