package net.onrc.onos.datastore;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import net.onrc.onos.datastore.RCTable.Entry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import edu.stanford.ramcloud.JRamCloud;
import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
import edu.stanford.ramcloud.JRamCloud.WrongVersionException;

/**
 * Class to represent an Object represented as a single K-V pair Value blob.
 *
 */
public class RCObject {
    private static final Logger log = LoggerFactory.getLogger(RCObject.class);
    /**
     * Version number which represents that the object doesnot exist, or hase
     * never read the DB before.
     */
    public static final long VERSION_NONEXISTENT = 0L;

    // Each Object should prepare their own serializer, which has required
    // objects registered.
    private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
	@Override
	protected Kryo initialValue() {
	    Kryo kryo = new Kryo();
	    // kryo.setRegistrationRequired(true);
	    // TODO TreeMap or just Map
	    // kryo.register(TreeMap.class);
	    kryo.setReferences(false);
	    return kryo;
	}
    };

    private final RCTable table;
    private final byte[] key;
    private byte[] value;
    private long version;

    private Map<Object, Object> propertyMap;

    public RCObject(RCTable table, byte[] key) {
	this(table, key, null);
    }

    public RCObject(RCTable table, byte[] key, byte[] value) {
	if (table == null) {
	    throw new IllegalArgumentException("table cannot be null");
	}
	if (key == null) {
	    throw new IllegalArgumentException("key cannot be null");
	}
	this.table = table;
	this.key = key;
	this.value = value;
	this.version = VERSION_NONEXISTENT;
	this.propertyMap = new HashMap<Object, Object>();

	if (this.value != null) {
	    deserializeObjectFromValue();
	}
    }

    public RCTable getTable() {
	return table;
    }

    public long getTableId() {
	return table.getTableId();
    }

    public byte[] getKey() {
	return key;
    }

    public long getVersion() {
	return version;
    }

    /**
     * Return serialized Value.
     *
     * @note will not trigger serialization
     * @return Will return null, if never been read, or was not serialized
     */
    public byte[] getSerializedValue() {
	return value;
    }

    /**
     * Return Object as a Map.
     *
     * @note Will not trigger deserialization
     * @return Will return null, if never been set, or was not deserialized
     */
    protected Map<Object, Object> getObjectMap() {
	return this.propertyMap;
    }

    protected Map<Object, Object> setObjectMap(Map<Object, Object> new_map) {
	Map<Object, Object> old_map = this.propertyMap;
	this.propertyMap = new_map;
	return old_map;
    }

    public void serializeAndSetValue() {
	serializeAndSetValue(defaultKryo.get(), this.propertyMap);
    }

    protected void serializeAndSetValue(Kryo kryo, Map<Object, Object> javaObject) {

	// value
	byte[] rcTemp = new byte[1024 * 1024];
	Output output = new Output(rcTemp);
	kryo.writeObject(output, javaObject);
	this.value = output.toBytes();
    }

    /**
     * Deserialize
     *
     * @return
     */
    public Map<Object, Object> deserializeObjectFromValue() {
	return deserializeObjectFromValue(defaultKryo.get());
    }

    protected HashMap<Object, Object> deserializeObjectFromValue(Kryo kryo) {
	return deserializeObjectFromValue(kryo, HashMap.class);
    }

    protected <T extends Map> T deserializeObjectFromValue(Kryo kryo, Class<T> type) {
	if (this.value == null)
	    return null;

	Input input = new Input(this.value);
	T map = kryo.readObject(input, type);
	this.propertyMap = map;

	return map;
    }

    /**
     * Create an Object in DataStore.
     *
     * Fails if the Object with same key already exists.
     *
     * @note create an Empty Object if no object has never been set.
     * @throws ObjectExistsException
     */
    public void create() throws ObjectExistsException {

	if (this.propertyMap == null) {
	    log.warn("No object map was set. Setting empty Map.");
	    setObjectMap(new HashMap<Object, Object>());
	}
	serializeAndSetValue();

	this.version = table.create(key, value);
    }

    /**
     * Read an Object from DataStore.
     *
     * Fails if the Object with the key does not exist.
     *
     * @throws ObjectDoesntExistException
     *
     */
    public void read() throws ObjectDoesntExistException {
	Entry e = table.read(key);
	this.value = e.value;
	this.version = e.version;

	// TODO should we deserialize immediately?
	deserializeObjectFromValue();
    }

    /**
     * Update an existing Object in DataStore checking versions.
     *
     * Fails if the Object with key does not exists, or conditional failure.
     *
     * @throws WrongVersionException
     * @throws ObjectDoesntExistException
     */
    public void update() throws ObjectDoesntExistException,
	    WrongVersionException {
	if (this.propertyMap == null) {
	    setObjectMap(new HashMap<Object, Object>());
	}
	serializeAndSetValue();

	this.version = table.update(key, value, version);
    }

    /**
     * Remove an existing Object in DataStore.
     *
     * Fails if the Object with key does not exists.
     *
     * @throws ObjectDoesntExistException
     */
    public void delete() throws ObjectDoesntExistException {
	this.version = table.delete(key);
    }

    /**
     * Multi-read RCObjects.
     *
     * If the blob value was read successfully, RCObject will deserialize them.
     *
     * @param objects
     *            RCObjects to read
     * @return true if there exist an failed read.
     */
    public static boolean multiRead(Collection<RCObject> objects) {
	boolean fail_exists = false;

	ArrayList<RCObject> req = new ArrayList<>();
	Iterator<RCObject> it = objects.iterator();
	while (it.hasNext()) {

	    req.add(it.next());

	    if (req.size() >= RCClient.MAX_MULTI_READS) {
		// dispatch multiRead
		fail_exists |= multiReadInternal(req);
		req.clear();
	    }
	}

	if (!req.isEmpty()) {
	    // dispatch multiRead
	    fail_exists |= multiReadInternal(req);
	    req.clear();
	}

	return fail_exists;
    }

    private static boolean multiReadInternal(ArrayList<RCObject> req) {
	boolean fail_exists = false;
	JRamCloud rcClient = RCClient.getClient();

	final int reqs = req.size();
	JRamCloud.multiReadObject mrObjs[] = new JRamCloud.multiReadObject[reqs];

	// setup multi-read operation
	for (int i = 0; i < reqs; ++i) {
	    RCObject obj = req.get(i);
	    mrObjs[i] = new JRamCloud.multiReadObject(obj.getTableId(),
		    obj.getKey());
	}

	// execute
	JRamCloud.Object results[] = rcClient.multiRead(mrObjs);
	assert (results.length <= req.size());

	// reflect changes to RCObject
	for (int i = 0; i < results.length; ++i) {
	    RCObject obj = req.get(i);
	    if (results[i] == null) {
		log.error("MultiRead error, skipping {}, {}", obj.getTable(),
		        obj);
		fail_exists = true;
		continue;
	    }
	    assert (Arrays.equals(results[i].key, obj.getKey()));

	    obj.value = results[i].value;
	    obj.version = results[i].version;
	    if (obj.version == VERSION_NONEXISTENT) {
		fail_exists = true;
	    } else {
		obj.deserializeObjectFromValue();
	    }
	}

	return fail_exists;
    }

    /**
     * Get All of it's kind?
     */
    public static Collection<? extends RCObject> getAllObjects() {
	// TODO implement
	throw new UnsupportedOperationException("Not implemented yet");
	//Collection<? extends RCObject> list = new ArrayList<>();
	//return list;
    }

}
