blob: 9096f8a29ed920a4ef876f1704510c4c5c28d8a7 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.ramcloud;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -07002
3import java.io.File;
Yuta HIGUCHId47eac32014-04-07 13:44:47 -07004import java.nio.ByteBuffer;
5import java.nio.ByteOrder;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -07006import java.util.ArrayList;
7import java.util.Arrays;
8import java.util.Collection;
9import java.util.Iterator;
10import java.util.List;
11import java.util.concurrent.ConcurrentHashMap;
12
Jonathan Hart6df90172014-04-03 10:13:11 -070013import net.onrc.onos.core.datastore.IKVClient;
14import net.onrc.onos.core.datastore.IKVTable;
Jonathan Harta99ec672014-04-03 11:30:34 -070015import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
Jonathan Hart6df90172014-04-03 10:13:11 -070016import net.onrc.onos.core.datastore.IKVTableID;
17import net.onrc.onos.core.datastore.IMultiEntryOperation;
Jonathan Harta99ec672014-04-03 11:30:34 -070018import net.onrc.onos.core.datastore.IMultiEntryOperation.STATUS;
Jonathan Hart6df90172014-04-03 10:13:11 -070019import net.onrc.onos.core.datastore.ObjectDoesntExistException;
20import net.onrc.onos.core.datastore.ObjectExistsException;
21import net.onrc.onos.core.datastore.WrongVersionException;
Jonathan Hart6df90172014-04-03 10:13:11 -070022import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
23import net.onrc.onos.core.datastore.ramcloud.RCTable.Entry;
24import net.onrc.onos.core.datastore.utils.ByteArrayUtil;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070025
26import org.apache.commons.configuration.Configuration;
27import org.apache.commons.configuration.ConfigurationException;
28import org.apache.commons.configuration.PropertiesConfiguration;
29import org.slf4j.Logger;
30import org.slf4j.LoggerFactory;
31
32import edu.stanford.ramcloud.JRamCloud;
33import edu.stanford.ramcloud.JRamCloud.MultiReadObject;
34import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
35import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject;
36import edu.stanford.ramcloud.JRamCloud.RejectRules;
37import edu.stanford.ramcloud.JRamCloud.RejectRulesException;
38import edu.stanford.ramcloud.JRamCloud.TableEnumerator2;
39
40public class RCClient implements IKVClient {
Yuta HIGUCHId150ece2014-04-29 16:25:36 -070041
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070042 private static final Logger log = LoggerFactory.getLogger(RCClient.class);
43
Yuta HIGUCHId150ece2014-04-29 16:25:36 -070044 private static final String DEFAULT_LOCATOR = "zk:localhost:2181";
45 private static final String DEFAULT_CLUSTERNAME = "ONOS-RC";
46
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070047 private static final String DB_CONFIG_FILE = "conf/ramcloud.conf";
Ray Milkey5c9f2db2014-04-09 10:31:21 -070048 public static final Configuration CONFIG = getConfiguration();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070049
50 // Value taken from RAMCloud's Status.h
51 // FIXME These constants should be defined by JRamCloud
52 public static final int STATUS_OK = 0;
53
Yuta HIGUCHIe316d5c2014-04-19 13:37:38 -070054 /**
55 * Maximum number of Multi-Read operations which can be executed in
56 * one RPC call.
57 *
58 * There are multiple factors which determines this limit.
59 * - RAMCloud RPC size limit of 8MB.
60 * - JNI implementation store the RPC result on stack.
61 * (Increasing the stack-size limit will help relaxing this limit.)
62 */
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070063 public static final int MAX_MULTI_READS = Math.max(1, Integer
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070064 .valueOf(System.getProperty("ramcloud.max_multi_reads", "400")));
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070065
Yuta HIGUCHIe316d5c2014-04-19 13:37:38 -070066 /**
67 * Maximum number of Multi-Write operations which can be executed in
68 * one RPC call.
69 *
70 * There are multiple factors which determines this limit.
71 * - RAMCloud RPC size limit of 8MB.
72 * - JNI implementation store the RPC result on stack.
73 * (Increasing the stack-size limit will help relaxing this limit.)
74 */
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070075 public static final int MAX_MULTI_WRITES = Math.max(1, Integer
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070076 .valueOf(System.getProperty("ramcloud.max_multi_writes", "800")));
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070077
Ray Milkey5c9f2db2014-04-09 10:31:21 -070078 private static final ThreadLocal<JRamCloud> TLS_RC_CLIENT = new ThreadLocal<JRamCloud>() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070079 @Override
80 protected JRamCloud initialValue() {
Yuta HIGUCHId150ece2014-04-29 16:25:36 -070081 return new JRamCloud(getLocator(CONFIG), getClusterName(CONFIG));
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070082 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070083 };
84
85 /**
86 * @return JRamCloud instance intended to be used only within the
Ray Milkey269ffb92014-04-03 14:43:30 -070087 * SameThread.
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070088 * @note Do not store the returned instance in a member variable, etc. which
Ray Milkey269ffb92014-04-03 14:43:30 -070089 * may be accessed later by another thread.
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070090 */
91 static JRamCloud getJRamCloudClient() {
Ray Milkey5c9f2db2014-04-09 10:31:21 -070092 return TLS_RC_CLIENT.get();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070093 }
94
95 // Currently RCClient is state-less
Ray Milkey5c9f2db2014-04-09 10:31:21 -070096 private static final RCClient THE_INSTANCE = new RCClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070097
98 public static RCClient getClient() {
Ray Milkey5c9f2db2014-04-09 10:31:21 -070099 return THE_INSTANCE;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700100 }
101
102 public static final Configuration getConfiguration() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700103 final File configFile = new File(System.getProperty("ramcloud.config.path", DB_CONFIG_FILE));
104 return getConfiguration(configFile);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700105 }
106
107 public static final Configuration getConfiguration(final File configFile) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700108 if (configFile == null) {
109 throw new IllegalArgumentException("Need to specify a configuration file or storage directory");
110 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700111
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700112 if (!configFile.isFile()) {
113 throw new IllegalArgumentException("Location of configuration must be a file");
114 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700115
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700116 try {
117 return new PropertiesConfiguration(configFile);
118 } catch (ConfigurationException e) {
119 throw new IllegalArgumentException("Could not load configuration at: " + configFile, e);
120 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700121 }
122
Yuta HIGUCHId150ece2014-04-29 16:25:36 -0700123 public static String getLocator(final Configuration configuration) {
124
125 final String locator = configuration.getString("ramcloud.locator");
126 if (locator != null) {
127 return locator;
128 }
129
130 // TODO Stop reading obsolete coordinatorIp, etc. once we're ready.
131 final String coordinatorIp = configuration.getString("ramcloud.coordinatorIp");
132 if (coordinatorIp == null) {
133 return DEFAULT_LOCATOR;
134 }
135
136 final String coordinatorPort = configuration.getString("ramcloud.coordinatorPort");
137 if (coordinatorPort == null) {
138 return DEFAULT_LOCATOR;
139 }
140
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700141 final String coordinatorURL = coordinatorIp + "," + coordinatorPort;
142 return coordinatorURL;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700143 }
144
Yuta HIGUCHId150ece2014-04-29 16:25:36 -0700145 public static String getClusterName(final Configuration configuration) {
146 final String clusterName = configuration.getString("ramcloud.clusterName");
147 if (clusterName != null) {
148 return clusterName;
149 }
150
151 return DEFAULT_CLUSTERNAME;
152 }
153
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700154 @Override
155 public IMultiEntryOperation createOp(IKVTableID tableId, byte[] key, byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700156 return RCMultiEntryOperation.create(tableId, key, value);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700157 }
158
159 /**
160 * @param tableId RCTableID instance
161 */
162 @Override
163 public long create(IKVTableID tableId, byte[] key, byte[] value)
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700164 throws ObjectExistsException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700165
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700166 RCTableID rcTableId = (RCTableID) tableId;
167 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700168
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700169 RejectRules rules = new RejectRules();
170 rules.rejectIfExists();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700171
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700172 try {
173 return rcClient.write(rcTableId.getTableID(), key, value, rules);
174 } catch (JRamCloud.ObjectExistsException e) {
175 throw new ObjectExistsException(rcTableId, key, e);
176 } catch (JRamCloud.RejectRulesException e) {
177 log.error("Unexpected RejectRulesException", e);
178 return JRamCloud.VERSION_NONEXISTENT;
179 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700180 }
181
182 @Override
183 public IMultiEntryOperation forceCreateOp(IKVTableID tableId, byte[] key, byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700184 return RCMultiEntryOperation.forceCreate(tableId, key, value);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700185 }
186
187 @Override
188 public long forceCreate(IKVTableID tableId, byte[] key, byte[] value) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700189 RCTableID rcTableId = (RCTableID) tableId;
190 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700191
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700192 long updatedVersion = rcClient.write(rcTableId.getTableID(), key, value);
193 return updatedVersion;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700194 }
195
196 @Override
197 public IMultiEntryOperation readOp(IKVTableID tableId, byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700198 return RCMultiEntryOperation.read(tableId, key);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700199 }
200
201 @Override
202 public IKVEntry read(IKVTableID tableId, byte[] key)
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700203 throws ObjectDoesntExistException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700204
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700205 RCTableID rcTableId = (RCTableID) tableId;
206 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700207
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700208 RejectRules rules = new RejectRules();
209 rules.rejectIfDoesntExists();
210 try {
211 JRamCloud.Object rcObj = rcClient.read(rcTableId.getTableID(), key, rules);
212 return new Entry(rcObj.key, rcObj.value, rcObj.version);
213 } catch (JRamCloud.ObjectDoesntExistException e) {
214 throw new ObjectDoesntExistException(rcTableId, key, e);
215 } catch (JRamCloud.RejectRulesException e) {
216 log.error("Unexpected RejectRulesException", e);
217 return null;
218 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700219 }
220
221 @Override
Ray Milkey269ffb92014-04-03 14:43:30 -0700222 public IMultiEntryOperation updateOp(IKVTableID tableId, byte[] key, byte[] value, long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700223 return RCMultiEntryOperation.update(tableId, key, value, version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700224 }
225
226 @Override
227 public long update(IKVTableID tableId, byte[] key, byte[] value,
Ray Milkey269ffb92014-04-03 14:43:30 -0700228 long version) throws ObjectDoesntExistException,
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700229 WrongVersionException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700230
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700231 RCTableID rcTableId = (RCTableID) tableId;
232 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700233
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700234 RejectRules rules = new RejectRules();
235 rules.rejectIfDoesntExists();
236 rules.rejectIfNeVersion(version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700237
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700238 try {
239 return rcClient.write(rcTableId.getTableID(), key, value, rules);
240 } catch (JRamCloud.ObjectDoesntExistException e) {
241 throw new ObjectDoesntExistException(rcTableId, key, e);
242 } catch (JRamCloud.WrongVersionException e) {
243 throw new WrongVersionException(rcTableId, key, version, e);
244 } catch (JRamCloud.RejectRulesException e) {
245 log.error("Unexpected RejectRulesException", e);
246 return JRamCloud.VERSION_NONEXISTENT;
247 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700248 }
249
250
251 @Override
252 public long update(IKVTableID tableId, byte[] key, byte[] value)
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700253 throws ObjectDoesntExistException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700254
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700255 RCTableID rcTableId = (RCTableID) tableId;
256 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700257
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700258 RejectRules rules = new RejectRules();
259 rules.rejectIfDoesntExists();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700260
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700261 try {
262 return rcClient.write(rcTableId.getTableID(), key, value, rules);
263 } catch (JRamCloud.ObjectDoesntExistException e) {
264 throw new ObjectDoesntExistException(rcTableId, key, e);
265 } catch (JRamCloud.RejectRulesException e) {
266 log.error("Unexpected RejectRulesException", e);
267 return JRamCloud.VERSION_NONEXISTENT;
268 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700269 }
270
271 @Override
Ray Milkey269ffb92014-04-03 14:43:30 -0700272 public IMultiEntryOperation deleteOp(IKVTableID tableId, byte[] key, byte[] value, long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700273 return RCMultiEntryOperation.delete(tableId, key, value, version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700274 }
275
276 @Override
277 public long delete(IKVTableID tableId, byte[] key, long version)
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700278 throws ObjectDoesntExistException, WrongVersionException {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700279
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700280 RCTableID rcTableId = (RCTableID) tableId;
281 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700282
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700283 RejectRules rules = new RejectRules();
284 rules.rejectIfDoesntExists();
285 rules.rejectIfNeVersion(version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700286
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700287 try {
288 return rcClient.remove(rcTableId.getTableID(), key, rules);
289 } catch (JRamCloud.ObjectDoesntExistException e) {
290 throw new ObjectDoesntExistException(rcTableId, key, e);
291 } catch (JRamCloud.WrongVersionException e) {
292 throw new WrongVersionException(rcTableId, key, version, e);
293 } catch (JRamCloud.RejectRulesException e) {
294 log.error("Unexpected RejectRulesException", e);
295 return JRamCloud.VERSION_NONEXISTENT;
296 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700297 }
298
299 @Override
300 public IMultiEntryOperation forceDeleteOp(IKVTableID tableId, byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700301 return RCMultiEntryOperation.forceDelete(tableId, key);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700302 }
303
304 @Override
305 public long forceDelete(IKVTableID tableId, byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700306 RCTableID rcTableId = (RCTableID) tableId;
307 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700308 final long removedVersion = rcClient.remove(rcTableId.getTableID(), key);
309 return removedVersion;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700310 }
311
312 @Override
313 public Iterable<IKVEntry> getAllEntries(IKVTableID tableId) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700314 return new RCTableEntryIterable((RCTableID) tableId);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700315 }
316
317 static class RCTableEntryIterable implements Iterable<IKVEntry> {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700318 private final RCTableID tableId;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700319
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700320 public RCTableEntryIterable(final RCTableID tableId) {
321 this.tableId = tableId;
322 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700323
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700324 @Override
325 public Iterator<IKVEntry> iterator() {
326 return new RCClient.RCTableIterator(tableId);
327 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700328 }
329
330 public static class RCTableIterator implements Iterator<IKVEntry> {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700331 private final RCTableID tableId;
332 protected final TableEnumerator2 enumerator;
333 private JRamCloud.Object last;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700334
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700335 public RCTableIterator(final RCTableID tableId) {
336 this.tableId = tableId;
337 this.enumerator = getJRamCloudClient().new TableEnumerator2(tableId.getTableID());
338 this.last = null;
339 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700340
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700341 @Override
342 public boolean hasNext() {
343 return this.enumerator.hasNext();
344 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700345
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700346 @Override
347 public RCTable.Entry next() {
348 last = enumerator.next();
349 return new RCTable.Entry(last.key, last.value, last.version);
350 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700351
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700352 @Override
353 public void remove() {
354 if (last != null) {
355 getJRamCloudClient();
356 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700357
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700358 RejectRules rules = new RejectRules();
359 rules.rejectIfNeVersion(last.version);
360 try {
361 rcClient.remove(tableId.getTableID(), last.key, rules);
362 } catch (RejectRulesException e) {
363 log.trace("remove failed", e);
364 }
365 last = null;
366 }
367 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700368 }
369
370 @Override
371 public boolean multiRead(final Collection<IMultiEntryOperation> ops) {
372
Ray Milkey269ffb92014-04-03 14:43:30 -0700373 if (ops.size() <= MAX_MULTI_READS && ops instanceof ArrayList) {
Ray Milkey7f1567c2014-04-08 13:53:32 -0700374 @SuppressWarnings({ "unchecked", "rawtypes" })
Ray Milkey269ffb92014-04-03 14:43:30 -0700375 final ArrayList<RCMultiEntryOperation> arrays = (ArrayList) ops;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700376 return multiReadInternal(arrays);
377 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700378
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700379 boolean failExists = false;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700380
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700381 ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
382 Iterator<IMultiEntryOperation> it = ops.iterator();
383 while (it.hasNext()) {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700384
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700385 req.add((RCMultiEntryOperation) it.next());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700386
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700387 if (req.size() >= MAX_MULTI_READS) {
388 // dispatch multiRead
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700389 failExists |= multiReadInternal(req);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700390 req.clear();
391 }
392 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700393
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700394 if (!req.isEmpty()) {
395 // dispatch multiRead
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700396 failExists |= multiReadInternal(req);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700397 req.clear();
398 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700399
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700400 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700401 }
402
403 @Override
404 public boolean multiWrite(final List<IMultiEntryOperation> ops) {
405
Ray Milkey269ffb92014-04-03 14:43:30 -0700406 if (ops.size() <= MAX_MULTI_WRITES && ops instanceof ArrayList) {
Ray Milkey7f1567c2014-04-08 13:53:32 -0700407 @SuppressWarnings({ "unchecked", "rawtypes" })
Ray Milkey269ffb92014-04-03 14:43:30 -0700408 final ArrayList<RCMultiEntryOperation> arrays = (ArrayList) ops;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700409 return multiWriteInternal(arrays);
410 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700411
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700412 boolean failExists = false;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700413
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700414 ArrayList<RCMultiEntryOperation> req = new ArrayList<>();
415 Iterator<IMultiEntryOperation> it = ops.iterator();
416 while (it.hasNext()) {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700417
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700418 req.add((RCMultiEntryOperation) it.next());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700419
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700420 if (req.size() >= MAX_MULTI_WRITES) {
421 // dispatch multiWrite
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700422 failExists |= multiWriteInternal(req);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700423 req.clear();
424 }
425 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700426
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700427 if (!req.isEmpty()) {
428 // dispatch multiWrite
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700429 failExists |= multiWriteInternal(req);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700430 req.clear();
431 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700432
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700433 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700434 }
435
436 @Override
437 public boolean multiDelete(final Collection<IMultiEntryOperation> ops) {
438
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700439 // TODO implement multiRemove JNI, etc. if we need performance
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700440
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700441 boolean failExists = false;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700442 JRamCloud rcClient = getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700443
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700444 for (IMultiEntryOperation iop : ops) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700445 RCMultiEntryOperation op = (RCMultiEntryOperation) iop;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700446 switch (op.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700447 case DELETE:
448 RejectRules rules = new RejectRules();
449 rules.rejectIfDoesntExists();
450 rules.rejectIfNeVersion(op.getVersion());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700451
Ray Milkey269ffb92014-04-03 14:43:30 -0700452 try {
453 final long removedVersion = rcClient.remove(op.tableId.getTableID(), op.entry.getKey(), rules);
454 op.entry.setVersion(removedVersion);
455 op.status = STATUS.SUCCESS;
456 } catch (JRamCloud.ObjectDoesntExistException | JRamCloud.WrongVersionException e) {
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700457 log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuilder(op.entry.getKey(), "") + " from tableID:" + op.tableId, e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700458 failExists = true;
459 op.status = STATUS.FAILED;
460 } catch (JRamCloud.RejectRulesException e) {
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700461 log.error("Failed to remove key:" + ByteArrayUtil.toHexStringBuilder(op.entry.getKey(), "") + " from tableID:" + op.tableId, e);
Ray Milkey269ffb92014-04-03 14:43:30 -0700462 failExists = true;
463 op.status = STATUS.FAILED;
464 }
465 break;
466
467 case FORCE_DELETE:
468 final long removedVersion = rcClient.remove(op.tableId.getTableID(), op.entry.getKey());
469 if (removedVersion != VERSION_NONEXISTENT) {
470 op.entry.setVersion(removedVersion);
471 op.status = STATUS.SUCCESS;
472 } else {
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700473 log.error("Failed to remove key:{} from tableID:{}", ByteArrayUtil.toHexStringBuilder(op.entry.getKey(), ""), op.tableId);
Ray Milkey269ffb92014-04-03 14:43:30 -0700474 failExists = true;
475 op.status = STATUS.FAILED;
476 }
477 break;
478
479 default:
480 log.error("Invalid operation {} specified on multiDelete", op.getOperation());
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700481 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700482 op.status = STATUS.FAILED;
Ray Milkey269ffb92014-04-03 14:43:30 -0700483 break;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700484 }
485 }
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700486 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700487 }
488
489 private boolean multiReadInternal(final ArrayList<RCMultiEntryOperation> ops) {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700490 boolean failExists = false;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700491 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700492
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700493 final int reqs = ops.size();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700494
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700495 MultiReadObject multiReadObjects = new MultiReadObject(reqs);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700496
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700497 // setup multi-read operation objects
498 for (int i = 0; i < reqs; ++i) {
499 IMultiEntryOperation op = ops.get(i);
Ray Milkey269ffb92014-04-03 14:43:30 -0700500 multiReadObjects.setObject(i, ((RCTableID) op.getTableId()).getTableID(), op.getKey());
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700501 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700502
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700503 // execute
504 JRamCloud.Object[] results = rcClient.multiRead(multiReadObjects.tableId, multiReadObjects.key, multiReadObjects.keyLength, reqs);
505 if (results.length != reqs) {
506 log.error("multiRead returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700507 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700508 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700509
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700510 for (int i = 0; i < results.length; ++i) {
511 IModifiableMultiEntryOperation op = ops.get(i);
512 if (results[i] == null) {
Yuta HIGUCHIf148aac2014-05-05 14:59:06 -0700513 // Logging as error gets too noisy when doing speculative read.
514 log.trace("MultiRead error {}, {}", op.getTableId(), op);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700515 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700516 op.setStatus(STATUS.FAILED);
517 continue;
518 }
519 assert (Arrays.equals(results[i].key, op.getKey()));
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700520
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700521 op.setValue(results[i].value, results[i].version);
522 if (results[i].version == JRamCloud.VERSION_NONEXISTENT) {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700523 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700524 op.setStatus(STATUS.FAILED);
525 } else {
526 op.setStatus(STATUS.SUCCESS);
527 }
528 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700529
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700530 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700531 }
532
533 private boolean multiWriteInternal(final ArrayList<RCMultiEntryOperation> ops) {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700534 boolean failExists = false;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700535 JRamCloud rcClient = RCClient.getJRamCloudClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700536
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700537 final int reqs = ops.size();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700538
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700539 MultiWriteObject multiWriteObjects = new MultiWriteObject(reqs);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700540
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700541 for (int i = 0; i < reqs; ++i) {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700542
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700543 IModifiableMultiEntryOperation op = ops.get(i);
544 RejectRules rules = new RejectRules();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700545
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700546 switch (op.getOperation()) {
Ray Milkey269ffb92014-04-03 14:43:30 -0700547 case CREATE:
548 rules.rejectIfExists();
549 break;
550 case FORCE_CREATE:
551 // no reject rule
552 break;
553 case UPDATE:
554 rules.rejectIfDoesntExists();
555 rules.rejectIfNeVersion(op.getVersion());
556 break;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700557
Ray Milkey269ffb92014-04-03 14:43:30 -0700558 default:
559 log.error("Invalid operation {} specified on multiWriteInternal", op.getOperation());
560 failExists = true;
561 op.setStatus(STATUS.FAILED);
562 return failExists;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700563 }
Ray Milkey269ffb92014-04-03 14:43:30 -0700564 multiWriteObjects.setObject(i, ((RCTableID) op.getTableId()).getTableID(), op.getKey(), op.getValue(), rules);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700565 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700566
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700567 MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects.tableId, multiWriteObjects.key, multiWriteObjects.keyLength, multiWriteObjects.value, multiWriteObjects.valueLength, ops.size(), multiWriteObjects.rules);
568 if (results.length != reqs) {
569 log.error("multiWrite returned unexpected number of results. (requested:{}, returned:{})", reqs, results.length);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700570 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700571 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700572
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700573 for (int i = 0; i < results.length; ++i) {
574 IModifiableMultiEntryOperation op = ops.get(i);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700575
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700576 if (results[i] != null
577 && results[i].getStatus() == RCClient.STATUS_OK) {
578 op.setStatus(STATUS.SUCCESS);
579 op.setVersion(results[i].getVersion());
580 } else {
581 op.setStatus(STATUS.FAILED);
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700582 failExists = true;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700583 }
584 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700585
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700586 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700587 }
588
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700589 private static final ConcurrentHashMap<String, RCTable> TABLES = new ConcurrentHashMap<>();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700590
591 @Override
592 public IKVTable getTable(final String tableName) {
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700593 RCTable table = TABLES.get(tableName);
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700594 if (table == null) {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700595 RCTable newTable = new RCTable(tableName);
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700596 RCTable existingTable = TABLES
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700597 .putIfAbsent(tableName, newTable);
598 if (existingTable != null) {
599 return existingTable;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700600 } else {
Yuta HIGUCHIa14eb172014-03-24 15:03:23 -0700601 return newTable;
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700602 }
603 }
604 return table;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700605 }
606
607 @Override
608 public void dropTable(IKVTable table) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700609 JRamCloud rcClient = RCClient.getJRamCloudClient();
610 rcClient.dropTable(table.getTableId().getTableName());
Ray Milkey5c9f2db2014-04-09 10:31:21 -0700611 TABLES.remove(table.getTableId().getTableName());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700612 }
613
614 static final long VERSION_NONEXISTENT = JRamCloud.VERSION_NONEXISTENT;
615
616 @Override
Ray Milkey7531a342014-04-11 15:08:12 -0700617 public long getVersionNonexistant() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700618 return VERSION_NONEXISTENT;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700619 }
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700620
621 @Override
622 public void createCounter(final IKVTableID tableId, final byte[] key,
623 final long initialValue)
624 throws ObjectExistsException {
625
626 ByteBuffer valueBytes = ByteBuffer.allocate(8)
627 .order(ByteOrder.LITTLE_ENDIAN).putLong(initialValue);
628 valueBytes.flip();
629 final long version = create(tableId, key, valueBytes.array());
630 if (log.isTraceEnabled()) {
631 log.trace("Created counter {}-{}={}@{}",
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700632 tableId, ByteArrayUtil.toHexStringBuilder(key, ":"),
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700633 initialValue, version);
634 }
635 }
636
637 @Override
638 public void setCounter(final IKVTableID tableId, final byte[] key,
639 final long value) {
640
641 ByteBuffer valueBytes = ByteBuffer.allocate(8)
642 .order(ByteOrder.LITTLE_ENDIAN).putLong(value);
643 valueBytes.flip();
644
645 final long version = forceCreate(tableId, key, valueBytes.array());
646 if (log.isTraceEnabled()) {
647 log.trace("set counter {}-{}={}@{}",
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700648 tableId, ByteArrayUtil.toHexStringBuilder(key, ":"),
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700649 value, version);
650 }
651 }
652
653 @Override
654 public long incrementCounter(final IKVTableID tableId, final byte[] key,
655 final long incrementValue) {
656
657 RCTableID rcTableId = (RCTableID) tableId;
658 JRamCloud rcClient = RCClient.getJRamCloudClient();
659
660 try {
661 return rcClient.increment(rcTableId.getTableID(), key, incrementValue);
662 } catch (JRamCloud.ObjectDoesntExistException e) {
663 log.warn("Counter {}-{} was not present",
664 tableId,
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700665 ByteArrayUtil.toHexStringBuilder(key, ":"));
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700666 try {
667 // creating counter initialized to 0
668 createCounter(rcTableId, key, 0L);
669 } catch (ObjectExistsException e1) {
670 // someone concurrently created it
671 log.debug("Counter {}-{} seemed to be concurrently created.",
672 tableId,
Yuta HIGUCHI805bc8f2014-04-16 11:51:43 -0700673 ByteArrayUtil.toHexStringBuilder(key, ":"));
Yuta HIGUCHId47eac32014-04-07 13:44:47 -0700674 }
675 try {
676 return rcClient.increment(rcTableId.getTableID(), key, incrementValue);
677 } catch (edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException e1) {
678 log.error("Should never happen");
679 throw new IllegalStateException("Created counter disappeared.");
680 }
681 }
682 }
683
684 @Override
685 public void destroyCounter(final IKVTableID tableId, final byte[] key) {
686
687 RCTableID rcTableId = (RCTableID) tableId;
688 JRamCloud rcClient = RCClient.getJRamCloudClient();
689
690 rcClient.remove(rcTableId.getTableID(), key);
691 }
692
693 @Override
694 public long getCounter(IKVTableID tableId, byte[] key)
695 throws ObjectDoesntExistException {
696
697 IKVEntry entry = read(tableId, key);
698 ByteBuffer counter = ByteBuffer.wrap(entry.getValue()).order(ByteOrder.LITTLE_ENDIAN);
699 return counter.getLong();
700 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700701}