blob: 0ac0d49b38981746fe78b746e4fc3784fa4cbb1a [file] [log] [blame]
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -08001package net.onrc.onos.datastore;
2
3import java.util.ArrayList;
4import java.util.Arrays;
5import java.util.Collection;
6import java.util.HashMap;
7import java.util.Iterator;
8import java.util.Map;
9
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080010import net.onrc.onos.datastore.RCObject.WriteOp.STATUS;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080011import net.onrc.onos.datastore.RCTable.Entry;
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080012
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080013import org.slf4j.Logger;
14import org.slf4j.LoggerFactory;
15
16import com.esotericsoftware.kryo.Kryo;
17import com.esotericsoftware.kryo.io.Input;
18import com.esotericsoftware.kryo.io.Output;
19
20import edu.stanford.ramcloud.JRamCloud;
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080021import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
22import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080023import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
24import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080025import edu.stanford.ramcloud.JRamCloud.RejectRules;
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080026import edu.stanford.ramcloud.JRamCloud.TableEnumerator;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080027import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
28
29/**
30 * Class to represent an Object represented as a single K-V pair Value blob.
31 *
32 */
33public class RCObject {
34 private static final Logger log = LoggerFactory.getLogger(RCObject.class);
35 /**
36 * Version number which represents that the object doesnot exist, or hase
37 * never read the DB before.
38 */
39 public static final long VERSION_NONEXISTENT = 0L;
40
41 // Each Object should prepare their own serializer, which has required
42 // objects registered.
43 private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
44 @Override
45 protected Kryo initialValue() {
46 Kryo kryo = new Kryo();
47 // kryo.setRegistrationRequired(true);
48 // TODO TreeMap or just Map
49 // kryo.register(TreeMap.class);
50 kryo.setReferences(false);
51 return kryo;
52 }
53 };
54
55 private final RCTable table;
56 private final byte[] key;
57 private byte[] value;
58 private long version;
59
60 private Map<Object, Object> propertyMap;
61
62 public RCObject(RCTable table, byte[] key) {
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080063 this(table, key, null, VERSION_NONEXISTENT);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080064 }
65
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080066 public RCObject(RCTable table, byte[] key, byte[] value, long version) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080067 if (table == null) {
68 throw new IllegalArgumentException("table cannot be null");
69 }
70 if (key == null) {
71 throw new IllegalArgumentException("key cannot be null");
72 }
73 this.table = table;
74 this.key = key;
75 this.value = value;
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080076 this.version = version;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080077 this.propertyMap = new HashMap<Object, Object>();
78
79 if (this.value != null) {
80 deserializeObjectFromValue();
81 }
82 }
83
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080084 public static <T extends RCObject> T createFromKey(byte[] key) {
85 // Equivalent of this method is expected to be implemented by SubClasses
86 throw new UnsupportedOperationException(
87 "createFromKey() is not expected to be called for RCObject");
88 }
89
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080090 public RCTable getTable() {
91 return table;
92 }
93
94 public long getTableId() {
95 return table.getTableId();
96 }
97
98 public byte[] getKey() {
99 return key;
100 }
101
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800102 /**
103 * Get the byte array value of this object
104 *
105 * @note will trigger serialization, if value was null.
106 * @return
107 */
108 protected byte[] getValue() {
109 if (value == null) {
110 serializeAndSetValue();
111 }
112 return value;
113 }
114
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800115 public long getVersion() {
116 return version;
117 }
118
119 /**
120 * Return serialized Value.
121 *
122 * @note will not trigger serialization
123 * @return Will return null, if never been read, or was not serialized
124 */
125 public byte[] getSerializedValue() {
126 return value;
127 }
128
129 /**
130 * Return Object as a Map.
131 *
132 * @note Will not trigger deserialization
133 * @return Will return null, if never been set, or was not deserialized
134 */
135 protected Map<Object, Object> getObjectMap() {
136 return this.propertyMap;
137 }
138
139 protected Map<Object, Object> setObjectMap(Map<Object, Object> new_map) {
140 Map<Object, Object> old_map = this.propertyMap;
141 this.propertyMap = new_map;
142 return old_map;
143 }
144
145 public void serializeAndSetValue() {
146 serializeAndSetValue(defaultKryo.get(), this.propertyMap);
147 }
148
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800149 protected void serializeAndSetValue(Kryo kryo,
150 Map<Object, Object> javaObject) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800151
152 // value
153 byte[] rcTemp = new byte[1024 * 1024];
154 Output output = new Output(rcTemp);
155 kryo.writeObject(output, javaObject);
156 this.value = output.toBytes();
157 }
158
159 /**
160 * Deserialize
161 *
162 * @return
163 */
164 public Map<Object, Object> deserializeObjectFromValue() {
165 return deserializeObjectFromValue(defaultKryo.get());
166 }
167
168 protected HashMap<Object, Object> deserializeObjectFromValue(Kryo kryo) {
169 return deserializeObjectFromValue(kryo, HashMap.class);
170 }
171
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800172 protected <T extends Map> T deserializeObjectFromValue(Kryo kryo,
173 Class<T> type) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800174 if (this.value == null)
175 return null;
176
177 Input input = new Input(this.value);
178 T map = kryo.readObject(input, type);
179 this.propertyMap = map;
180
181 return map;
182 }
183
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800184 protected void setValueAndDeserialize(byte[] value, long version) {
185 this.value = value;
186 this.version = version;
187 deserializeObjectFromValue();
188 }
189
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800190 /**
191 * Create an Object in DataStore.
192 *
193 * Fails if the Object with same key already exists.
194 *
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800195 * @throws ObjectExistsException
196 */
197 public void create() throws ObjectExistsException {
198
199 if (this.propertyMap == null) {
200 log.warn("No object map was set. Setting empty Map.");
201 setObjectMap(new HashMap<Object, Object>());
202 }
203 serializeAndSetValue();
204
205 this.version = table.create(key, value);
206 }
207
Yuta HIGUCHIc19f5952014-02-05 09:36:06 -0800208 public void forceCreate() {
209
210 if (this.propertyMap == null) {
211 log.warn("No object map was set. Setting empty Map.");
212 setObjectMap(new HashMap<Object, Object>());
213 }
214 serializeAndSetValue();
215
216 this.version = table.forceCreate(key, value);
217 }
218
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800219 /**
220 * Read an Object from DataStore.
221 *
222 * Fails if the Object with the key does not exist.
223 *
224 * @throws ObjectDoesntExistException
225 *
226 */
227 public void read() throws ObjectDoesntExistException {
228 Entry e = table.read(key);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800229 // TODO should we deserialize immediately?
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800230 setValueAndDeserialize(e.value, e.version);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800231 }
232
233 /**
234 * Update an existing Object in DataStore checking versions.
235 *
236 * Fails if the Object with key does not exists, or conditional failure.
237 *
238 * @throws WrongVersionException
239 * @throws ObjectDoesntExistException
240 */
241 public void update() throws ObjectDoesntExistException,
242 WrongVersionException {
243 if (this.propertyMap == null) {
244 setObjectMap(new HashMap<Object, Object>());
245 }
246 serializeAndSetValue();
247
248 this.version = table.update(key, value, version);
249 }
250
251 /**
252 * Remove an existing Object in DataStore.
253 *
254 * Fails if the Object with key does not exists.
255 *
256 * @throws ObjectDoesntExistException
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800257 * @throws WrongVersionException
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800258 */
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800259 public void delete() throws ObjectDoesntExistException,
260 WrongVersionException {
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800261 this.version = table.delete(key, this.version);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800262 }
263
Yuta HIGUCHIc19f5952014-02-05 09:36:06 -0800264 public void forceDelete() {
265 this.version = table.forceDelete(key);
266 }
267
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800268 /**
269 * Multi-read RCObjects.
270 *
271 * If the blob value was read successfully, RCObject will deserialize them.
272 *
273 * @param objects
274 * RCObjects to read
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800275 * @return true if there exist a failed read.
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800276 */
277 public static boolean multiRead(Collection<RCObject> objects) {
278 boolean fail_exists = false;
279
280 ArrayList<RCObject> req = new ArrayList<>();
281 Iterator<RCObject> it = objects.iterator();
282 while (it.hasNext()) {
283
284 req.add(it.next());
285
286 if (req.size() >= RCClient.MAX_MULTI_READS) {
287 // dispatch multiRead
288 fail_exists |= multiReadInternal(req);
289 req.clear();
290 }
291 }
292
293 if (!req.isEmpty()) {
294 // dispatch multiRead
295 fail_exists |= multiReadInternal(req);
296 req.clear();
297 }
298
299 return fail_exists;
300 }
301
302 private static boolean multiReadInternal(ArrayList<RCObject> req) {
303 boolean fail_exists = false;
304 JRamCloud rcClient = RCClient.getClient();
305
306 final int reqs = req.size();
307 JRamCloud.multiReadObject mrObjs[] = new JRamCloud.multiReadObject[reqs];
308
309 // setup multi-read operation
310 for (int i = 0; i < reqs; ++i) {
311 RCObject obj = req.get(i);
312 mrObjs[i] = new JRamCloud.multiReadObject(obj.getTableId(),
313 obj.getKey());
314 }
315
316 // execute
317 JRamCloud.Object results[] = rcClient.multiRead(mrObjs);
318 assert (results.length <= req.size());
319
320 // reflect changes to RCObject
321 for (int i = 0; i < results.length; ++i) {
322 RCObject obj = req.get(i);
323 if (results[i] == null) {
324 log.error("MultiRead error, skipping {}, {}", obj.getTable(),
325 obj);
326 fail_exists = true;
327 continue;
328 }
329 assert (Arrays.equals(results[i].key, obj.getKey()));
330
331 obj.value = results[i].value;
332 obj.version = results[i].version;
333 if (obj.version == VERSION_NONEXISTENT) {
334 fail_exists = true;
335 } else {
336 obj.deserializeObjectFromValue();
337 }
338 }
339
340 return fail_exists;
341 }
342
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800343 public static class WriteOp {
344 public enum STATUS {
345 NOT_EXECUTED, SUCCESS, FAILED
346 }
347
348 public enum OPS {
349 CREATE, UPDATE, FORCE_CREATE
350 }
351
352 private RCObject obj;
353 private OPS op;
354 private STATUS status;
355
356 public static WriteOp Create(RCObject obj) {
357 return new WriteOp(obj, OPS.CREATE);
358 }
359
360 public static WriteOp Update(RCObject obj) {
361 return new WriteOp(obj, OPS.UPDATE);
362 }
363
364 public static WriteOp ForceCreate(RCObject obj) {
365 return new WriteOp(obj, OPS.FORCE_CREATE);
366 }
367
368 public WriteOp(RCObject obj, OPS op) {
369 this.obj = obj;
370 this.op = op;
371 this.status = STATUS.NOT_EXECUTED;
372 }
373
374 public boolean hasSucceed() {
375 return status == STATUS.SUCCESS;
376 }
377
378 public RCObject getObject() {
379 return obj;
380 }
381
382 public OPS getOp() {
383 return op;
384 }
Yuta HIGUCHI1ca1afa2014-02-04 19:33:57 -0800385
386 public STATUS getStatus() {
387 return status;
388 }
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800389 }
390
391 public static boolean multiWrite(Collection<WriteOp> objects) {
392 boolean fail_exists = false;
393
394 ArrayList<WriteOp> req = new ArrayList<>();
395 Iterator<WriteOp> it = objects.iterator();
396 while (it.hasNext()) {
397
398 req.add(it.next());
399
400 if (req.size() >= RCClient.MAX_MULTI_WRITES) {
401 // dispatch multiWrite
402 fail_exists |= multiWriteInternal(req);
403 req.clear();
404 }
405 }
406
407 if (!req.isEmpty()) {
408 // dispatch multiWrite
409 fail_exists |= multiWriteInternal(req);
410 req.clear();
411 }
412
413 return fail_exists;
414 }
415
416 private static boolean multiWriteInternal(ArrayList<WriteOp> ops) {
417
418 boolean fail_exists = false;
419 MultiWriteObject multiWriteObjects[] = new MultiWriteObject[ops.size()];
420 JRamCloud rcClient = RCClient.getClient();
421
422 for (int i = 0; i < multiWriteObjects.length; ++i) {
423 WriteOp op = ops.get(i);
424 RCObject obj = op.getObject();
425
426 // FIXME JRamCloud.RejectRules definition is messed up
427 RejectRules rules = rcClient.new RejectRules();
428
429 switch (op.getOp()) {
430 case CREATE:
431 rules.setExists();
432 break;
433 case FORCE_CREATE:
434 // no reject rule
435 break;
436 case UPDATE:
437 rules.setDoesntExists();
438 rules.setNeVersion(obj.getVersion());
439 break;
440 }
441 multiWriteObjects[i] = new MultiWriteObject(obj.getTableId(),
442 obj.getKey(), obj.getValue(), rules);
443 }
444
445 MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects);
446 assert (results.length == multiWriteObjects.length);
447
448 for (int i = 0; i < results.length; ++i) {
449 WriteOp op = ops.get(i);
450
451 if (results[i] != null
452 && results[i].getStatus() == RCClient.STATUS_OK) {
453 op.status = STATUS.SUCCESS;
454
455 RCObject obj = op.getObject();
456 obj.version = results[i].getVersion();
457 } else {
458 op.status = STATUS.FAILED;
459 fail_exists = true;
460 }
461
462 }
463
464 return fail_exists;
465 }
466
467 public static Iterable<RCObject> getAllObjects(RCTable table) {
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800468 return new ObjectEnumerator(table);
469 }
470
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800471 public static class ObjectEnumerator implements Iterable<RCObject> {
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800472
473 private RCTable table;
474
475 public ObjectEnumerator(RCTable table) {
476 this.table = table;
477 }
478
479 @Override
480 public Iterator<RCObject> iterator() {
481 return new ObjectIterator<RCObject>(table);
482 }
483
484 }
485
486 public static class ObjectIterator<E extends RCObject> implements
487 Iterator<E> {
488
489 protected TableEnumerator enumerator;
490
491 public ObjectIterator(RCTable table) {
492 // FIXME workaround for JRamCloud bug. It should have been declared
493 // as static class
494 JRamCloud c = RCClient.getClient();
495 this.enumerator = c.new TableEnumerator(table.getTableId());
496 }
497
498 @Override
499 public boolean hasNext() {
500 return enumerator.hasNext();
501 }
502
503 @Override
504 public E next() {
505 JRamCloud.Object o = enumerator.next();
506 RCObject obj = RCObject.createFromKey(o.key);
507 obj.setValueAndDeserialize(o.value, o.version);
508 return (E) obj;
509 }
510
511 @Deprecated
512 @Override
513 public void remove() {
514 // TODO Not implemented, as I cannot find a use-case for it.
515 throw new UnsupportedOperationException("Not implemented yet");
516 }
517
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800518 }
519
520}