blob: 009b58405dc0fe03f9b184f9ed4453889987f31d [file] [log] [blame]
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -08001package net.onrc.onos.datastore;
2
3import java.util.ArrayList;
4import java.util.Arrays;
5import java.util.Collection;
6import java.util.HashMap;
7import java.util.Iterator;
8import java.util.Map;
9
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080010import net.onrc.onos.datastore.RCObject.WriteOp.STATUS;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080011import net.onrc.onos.datastore.RCTable.Entry;
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080012
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080013import org.slf4j.Logger;
14import org.slf4j.LoggerFactory;
15
16import com.esotericsoftware.kryo.Kryo;
17import com.esotericsoftware.kryo.io.Input;
18import com.esotericsoftware.kryo.io.Output;
19
20import edu.stanford.ramcloud.JRamCloud;
Yoshi Muroie7693b12014-02-19 19:41:17 -080021import edu.stanford.ramcloud.JRamCloud.MultiReadObject;
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080022import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
23import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080024import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
25import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080026import edu.stanford.ramcloud.JRamCloud.RejectRules;
Yoshi Muroie7693b12014-02-19 19:41:17 -080027import edu.stanford.ramcloud.JRamCloud.TableEnumerator2;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080028import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
29
30/**
31 * Class to represent an Object represented as a single K-V pair Value blob.
32 *
33 */
34public class RCObject {
35 private static final Logger log = LoggerFactory.getLogger(RCObject.class);
36 /**
37 * Version number which represents that the object doesnot exist, or hase
38 * never read the DB before.
39 */
40 public static final long VERSION_NONEXISTENT = 0L;
41
42 // Each Object should prepare their own serializer, which has required
43 // objects registered.
44 private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
45 @Override
46 protected Kryo initialValue() {
47 Kryo kryo = new Kryo();
48 // kryo.setRegistrationRequired(true);
49 // TODO TreeMap or just Map
50 // kryo.register(TreeMap.class);
51 kryo.setReferences(false);
52 return kryo;
53 }
54 };
55
56 private final RCTable table;
57 private final byte[] key;
Yoshi Muroi212e5ca2014-02-20 22:42:37 -080058 protected byte[] value; //FIXME should not be exposed
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080059 private long version;
60
61 private Map<Object, Object> propertyMap;
62
63 public RCObject(RCTable table, byte[] key) {
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080064 this(table, key, null, VERSION_NONEXISTENT);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080065 }
66
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080067 public RCObject(RCTable table, byte[] key, byte[] value, long version) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080068 if (table == null) {
69 throw new IllegalArgumentException("table cannot be null");
70 }
71 if (key == null) {
72 throw new IllegalArgumentException("key cannot be null");
73 }
74 this.table = table;
75 this.key = key;
76 this.value = value;
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080077 this.version = version;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080078 this.propertyMap = new HashMap<Object, Object>();
79
80 if (this.value != null) {
81 deserializeObjectFromValue();
82 }
83 }
84
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080085 public static <T extends RCObject> T createFromKey(byte[] key) {
86 // Equivalent of this method is expected to be implemented by SubClasses
87 throw new UnsupportedOperationException(
88 "createFromKey() is not expected to be called for RCObject");
89 }
90
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080091 public RCTable getTable() {
92 return table;
93 }
94
95 public long getTableId() {
96 return table.getTableId();
97 }
98
99 public byte[] getKey() {
100 return key;
101 }
102
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800103 /**
104 * Get the byte array value of this object
105 *
106 * @note will trigger serialization, if value was null.
107 * @return
108 */
109 protected byte[] getValue() {
110 if (value == null) {
111 serializeAndSetValue();
112 }
113 return value;
114 }
115
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800116 public long getVersion() {
117 return version;
118 }
119
120 /**
121 * Return serialized Value.
122 *
123 * @note will not trigger serialization
124 * @return Will return null, if never been read, or was not serialized
125 */
126 public byte[] getSerializedValue() {
127 return value;
128 }
129
130 /**
131 * Return Object as a Map.
132 *
133 * @note Will not trigger deserialization
134 * @return Will return null, if never been set, or was not deserialized
135 */
136 protected Map<Object, Object> getObjectMap() {
137 return this.propertyMap;
138 }
139
140 protected Map<Object, Object> setObjectMap(Map<Object, Object> new_map) {
141 Map<Object, Object> old_map = this.propertyMap;
142 this.propertyMap = new_map;
143 return old_map;
144 }
145
146 public void serializeAndSetValue() {
147 serializeAndSetValue(defaultKryo.get(), this.propertyMap);
148 }
149
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800150 protected void serializeAndSetValue(Kryo kryo,
151 Map<Object, Object> javaObject) {
Yuta HIGUCHI1d6a22a2014-02-21 19:17:42 -0800152
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800153
154 // value
155 byte[] rcTemp = new byte[1024 * 1024];
156 Output output = new Output(rcTemp);
157 kryo.writeObject(output, javaObject);
158 this.value = output.toBytes();
159 }
160
161 /**
162 * Deserialize
163 *
164 * @return
165 */
166 public Map<Object, Object> deserializeObjectFromValue() {
167 return deserializeObjectFromValue(defaultKryo.get());
168 }
169
170 protected HashMap<Object, Object> deserializeObjectFromValue(Kryo kryo) {
171 return deserializeObjectFromValue(kryo, HashMap.class);
172 }
173
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800174 protected <T extends Map> T deserializeObjectFromValue(Kryo kryo,
175 Class<T> type) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800176 if (this.value == null)
177 return null;
Yuta HIGUCHI1d6a22a2014-02-21 19:17:42 -0800178
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800179 Input input = new Input(this.value);
180 T map = kryo.readObject(input, type);
181 this.propertyMap = map;
182
183 return map;
184 }
185
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800186 protected void setValueAndDeserialize(byte[] value, long version) {
187 this.value = value;
188 this.version = version;
189 deserializeObjectFromValue();
190 }
191
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800192 /**
193 * Create an Object in DataStore.
194 *
195 * Fails if the Object with same key already exists.
196 *
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800197 * @throws ObjectExistsException
198 */
199 public void create() throws ObjectExistsException {
200
201 if (this.propertyMap == null) {
202 log.warn("No object map was set. Setting empty Map.");
203 setObjectMap(new HashMap<Object, Object>());
204 }
205 serializeAndSetValue();
206
207 this.version = table.create(key, value);
208 }
209
Yuta HIGUCHIc19f5952014-02-05 09:36:06 -0800210 public void forceCreate() {
211
212 if (this.propertyMap == null) {
213 log.warn("No object map was set. Setting empty Map.");
214 setObjectMap(new HashMap<Object, Object>());
215 }
216 serializeAndSetValue();
217
218 this.version = table.forceCreate(key, value);
219 }
220
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800221 /**
222 * Read an Object from DataStore.
223 *
224 * Fails if the Object with the key does not exist.
225 *
226 * @throws ObjectDoesntExistException
227 *
228 */
229 public void read() throws ObjectDoesntExistException {
230 Entry e = table.read(key);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800231 // TODO should we deserialize immediately?
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800232 setValueAndDeserialize(e.value, e.version);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800233 }
234
235 /**
236 * Update an existing Object in DataStore checking versions.
237 *
238 * Fails if the Object with key does not exists, or conditional failure.
239 *
240 * @throws WrongVersionException
241 * @throws ObjectDoesntExistException
242 */
243 public void update() throws ObjectDoesntExistException,
244 WrongVersionException {
245 if (this.propertyMap == null) {
246 setObjectMap(new HashMap<Object, Object>());
247 }
248 serializeAndSetValue();
249
250 this.version = table.update(key, value, version);
251 }
252
253 /**
254 * Remove an existing Object in DataStore.
255 *
256 * Fails if the Object with key does not exists.
257 *
258 * @throws ObjectDoesntExistException
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800259 * @throws WrongVersionException
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800260 */
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800261 public void delete() throws ObjectDoesntExistException,
262 WrongVersionException {
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800263 this.version = table.delete(key, this.version);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800264 }
265
Yuta HIGUCHIc19f5952014-02-05 09:36:06 -0800266 public void forceDelete() {
267 this.version = table.forceDelete(key);
268 }
269
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800270 /**
271 * Multi-read RCObjects.
272 *
273 * If the blob value was read successfully, RCObject will deserialize them.
274 *
275 * @param objects
276 * RCObjects to read
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800277 * @return true if there exist a failed read.
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800278 */
279 public static boolean multiRead(Collection<RCObject> objects) {
280 boolean fail_exists = false;
281
282 ArrayList<RCObject> req = new ArrayList<>();
283 Iterator<RCObject> it = objects.iterator();
284 while (it.hasNext()) {
285
286 req.add(it.next());
287
288 if (req.size() >= RCClient.MAX_MULTI_READS) {
289 // dispatch multiRead
290 fail_exists |= multiReadInternal(req);
291 req.clear();
292 }
293 }
294
295 if (!req.isEmpty()) {
296 // dispatch multiRead
297 fail_exists |= multiReadInternal(req);
298 req.clear();
299 }
300
301 return fail_exists;
302 }
303
304 private static boolean multiReadInternal(ArrayList<RCObject> req) {
305 boolean fail_exists = false;
306 JRamCloud rcClient = RCClient.getClient();
307
308 final int reqs = req.size();
Yuta HIGUCHI1d6a22a2014-02-21 19:17:42 -0800309
Yoshi Muroie7693b12014-02-19 19:41:17 -0800310 MultiReadObject multiReadObjects = new MultiReadObject(req.size());
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800311
312 // setup multi-read operation
313 for (int i = 0; i < reqs; ++i) {
314 RCObject obj = req.get(i);
Yoshi Muroie7693b12014-02-19 19:41:17 -0800315 multiReadObjects.setObject(i, obj.getTableId(), obj.getKey());
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800316 }
317
318 // execute
Yoshi Muroie7693b12014-02-19 19:41:17 -0800319 JRamCloud.Object results[] = rcClient.multiRead(multiReadObjects.tableId, multiReadObjects.key, multiReadObjects.keyLength, reqs);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800320 assert (results.length <= req.size());
321
322 // reflect changes to RCObject
323 for (int i = 0; i < results.length; ++i) {
324 RCObject obj = req.get(i);
325 if (results[i] == null) {
326 log.error("MultiRead error, skipping {}, {}", obj.getTable(),
327 obj);
328 fail_exists = true;
329 continue;
330 }
331 assert (Arrays.equals(results[i].key, obj.getKey()));
332
333 obj.value = results[i].value;
334 obj.version = results[i].version;
335 if (obj.version == VERSION_NONEXISTENT) {
336 fail_exists = true;
337 } else {
338 obj.deserializeObjectFromValue();
339 }
340 }
341
342 return fail_exists;
343 }
344
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800345 public static class WriteOp {
346 public enum STATUS {
347 NOT_EXECUTED, SUCCESS, FAILED
348 }
349
350 public enum OPS {
351 CREATE, UPDATE, FORCE_CREATE
352 }
353
354 private RCObject obj;
355 private OPS op;
356 private STATUS status;
357
358 public static WriteOp Create(RCObject obj) {
359 return new WriteOp(obj, OPS.CREATE);
360 }
361
362 public static WriteOp Update(RCObject obj) {
363 return new WriteOp(obj, OPS.UPDATE);
364 }
365
366 public static WriteOp ForceCreate(RCObject obj) {
367 return new WriteOp(obj, OPS.FORCE_CREATE);
368 }
369
370 public WriteOp(RCObject obj, OPS op) {
371 this.obj = obj;
372 this.op = op;
373 this.status = STATUS.NOT_EXECUTED;
374 }
375
376 public boolean hasSucceed() {
377 return status == STATUS.SUCCESS;
378 }
379
380 public RCObject getObject() {
381 return obj;
382 }
383
384 public OPS getOp() {
385 return op;
386 }
Yuta HIGUCHI1ca1afa2014-02-04 19:33:57 -0800387
388 public STATUS getStatus() {
389 return status;
390 }
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800391 }
392
393 public static boolean multiWrite(Collection<WriteOp> objects) {
394 boolean fail_exists = false;
395
396 ArrayList<WriteOp> req = new ArrayList<>();
397 Iterator<WriteOp> it = objects.iterator();
398 while (it.hasNext()) {
399
400 req.add(it.next());
401
402 if (req.size() >= RCClient.MAX_MULTI_WRITES) {
403 // dispatch multiWrite
404 fail_exists |= multiWriteInternal(req);
405 req.clear();
406 }
407 }
408
409 if (!req.isEmpty()) {
410 // dispatch multiWrite
411 fail_exists |= multiWriteInternal(req);
412 req.clear();
413 }
414
415 return fail_exists;
416 }
417
418 private static boolean multiWriteInternal(ArrayList<WriteOp> ops) {
419
420 boolean fail_exists = false;
Yoshi Muroie7693b12014-02-19 19:41:17 -0800421 MultiWriteObject multiWriteObjects = new MultiWriteObject(ops.size());
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800422 JRamCloud rcClient = RCClient.getClient();
423
Yoshi Muroie7693b12014-02-19 19:41:17 -0800424 for (int i = 0; i < ops.size(); ++i) {
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800425 WriteOp op = ops.get(i);
426 RCObject obj = op.getObject();
427
Yuta HIGUCHI1d6a22a2014-02-21 19:17:42 -0800428 RejectRules rules = new RejectRules();
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800429
430 switch (op.getOp()) {
431 case CREATE:
Yuta HIGUCHI1d6a22a2014-02-21 19:17:42 -0800432 rules.rejectIfExists();
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800433 break;
434 case FORCE_CREATE:
435 // no reject rule
436 break;
437 case UPDATE:
Yuta HIGUCHI1d6a22a2014-02-21 19:17:42 -0800438 rules.rejectIfDoesntExists();
439 rules.rejectIfNeVersion(obj.getVersion());
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800440 break;
441 }
Yoshi Muroie7693b12014-02-19 19:41:17 -0800442 multiWriteObjects.setObject(i, obj.getTableId(), obj.getKey(), obj.getValue(), rules);
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800443 }
444
Yoshi Muroie7693b12014-02-19 19:41:17 -0800445 MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects.tableId, multiWriteObjects.key, multiWriteObjects.keyLength, multiWriteObjects.value, multiWriteObjects.valueLength, ops.size(), multiWriteObjects.rules);
446 assert (results.length == ops.size());
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800447
448 for (int i = 0; i < results.length; ++i) {
449 WriteOp op = ops.get(i);
450
451 if (results[i] != null
452 && results[i].getStatus() == RCClient.STATUS_OK) {
453 op.status = STATUS.SUCCESS;
454
455 RCObject obj = op.getObject();
456 obj.version = results[i].getVersion();
457 } else {
458 op.status = STATUS.FAILED;
459 fail_exists = true;
460 }
461
462 }
463
464 return fail_exists;
465 }
Yuta HIGUCHI1d6a22a2014-02-21 19:17:42 -0800466
Yuta HIGUCHI25719052014-02-13 14:42:06 -0800467 public static abstract class ObjectIterator<E extends RCObject> implements
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800468 Iterator<E> {
469
Yoshi Muroie7693b12014-02-19 19:41:17 -0800470 protected TableEnumerator2 enumerator;
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800471
472 public ObjectIterator(RCTable table) {
473 // FIXME workaround for JRamCloud bug. It should have been declared
474 // as static class
475 JRamCloud c = RCClient.getClient();
Yoshi Muroie7693b12014-02-19 19:41:17 -0800476 this.enumerator = c.new TableEnumerator2(table.getTableId());
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800477 }
478
479 @Override
480 public boolean hasNext() {
481 return enumerator.hasNext();
482 }
483
Yuta HIGUCHI25719052014-02-13 14:42:06 -0800484// Implement something similar to below to realize Iterator
485// @Override
486// public E next() {
487// JRamCloud.Object o = enumerator.next();
488// E obj = E.createFromKey(o.key);
489// obj.setValueAndDeserialize(o.value, o.version);
490// return obj;
491// }
Yuta HIGUCHI1d6a22a2014-02-21 19:17:42 -0800492
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800493 @Deprecated
494 @Override
495 public void remove() {
496 // TODO Not implemented, as I cannot find a use-case for it.
497 throw new UnsupportedOperationException("Not implemented yet");
498 }
499
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800500 }
501
502}