blob: 9a9141e89111d2872f9c35de26d809deac6a06c3 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.ramcloud;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -07002
3import java.io.File;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07004import java.nio.ByteBuffer;
5import java.nio.ByteOrder;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -07006import java.util.ArrayList;
7import java.util.Arrays;
8import java.util.Collection;
9import java.util.Iterator;
10import java.util.List;
11import java.util.concurrent.ConcurrentHashMap;
12
Jonathan Hart6df90172014-04-03 10:13:11 -070013import net.onrc.onos.core.datastore.IKVClient;
14import net.onrc.onos.core.datastore.IKVTable;
Jonathan Harta99ec672014-04-03 11:30:34 -070015import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
Jonathan Hart6df90172014-04-03 10:13:11 -070016import net.onrc.onos.core.datastore.IKVTableID;
17import net.onrc.onos.core.datastore.IMultiEntryOperation;
Jonathan Harta99ec672014-04-03 11:30:34 -070018import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
Jonathan Hart6df90172014-04-03 10:13:11 -070019import net.onrc.onos.core.datastore.ObjectDoesntExistException;
20import net.onrc.onos.core.datastore.ObjectExistsException;
21import net.onrc.onos.core.datastore.WrongVersionException;
Jonathan Hart6df90172014-04-03 10:13:11 -070022import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
23import net.onrc.onos.core.datastore.ramcloud.RCTable.Entry;
24import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070025
26import org.apache.commons.configuration.Configuration;
27import org.apache.commons.configuration.ConfigurationException;
28import org.apache.commons.configuration.PropertiesConfiguration;
29import org.slf4j.Logger;
30import org.slf4j.LoggerFactory;
31
32import edu.stanford.ramcloud.JRamCloud;
33import edu.stanford.ramcloud.JRamCloud.MultiReadObject;
34import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
35import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject;
36import edu.stanford.ramcloud.JRamCloud.RejectRules;
37import edu.stanford.ramcloud.JRamCloud.RejectRulesException;
38import edu.stanford.ramcloud.JRamCloud.TableEnumerator2;
39
40public class RCClient implements IKVClient {
41 private static final Logger log = LoggerFactory.getLogger(RCClient.class);
42
43 private static final String DB_CONFIG_FILE = "conf/ramcloud.conf";
Ray Milkey5c9f2db2014-04-09 10:31:21 -070044 public static final Configuration CONFIG = getConfiguration();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070045
46 // Value taken from RAMCloud's Status.h
47 // FIXME These constants should be defined by JRamCloud
48 public static final int STATUS_OK = 0;
49
50 // FIXME come up with a proper way to retrieve configuration
51 public static final int MAX_MULTI_READS = Math.max(1, Integer
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070052 .valueOf(System.getProperty("ramcloud.max_multi_reads", "400")));
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070053
54 public static final int MAX_MULTI_WRITES = Math.max(1, Integer
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070055 .valueOf(System.getProperty("ramcloud.max_multi_writes", "800")));
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070056
Ray Milkey5c9f2db2014-04-09 10:31:21 -070057 private static final ThreadLocal<JRamCloud> TLS_RC_CLIENT = new ThreadLocal<JRamCloud>() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070058 @Override
59 protected JRamCloud initialValue() {
Ray Milkey5c9f2db2014-04-09 10:31:21 -070060 return new JRamCloud(getCoordinatorUrl(CONFIG));
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070061 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070062 };
63
64 /**
65 * @return JRamCloud instance intended to be used only within the
Ray Milkey269ffb92014-04-03 14:43:30 -070066 * SameThread.
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070067 * @note Do not store the returned instance in a member variable, etc. which
Ray Milkey269ffb92014-04-03 14:43:30 -070068 * may be accessed later by another thread.
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070069 */
70 static JRamCloud getJRamCloudClient() {
Ray Milkey5c9f2db2014-04-09 10:31:21 -070071 return TLS_RC_CLIENT.get();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070072 }
73
74 // Currently RCClient is state-less
Ray Milkey5c9f2db2014-04-09 10:31:21 -070075 private static final RCClient THE_INSTANCE = new RCClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070076
77 public static RCClient getClient() {
Ray Milkey5c9f2db2014-04-09 10:31:21 -070078 return THE_INSTANCE;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070079 }
80
81 public static final Configuration getConfiguration() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070082 final File configFile = new File(System.getProperty("ramcloud.config.path", DB_CONFIG_FILE));
83 return getConfiguration(configFile);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070084 }
85
86 public static final Configuration getConfiguration(final File configFile) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070087 if (configFile == null) {
88 throw new IllegalArgumentException("Need to specify a configuration file or storage directory");
89 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070090
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070091 if (!configFile.isFile()) {
92 throw new IllegalArgumentException("Location of configuration must be a file");
93 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070094
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070095 try {
96 return new PropertiesConfiguration(configFile);
97 } catch (ConfigurationException e) {
98 throw new IllegalArgumentException("Could not load configuration at: " + configFile, e);
99 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700100 }
101
102 public static String getCoordinatorUrl(final Configuration configuration) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700103 final String coordinatorIp = configuration.getString("ramcloud.coordinatorIp", "fast+udp:host=127.0.0.1");
104 final String coordinatorPort = configuration.getString("ramcloud.coordinatorPort", "port=12246");
105 final String coordinatorURL = coordinatorIp + "," + coordinatorPort;
106 return coordinatorURL;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700107 }
108
109 @Override
110 public IMultiEntryOperation createOp(IKVTableID tableId, byte[] key, byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700111 return RCMultiEntryOperation.create(tableId, key, value);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700112 }
113
114 /**
115 * @param tableId RCTableID instance
116 */
117 @Override
118 public long create(IKVTableID tableId, byte[] key, byte[] value)
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700119 throws ObjectExistsException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700120
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700121 RCTableID rcTableId = (RCTableID) tableId;
122 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700123
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700124 RejectRules rules = new RejectRules();
125 rules.rejectIfExists();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700126
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700127 try {
128 return rcClient.write(rcTableId.getTableID(), key, value, rules);
129 } catch (JRamCloud.ObjectExistsException e) {
130 throw new ObjectExistsException(rcTableId, key, e);
131 } catch (JRamCloud.RejectRulesException e) {
132 log.error("Unexpected RejectRulesException", e);
133 return JRamCloud.VERSION_NONEXISTENT;
134 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700135 }
136
137 @Override
138 public IMultiEntryOperation forceCreateOp(IKVTableID tableId, byte[] key, byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700139 return RCMultiEntryOperation.forceCreate(tableId, key, value);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700140 }
141
142 @Override
143 public long forceCreate(IKVTableID tableId, byte[] key, byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700144 RCTableID rcTableId = (RCTableID) tableId;
145 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700146
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700147 long updatedVersion = rcClient.write(rcTableId.getTableID(), key, value);
148 return updatedVersion;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700149 }
150
151 @Override
152 public IMultiEntryOperation readOp(IKVTableID tableId, byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700153 return RCMultiEntryOperation.read(tableId, key);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700154 }
155
156 @Override
157 public IKVEntry read(IKVTableID tableId, byte[] key)
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700158 throws ObjectDoesntExistException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700159
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700160 RCTableID rcTableId = (RCTableID) tableId;
161 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700162
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700163 RejectRules rules = new RejectRules();
164 rules.rejectIfDoesntExists();
165 try {
166 JRamCloud.Object rcObj = rcClient.read(rcTableId.getTableID(), key, rules);
167 return new Entry(rcObj.key, rcObj.value, rcObj.version);
168 } catch (JRamCloud.ObjectDoesntExistException e) {
169 throw new ObjectDoesntExistException(rcTableId, key, e);
170 } catch (JRamCloud.RejectRulesException e) {
171 log.error("Unexpected RejectRulesException", e);
172 return null;
173 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700174 }
175
176 @Override
Ray Milkey269ffb92014-04-03 14:43:30 -0700177 public IMultiEntryOperation updateOp(IKVTableID tableId, byte[] key, byte[] value, long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700178 return RCMultiEntryOperation.update(tableId, key, value, version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700179 }
180
181 @Override
182 public long update(IKVTableID tableId, byte[] key, byte[] value,
Ray Milkey269ffb92014-04-03 14:43:30 -0700183 long version) throws ObjectDoesntExistException,
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700184 WrongVersionException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700185
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700186 RCTableID rcTableId = (RCTableID) tableId;
187 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700188
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700189 RejectRules rules = new RejectRules();
190 rules.rejectIfDoesntExists();
191 rules.rejectIfNeVersion(version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700192
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700193 try {
194 return rcClient.write(rcTableId.getTableID(), key, value, rules);
195 } catch (JRamCloud.ObjectDoesntExistException e) {
196 throw new ObjectDoesntExistException(rcTableId, key, e);
197 } catch (JRamCloud.WrongVersionException e) {
198 throw new WrongVersionException(rcTableId, key, version, e);
199 } catch (JRamCloud.RejectRulesException e) {
200 log.error("Unexpected RejectRulesException", e);
201 return JRamCloud.VERSION_NONEXISTENT;
202 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700203 }
204
205
206 @Override
207 public long update(IKVTableID tableId, byte[] key, byte[] value)
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700208 throws ObjectDoesntExistException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700209
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700210 RCTableID rcTableId = (RCTableID) tableId;
211 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700212
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700213 RejectRules rules = new RejectRules();
214 rules.rejectIfDoesntExists();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700215
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700216 try {
217 return rcClient.write(rcTableId.getTableID(), key, value, rules);
218 } catch (JRamCloud.ObjectDoesntExistException e) {
219 throw new ObjectDoesntExistException(rcTableId, key, e);
220 } catch (JRamCloud.RejectRulesException e) {
221 log.error("Unexpected RejectRulesException", e);
222 return JRamCloud.VERSION_NONEXISTENT;
223 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700224 }
225
226 @Override
Ray Milkey269ffb92014-04-03 14:43:30 -0700227 public IMultiEntryOperation deleteOp(IKVTableID tableId, byte[] key, byte[] value, long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700228 return RCMultiEntryOperation.delete(tableId, key, value, version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700229 }
230
231 @Override
232 public long delete(IKVTableID tableId, byte[] key, long version)
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700233 throws ObjectDoesntExistException, WrongVersionException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700234
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700235 RCTableID rcTableId = (RCTableID) tableId;
236 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700237
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700238 RejectRules rules = new RejectRules();
239 rules.rejectIfDoesntExists();
240 rules.rejectIfNeVersion(version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700241
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700242 try {
243 return rcClient.remove(rcTableId.getTableID(), key, rules);
244 } catch (JRamCloud.ObjectDoesntExistException e) {
245 throw new ObjectDoesntExistException(rcTableId, key, e);
246 } catch (JRamCloud.WrongVersionException e) {
247 throw new WrongVersionException(rcTableId, key, version, e);
248 } catch (JRamCloud.RejectRulesException e) {
249 log.error("Unexpected RejectRulesException", e);
250 return JRamCloud.VERSION_NONEXISTENT;
251 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700252 }
253
254 @Override
255 public IMultiEntryOperation forceDeleteOp(IKVTableID tableId, byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700256 return RCMultiEntryOperation.forceDelete(tableId, key);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700257 }
258
259 @Override
260 public long forceDelete(IKVTableID tableId, byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700261 RCTableID rcTableId = (RCTableID) tableId;
262 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700263 final long removedVersion = rcClient.remove(rcTableId.getTableID(), key);
264 return removedVersion;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700265 }
266
267 @Override
268 public Iterable<IKVEntry> getAllEntries(IKVTableID tableId) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700269 return new RCTableEntryIterable((RCTableID) tableId);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700270 }
271
272 static class RCTableEntryIterable implements Iterable<IKVEntry> {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700273 private final RCTableID tableId;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700274
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700275 public RCTableEntryIterable(final RCTableID tableId) {
276 this.tableId = tableId;
277 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700278
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700279 @Override
280 public Iterator<IKVEntry> iterator() {
281 return new RCClient.RCTableIterator(tableId);
282 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700283 }
284
285 public static class RCTableIterator implements Iterator<IKVEntry> {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700286 private final RCTableID tableId;
287 protected final TableEnumerator2 enumerator;
288 private JRamCloud.Object last;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700289
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700290 public RCTableIterator(final RCTableID tableId) {
291 this.tableId = tableId;
292 this.enumerator = getJRamCloudClient().new TableEnumerator2(tableId.getTableID());
293 this.last = null;
294 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700295
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700296 @Override
297 public boolean hasNext() {
298 return this.enumerator.hasNext();
299 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700300
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700301 @Override
302 public RCTable.Entry next() {
303 last = enumerator.next();
304 return new RCTable.Entry(last.key, last.value, last.version);
305 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700306
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700307 @Override
308 public void remove() {
309 if (last != null) {
310 getJRamCloudClient();
311 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700312
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700313 RejectRules rules = new RejectRules();
314 rules.rejectIfNeVersion(last.version);
315 try {
316 rcClient.remove(tableId.getTableID(), last.key, rules);
317 } catch (RejectRulesException e) {
318 log.trace("remove failed", e);
319 }
320 last = null;
321 }
322 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700323 }
324
325 @Override
326 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
327
Ray Milkey269ffb92014-04-03 14:43:30 -0700328 if (ops.size() <= MAX_MULTI_READS && ops instanceof ArrayList) {
Ray Milkey7f1567c2014-04-08 13:53:32 -0700329 @SuppressWarnings({ "unchecked", "rawtypes" })
Ray Milkey269ffb92014-04-03 14:43:30 -0700330 final ArrayList<RCMultiEntryOperation> arrays = (ArrayList) ops;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700331 return multiReadInternal(arrays);
332 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700333
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700334 boolean failExists = false;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700335
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700336 ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
337 Iterator<IMultiEntryOperation> it = ops.iterator();
338 while (it.hasNext()) {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700339
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700340 req.add((RCMultiEntryOperation) it.next());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700341
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700342 if (req.size() >= MAX_MULTI_READS) {
343 // dispatch multiRead
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700344 failExists |= multiReadInternal(req);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700345 req.clear();
346 }
347 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700348
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700349 if (!req.isEmpty()) {
350 // dispatch multiRead
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700351 failExists |= multiReadInternal(req);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700352 req.clear();
353 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700354
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700355 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700356 }
357
358 @Override
359 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
360
Ray Milkey269ffb92014-04-03 14:43:30 -0700361 if (ops.size() <= MAX_MULTI_WRITES && ops instanceof ArrayList) {
Ray Milkey7f1567c2014-04-08 13:53:32 -0700362 @SuppressWarnings({ "unchecked", "rawtypes" })
Ray Milkey269ffb92014-04-03 14:43:30 -0700363 final ArrayList<RCMultiEntryOperation> arrays = (ArrayList) ops;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700364 return multiWriteInternal(arrays);
365 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700366
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700367 boolean failExists = false;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700368
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700369 ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
370 Iterator<IMultiEntryOperation> it = ops.iterator();
371 while (it.hasNext()) {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700372
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700373 req.add((RCMultiEntryOperation) it.next());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700374
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700375 if (req.size() >= MAX_MULTI_WRITES) {
376 // dispatch multiWrite
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700377 failExists |= multiWriteInternal(req);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700378 req.clear();
379 }
380 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700381
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700382 if (!req.isEmpty()) {
383 // dispatch multiWrite
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700384 failExists |= multiWriteInternal(req);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700385 req.clear();
386 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700387
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700388 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700389 }
390
391 @Override
392 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
393
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700394 // TODO implement multiRemove JNI, etc. if we need performance
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700395
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700396 boolean failExists = false;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700397 JRamCloud rcClient = getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700398
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700399 for (IMultiEntryOperation iop : ops) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700400 RCMultiEntryOperation op = (RCMultiEntryOperation) iop;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700401 switch (op.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700402 case DELETE:
403 RejectRules rules = new RejectRules();
404 rules.rejectIfDoesntExists();
405 rules.rejectIfNeVersion(op.getVersion());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700406
Ray Milkey269ffb92014-04-03 14:43:30 -0700407 try {
408 final long removedVersion = rcClient.remove(op.tableId.getTableID(), op.entry.getKey(), rules);
409 op.entry.setVersion(removedVersion);
410 op.status = STATUS.SUCCESS;
411 } catch (JRamCloud.ObjectDoesntExistException | JRamCloud.WrongVersionException e) {
412 log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), "") + " from tableID:" + op.tableId, e);
413 failExists = true;
414 op.status = STATUS.FAILED;
415 } catch (JRamCloud.RejectRulesException e) {
416 log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), "") + " from tableID:" + op.tableId, e);
417 failExists = true;
418 op.status = STATUS.FAILED;
419 }
420 break;
421
422 case FORCE_DELETE:
423 final long removedVersion = rcClient.remove(op.tableId.getTableID(), op.entry.getKey());
424 if (removedVersion != VERSION_NONEXISTENT) {
425 op.entry.setVersion(removedVersion);
426 op.status = STATUS.SUCCESS;
427 } else {
428 log.error("Failed to remove key:{} from tableID:{}", ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), ""), op.tableId);
429 failExists = true;
430 op.status = STATUS.FAILED;
431 }
432 break;
433
434 default:
435 log.error("Invalid operation {} specified on multiDelete", op.getOperation());
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700436 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700437 op.status = STATUS.FAILED;
Ray Milkey269ffb92014-04-03 14:43:30 -0700438 break;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700439 }
440 }
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700441 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700442 }
443
444 private boolean multiReadInternal(final ArrayList<RCMultiEntryOperation> ops) {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700445 boolean failExists = false;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700446 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700447
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700448 final int reqs = ops.size();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700449
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700450 MultiReadObject multiReadObjects = new MultiReadObject(reqs);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700451
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700452 // setup multi-read operation objects
453 for (int i = 0; i < reqs; ++i) {
454 IMultiEntryOperation op = ops.get(i);
Ray Milkey269ffb92014-04-03 14:43:30 -0700455 multiReadObjects.setObject(i, ((RCTableID) op.getTableId()).getTableID(), op.getKey());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700456 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700457
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700458 // execute
459 JRamCloud.Object[] results = rcClient.multiRead(multiReadObjects.tableId, multiReadObjects.key, multiReadObjects.keyLength, reqs);
460 if (results.length != reqs) {
461 log.error("multiRead returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700462 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700463 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700464
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700465 for (int i = 0; i < results.length; ++i) {
466 IModifiableMultiEntryOperation op = ops.get(i);
467 if (results[i] == null) {
468 log.error("MultiRead error, skipping {}, {}", op.getTableId(), op);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700469 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700470 op.setStatus(STATUS.FAILED);
471 continue;
472 }
473 assert (Arrays.equals(results[i].key, op.getKey()));
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700474
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700475 op.setValue(results[i].value, results[i].version);
476 if (results[i].version == JRamCloud.VERSION_NONEXISTENT) {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700477 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700478 op.setStatus(STATUS.FAILED);
479 } else {
480 op.setStatus(STATUS.SUCCESS);
481 }
482 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700483
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700484 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700485 }
486
487 private boolean multiWriteInternal(final ArrayList<RCMultiEntryOperation> ops) {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700488 boolean failExists = false;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700489 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700490
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700491 final int reqs = ops.size();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700492
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700493 MultiWriteObject multiWriteObjects = new MultiWriteObject(reqs);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700494
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700495 for (int i = 0; i < reqs; ++i) {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700496
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700497 IModifiableMultiEntryOperation op = ops.get(i);
498 RejectRules rules = new RejectRules();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700499
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700500 switch (op.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700501 case CREATE:
502 rules.rejectIfExists();
503 break;
504 case FORCE_CREATE:
505 // no reject rule
506 break;
507 case UPDATE:
508 rules.rejectIfDoesntExists();
509 rules.rejectIfNeVersion(op.getVersion());
510 break;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700511
Ray Milkey269ffb92014-04-03 14:43:30 -0700512 default:
513 log.error("Invalid operation {} specified on multiWriteInternal", op.getOperation());
514 failExists = true;
515 op.setStatus(STATUS.FAILED);
516 return failExists;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700517 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700518 multiWriteObjects.setObject(i, ((RCTableID) op.getTableId()).getTableID(), op.getKey(), op.getValue(), rules);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700519 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700520
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700521 MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects.tableId, multiWriteObjects.key, multiWriteObjects.keyLength, multiWriteObjects.value, multiWriteObjects.valueLength, ops.size(), multiWriteObjects.rules);
522 if (results.length != reqs) {
523 log.error("multiWrite returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700524 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700525 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700526
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700527 for (int i = 0; i < results.length; ++i) {
528 IModifiableMultiEntryOperation op = ops.get(i);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700529
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700530 if (results[i] != null
531 && results[i].getStatus() == RCClient.STATUS_OK) {
532 op.setStatus(STATUS.SUCCESS);
533 op.setVersion(results[i].getVersion());
534 } else {
535 op.setStatus(STATUS.FAILED);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700536 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700537 }
538 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700539
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700540 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700541 }
542
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700543 private static final ConcurrentHashMap<String, RCTable> TABLES = new ConcurrentHashMap<>();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700544
545 @Override
546 public IKVTable getTable(final String tableName) {
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700547 RCTable table = TABLES.get(tableName);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700548 if (table == null) {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700549 RCTable newTable = new RCTable(tableName);
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700550 RCTable existingTable = TABLES
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700551 .putIfAbsent(tableName, newTable);
552 if (existingTable != null) {
553 return existingTable;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700554 } else {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700555 return newTable;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700556 }
557 }
558 return table;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700559 }
560
561 @Override
562 public void dropTable(IKVTable table) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700563 JRamCloud rcClient = RCClient.getJRamCloudClient();
564 rcClient.dropTable(table.getTableId().getTableName());
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700565 TABLES.remove(table.getTableId().getTableName());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700566 }
567
568 static final long VERSION_NONEXISTENT = JRamCloud.VERSION_NONEXISTENT;
569
570 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700571 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700572 return VERSION_NONEXISTENT;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700573 }
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700574
575 @Override
576 public void createCounter(final IKVTableID tableId, final byte[] key,
577 final long initialValue)
578 throws ObjectExistsException {
579
580 ByteBuffer valueBytes = ByteBuffer.allocate(8)
581 .order(ByteOrder.LITTLE_ENDIAN).putLong(initialValue);
582 valueBytes.flip();
583 final long version = create(tableId, key, valueBytes.array());
584 if (log.isTraceEnabled()) {
585 log.trace("Created counter {}-{}={}@{}",
586 tableId, ByteArrayUtil.toHexStringBuffer(key, ":"),
587 initialValue, version);
588 }
589 }
590
591 @Override
592 public void setCounter(final IKVTableID tableId, final byte[] key,
593 final long value) {
594
595 ByteBuffer valueBytes = ByteBuffer.allocate(8)
596 .order(ByteOrder.LITTLE_ENDIAN).putLong(value);
597 valueBytes.flip();
598
599 final long version = forceCreate(tableId, key, valueBytes.array());
600 if (log.isTraceEnabled()) {
601 log.trace("set counter {}-{}={}@{}",
602 tableId, ByteArrayUtil.toHexStringBuffer(key, ":"),
603 value, version);
604 }
605 }
606
607 @Override
608 public long incrementCounter(final IKVTableID tableId, final byte[] key,
609 final long incrementValue) {
610
611 RCTableID rcTableId = (RCTableID) tableId;
612 JRamCloud rcClient = RCClient.getJRamCloudClient();
613
614 try {
615 return rcClient.increment(rcTableId.getTableID(), key, incrementValue);
616 } catch (JRamCloud.ObjectDoesntExistException e) {
617 log.warn("Counter {}-{} was not present",
618 tableId,
619 ByteArrayUtil.toHexStringBuffer(key, ":"));
620 try {
621 // creating counter initialized to 0
622 createCounter(rcTableId, key, 0L);
623 } catch (ObjectExistsException e1) {
624 // someone concurrently created it
625 log.debug("Counter {}-{} seemed to be concurrently created.",
626 tableId,
627 ByteArrayUtil.toHexStringBuffer(key, ":"));
628 }
629 try {
630 return rcClient.increment(rcTableId.getTableID(), key, incrementValue);
631 } catch (edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException e1) {
632 log.error("Should never happen");
633 throw new IllegalStateException("Created counter disappeared.");
634 }
635 }
636 }
637
638 @Override
639 public void destroyCounter(final IKVTableID tableId, final byte[] key) {
640
641 RCTableID rcTableId = (RCTableID) tableId;
642 JRamCloud rcClient = RCClient.getJRamCloudClient();
643
644 rcClient.remove(rcTableId.getTableID(), key);
645 }
646
647 @Override
648 public long getCounter(IKVTableID tableId, byte[] key)
649 throws ObjectDoesntExistException {
650
651 IKVEntry entry = read(tableId, key);
652 ByteBuffer counter = ByteBuffer.wrap(entry.getValue()).order(ByteOrder.LITTLE_ENDIAN);
653 return counter.getLong();
654 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700655}