blob: 5398eb861ca0264d55e0ee2eff0dcadafd0f5c9d [file] [log] [blame]
package net.onrc.onos.core.datastore.hazelcast;
import java.util.Collection;
import java.util.List;
import net.onrc.onos.core.datagrid.HazelcastDatagrid;
import net.onrc.onos.core.datastore.IKVClient;
import net.onrc.onos.core.datastore.IKVTable;
import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
import net.onrc.onos.core.datastore.IKVTableID;
import net.onrc.onos.core.datastore.IMultiEntryOperation;
import net.onrc.onos.core.datastore.IMultiEntryOperation.OPERATION;
import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
import net.onrc.onos.core.datastore.ObjectDoesntExistException;
import net.onrc.onos.core.datastore.ObjectExistsException;
import net.onrc.onos.core.datastore.WrongVersionException;
import net.onrc.onos.core.datastore.hazelcast.HZTable.VersionedValue;
import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.IMap;
/**
* Hazelcast implementation of datastore IKVClient.
*/
public final class HZClient implements IKVClient {
private static final Logger log = LoggerFactory.getLogger(HZClient.class);
static final long VERSION_NONEXISTENT = 0L;
private static final String MAP_PREFIX = "datastore://";
private static final String BASE_CONFIG_FILENAME =
System.getProperty("net.onrc.onos.core.datastore.hazelcast.baseConfig", "conf/hazelcast.xml");
private final HazelcastInstance hazelcastInstance;
private static final HZClient THE_INSTANCE = new HZClient();
/**
* Get DataStoreClient implemented on Hazelcast.
*
* @return HZClient
*/
public static HZClient getClient() {
return THE_INSTANCE;
}
/**
* Default constructor.
* <p/>
* Get or create the Hazelcast Instance to use for datastore.
*/
private HZClient() {
Config config = HazelcastDatagrid.loadHazelcastConfig(BASE_CONFIG_FILENAME);
// Try to get the existing HZ instance in JVM if possible.
hazelcastInstance = Hazelcast.getOrCreateHazelcastInstance(config);
}
/**
* Gets the HazelcastInstance object.
*
* @return HazelcastInstance
*/
HazelcastInstance getHZInstance() {
return hazelcastInstance;
}
@Override
public IKVTable getTable(final String tableName) {
IMap<byte[], VersionedValue> map = hazelcastInstance.getMap(MAP_PREFIX + tableName);
return new HZTable(tableName, map);
}
@Override
public void dropTable(final IKVTable table) {
((HZTable) table).getBackendMap().clear();
}
@Override
public long create(final IKVTableID tableId, final byte[] key, final byte[] value)
throws ObjectExistsException {
IKVTable table = (IKVTable) tableId;
return table.create(key, value);
}
@Override
public long forceCreate(final IKVTableID tableId, final byte[] key, final byte[] value) {
IKVTable table = (IKVTable) tableId;
return table.forceCreate(key, value);
}
@Override
public IKVEntry read(final IKVTableID tableId, final byte[] key)
throws ObjectDoesntExistException {
IKVTable table = (IKVTable) tableId;
return table.read(key);
}
@Override
public long update(final IKVTableID tableId, final byte[] key, final byte[] value,
final long version) throws ObjectDoesntExistException,
WrongVersionException {
IKVTable table = (IKVTable) tableId;
return table.update(key, value, version);
}
@Override
public long update(final IKVTableID tableId, final byte[] key, final byte[] value)
throws ObjectDoesntExistException {
IKVTable table = (IKVTable) tableId;
return table.update(key, value);
}
@Override
public long delete(final IKVTableID tableId, final byte[] key, final long version)
throws ObjectDoesntExistException, WrongVersionException {
IKVTable table = (IKVTable) tableId;
return table.delete(key, version);
}
@Override
public long forceDelete(final IKVTableID tableId, final byte[] key) {
IKVTable table = (IKVTable) tableId;
return table.forceDelete(key);
}
@Override
public Iterable<IKVEntry> getAllEntries(final IKVTableID tableId) {
IKVTable table = (IKVTable) tableId;
return table.getAllEntries();
}
@Override
public IMultiEntryOperation createOp(final IKVTableID tableId, final byte[] key,
final byte[] value) {
return new HZMultiEntryOperation((HZTable) tableId, key, value, HZClient.VERSION_NONEXISTENT, OPERATION.CREATE);
}
@Override
public IMultiEntryOperation forceCreateOp(final IKVTableID tableId, final byte[] key,
final byte[] value) {
return new HZMultiEntryOperation((HZTable) tableId, key, value,
HZClient.VERSION_NONEXISTENT, OPERATION.FORCE_CREATE);
}
@Override
public IMultiEntryOperation readOp(final IKVTableID tableId, final byte[] key) {
return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.READ);
}
@Override
public IMultiEntryOperation updateOp(final IKVTableID tableId, final byte[] key,
final byte[] value, final long version) {
return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.UPDATE);
}
@Override
public IMultiEntryOperation deleteOp(final IKVTableID tableId, final byte[] key,
final byte[] value, final long version) {
return new HZMultiEntryOperation((HZTable) tableId, key, value, version, OPERATION.DELETE);
}
@Override
public IMultiEntryOperation forceDeleteOp(final IKVTableID tableId, final byte[] key) {
return new HZMultiEntryOperation((HZTable) tableId, key, OPERATION.FORCE_DELETE);
}
@Override
public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
boolean failExists = false;
for (IMultiEntryOperation op : ops) {
HZMultiEntryOperation mop = (HZMultiEntryOperation) op;
switch (mop.getOperation()) {
case DELETE:
try {
final long version = delete(mop.getTableId(), mop.getKey(), mop.getVersion());
mop.setVersion(version);
mop.setStatus(STATUS.SUCCESS);
} catch (ObjectDoesntExistException | WrongVersionException e) {
log.error(mop + " failed.", e);
mop.setStatus(STATUS.FAILED);
failExists = true;
}
break;
case FORCE_DELETE:
final long version = forceDelete(mop.getTableId(), mop.getKey());
mop.setVersion(version);
mop.setStatus(STATUS.SUCCESS);
break;
default:
throw new UnsupportedOperationException(mop.toString());
}
}
return failExists;
}
@Override
public boolean multiWrite(final List<IMultiEntryOperation> ops) {
// there may be room to batch to improve performance
boolean failExists = false;
for (IMultiEntryOperation op : ops) {
IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
switch (mop.getOperation()) {
case CREATE:
try {
long version = create(mop.getTableId(), mop.getKey(), mop.getValue());
mop.setVersion(version);
mop.setStatus(STATUS.SUCCESS);
} catch (ObjectExistsException e) {
log.error(mop + " failed.", e);
mop.setStatus(STATUS.FAILED);
failExists = true;
}
break;
case FORCE_CREATE: {
final long version = forceCreate(mop.getTableId(), mop.getKey(), mop.getValue());
mop.setVersion(version);
mop.setStatus(STATUS.SUCCESS);
break;
}
case UPDATE:
try {
long version = update(mop.getTableId(), mop.getKey(), mop.getValue(), mop.getVersion());
mop.setVersion(version);
mop.setStatus(STATUS.SUCCESS);
} catch (ObjectDoesntExistException | WrongVersionException e) {
log.error(mop + " failed.", e);
mop.setStatus(STATUS.FAILED);
failExists = true;
}
break;
default:
throw new UnsupportedOperationException(mop.toString());
}
}
return failExists;
}
@Override
public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
boolean failExists = false;
for (IMultiEntryOperation op : ops) {
IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
HZTable table = (HZTable) op.getTableId();
((HZMultiEntryOperation) mop.getActualOperation()).setFuture(table.getBackendMap().getAsync(op.getKey()));
}
for (IMultiEntryOperation op : ops) {
IModifiableMultiEntryOperation mop = (IModifiableMultiEntryOperation) op;
if (!mop.hasSucceeded()) {
failExists = true;
}
}
return failExists;
}
@Override
public long getVersionNonexistant() {
return VERSION_NONEXISTENT;
}
private String getCounterName(final IKVTableID tableId, final byte[] key) {
StringBuilder buf = new StringBuilder(tableId.getTableName());
buf.append('@');
ByteArrayUtil.toHexStringBuilder(key, ":", buf);
return buf.toString();
}
private IAtomicLong getAtomicLong(final IKVTableID tableId, final byte[] key) {
// TODO we probably want to implement some sort of caching
return hazelcastInstance.getAtomicLong(getCounterName(tableId, key));
}
/**
* {@inheritDoc}
* <p />
* Warning: The counter is a different object from {@code key} entry on
* IKVTable with {@code tableId}. You cannot use table API to read/write
* counters.
*
* @param tableId Only getTableName() will be used.
* @param key tableId + key will be used as Counter name
*/
@Override
public void createCounter(final IKVTableID tableId,
final byte[] key, final long initialValue)
throws ObjectExistsException {
IAtomicLong counter = getAtomicLong(tableId, key);
// Assumption here is that AtomicLong is initialized to 0L
final boolean success = counter.compareAndSet(0L, initialValue);
if (!success) {
throw new ObjectExistsException("Atomic counter "
+ getCounterName(tableId, key)
+ " already exist with value:" + counter.get());
}
}
@Override
public void setCounter(final IKVTableID tableId,
final byte[] key, final long value) {
IAtomicLong counter = getAtomicLong(tableId, key);
counter.set(value);
}
@Override
public long incrementCounter(final IKVTableID tableId,
final byte[] key, final long incrementValue) {
IAtomicLong counter = getAtomicLong(tableId, key);
return counter.addAndGet(incrementValue);
}
@Override
public void destroyCounter(final IKVTableID tableId, final byte[] key) {
IAtomicLong counter = getAtomicLong(tableId, key);
counter.destroy();
}
@Override
public long getCounter(final IKVTableID tableId, final byte[] key)
throws ObjectDoesntExistException {
IAtomicLong counter = getAtomicLong(tableId, key);
return counter.get();
}
}