blob: 7ff7e7f2d0d2bc19cba0490170a24b0987b39e68 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.ramcloud;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -07002
3import java.io.File;
4import java.util.ArrayList;
5import java.util.Arrays;
6import java.util.Collection;
7import java.util.Iterator;
8import java.util.List;
9import java.util.concurrent.ConcurrentHashMap;
10
Jonathan Hart6df90172014-04-03 10:13:11 -070011import net.onrc.onos.core.datastore.IKVClient;
12import net.onrc.onos.core.datastore.IKVTable;
Jonathan Harta99ec672014-04-03 11:30:34 -070013import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
Jonathan Hart6df90172014-04-03 10:13:11 -070014import net.onrc.onos.core.datastore.IKVTableID;
15import net.onrc.onos.core.datastore.IMultiEntryOperation;
Jonathan Harta99ec672014-04-03 11:30:34 -070016import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
Jonathan Hart6df90172014-04-03 10:13:11 -070017import net.onrc.onos.core.datastore.ObjectDoesntExistException;
18import net.onrc.onos.core.datastore.ObjectExistsException;
19import net.onrc.onos.core.datastore.WrongVersionException;
Jonathan Hart6df90172014-04-03 10:13:11 -070020import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
21import net.onrc.onos.core.datastore.ramcloud.RCTable.Entry;
22import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070023
24import org.apache.commons.configuration.Configuration;
25import org.apache.commons.configuration.ConfigurationException;
26import org.apache.commons.configuration.PropertiesConfiguration;
27import org.slf4j.Logger;
28import org.slf4j.LoggerFactory;
29
30import edu.stanford.ramcloud.JRamCloud;
31import edu.stanford.ramcloud.JRamCloud.MultiReadObject;
32import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
33import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject;
34import edu.stanford.ramcloud.JRamCloud.RejectRules;
35import edu.stanford.ramcloud.JRamCloud.RejectRulesException;
36import edu.stanford.ramcloud.JRamCloud.TableEnumerator2;
37
38public class RCClient implements IKVClient {
39 private static final Logger log = LoggerFactory.getLogger(RCClient.class);
40
41 private static final String DB_CONFIG_FILE = "conf/ramcloud.conf";
42 public static final Configuration config = getConfiguration();
43
44 // Value taken from RAMCloud's Status.h
45 // FIXME These constants should be defined by JRamCloud
46 public static final int STATUS_OK = 0;
47
48 // FIXME come up with a proper way to retrieve configuration
49 public static final int MAX_MULTI_READS = Math.max(1, Integer
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070050 .valueOf(System.getProperty("ramcloud.max_multi_reads", "400")));
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070051
52 public static final int MAX_MULTI_WRITES = Math.max(1, Integer
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070053 .valueOf(System.getProperty("ramcloud.max_multi_writes", "800")));
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070054
55 private static final ThreadLocal<JRamCloud> tlsRCClient = new ThreadLocal<JRamCloud>() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070056 @Override
57 protected JRamCloud initialValue() {
58 return new JRamCloud(getCoordinatorUrl(config));
59 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070060 };
61
62 /**
63 * @return JRamCloud instance intended to be used only within the
64 * SameThread.
65 * @note Do not store the returned instance in a member variable, etc. which
66 * may be accessed later by another thread.
67 */
68 static JRamCloud getJRamCloudClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070069 return tlsRCClient.get();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070070 }
71
72 // Currently RCClient is state-less
73 private static final RCClient theInstance= new RCClient();
74
75 public static RCClient getClient() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070076 return theInstance;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070077 }
78
79 public static final Configuration getConfiguration() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070080 final File configFile = new File(System.getProperty("ramcloud.config.path", DB_CONFIG_FILE));
81 return getConfiguration(configFile);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070082 }
83
84 public static final Configuration getConfiguration(final File configFile) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070085 if (configFile == null) {
86 throw new IllegalArgumentException("Need to specify a configuration file or storage directory");
87 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070088
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070089 if (!configFile.isFile()) {
90 throw new IllegalArgumentException("Location of configuration must be a file");
91 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070092
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070093 try {
94 return new PropertiesConfiguration(configFile);
95 } catch (ConfigurationException e) {
96 throw new IllegalArgumentException("Could not load configuration at: " + configFile, e);
97 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070098 }
99
100 public static String getCoordinatorUrl(final Configuration configuration) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700101 final String coordinatorIp = configuration.getString("ramcloud.coordinatorIp", "fast+udp:host=127.0.0.1");
102 final String coordinatorPort = configuration.getString("ramcloud.coordinatorPort", "port=12246");
103 final String coordinatorURL = coordinatorIp + "," + coordinatorPort;
104 return coordinatorURL;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700105 }
106
107 @Override
108 public IMultiEntryOperation createOp(IKVTableID tableId, byte[] key, byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700109 return RCMultiEntryOperation.create(tableId, key, value);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700110 }
111
112 /**
113 * @param tableId RCTableID instance
114 */
115 @Override
116 public long create(IKVTableID tableId, byte[] key, byte[] value)
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700117 throws ObjectExistsException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700118
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700119 RCTableID rcTableId = (RCTableID) tableId;
120 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700121
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700122 RejectRules rules = new RejectRules();
123 rules.rejectIfExists();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700124
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700125 try {
126 return rcClient.write(rcTableId.getTableID(), key, value, rules);
127 } catch (JRamCloud.ObjectExistsException e) {
128 throw new ObjectExistsException(rcTableId, key, e);
129 } catch (JRamCloud.RejectRulesException e) {
130 log.error("Unexpected RejectRulesException", e);
131 return JRamCloud.VERSION_NONEXISTENT;
132 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700133 }
134
135 @Override
136 public IMultiEntryOperation forceCreateOp(IKVTableID tableId, byte[] key, byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700137 return RCMultiEntryOperation.forceCreate(tableId, key, value);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700138 }
139
140 @Override
141 public long forceCreate(IKVTableID tableId, byte[] key, byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700142 RCTableID rcTableId = (RCTableID) tableId;
143 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700144
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700145 long updatedVersion = rcClient.write(rcTableId.getTableID(), key, value);
146 return updatedVersion;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700147 }
148
149 @Override
150 public IMultiEntryOperation readOp(IKVTableID tableId, byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700151 return RCMultiEntryOperation.read(tableId, key);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700152 }
153
154 @Override
155 public IKVEntry read(IKVTableID tableId, byte[] key)
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700156 throws ObjectDoesntExistException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700157
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700158 RCTableID rcTableId = (RCTableID) tableId;
159 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700160
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700161 RejectRules rules = new RejectRules();
162 rules.rejectIfDoesntExists();
163 try {
164 JRamCloud.Object rcObj = rcClient.read(rcTableId.getTableID(), key, rules);
165 return new Entry(rcObj.key, rcObj.value, rcObj.version);
166 } catch (JRamCloud.ObjectDoesntExistException e) {
167 throw new ObjectDoesntExistException(rcTableId, key, e);
168 } catch (JRamCloud.RejectRulesException e) {
169 log.error("Unexpected RejectRulesException", e);
170 return null;
171 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700172 }
173
174 @Override
175 public IMultiEntryOperation updateOp(IKVTableID tableId, byte[] key, byte[] value,long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700176 return RCMultiEntryOperation.update(tableId, key, value, version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700177 }
178
179 @Override
180 public long update(IKVTableID tableId, byte[] key, byte[] value,
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700181 long version) throws ObjectDoesntExistException,
182 WrongVersionException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700183
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700184 RCTableID rcTableId = (RCTableID) tableId;
185 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700186
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700187 RejectRules rules = new RejectRules();
188 rules.rejectIfDoesntExists();
189 rules.rejectIfNeVersion(version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700190
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700191 try {
192 return rcClient.write(rcTableId.getTableID(), key, value, rules);
193 } catch (JRamCloud.ObjectDoesntExistException e) {
194 throw new ObjectDoesntExistException(rcTableId, key, e);
195 } catch (JRamCloud.WrongVersionException e) {
196 throw new WrongVersionException(rcTableId, key, version, e);
197 } catch (JRamCloud.RejectRulesException e) {
198 log.error("Unexpected RejectRulesException", e);
199 return JRamCloud.VERSION_NONEXISTENT;
200 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700201 }
202
203
204 @Override
205 public long update(IKVTableID tableId, byte[] key, byte[] value)
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700206 throws ObjectDoesntExistException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700207
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700208 RCTableID rcTableId = (RCTableID) tableId;
209 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700210
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700211 RejectRules rules = new RejectRules();
212 rules.rejectIfDoesntExists();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700213
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700214 try {
215 return rcClient.write(rcTableId.getTableID(), key, value, rules);
216 } catch (JRamCloud.ObjectDoesntExistException e) {
217 throw new ObjectDoesntExistException(rcTableId, key, e);
218 } catch (JRamCloud.RejectRulesException e) {
219 log.error("Unexpected RejectRulesException", e);
220 return JRamCloud.VERSION_NONEXISTENT;
221 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700222 }
223
224 @Override
225 public IMultiEntryOperation deleteOp(IKVTableID tableId, byte[] key, byte[] value,long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700226 return RCMultiEntryOperation.delete(tableId, key, value, version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700227 }
228
229 @Override
230 public long delete(IKVTableID tableId, byte[] key, long version)
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700231 throws ObjectDoesntExistException, WrongVersionException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700232
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700233 RCTableID rcTableId = (RCTableID) tableId;
234 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700235
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700236 RejectRules rules = new RejectRules();
237 rules.rejectIfDoesntExists();
238 rules.rejectIfNeVersion(version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700239
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700240 try {
241 return rcClient.remove(rcTableId.getTableID(), key, rules);
242 } catch (JRamCloud.ObjectDoesntExistException e) {
243 throw new ObjectDoesntExistException(rcTableId, key, e);
244 } catch (JRamCloud.WrongVersionException e) {
245 throw new WrongVersionException(rcTableId, key, version, e);
246 } catch (JRamCloud.RejectRulesException e) {
247 log.error("Unexpected RejectRulesException", e);
248 return JRamCloud.VERSION_NONEXISTENT;
249 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700250 }
251
252 @Override
253 public IMultiEntryOperation forceDeleteOp(IKVTableID tableId, byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700254 return RCMultiEntryOperation.forceDelete(tableId, key);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700255 }
256
257 @Override
258 public long forceDelete(IKVTableID tableId, byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700259 RCTableID rcTableId = (RCTableID) tableId;
260 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700261 final long removedVersion = rcClient.remove(rcTableId.getTableID(), key);
262 return removedVersion;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700263 }
264
265 @Override
266 public Iterable<IKVEntry> getAllEntries(IKVTableID tableId) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700267 return new RCTableEntryIterable((RCTableID) tableId);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700268 }
269
270 static class RCTableEntryIterable implements Iterable<IKVEntry> {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700271 private final RCTableID tableId;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700272
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700273 public RCTableEntryIterable(final RCTableID tableId) {
274 this.tableId = tableId;
275 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700276
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700277 @Override
278 public Iterator<IKVEntry> iterator() {
279 return new RCClient.RCTableIterator(tableId);
280 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700281 }
282
283 public static class RCTableIterator implements Iterator<IKVEntry> {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700284 private final RCTableID tableId;
285 protected final TableEnumerator2 enumerator;
286 private JRamCloud.Object last;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700287
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700288 public RCTableIterator(final RCTableID tableId) {
289 this.tableId = tableId;
290 this.enumerator = getJRamCloudClient().new TableEnumerator2(tableId.getTableID());
291 this.last = null;
292 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700293
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700294 @Override
295 public boolean hasNext() {
296 return this.enumerator.hasNext();
297 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700298
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700299 @Override
300 public RCTable.Entry next() {
301 last = enumerator.next();
302 return new RCTable.Entry(last.key, last.value, last.version);
303 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700304
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700305 @Override
306 public void remove() {
307 if (last != null) {
308 getJRamCloudClient();
309 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700310
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700311 RejectRules rules = new RejectRules();
312 rules.rejectIfNeVersion(last.version);
313 try {
314 rcClient.remove(tableId.getTableID(), last.key, rules);
315 } catch (RejectRulesException e) {
316 log.trace("remove failed", e);
317 }
318 last = null;
319 }
320 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700321 }
322
323 @Override
324 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
325
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700326 if ( ops.size() <= MAX_MULTI_READS && ops instanceof ArrayList) {
327 @SuppressWarnings({ "unchecked", "rawtypes" })
328 final ArrayList<RCMultiEntryOperation> arrays = (ArrayList)ops;
329 return multiReadInternal(arrays);
330 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700331
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700332 boolean failExists = false;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700333
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700334 ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
335 Iterator<IMultiEntryOperation> it = ops.iterator();
336 while (it.hasNext()) {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700337
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700338 req.add((RCMultiEntryOperation) it.next());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700339
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700340 if (req.size() >= MAX_MULTI_READS) {
341 // dispatch multiRead
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700342 failExists |= multiReadInternal(req);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700343 req.clear();
344 }
345 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700346
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700347 if (!req.isEmpty()) {
348 // dispatch multiRead
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700349 failExists |= multiReadInternal(req);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700350 req.clear();
351 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700352
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700353 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700354 }
355
356 @Override
357 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
358
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700359 if ( ops.size() <= MAX_MULTI_WRITES && ops instanceof ArrayList) {
360 @SuppressWarnings({ "unchecked", "rawtypes" })
361 final ArrayList<RCMultiEntryOperation> arrays = (ArrayList)ops;
362 return multiWriteInternal(arrays);
363 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700364
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700365 boolean failExists = false;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700366
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700367 ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
368 Iterator<IMultiEntryOperation> it = ops.iterator();
369 while (it.hasNext()) {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700370
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700371 req.add((RCMultiEntryOperation) it.next());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700372
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700373 if (req.size() >= MAX_MULTI_WRITES) {
374 // dispatch multiWrite
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700375 failExists |= multiWriteInternal(req);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700376 req.clear();
377 }
378 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700379
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700380 if (!req.isEmpty()) {
381 // dispatch multiWrite
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700382 failExists |= multiWriteInternal(req);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700383 req.clear();
384 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700385
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700386 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700387 }
388
389 @Override
390 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
391
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700392 // TODO implement multiRemove JNI, etc. if we need performance
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700393
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700394 boolean failExists = false;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700395 JRamCloud rcClient = getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700396
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700397 for (IMultiEntryOperation iop : ops) {
398 RCMultiEntryOperation op = (RCMultiEntryOperation)iop;
399 switch (op.getOperation()) {
400 case DELETE:
401 RejectRules rules = new RejectRules();
402 rules.rejectIfDoesntExists();
403 rules.rejectIfNeVersion(op.getVersion());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700404
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700405 try {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700406 final long removedVersion = rcClient.remove(op.tableId.getTableID(), op.entry.getKey(), rules);
407 op.entry.setVersion(removedVersion);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700408 op.status = STATUS.SUCCESS;
409 } catch (JRamCloud.ObjectDoesntExistException|JRamCloud.WrongVersionException e) {
410 log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), "") + " from tableID:" + op.tableId, e );
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700411 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700412 op.status = STATUS.FAILED;
413 } catch (JRamCloud.RejectRulesException e) {
414 log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), "") + " from tableID:" + op.tableId, e );
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700415 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700416 op.status = STATUS.FAILED;
417 }
418 break;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700419
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700420 case FORCE_DELETE:
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700421 final long removedVersion = rcClient.remove(op.tableId.getTableID(), op.entry.getKey());
422 if (removedVersion != VERSION_NONEXISTENT) {
423 op.entry.setVersion(removedVersion);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700424 op.status = STATUS.SUCCESS;
425 } else {
426 log.error("Failed to remove key:{} from tableID:{}", ByteArrayUtil.toHexStringBuffer(op.entry.getKey(), ""), op.tableId );
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700427 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700428 op.status = STATUS.FAILED;
429 }
430 break;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700431
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700432 default:
433 log.error("Invalid operation {} specified on multiDelete", op.getOperation() );
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700434 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700435 op.status = STATUS.FAILED;
436 break;
437 }
438 }
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700439 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700440 }
441
442 private boolean multiReadInternal(final ArrayList<RCMultiEntryOperation> ops) {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700443 boolean failExists = false;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700444 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700445
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700446 final int reqs = ops.size();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700447
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700448 MultiReadObject multiReadObjects = new MultiReadObject(reqs);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700449
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700450 // setup multi-read operation objects
451 for (int i = 0; i < reqs; ++i) {
452 IMultiEntryOperation op = ops.get(i);
453 multiReadObjects.setObject(i, ((RCTableID)op.getTableId()).getTableID(), op.getKey());
454 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700455
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700456 // execute
457 JRamCloud.Object[] results = rcClient.multiRead(multiReadObjects.tableId, multiReadObjects.key, multiReadObjects.keyLength, reqs);
458 if (results.length != reqs) {
459 log.error("multiRead returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700460 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700461 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700462
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700463 for (int i = 0; i < results.length; ++i) {
464 IModifiableMultiEntryOperation op = ops.get(i);
465 if (results[i] == null) {
466 log.error("MultiRead error, skipping {}, {}", op.getTableId(), op);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700467 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700468 op.setStatus(STATUS.FAILED);
469 continue;
470 }
471 assert (Arrays.equals(results[i].key, op.getKey()));
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700472
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700473 op.setValue(results[i].value, results[i].version);
474 if (results[i].version == JRamCloud.VERSION_NONEXISTENT) {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700475 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700476 op.setStatus(STATUS.FAILED);
477 } else {
478 op.setStatus(STATUS.SUCCESS);
479 }
480 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700481
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700482 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700483 }
484
485 private boolean multiWriteInternal(final ArrayList<RCMultiEntryOperation> ops) {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700486 boolean failExists = false;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700487 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700488
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700489 final int reqs = ops.size();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700490
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700491 MultiWriteObject multiWriteObjects = new MultiWriteObject(reqs);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700492
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700493 for (int i = 0; i < reqs; ++i) {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700494
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700495 IModifiableMultiEntryOperation op = ops.get(i);
496 RejectRules rules = new RejectRules();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700497
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700498 switch (op.getOperation()) {
499 case CREATE:
500 rules.rejectIfExists();
501 break;
502 case FORCE_CREATE:
503 // no reject rule
504 break;
505 case UPDATE:
506 rules.rejectIfDoesntExists();
507 rules.rejectIfNeVersion(op.getVersion());
508 break;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700509
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700510 default:
511 log.error("Invalid operation {} specified on multiWriteInternal", op.getOperation() );
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700512 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700513 op.setStatus(STATUS.FAILED);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700514 return failExists;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700515 }
516 multiWriteObjects.setObject(i, ((RCTableID)op.getTableId()).getTableID(), op.getKey(), op.getValue(), rules);
517 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700518
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700519 MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects.tableId, multiWriteObjects.key, multiWriteObjects.keyLength, multiWriteObjects.value, multiWriteObjects.valueLength, ops.size(), multiWriteObjects.rules);
520 if (results.length != reqs) {
521 log.error("multiWrite returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700522 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700523 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700524
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700525 for (int i = 0; i < results.length; ++i) {
526 IModifiableMultiEntryOperation op = ops.get(i);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700527
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700528 if (results[i] != null
529 && results[i].getStatus() == RCClient.STATUS_OK) {
530 op.setStatus(STATUS.SUCCESS);
531 op.setVersion(results[i].getVersion());
532 } else {
533 op.setStatus(STATUS.FAILED);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700534 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700535 }
536 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700537
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700538 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700539 }
540
541 private static final ConcurrentHashMap<String, RCTable> tables = new ConcurrentHashMap<>();
542
543 @Override
544 public IKVTable getTable(final String tableName) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700545 RCTable table = tables.get(tableName);
546 if (table == null) {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700547 RCTable newTable = new RCTable(tableName);
548 RCTable existingTable = tables
549 .putIfAbsent(tableName, newTable);
550 if (existingTable != null) {
551 return existingTable;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700552 } else {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700553 return newTable;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700554 }
555 }
556 return table;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700557 }
558
559 @Override
560 public void dropTable(IKVTable table) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700561 JRamCloud rcClient = RCClient.getJRamCloudClient();
562 rcClient.dropTable(table.getTableId().getTableName());
563 tables.remove(table.getTableId().getTableName());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700564 }
565
566 static final long VERSION_NONEXISTENT = JRamCloud.VERSION_NONEXISTENT;
567
568 @Override
569 public long VERSION_NONEXISTENT() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700570 return VERSION_NONEXISTENT;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700571 }
572}