blob: ce4f61760b41a30539927f9ca98fd9e686e6a9ee [file] [log] [blame]
package net.onrc.onos.core.datastore.utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import net.onrc.onos.core.datastore.DataStoreClient;
import net.onrc.onos.core.datastore.IKVClient;
import net.onrc.onos.core.datastore.IKVTable;
import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
import net.onrc.onos.core.datastore.IKVTableID;
import net.onrc.onos.core.datastore.IMultiEntryOperation;
import net.onrc.onos.core.datastore.IMultiObjectOperation;
import net.onrc.onos.core.datastore.ObjectDoesntExistException;
import net.onrc.onos.core.datastore.ObjectExistsException;
import net.onrc.onos.core.datastore.WrongVersionException;
import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Class to represent an Object represented as a single K-V pair Value blob.
*
*/
public class KVObject {
private static final Logger log = LoggerFactory.getLogger(KVObject.class);
// Default Kryo serializer.
// each sub-class should prepare their own serializer, which has required
// objects registered for better performance.
private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
@Override
protected Kryo initialValue() {
Kryo kryo = new Kryo();
// kryo.setRegistrationRequired(true);
// kryo.setReferences(false);
return kryo;
}
};
private final IKVTable table;
private final byte[] key;
/**
* serialized value version stored on data store or
* {@link IKVTable.VERSION_NONEXISTENT()} if is a new object.
*/
private long version;
/**
* Map to store user-defined properties
*/
private Map<Object, Object> propertyMap;
public KVObject(final IKVTable table, final byte[] key) {
this(table, key, null, table.VERSION_NONEXISTENT());
}
public KVObject(final IKVTable table, final byte[] key, final byte[] value, final long version) {
if (table == null) {
throw new IllegalArgumentException("table cannot be null");
}
if (key == null) {
throw new IllegalArgumentException("key cannot be null");
}
this.table = table;
this.key = key;
this.version = version;
this.propertyMap = new HashMap<Object, Object>();
if (value != null) {
deserialize(value);
}
}
protected static KVObject createFromKey(final byte[] key) {
// Equivalent of this method is expected to be implemented by SubClasses
throw new UnsupportedOperationException(
"createFromKey() is not expected to be called for RCObject");
}
public IKVTable getTable() {
return table;
}
public IKVTableID getTableId() {
return table.getTableId();
}
public byte[] getKey() {
return key;
}
public long getVersion() {
return version;
}
/**
* Return user-defined object properties.
*
* @note Will not trigger deserialization
* @return Will return null, if never been set, or was not deserialized
*/
protected Map<Object, Object> getPropertyMap() {
return this.propertyMap;
}
protected Map<Object, Object> replacePropertyMap(final Map<Object, Object> newMap) {
Map<Object, Object> oldMap = this.propertyMap;
this.propertyMap = newMap;
return oldMap;
}
/**
* Serialize object.
*
* sub-classes should override this method to customize serialization.
*
* @return serialized byte array
*/
public byte[] serialize() {
return serializePropertyMap(defaultKryo.get(), this.propertyMap);
}
protected byte[] serializePropertyMap(final Kryo kryo,
final Map<Object, Object> propMap) {
// value
byte[] rcTemp = new byte[1024 * 1024];
Output output = new Output(rcTemp);
kryo.writeObject(output, propMap);
return output.toBytes();
}
/**
* Deserialize using value and version stored in data store.
*
* @param bytes serialized bytes
* @param version version of this {@code bytes}
* @return true if success
*/
public boolean deserialize(final byte[] bytes, final long version) {
this.version = version;
return deserialize(bytes);
}
/**
* Deserialize object.
*
* sub-classes should override this method to customize deserialization.
*
* @param bytes serialized byte array
* @return true if success
*/
protected boolean deserialize(final byte[] bytes) {
deserializePropertyMap(defaultKryo.get(), bytes);
return true;
}
/**
* Deserialize and set {@link propertyMap}.
* @param kryo serializer to use
* @param bytes Kryo serialized Map object
* @return true if success
*/
protected boolean deserializePropertyMap(final Kryo kryo, final byte[] bytes) {
@SuppressWarnings("unchecked")
Map<Object, Object> map = deserializePropertyMap(kryo, bytes, HashMap.class);
if (map == null) {
map = new HashMap<>();
}
this.propertyMap = map;
return true;
}
protected <T extends Map<?, ?>> T deserializePropertyMap(final Kryo kryo,
final byte[] bytes, final Class<T> type) {
if (bytes == null || bytes.length == 0) {
return null;
}
Input input = new Input(bytes);
T map = kryo.readObject(input, type);
return map;
}
/**
* Create an Object in DataStore.
*
* Fails if the Object with same key already exists.
*
* @throws ObjectExistsException
*/
public void create() throws ObjectExistsException {
if (this.propertyMap == null) {
log.warn("No object map was set. Setting empty Map.");
replacePropertyMap(new HashMap<Object, Object>());
}
this.version = table.create(key, this.serialize());
}
public void forceCreate() {
if (this.propertyMap == null) {
log.warn("No object map was set. Setting empty Map.");
replacePropertyMap(new HashMap<Object, Object>());
}
this.version = table.forceCreate(key, this.serialize());
}
/**
* Read an Object from DataStore.
*
* Fails if the Object with the key does not exist.
*
* @throws ObjectDoesntExistException
*
*/
public void read() throws ObjectDoesntExistException {
IKVEntry e = table.read(key);
deserialize(e.getValue(), e.getVersion());
}
/**
* Update an existing Object in DataStore checking versions.
*
* Fails if the Object with key does not exists, or conditional failure.
*
* @throws WrongVersionException
* @throws ObjectDoesntExistException
*/
public void update() throws ObjectDoesntExistException,
WrongVersionException {
if (this.propertyMap == null) {
replacePropertyMap(new HashMap<Object, Object>());
}
this.version = table.update(key, this.serialize(), version);
}
/**
* Remove an existing Object in DataStore.
*
* Fails if the Object with key does not exists.
*
* @throws ObjectDoesntExistException
* @throws WrongVersionException
*/
public void delete() throws ObjectDoesntExistException,
WrongVersionException {
this.version = table.delete(key, this.version);
}
public void forceDelete() {
this.version = table.forceDelete(key);
}
public WriteOp forceCreateOp(IKVClient client) {
return new WriteOp(client.forceCreateOp(getTableId(), getKey(), serialize()), this);
}
public WriteOp createOp(IKVClient client) {
return new WriteOp(client.createOp(getTableId(), getKey(), serialize()), this);
}
// this might not be needed?
public WriteOp readOp(IKVClient client) {
return new WriteOp(client.readOp(getTableId(), getKey()), this);
}
public WriteOp updateOp(IKVClient client) {
return new WriteOp(client.updateOp(getTableId(), getKey(), serialize(), getVersion()), this);
}
public WriteOp deleteOp(IKVClient client) {
return new WriteOp(client.deleteOp(getTableId(), getKey(), serialize(), getVersion()), this);
}
public WriteOp forceDeleteOp(IKVClient client) {
return new WriteOp(client.forceDeleteOp(getTableId(), getKey()), this);
}
/**
* Multi-read RCObjects.
*
* If the blob value was read successfully, RCObject will deserialize them.
*
* @param objects
* RCObjects to read
* @return true if there exist a failed read.
*/
public static boolean multiRead(final List<? extends KVObject> objects) {
final IKVClient client = DataStoreClient.getClient();
final ArrayList<IMultiEntryOperation> readOps = new ArrayList<>(objects.size());
for (KVObject o : objects) {
readOps.add(o.readOp(client));
}
boolean failExists = client.multiRead(readOps);
for (int i = 0; i < readOps.size(); ++i) {
KVObject obj = objects.get(i);
IMultiEntryOperation entry = readOps.get(i);
if ( entry.hasSucceeded() ) {
if ( !obj.deserialize(entry.getValue(), entry.getVersion()) ) {
//deserialize return true on success
failExists = true;
log.error("MultiRead error, failed to deserialize {}, {}", obj.getTable(), obj);
}
} else {
log.error("MultiRead error, skipping {}, {}", obj.getTable(), obj);
obj.version = obj.getTable().VERSION_NONEXISTENT();
failExists = true;
}
}
return failExists;
}
/**
* TODO Extract common interface
*/
public static class WriteOp implements IMultiObjectOperation, IModifiableMultiEntryOperation {
private final IModifiableMultiEntryOperation base;
private final KVObject obj;
public WriteOp(IMultiEntryOperation base, final KVObject obj) {
this.base = (IModifiableMultiEntryOperation) base;
this.obj = obj;
// switch (base.getOperation()) {
// case CREATE:
// case FORCE_CREATE:
// case UPDATE:
// break;
// default:
// throw new UnsupportedOperationException("Unexpected OPERATION:"+base.getOperation());
// }
}
@Override
public KVObject getObject() {
return obj;
}
@Deprecated
public OPERATION getOp() {
return this.getOperation();
}
@Override
public boolean hasSucceeded() {
return base.hasSucceeded();
}
@Override
public STATUS getStatus() {
return base.getStatus();
}
@Override
public IKVTableID getTableId() {
return base.getTableId();
}
@Override
public byte[] getKey() {
return base.getKey();
}
@Override
public byte[] getValue() {
return base.getValue();
}
@Override
public long getVersion() {
return base.getVersion();
}
@Override
public OPERATION getOperation() {
return base.getOperation();
}
@Override
public void setStatus(STATUS status) {
base.setStatus(status);
}
@Override
public void setValue(byte[] value, long version) {
base.setValue(value, version);
}
@Override
public void setVersion(long version) {
base.setVersion(version);
this.obj.version = version;
}
@Override
public IModifiableMultiEntryOperation getActualOperation() {
return base;
}
}
public static boolean multiWrite(final List<WriteOp> objects) {
final IKVClient client = DataStoreClient.getClient();
final ArrayList<IMultiEntryOperation> writeOps = new ArrayList<>(objects.size());
for (WriteOp o : objects) {
writeOps.add(o);
}
return client.multiWrite(writeOps);
}
public abstract static class AbstractObjectIterator<E extends KVObject> implements
Iterator<E> {
protected Iterator<IKVEntry> enumerator;
public AbstractObjectIterator(final IKVTable table) {
this.enumerator = table.getAllEntries().iterator();
}
@Override
public boolean hasNext() {
return enumerator.hasNext();
}
// Implement something similar to below to realize Iterator
// @Override
// public E next() {
// IKVTable.IKVEntry o = enumerator.next();
// E obj = E.createFromKey(o.getKey());
// obj.deserialize(o.getValue(), o.getVersion());
// return obj;
// }
@Deprecated
@Override
public void remove() {
// TODO Not implemented, as I cannot find a use-case for it.
throw new UnsupportedOperationException("Not implemented yet");
}
}
}