| package net.onrc.onos.core.datastore.ramcloud; |
| |
| import java.io.File; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import net.onrc.onos.core.datastore.IKVClient; |
| import net.onrc.onos.core.datastore.IKVTable; |
| import net.onrc.onos.core.datastore.IKVTable.IKVEntry; |
| import net.onrc.onos.core.datastore.IKVTableID; |
| import net.onrc.onos.core.datastore.IMultiEntryOperation; |
| import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS; |
| import net.onrc.onos.core.datastore.ObjectDoesntExistException; |
| import net.onrc.onos.core.datastore.ObjectExistsException; |
| import net.onrc.onos.core.datastore.WrongVersionException; |
| import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation; |
| import net.onrc.onos.core.datastore.ramcloud.RCTable.Entry; |
| import net.onrc.onos.core.datastore.utils.ByteArrayUtil; |
| |
| import org.apache.commons.configuration.Configuration; |
| import org.apache.commons.configuration.ConfigurationException; |
| import org.apache.commons.configuration.PropertiesConfiguration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import edu.stanford.ramcloud.JRamCloud; |
| import edu.stanford.ramcloud.JRamCloud.MultiReadObject; |
| import edu.stanford.ramcloud.JRamCloud.MultiWriteObject; |
| import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject; |
| import edu.stanford.ramcloud.JRamCloud.RejectRules; |
| import edu.stanford.ramcloud.JRamCloud.RejectRulesException; |
| import edu.stanford.ramcloud.JRamCloud.TableEnumerator2; |
| |
| /** |
| * RAMCloud implementation of datastore IKVClient. |
| */ |
| public class RCClient implements IKVClient { |
| |
| private static final Logger log = LoggerFactory.getLogger(RCClient.class); |
| |
| private static final String DEFAULT_LOCATOR = "zk:localhost:2181"; |
| private static final String DEFAULT_CLUSTERNAME = "ONOS-RC"; |
| |
| private static final String DB_CONFIG_FILE = "conf/ramcloud.conf"; |
| private static final Configuration CONFIG = getConfiguration(); |
| private static final String CLUSTER_NAME = getClusterName(CONFIG); |
| private static final String LOCATOR = getLocator(CONFIG); |
| |
| // FIXME These constants should be defined by JRamCloud |
| /** |
| * Constant defined in RAMCloud's Status.h. |
| */ |
| public static final int STATUS_OK = 0; |
| |
| /** |
| * Maximum number of Multi-Read operations which can be executed in |
| * one RPC call. |
| * |
| * There are multiple factors which determines this limit. |
| * - RAMCloud RPC size limit of 8MB. |
| * - JNI implementation store the RPC result on stack. |
| * (Increasing the stack-size limit will help relaxing this limit.) |
| */ |
| public static final int MAX_MULTI_READS = Math.max(1, Integer |
| .parseInt(System.getProperty("ramcloud.max_multi_reads", "400"))); |
| |
| /** |
| * Maximum number of Multi-Write operations which can be executed in |
| * one RPC call. |
| * |
| * There are multiple factors which determines this limit. |
| * - RAMCloud RPC size limit of 8MB. |
| * - JNI implementation store the RPC result on stack. |
| * (Increasing the stack-size limit will help relaxing this limit.) |
| */ |
| public static final int MAX_MULTI_WRITES = Math.max(1, Integer |
| .parseInt(System.getProperty("ramcloud.max_multi_writes", "800"))); |
| |
| private static final ThreadLocal<JRamCloud> TLS_RC_CLIENT = new ThreadLocal<JRamCloud>() { |
| @Override |
| protected JRamCloud initialValue() { |
| return new JRamCloud(LOCATOR, CLUSTER_NAME); |
| } |
| }; |
| |
| /** |
| * @return JRamCloud instance intended to be used only within the |
| * SameThread. |
| * @note Do not store the returned instance in a member variable, etc. which |
| * may be accessed later by another thread. |
| */ |
| static JRamCloud getJRamCloudClient() { |
| return TLS_RC_CLIENT.get(); |
| } |
| |
| // Currently RCClient is state-less |
| private static final RCClient THE_INSTANCE = new RCClient(); |
| |
| /** |
| * Default constructor. |
| */ |
| protected RCClient() { |
| log.info("locator: {}, cluster name: {}", LOCATOR, CLUSTER_NAME); |
| } |
| |
| /** |
| * Gets a DataStoreClient implemented on RAMCloud. |
| * |
| * @return RCClient |
| */ |
| public static RCClient getClient() { |
| return THE_INSTANCE; |
| } |
| |
| /** |
| * Gets the {@link Configuration} instance. |
| * |
| * @return Configuration |
| */ |
| private static final Configuration getConfiguration() { |
| final File configFile = new File( |
| System.getProperty("ramcloud.config.path", |
| DB_CONFIG_FILE)); |
| return getConfiguration(configFile); |
| } |
| |
| /** |
| * Gets the {@link Configuration} instance from properties file. |
| * |
| * @param configFile properties file |
| * @return Configuration |
| */ |
| private static final Configuration getConfiguration(final File configFile) { |
| if (configFile == null) { |
| throw new IllegalArgumentException("Need to specify a configuration file or storage directory"); |
| } |
| |
| if (!configFile.isFile()) { |
| throw new IllegalArgumentException("Location of configuration must be a file"); |
| } |
| |
| try { |
| PropertiesConfiguration conf = new PropertiesConfiguration(); |
| // stop parsing commas in property value |
| conf.setDelimiterParsingDisabled(true); |
| conf.load(configFile); |
| return conf; |
| } catch (ConfigurationException e) { |
| throw new IllegalArgumentException("Could not load configuration at: " + configFile, e); |
| } |
| } |
| |
| /** |
| * Gets the RAMCloud external storage locator from configuration file. |
| * |
| * @param configuration input |
| * @return RAMCloud external storage locator string |
| */ |
| private static String getLocator(final Configuration configuration) { |
| |
| final String locator = configuration.getString("ramcloud.locator"); |
| if (locator != null) { |
| return locator; |
| } |
| |
| // TODO Stop reading obsolete coordinatorIp, etc. once we're ready. |
| final String coordinatorIp = configuration.getString("ramcloud.coordinatorIp"); |
| if (coordinatorIp == null) { |
| return DEFAULT_LOCATOR; |
| } |
| |
| final String coordinatorPort = configuration.getString("ramcloud.coordinatorPort"); |
| if (coordinatorPort == null) { |
| return DEFAULT_LOCATOR; |
| } |
| |
| final String coordinatorURL = coordinatorIp + "," + coordinatorPort; |
| return coordinatorURL; |
| } |
| |
| /** |
| * Gets the RAMCloud clusterName from configuration file. |
| * |
| * @param configuration input |
| * @return RAMCloud clusterName |
| */ |
| private static String getClusterName(final Configuration configuration) { |
| return configuration.getString("ramcloud.clusterName", |
| DEFAULT_CLUSTERNAME); |
| } |
| |
| @Override |
| public IMultiEntryOperation createOp(IKVTableID tableId, byte[] key, byte[] value) { |
| return RCMultiEntryOperation.create(tableId, key, value); |
| } |
| |
| /** |
| * @param tableId RCTableID instance |
| */ |
| @Override |
| public long create(IKVTableID tableId, byte[] key, byte[] value) |
| throws ObjectExistsException { |
| |
| RCTableID rcTableId = (RCTableID) tableId; |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| |
| RejectRules rules = new RejectRules(); |
| rules.rejectIfExists(); |
| |
| try { |
| return rcClient.write(rcTableId.getTableID(), key, value, rules); |
| } catch (JRamCloud.ObjectExistsException e) { |
| throw new ObjectExistsException(rcTableId, key, e); |
| } catch (JRamCloud.RejectRulesException e) { |
| log.error("Unexpected RejectRulesException", e); |
| return JRamCloud.VERSION_NONEXISTENT; |
| } |
| } |
| |
| @Override |
| public IMultiEntryOperation forceCreateOp(IKVTableID tableId, byte[] key, byte[] value) { |
| return RCMultiEntryOperation.forceCreate(tableId, key, value); |
| } |
| |
| @Override |
| public long forceCreate(IKVTableID tableId, byte[] key, byte[] value) { |
| RCTableID rcTableId = (RCTableID) tableId; |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| |
| long updatedVersion = rcClient.write(rcTableId.getTableID(), key, value); |
| return updatedVersion; |
| } |
| |
| @Override |
| public IMultiEntryOperation readOp(IKVTableID tableId, byte[] key) { |
| return RCMultiEntryOperation.read(tableId, key); |
| } |
| |
| @Override |
| public IKVEntry read(IKVTableID tableId, byte[] key) |
| throws ObjectDoesntExistException { |
| |
| RCTableID rcTableId = (RCTableID) tableId; |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| |
| RejectRules rules = new RejectRules(); |
| rules.rejectIfDoesntExists(); |
| try { |
| JRamCloud.Object rcObj = rcClient.read(rcTableId.getTableID(), key, rules); |
| return new Entry(rcObj.key, rcObj.value, rcObj.version); |
| } catch (JRamCloud.ObjectDoesntExistException e) { |
| throw new ObjectDoesntExistException(rcTableId, key, e); |
| } catch (JRamCloud.RejectRulesException e) { |
| log.error("Unexpected RejectRulesException", e); |
| return null; |
| } |
| } |
| |
| @Override |
| public IMultiEntryOperation updateOp(IKVTableID tableId, byte[] key, byte[] value, long version) { |
| return RCMultiEntryOperation.update(tableId, key, value, version); |
| } |
| |
| @Override |
| public long update(IKVTableID tableId, byte[] key, byte[] value, |
| long version) throws ObjectDoesntExistException, |
| WrongVersionException { |
| |
| RCTableID rcTableId = (RCTableID) tableId; |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| |
| RejectRules rules = new RejectRules(); |
| rules.rejectIfDoesntExists(); |
| rules.rejectIfNeVersion(version); |
| |
| try { |
| return rcClient.write(rcTableId.getTableID(), key, value, rules); |
| } catch (JRamCloud.ObjectDoesntExistException e) { |
| throw new ObjectDoesntExistException(rcTableId, key, e); |
| } catch (JRamCloud.WrongVersionException e) { |
| throw new WrongVersionException(rcTableId, key, version, e); |
| } catch (JRamCloud.RejectRulesException e) { |
| log.error("Unexpected RejectRulesException", e); |
| return JRamCloud.VERSION_NONEXISTENT; |
| } |
| } |
| |
| |
| @Override |
| public long update(IKVTableID tableId, byte[] key, byte[] value) |
| throws ObjectDoesntExistException { |
| |
| RCTableID rcTableId = (RCTableID) tableId; |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| |
| RejectRules rules = new RejectRules(); |
| rules.rejectIfDoesntExists(); |
| |
| try { |
| return rcClient.write(rcTableId.getTableID(), key, value, rules); |
| } catch (JRamCloud.ObjectDoesntExistException e) { |
| throw new ObjectDoesntExistException(rcTableId, key, e); |
| } catch (JRamCloud.RejectRulesException e) { |
| log.error("Unexpected RejectRulesException", e); |
| return JRamCloud.VERSION_NONEXISTENT; |
| } |
| } |
| |
| @Override |
| public IMultiEntryOperation deleteOp(IKVTableID tableId, byte[] key, byte[] value, long version) { |
| return RCMultiEntryOperation.delete(tableId, key, value, version); |
| } |
| |
| @Override |
| public long delete(IKVTableID tableId, byte[] key, long version) |
| throws ObjectDoesntExistException, WrongVersionException { |
| |
| RCTableID rcTableId = (RCTableID) tableId; |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| |
| RejectRules rules = new RejectRules(); |
| rules.rejectIfDoesntExists(); |
| rules.rejectIfNeVersion(version); |
| |
| try { |
| return rcClient.remove(rcTableId.getTableID(), key, rules); |
| } catch (JRamCloud.ObjectDoesntExistException e) { |
| throw new ObjectDoesntExistException(rcTableId, key, e); |
| } catch (JRamCloud.WrongVersionException e) { |
| throw new WrongVersionException(rcTableId, key, version, e); |
| } catch (JRamCloud.RejectRulesException e) { |
| log.error("Unexpected RejectRulesException", e); |
| return JRamCloud.VERSION_NONEXISTENT; |
| } |
| } |
| |
| @Override |
| public IMultiEntryOperation forceDeleteOp(IKVTableID tableId, byte[] key) { |
| return RCMultiEntryOperation.forceDelete(tableId, key); |
| } |
| |
| @Override |
| public long forceDelete(IKVTableID tableId, byte[] key) { |
| RCTableID rcTableId = (RCTableID) tableId; |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| final long removedVersion = rcClient.remove(rcTableId.getTableID(), key); |
| return removedVersion; |
| } |
| |
| @Override |
| public Iterable<IKVEntry> getAllEntries(IKVTableID tableId) { |
| return new RCTableEntryIterable((RCTableID) tableId); |
| } |
| |
| static class RCTableEntryIterable implements Iterable<IKVEntry> { |
| private final RCTableID tableId; |
| |
| public RCTableEntryIterable(final RCTableID tableId) { |
| this.tableId = tableId; |
| } |
| |
| @Override |
| public Iterator<IKVEntry> iterator() { |
| return new RCClient.RCTableIterator(tableId); |
| } |
| } |
| |
| public static class RCTableIterator implements Iterator<IKVEntry> { |
| private final RCTableID tableId; |
| protected final TableEnumerator2 enumerator; |
| private JRamCloud.Object last; |
| |
| public RCTableIterator(final RCTableID tableId) { |
| this.tableId = tableId; |
| this.enumerator = getJRamCloudClient().new TableEnumerator2(tableId.getTableID()); |
| this.last = null; |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return this.enumerator.hasNext(); |
| } |
| |
| @Override |
| public RCTable.Entry next() { |
| last = enumerator.next(); |
| return new RCTable.Entry(last.key, last.value, last.version); |
| } |
| |
| @Override |
| public void remove() { |
| if (last != null) { |
| getJRamCloudClient(); |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| |
| RejectRules rules = new RejectRules(); |
| rules.rejectIfNeVersion(last.version); |
| try { |
| rcClient.remove(tableId.getTableID(), last.key, rules); |
| } catch (RejectRulesException e) { |
| log.trace("remove failed", e); |
| } |
| last = null; |
| } |
| } |
| } |
| |
| @Override |
| public boolean multiRead(final Collection<IMultiEntryOperation> ops) { |
| |
| if (ops.size() <= MAX_MULTI_READS && ops instanceof ArrayList) { |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| final ArrayList<RCMultiEntryOperation> arrays = (ArrayList) ops; |
| return multiReadInternal(arrays); |
| } |
| |
| boolean failExists = false; |
| |
| ArrayList<RCMultiEntryOperation> req = new ArrayList<>(); |
| Iterator<IMultiEntryOperation> it = ops.iterator(); |
| while (it.hasNext()) { |
| |
| req.add((RCMultiEntryOperation) it.next()); |
| |
| if (req.size() >= MAX_MULTI_READS) { |
| // dispatch multiRead |
| failExists |= multiReadInternal(req); |
| req.clear(); |
| } |
| } |
| |
| if (!req.isEmpty()) { |
| // dispatch multiRead |
| failExists |= multiReadInternal(req); |
| req.clear(); |
| } |
| |
| return failExists; |
| } |
| |
| @Override |
| public boolean multiWrite(final List<IMultiEntryOperation> ops) { |
| |
| if (ops.size() <= MAX_MULTI_WRITES && ops instanceof ArrayList) { |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| final ArrayList<RCMultiEntryOperation> arrays = (ArrayList) ops; |
| return multiWriteInternal(arrays); |
| } |
| |
| boolean failExists = false; |
| |
| ArrayList<RCMultiEntryOperation> req = new ArrayList<>(); |
| Iterator<IMultiEntryOperation> it = ops.iterator(); |
| while (it.hasNext()) { |
| |
| req.add((RCMultiEntryOperation) it.next()); |
| |
| if (req.size() >= MAX_MULTI_WRITES) { |
| // dispatch multiWrite |
| failExists |= multiWriteInternal(req); |
| req.clear(); |
| } |
| } |
| |
| if (!req.isEmpty()) { |
| // dispatch multiWrite |
| failExists |= multiWriteInternal(req); |
| req.clear(); |
| } |
| |
| return failExists; |
| } |
| |
| @Override |
| public boolean multiDelete(final Collection<IMultiEntryOperation> ops) { |
| |
| // TODO implement multiRemove JNI, etc. if we need performance |
| |
| boolean failExists = false; |
| JRamCloud rcClient = getJRamCloudClient(); |
| |
| for (IMultiEntryOperation iop : ops) { |
| RCMultiEntryOperation op = (RCMultiEntryOperation) iop; |
| switch (op.getOperation()) { |
| case DELETE: |
| RejectRules rules = new RejectRules(); |
| rules.rejectIfDoesntExists(); |
| rules.rejectIfNeVersion(op.getVersion()); |
| |
| try { |
| final long removedVersion = rcClient.remove(op.tableId.getTableID(), op.entry.getKey(), rules); |
| op.entry.setVersion(removedVersion); |
| op.status = STATUS.SUCCESS; |
| } catch (JRamCloud.ObjectDoesntExistException | JRamCloud.WrongVersionException e) { |
| log.error("Failed to remove key:" + |
| ByteArrayUtil.toHexStringBuilder(op.entry.getKey(), "") + |
| " from tableID:" + op.tableId, e); |
| failExists = true; |
| op.status = STATUS.FAILED; |
| } catch (JRamCloud.RejectRulesException e) { |
| log.error("Failed to remove key:" + |
| ByteArrayUtil.toHexStringBuilder(op.entry.getKey(), "") + |
| " from tableID:" + op.tableId, e); |
| failExists = true; |
| op.status = STATUS.FAILED; |
| } |
| break; |
| |
| case FORCE_DELETE: |
| final long removedVersion = rcClient.remove(op.tableId.getTableID(), op.entry.getKey()); |
| if (removedVersion != VERSION_NONEXISTENT) { |
| op.entry.setVersion(removedVersion); |
| op.status = STATUS.SUCCESS; |
| } else { |
| log.error("Failed to remove key:{} from tableID:{}", |
| ByteArrayUtil.toHexStringBuilder(op.entry.getKey(), ""), |
| op.tableId); |
| failExists = true; |
| op.status = STATUS.FAILED; |
| } |
| break; |
| |
| default: |
| log.error("Invalid operation {} specified on multiDelete", op.getOperation()); |
| failExists = true; |
| op.status = STATUS.FAILED; |
| break; |
| } |
| } |
| return failExists; |
| } |
| |
| private boolean multiReadInternal(final ArrayList<RCMultiEntryOperation> ops) { |
| boolean failExists = false; |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| |
| final int reqs = ops.size(); |
| |
| MultiReadObject multiReadObjects = new MultiReadObject(reqs); |
| |
| // setup multi-read operation objects |
| for (int i = 0; i < reqs; ++i) { |
| IMultiEntryOperation op = ops.get(i); |
| multiReadObjects.setObject(i, ((RCTableID) op.getTableId()).getTableID(), op.getKey()); |
| } |
| |
| // execute |
| JRamCloud.Object[] results = rcClient.multiRead(multiReadObjects.tableId, |
| multiReadObjects.key, multiReadObjects.keyLength, reqs); |
| if (results.length != reqs) { |
| log.error("multiRead returned unexpected number of results. " + |
| "(requested:{}, returned:{})", reqs, results.length); |
| failExists = true; |
| } |
| |
| for (int i = 0; i < results.length; ++i) { |
| IModifiableMultiEntryOperation op = ops.get(i); |
| if (results[i] == null) { |
| // Logging as error gets too noisy when doing speculative read. |
| log.trace("MultiRead error {}, {}", op.getTableId(), op); |
| failExists = true; |
| op.setStatus(STATUS.FAILED); |
| continue; |
| } |
| assert (Arrays.equals(results[i].key, op.getKey())); |
| |
| op.setValue(results[i].value, results[i].version); |
| if (results[i].version == JRamCloud.VERSION_NONEXISTENT) { |
| failExists = true; |
| op.setStatus(STATUS.FAILED); |
| } else { |
| op.setStatus(STATUS.SUCCESS); |
| } |
| } |
| |
| return failExists; |
| } |
| |
| private boolean multiWriteInternal(final ArrayList<RCMultiEntryOperation> ops) { |
| boolean failExists = false; |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| |
| final int reqs = ops.size(); |
| |
| MultiWriteObject multiWriteObjects = new MultiWriteObject(reqs); |
| |
| for (int i = 0; i < reqs; ++i) { |
| |
| IModifiableMultiEntryOperation op = ops.get(i); |
| RejectRules rules = new RejectRules(); |
| |
| switch (op.getOperation()) { |
| case CREATE: |
| rules.rejectIfExists(); |
| break; |
| case FORCE_CREATE: |
| // no reject rule |
| break; |
| case UPDATE: |
| rules.rejectIfDoesntExists(); |
| rules.rejectIfNeVersion(op.getVersion()); |
| break; |
| |
| default: |
| log.error("Invalid operation {} specified on multiWriteInternal", op.getOperation()); |
| failExists = true; |
| op.setStatus(STATUS.FAILED); |
| return failExists; |
| } |
| multiWriteObjects.setObject(i, |
| ((RCTableID) op.getTableId()).getTableID(), op.getKey(), |
| op.getValue(), rules); |
| } |
| |
| MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects.tableId, |
| multiWriteObjects.key, multiWriteObjects.keyLength, |
| multiWriteObjects.value, multiWriteObjects.valueLength, ops.size(), |
| multiWriteObjects.rules); |
| if (results.length != reqs) { |
| log.error("multiWrite returned unexpected number of results. " + |
| "(requested:{}, returned:{})", reqs, results.length); |
| failExists = true; |
| } |
| |
| for (int i = 0; i < results.length; ++i) { |
| IModifiableMultiEntryOperation op = ops.get(i); |
| |
| if (results[i] != null |
| && results[i].getStatus() == RCClient.STATUS_OK) { |
| op.setStatus(STATUS.SUCCESS); |
| op.setVersion(results[i].getVersion()); |
| } else { |
| op.setStatus(STATUS.FAILED); |
| failExists = true; |
| } |
| } |
| |
| return failExists; |
| } |
| |
| private static final ConcurrentHashMap<String, RCTable> TABLES = |
| new ConcurrentHashMap<>(); |
| |
| @Override |
| public IKVTable getTable(final String tableName) { |
| RCTable table = TABLES.get(tableName); |
| if (table == null) { |
| RCTable newTable = new RCTable(tableName); |
| RCTable existingTable = TABLES |
| .putIfAbsent(tableName, newTable); |
| if (existingTable != null) { |
| return existingTable; |
| } else { |
| return newTable; |
| } |
| } |
| return table; |
| } |
| |
| @Override |
| public void dropTable(IKVTable table) { |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| rcClient.dropTable(table.getTableId().getTableName()); |
| TABLES.remove(table.getTableId().getTableName()); |
| } |
| |
| static final long VERSION_NONEXISTENT = JRamCloud.VERSION_NONEXISTENT; |
| |
| @Override |
| public long getVersionNonexistant() { |
| return VERSION_NONEXISTENT; |
| } |
| |
| @Override |
| public void createCounter(final IKVTableID tableId, final byte[] key, |
| final long initialValue) |
| throws ObjectExistsException { |
| |
| ByteBuffer valueBytes = ByteBuffer.allocate(8) |
| .order(ByteOrder.LITTLE_ENDIAN).putLong(initialValue); |
| valueBytes.flip(); |
| final long version = create(tableId, key, valueBytes.array()); |
| if (log.isTraceEnabled()) { |
| log.trace("Created counter {}-{}={}@{}", |
| tableId, ByteArrayUtil.toHexStringBuilder(key, ":"), |
| initialValue, version); |
| } |
| } |
| |
| @Override |
| public void setCounter(final IKVTableID tableId, final byte[] key, |
| final long value) { |
| |
| ByteBuffer valueBytes = ByteBuffer.allocate(8) |
| .order(ByteOrder.LITTLE_ENDIAN).putLong(value); |
| valueBytes.flip(); |
| |
| final long version = forceCreate(tableId, key, valueBytes.array()); |
| if (log.isTraceEnabled()) { |
| log.trace("set counter {}-{}={}@{}", |
| tableId, ByteArrayUtil.toHexStringBuilder(key, ":"), |
| value, version); |
| } |
| } |
| |
| @Override |
| public long incrementCounter(final IKVTableID tableId, final byte[] key, |
| final long incrementValue) { |
| |
| RCTableID rcTableId = (RCTableID) tableId; |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| |
| try { |
| return rcClient.increment(rcTableId.getTableID(), key, incrementValue); |
| } catch (JRamCloud.ObjectDoesntExistException e) { |
| log.warn("Counter {}-{} was not present", |
| tableId, |
| ByteArrayUtil.toHexStringBuilder(key, ":")); |
| try { |
| // creating counter initialized to 0 |
| createCounter(rcTableId, key, 0L); |
| } catch (ObjectExistsException e1) { |
| // someone concurrently created it |
| log.debug("Counter {}-{} seemed to be concurrently created.", |
| tableId, |
| ByteArrayUtil.toHexStringBuilder(key, ":")); |
| } |
| try { |
| return rcClient.increment(rcTableId.getTableID(), key, incrementValue); |
| } catch (edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException e1) { |
| log.error("Should never happen"); |
| throw new IllegalStateException("Created counter disappeared."); |
| } |
| } |
| } |
| |
| @Override |
| public void destroyCounter(final IKVTableID tableId, final byte[] key) { |
| |
| RCTableID rcTableId = (RCTableID) tableId; |
| JRamCloud rcClient = RCClient.getJRamCloudClient(); |
| |
| rcClient.remove(rcTableId.getTableID(), key); |
| } |
| |
| @Override |
| public long getCounter(IKVTableID tableId, byte[] key) |
| throws ObjectDoesntExistException { |
| |
| IKVEntry entry = read(tableId, key); |
| ByteBuffer counter = ByteBuffer.wrap(entry.getValue()).order(ByteOrder.LITTLE_ENDIAN); |
| return counter.getLong(); |
| } |
| } |