blob: 492204e0937c4950f00dffeea5802426f1d1cb69 [file] [log] [blame]
package net.onrc.onos.core.datastore.hazelcast;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import net.onrc.onos.core.datastore.DataStoreClient;
import net.onrc.onos.core.datastore.IKVTable;
import net.onrc.onos.core.datastore.IKVTableID;
import net.onrc.onos.core.datastore.ObjectDoesntExistException;
import net.onrc.onos.core.datastore.ObjectExistsException;
import net.onrc.onos.core.datastore.WrongVersionException;
import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
import net.onrc.onos.core.util.serializers.HazelcastSerializationConstants;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hazelcast.core.IMap;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
public class HZTable implements IKVTable, IKVTableID {
@SuppressWarnings("unused")
private static final Logger log = LoggerFactory.getLogger(HZTable.class);
// not sure how strict this should be managed
private static final AtomicLong INITIAL_VERSION = new AtomicLong(HZClient.VERSION_NONEXISTENT);
/**
* generate a new initial version for an entry.
*
* @return initial value
*/
protected static long getInitialVersion() {
long version = INITIAL_VERSION.incrementAndGet();
if (version == HZClient.VERSION_NONEXISTENT) {
// used up whole 64bit space?
version = INITIAL_VERSION.incrementAndGet();
}
return version;
}
/**
* increment version, avoiding versionNonexistant.
*
* @param version
* @return the next version number
*/
protected static long getNextVersion(final long version) {
long nextVersion = version + 1;
if (nextVersion == HZClient.VERSION_NONEXISTENT) {
++nextVersion;
}
return nextVersion;
}
static class VersionedValue implements IdentifiedDataSerializable {
private static final long serialVersionUID = -3149375966890712708L;
private byte[] value;
private long version;
protected VersionedValue() {
value = new byte[0];
version = HZClient.VERSION_NONEXISTENT;
}
public VersionedValue(final byte[] value, final long version) {
setValue(value);
this.version = version;
}
public byte[] getValue() {
return value;
}
public long getVersion() {
return version;
}
public void setValue(final byte[] value) {
checkArgument(value == null ||
value.length <= DataStoreClient.MAX_VALUE_BYTES,
"Value must be smaller than 1MB");
this.value = ArrayUtils.clone(value);
}
public void setNextVersion() {
this.version = getNextVersion(this.version);
}
@Override
public void writeData(final ObjectDataOutput out) throws IOException {
out.writeLong(version);
out.writeInt(value.length);
if (value.length > 0) {
out.write(value);
}
}
@Override
public void readData(final ObjectDataInput in) throws IOException {
version = in.readLong();
final int valueLen = in.readInt();
value = new byte[valueLen];
in.readFully(value);
}
@Override
public int getFactoryId() {
return HazelcastSerializationConstants.VERSIONED_VALUE_SERIALIZABLE_FACTORY_ID;
}
@Override
public int getId() {
return HazelcastSerializationConstants.VERSIONED_VALUE_TYPE_ID;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (version ^ (version >>> 32));
result = prime * result + Arrays.hashCode(value);
return result;
}
@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
VersionedValue other = (VersionedValue) obj;
if (version != other.version) {
return false;
}
if (!Arrays.equals(value, other.value)) {
return false;
}
return true;
}
}
// TODO Refactor and extract common parts
public static class Entry implements IKVEntry {
final byte[] key;
byte[] value;
long version;
public Entry(final byte[] key, final byte[] value, final long version) {
checkNotNull(key);
checkArgument(
key.length <= DataStoreClient.MAX_KEY_BYTES,
"Key must be smaller than 64KB");
this.key = key.clone();
this.setValue(value);
this.setVersion(version);
}
public Entry(final byte[] key) {
this(key, null, HZClient.VERSION_NONEXISTENT);
}
@Override
public byte[] getKey() {
return key.clone();
}
@Override
public byte[] getValue() {
return ArrayUtils.clone(value);
}
@Override
public long getVersion() {
return version;
}
void setValue(final byte[] value) {
this.value = ArrayUtils.clone(value);
}
void setVersion(final long version) {
this.version = version;
}
@Override
public String toString() {
return "[Entry key=" + ByteArrayUtil.toHexStringBuilder(key, ":") + ", value="
+ ByteArrayUtil.toHexStringBuilder(value, ":") + ", version=" + version + "]";
}
}
private final String mapName;
private final IMap<byte[], VersionedValue> map;
public HZTable(final String mapName, final IMap<byte[], VersionedValue> map) {
this.mapName = mapName;
this.map = map;
}
@Override
public String getTableName() {
return mapName;
}
@Override
public IKVTableID getTableId() {
return this;
}
@Override
public long create(final byte[] key, final byte[] value) throws ObjectExistsException {
final long version = getInitialVersion();
VersionedValue existing = map.putIfAbsent(key, new VersionedValue(value, version));
if (existing != null) {
throw new ObjectExistsException(this, key);
}
return version;
}
@Override
public long forceCreate(final byte[] key, final byte[] value) {
final long version = getInitialVersion();
map.set(key, new VersionedValue(value, version));
return version;
}
@Override
public IKVEntry read(final byte[] key) throws ObjectDoesntExistException {
final VersionedValue value = map.get(key);
if (value == null) {
throw new ObjectDoesntExistException(this, key);
}
return new Entry(key, value.getValue(), value.getVersion());
}
@Override
public long update(final byte[] key, final byte[] value, final long version)
throws ObjectDoesntExistException, WrongVersionException {
try {
map.lock(key);
final VersionedValue oldValue = map.get(key);
if (oldValue == null) {
throw new ObjectDoesntExistException(this, key);
}
if (oldValue.getVersion() != version) {
throw new WrongVersionException(this, key, version, oldValue.getVersion());
}
final long nextVersion = getNextVersion(version);
map.set(key, new VersionedValue(value, nextVersion));
return nextVersion;
} finally {
map.unlock(key);
}
}
@Override
public long update(final byte[] key, final byte[] value)
throws ObjectDoesntExistException {
try {
map.lock(key);
final VersionedValue valueInMap = map.get(key);
if (valueInMap == null) {
throw new ObjectDoesntExistException(this, key);
}
valueInMap.setValue(value);
valueInMap.setNextVersion();
map.set(key, valueInMap);
return valueInMap.getVersion();
} finally {
map.unlock(key);
}
}
@Override
public long delete(final byte[] key, final long version)
throws ObjectDoesntExistException, WrongVersionException {
try {
map.lock(key);
final VersionedValue oldValue = map.get(key);
if (oldValue == null) {
throw new ObjectDoesntExistException(this, key);
}
if (oldValue.getVersion() != version) {
throw new WrongVersionException(this, key, version, oldValue.getVersion());
}
map.delete(key);
return oldValue.getVersion();
} finally {
map.unlock(key);
}
}
@Override
public long forceDelete(final byte[] key) {
final VersionedValue valueInMap = map.remove(key);
if (valueInMap == null) {
return HZClient.VERSION_NONEXISTENT;
}
return valueInMap.getVersion();
}
@Override
public Iterable<IKVEntry> getAllEntries() {
final Set<IMap.Entry<byte[], VersionedValue>> entries = map.entrySet();
List<IKVEntry> entryList = new ArrayList<IKVTable.IKVEntry>(entries.size());
for (IMap.Entry<byte[], VersionedValue> entry : entries) {
entryList.add(new Entry(entry.getKey(), entry.getValue().getValue(), entry.getValue().getVersion()));
}
return entryList;
}
@Override
public String toString() {
return "[HZTable " + mapName + "]";
}
IMap<byte[], VersionedValue> getBackendMap() {
return this.map;
}
@Override
public long getVersionNonexistant() {
return HZClient.VERSION_NONEXISTENT;
}
}