Refactoring datastore package

Preparation to make datastore replacable
- Extract datastore interfaces
- Add multi Read/Write/Remove
- Add a method to walk over RCTable
- Refactor serialize/deserialize RCObject
- Localize dependency to JRAMCloud
  - Separate RAMCloud specific code into ramcloud package
  - Remove dependency to RAMCloud exception classes
  - Remove RC prefix from non RAMCloud specific code
- Cosmetics and update sample/test code

- reflect Naoki's comment
- more cosmetic fixes
  - reordered OPERATION enums

- removed no longer used code
- follow pmd, etc. where easily possible

Change-Id: I6f9153d705600447acf48a64f713c654c9f26713
diff --git a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
index 08290a3..56b088f 100644
--- a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
+++ b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayComparator.java
@@ -3,12 +3,20 @@
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 
+/**
+ * Comparator which will compares the content of byte[].
+ *
+ * Expected to be used with TreeMap, etc. when you want to use byte[] as a key.
+ */
 public final class ByteArrayComparator implements Comparator<byte[]> {
 
+    /**
+     * Instance which can be used, if you want to avoid instantiation per Map.
+     */
     public static final ByteArrayComparator BYTEARRAY_COMPARATOR = new ByteArrayComparator();
 
     @Override
-    public int compare(byte[] o1, byte[] o2) {
+    public int compare(final byte[] o1, final byte[] o2) {
 	final ByteBuffer b1 = ByteBuffer.wrap(o1);
 	final ByteBuffer b2 = ByteBuffer.wrap(o2);
 	return b1.compareTo(b2);
diff --git a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayUtil.java b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayUtil.java
index 96f8743..c13d7ea 100644
--- a/src/main/java/net/onrc/onos/datastore/utils/ByteArrayUtil.java
+++ b/src/main/java/net/onrc/onos/datastore/utils/ByteArrayUtil.java
@@ -4,13 +4,35 @@
 
 public class ByteArrayUtil {
 
+    // Suppresses default constructor, ensuring non-instantiability.
+    private ByteArrayUtil() {}
+
+    /**
+     * Returns a StringBuffer with each byte in {@code bytes}
+     * converted to a String with {@link Integer#toHexString(int)},
+     * separated by {@code sep}.
+     *
+     * @param bytes byte array to convert
+     * @param sep separator between each bytes
+     * @return {@code bytes} converted to a StringBuffer
+     */
     public static StringBuffer toHexStringBuffer(final byte[] bytes,
 	    final String sep) {
 	return toHexStringBuffer(bytes, sep, new StringBuffer());
     }
 
+    /**
+     * Returns a StringBuffer with each byte in {@code bytes}
+     * converted to a String with {@link Integer#toHexString(int)},
+     * separated by {@code sep}.
+     *
+     * @param bytes byte array to convert
+     * @param sep separator between each bytes
+     * @param buf StringBuffer to append to.
+     * @return {@code buf}
+     */
     public static StringBuffer toHexStringBuffer(final byte[] bytes,
-	    final String sep, StringBuffer buf) {
+	    final String sep, final StringBuffer buf) {
 	if (bytes == null) {
 	    return buf;
 	}
diff --git a/src/main/java/net/onrc/onos/datastore/utils/KVObject.java b/src/main/java/net/onrc/onos/datastore/utils/KVObject.java
new file mode 100644
index 0000000..2aac0e3
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datastore/utils/KVObject.java
@@ -0,0 +1,470 @@
+package net.onrc.onos.datastore.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import net.onrc.onos.datastore.DataStoreClient;
+import net.onrc.onos.datastore.IKVClient;
+import net.onrc.onos.datastore.IKVTable;
+import net.onrc.onos.datastore.IKVTable.IKVEntry;
+import net.onrc.onos.datastore.IKVTableID;
+import net.onrc.onos.datastore.IMultiEntryOperation;
+import net.onrc.onos.datastore.IMultiObjectOperation;
+import net.onrc.onos.datastore.ObjectDoesntExistException;
+import net.onrc.onos.datastore.ObjectExistsException;
+import net.onrc.onos.datastore.WrongVersionException;
+import net.onrc.onos.datastore.internal.IModifiableMultiEntryOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Class to represent an Object represented as a single K-V pair Value blob.
+ *
+ */
+public class KVObject {
+    private static final Logger log = LoggerFactory.getLogger(KVObject.class);
+
+    // Default Kryo serializer.
+    // each sub-class should prepare their own serializer, which has required
+    // objects registered for better performance.
+    private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
+	@Override
+	protected Kryo initialValue() {
+	    Kryo kryo = new Kryo();
+	    // kryo.setRegistrationRequired(true);
+	    // kryo.setReferences(false);
+	    return kryo;
+	}
+    };
+
+    private final IKVTable table;
+    private final byte[] key;
+
+    /**
+     * serialized value version stored on data store or
+     * {@link IKVTable.VERSION_NONEXISTENT()} if is a new object.
+     */
+    private long version;
+
+    /**
+     *  Map to store user-defined properties
+     */
+    private Map<Object, Object> propertyMap;
+
+    public KVObject(final IKVTable table, final byte[] key) {
+	this(table, key, null, table.VERSION_NONEXISTENT());
+    }
+
+    public KVObject(final IKVTable table, final byte[] key, final byte[] value, final long version) {
+	if (table == null) {
+	    throw new IllegalArgumentException("table cannot be null");
+	}
+	if (key == null) {
+	    throw new IllegalArgumentException("key cannot be null");
+	}
+	this.table = table;
+	this.key = key;
+	this.version = version;
+	this.propertyMap = new HashMap<Object, Object>();
+
+	if (value != null) {
+	    deserialize(value);
+	}
+    }
+
+    protected static KVObject createFromKey(final byte[] key) {
+	// Equivalent of this method is expected to be implemented by SubClasses
+	throw new UnsupportedOperationException(
+		"createFromKey() is not expected to be called for RCObject");
+    }
+
+    public IKVTable getTable() {
+	return table;
+    }
+
+    public IKVTableID getTableId() {
+	return table.getTableId();
+    }
+
+    public byte[] getKey() {
+	return key;
+    }
+
+    public long getVersion() {
+	return version;
+    }
+
+    /**
+     * Return user-defined object properties.
+     *
+     * @note Will not trigger deserialization
+     * @return Will return null, if never been set, or was not deserialized
+     */
+    protected Map<Object, Object> getPropertyMap() {
+	return this.propertyMap;
+    }
+
+    protected Map<Object, Object> replacePropertyMap(final Map<Object, Object> newMap) {
+	Map<Object, Object> oldMap = this.propertyMap;
+	this.propertyMap = newMap;
+	return oldMap;
+    }
+
+    /**
+     * Serialize object.
+     *
+     * sub-classes should override this method to customize serialization.
+     *
+     * @return serialized byte array
+     */
+    public byte[] serialize() {
+	return serializePropertyMap(defaultKryo.get(), this.propertyMap);
+    }
+
+    protected byte[] serializePropertyMap(final Kryo kryo,
+	    final Map<Object, Object> propMap) {
+
+
+	// value
+	byte[] rcTemp = new byte[1024 * 1024];
+	Output output = new Output(rcTemp);
+	kryo.writeObject(output, propMap);
+	return output.toBytes();
+    }
+
+
+    /**
+     * Deserialize using value and version stored in data store.
+     *
+     * @param bytes serialized bytes
+     * @param version version of this {@code bytes}
+     * @return true if success
+     */
+    public boolean deserialize(final byte[] bytes, final long version) {
+	this.version = version;
+	return deserialize(bytes);
+    }
+
+    /**
+     * Deserialize object.
+     *
+     * sub-classes should override this method to customize deserialization.
+     *
+     * @param bytes serialized byte array
+     * @return true if success
+     */
+    protected boolean deserialize(final byte[] bytes) {
+	deserializePropertyMap(defaultKryo.get(), bytes);
+	return true;
+    }
+
+    /**
+     * Deserialize and set {@link propertyMap}.
+     * @param kryo serializer to use
+     * @param bytes Kryo serialized Map object
+     * @return true if success
+     */
+    protected boolean deserializePropertyMap(final Kryo kryo, final byte[] bytes) {
+	@SuppressWarnings("unchecked")
+	Map<Object, Object> map = deserializePropertyMap(kryo, bytes, HashMap.class);
+	if (map == null) {
+	    map = new HashMap<>();
+	}
+	this.propertyMap = map;
+	return true;
+    }
+
+    protected <T extends Map<?, ?>> T deserializePropertyMap(final Kryo kryo,
+	    final byte[] bytes, final Class<T> type) {
+
+	if (bytes == null || bytes.length == 0) {
+	    return null;
+	}
+
+	Input input = new Input(bytes);
+	T map = kryo.readObject(input, type);
+
+	return map;
+    }
+
+
+    /**
+     * Create an Object in DataStore.
+     *
+     * Fails if the Object with same key already exists.
+     *
+     * @throws ObjectExistsException
+     */
+    public void create() throws ObjectExistsException {
+
+	if (this.propertyMap == null) {
+	    log.warn("No object map was set. Setting empty Map.");
+	    replacePropertyMap(new HashMap<Object, Object>());
+	}
+
+	this.version = table.create(key, this.serialize());
+    }
+
+    public void forceCreate() {
+
+	if (this.propertyMap == null) {
+	    log.warn("No object map was set. Setting empty Map.");
+	    replacePropertyMap(new HashMap<Object, Object>());
+	}
+
+	this.version = table.forceCreate(key, this.serialize());
+    }
+
+    /**
+     * Read an Object from DataStore.
+     *
+     * Fails if the Object with the key does not exist.
+     *
+     * @throws ObjectDoesntExistException
+     *
+     */
+    public void read() throws ObjectDoesntExistException {
+	IKVEntry e = table.read(key);
+	deserialize(e.getValue(), e.getVersion());
+    }
+
+    /**
+     * Update an existing Object in DataStore checking versions.
+     *
+     * Fails if the Object with key does not exists, or conditional failure.
+     *
+     * @throws WrongVersionException
+     * @throws ObjectDoesntExistException
+     */
+    public void update() throws ObjectDoesntExistException,
+    WrongVersionException {
+	if (this.propertyMap == null) {
+	    replacePropertyMap(new HashMap<Object, Object>());
+	}
+
+	this.version = table.update(key, this.serialize(), version);
+    }
+
+    /**
+     * Remove an existing Object in DataStore.
+     *
+     * Fails if the Object with key does not exists.
+     *
+     * @throws ObjectDoesntExistException
+     * @throws WrongVersionException
+     */
+    public void delete() throws ObjectDoesntExistException,
+    WrongVersionException {
+	this.version = table.delete(key, this.version);
+    }
+
+    public void forceDelete() {
+	this.version = table.forceDelete(key);
+    }
+
+    public WriteOp forceCreateOp(IKVClient client) {
+	return new WriteOp(client.forceCreateOp(getTableId(), getKey(), serialize()), this);
+    }
+
+    public WriteOp createOp(IKVClient client) {
+	return new WriteOp(client.createOp(getTableId(), getKey(), serialize()), this);
+    }
+
+    // this might not be needed?
+    public WriteOp readOp(IKVClient client) {
+	return new WriteOp(client.readOp(getTableId(), getKey()), this);
+    }
+
+    public WriteOp updateOp(IKVClient client) {
+	return new WriteOp(client.updateOp(getTableId(), getKey(), serialize(), getVersion()), this);
+    }
+
+    public WriteOp deleteOp(IKVClient client) {
+	return new WriteOp(client.deleteOp(getTableId(), getKey(), serialize(), getVersion()), this);
+    }
+
+    public WriteOp forceDeleteOp(IKVClient client) {
+	return new WriteOp(client.forceDeleteOp(getTableId(), getKey()), this);
+    }
+
+    /**
+     * Multi-read RCObjects.
+     *
+     * If the blob value was read successfully, RCObject will deserialize them.
+     *
+     * @param objects
+     *            RCObjects to read
+     * @return true if there exist a failed read.
+     */
+    public static boolean multiRead(final List<? extends KVObject> objects) {
+
+	final IKVClient client = DataStoreClient.getClient();
+
+	final ArrayList<IMultiEntryOperation> readOps = new ArrayList<>(objects.size());
+	for (KVObject o : objects) {
+	    readOps.add(o.readOp(client));
+	}
+
+	boolean failExists = client.multiRead(readOps);
+
+	for (int i = 0; i < readOps.size(); ++i) {
+	    KVObject obj = objects.get(i);
+	    IMultiEntryOperation entry = readOps.get(i);
+	    if ( entry.hasSucceeded() ) {
+		if ( !obj.deserialize(entry.getValue(), entry.getVersion()) ) {
+		    //deserialize return true on success
+		    failExists = true;
+		    log.error("MultiRead error, failed to deserialize {}, {}", obj.getTable(), obj);
+		}
+	    } else {
+		log.error("MultiRead error, skipping {}, {}", obj.getTable(), obj);
+		obj.version = obj.getTable().VERSION_NONEXISTENT();
+		failExists = true;
+	    }
+	}
+
+	return failExists;
+    }
+
+    /**
+     * TODO Extract common interface
+     */
+    public static class WriteOp implements IMultiObjectOperation, IModifiableMultiEntryOperation {
+
+	private final IModifiableMultiEntryOperation base;
+	private final KVObject obj;
+
+	public WriteOp(IMultiEntryOperation base, final KVObject obj) {
+	    this.base = (IModifiableMultiEntryOperation) base;
+	    this.obj = obj;
+
+	    //	    switch (base.getOperation()) {
+	    //	    case CREATE:
+	    //	    case FORCE_CREATE:
+	    //	    case UPDATE:
+	    //		break;
+	    //	    default:
+	    //		throw new UnsupportedOperationException("Unexpected OPERATION:"+base.getOperation());
+	    //	    }
+	}
+
+	@Override
+	public KVObject getObject() {
+	    return obj;
+	}
+
+	@Deprecated
+	public OPERATION getOp() {
+	    return this.getOperation();
+	}
+
+	@Override
+	public boolean hasSucceeded() {
+	    return base.hasSucceeded();
+	}
+
+	@Override
+	public STATUS getStatus() {
+	    return base.getStatus();
+	}
+
+	@Override
+	public IKVTableID getTableId() {
+	    return base.getTableId();
+	}
+
+	@Override
+	public byte[] getKey() {
+	    return base.getKey();
+	}
+
+	@Override
+	public byte[] getValue() {
+	    return base.getValue();
+	}
+
+	@Override
+	public long getVersion() {
+	    return base.getVersion();
+	}
+
+	@Override
+	public OPERATION getOperation() {
+	    return base.getOperation();
+	}
+
+	@Override
+	public void setStatus(STATUS status) {
+	    base.setStatus(status);
+	}
+
+	@Override
+	public void setValue(byte[] value, long version) {
+	    base.setValue(value, version);
+	}
+
+	@Override
+	public void setVersion(long version) {
+	    base.setVersion(version);
+	    this.obj.version = version;
+	}
+
+	@Override
+	public IModifiableMultiEntryOperation getActualOperation() {
+	    return base;
+	}
+    }
+
+    public static boolean multiWrite(final List<WriteOp> objects) {
+
+	final IKVClient client = DataStoreClient.getClient();
+
+	final ArrayList<IMultiEntryOperation> writeOps = new ArrayList<>(objects.size());
+	for (WriteOp o : objects) {
+	    writeOps.add(o);
+	}
+
+	return client.multiWrite(writeOps);
+    }
+
+    public abstract static class AbstractObjectIterator<E extends KVObject> implements
+    Iterator<E> {
+
+	protected Iterator<IKVEntry> enumerator;
+
+	public AbstractObjectIterator(final IKVTable table) {
+	    this.enumerator = table.getAllEntries().iterator();
+	}
+
+	@Override
+	public boolean hasNext() {
+	    return enumerator.hasNext();
+	}
+
+	// Implement something similar to below to realize Iterator
+	//	@Override
+	//	public E next() {
+	//	    IKVTable.IKVEntry o = enumerator.next();
+	//	    E obj = E.createFromKey(o.getKey());
+	//	    obj.deserialize(o.getValue(), o.getVersion());
+	//	    return obj;
+	//	}
+
+	@Deprecated
+	@Override
+	public void remove() {
+	    // TODO Not implemented, as I cannot find a use-case for it.
+	    throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+    }
+
+}