blob: 23a213999d4e40f01eb0c3e7057a0ad09017b3f5 [file] [log] [blame]
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -08001package net.onrc.onos.datastore;
2
3import java.util.ArrayList;
4import java.util.Arrays;
5import java.util.Collection;
6import java.util.HashMap;
7import java.util.Iterator;
8import java.util.Map;
9
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080010import net.onrc.onos.datastore.RCObject.WriteOp.STATUS;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080011import net.onrc.onos.datastore.RCTable.Entry;
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080012
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080013import org.slf4j.Logger;
14import org.slf4j.LoggerFactory;
15
16import com.esotericsoftware.kryo.Kryo;
17import com.esotericsoftware.kryo.io.Input;
18import com.esotericsoftware.kryo.io.Output;
19
20import edu.stanford.ramcloud.JRamCloud;
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080021import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
22import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080023import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
24import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080025import edu.stanford.ramcloud.JRamCloud.RejectRules;
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080026import edu.stanford.ramcloud.JRamCloud.TableEnumerator;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080027import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
28
29/**
30 * Class to represent an Object represented as a single K-V pair Value blob.
31 *
32 */
33public class RCObject {
34 private static final Logger log = LoggerFactory.getLogger(RCObject.class);
35 /**
36 * Version number which represents that the object doesnot exist, or hase
37 * never read the DB before.
38 */
39 public static final long VERSION_NONEXISTENT = 0L;
40
41 // Each Object should prepare their own serializer, which has required
42 // objects registered.
43 private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
44 @Override
45 protected Kryo initialValue() {
46 Kryo kryo = new Kryo();
47 // kryo.setRegistrationRequired(true);
48 // TODO TreeMap or just Map
49 // kryo.register(TreeMap.class);
50 kryo.setReferences(false);
51 return kryo;
52 }
53 };
54
55 private final RCTable table;
56 private final byte[] key;
57 private byte[] value;
58 private long version;
59
60 private Map<Object, Object> propertyMap;
61
62 public RCObject(RCTable table, byte[] key) {
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080063 this(table, key, null, VERSION_NONEXISTENT);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080064 }
65
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080066 public RCObject(RCTable table, byte[] key, byte[] value, long version) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080067 if (table == null) {
68 throw new IllegalArgumentException("table cannot be null");
69 }
70 if (key == null) {
71 throw new IllegalArgumentException("key cannot be null");
72 }
73 this.table = table;
74 this.key = key;
75 this.value = value;
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080076 this.version = version;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080077 this.propertyMap = new HashMap<Object, Object>();
78
79 if (this.value != null) {
80 deserializeObjectFromValue();
81 }
82 }
83
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080084 public static <T extends RCObject> T createFromKey(byte[] key) {
85 // Equivalent of this method is expected to be implemented by SubClasses
86 throw new UnsupportedOperationException(
87 "createFromKey() is not expected to be called for RCObject");
88 }
89
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080090 public RCTable getTable() {
91 return table;
92 }
93
94 public long getTableId() {
95 return table.getTableId();
96 }
97
98 public byte[] getKey() {
99 return key;
100 }
101
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800102 /**
103 * Get the byte array value of this object
104 *
105 * @note will trigger serialization, if value was null.
106 * @return
107 */
108 protected byte[] getValue() {
109 if (value == null) {
110 serializeAndSetValue();
111 }
112 return value;
113 }
114
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800115 public long getVersion() {
116 return version;
117 }
118
119 /**
120 * Return serialized Value.
121 *
122 * @note will not trigger serialization
123 * @return Will return null, if never been read, or was not serialized
124 */
125 public byte[] getSerializedValue() {
126 return value;
127 }
128
129 /**
130 * Return Object as a Map.
131 *
132 * @note Will not trigger deserialization
133 * @return Will return null, if never been set, or was not deserialized
134 */
135 protected Map<Object, Object> getObjectMap() {
136 return this.propertyMap;
137 }
138
139 protected Map<Object, Object> setObjectMap(Map<Object, Object> new_map) {
140 Map<Object, Object> old_map = this.propertyMap;
141 this.propertyMap = new_map;
142 return old_map;
143 }
144
145 public void serializeAndSetValue() {
146 serializeAndSetValue(defaultKryo.get(), this.propertyMap);
147 }
148
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800149 protected void serializeAndSetValue(Kryo kryo,
150 Map<Object, Object> javaObject) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800151
152 // value
153 byte[] rcTemp = new byte[1024 * 1024];
154 Output output = new Output(rcTemp);
155 kryo.writeObject(output, javaObject);
156 this.value = output.toBytes();
157 }
158
159 /**
160 * Deserialize
161 *
162 * @return
163 */
164 public Map<Object, Object> deserializeObjectFromValue() {
165 return deserializeObjectFromValue(defaultKryo.get());
166 }
167
168 protected HashMap<Object, Object> deserializeObjectFromValue(Kryo kryo) {
169 return deserializeObjectFromValue(kryo, HashMap.class);
170 }
171
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800172 protected <T extends Map> T deserializeObjectFromValue(Kryo kryo,
173 Class<T> type) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800174 if (this.value == null)
175 return null;
176
177 Input input = new Input(this.value);
178 T map = kryo.readObject(input, type);
179 this.propertyMap = map;
180
181 return map;
182 }
183
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800184 protected void setValueAndDeserialize(byte[] value, long version) {
185 this.value = value;
186 this.version = version;
187 deserializeObjectFromValue();
188 }
189
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800190 /**
191 * Create an Object in DataStore.
192 *
193 * Fails if the Object with same key already exists.
194 *
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800195 * @throws ObjectExistsException
196 */
197 public void create() throws ObjectExistsException {
198
199 if (this.propertyMap == null) {
200 log.warn("No object map was set. Setting empty Map.");
201 setObjectMap(new HashMap<Object, Object>());
202 }
203 serializeAndSetValue();
204
205 this.version = table.create(key, value);
206 }
207
208 /**
209 * Read an Object from DataStore.
210 *
211 * Fails if the Object with the key does not exist.
212 *
213 * @throws ObjectDoesntExistException
214 *
215 */
216 public void read() throws ObjectDoesntExistException {
217 Entry e = table.read(key);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800218 // TODO should we deserialize immediately?
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800219 setValueAndDeserialize(e.value, e.version);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800220 }
221
222 /**
223 * Update an existing Object in DataStore checking versions.
224 *
225 * Fails if the Object with key does not exists, or conditional failure.
226 *
227 * @throws WrongVersionException
228 * @throws ObjectDoesntExistException
229 */
230 public void update() throws ObjectDoesntExistException,
231 WrongVersionException {
232 if (this.propertyMap == null) {
233 setObjectMap(new HashMap<Object, Object>());
234 }
235 serializeAndSetValue();
236
237 this.version = table.update(key, value, version);
238 }
239
240 /**
241 * Remove an existing Object in DataStore.
242 *
243 * Fails if the Object with key does not exists.
244 *
245 * @throws ObjectDoesntExistException
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800246 * @throws WrongVersionException
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800247 */
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800248 public void delete() throws ObjectDoesntExistException,
249 WrongVersionException {
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800250 this.version = table.delete(key, this.version);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800251 }
252
253 /**
254 * Multi-read RCObjects.
255 *
256 * If the blob value was read successfully, RCObject will deserialize them.
257 *
258 * @param objects
259 * RCObjects to read
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800260 * @return true if there exist a failed read.
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800261 */
262 public static boolean multiRead(Collection<RCObject> objects) {
263 boolean fail_exists = false;
264
265 ArrayList<RCObject> req = new ArrayList<>();
266 Iterator<RCObject> it = objects.iterator();
267 while (it.hasNext()) {
268
269 req.add(it.next());
270
271 if (req.size() >= RCClient.MAX_MULTI_READS) {
272 // dispatch multiRead
273 fail_exists |= multiReadInternal(req);
274 req.clear();
275 }
276 }
277
278 if (!req.isEmpty()) {
279 // dispatch multiRead
280 fail_exists |= multiReadInternal(req);
281 req.clear();
282 }
283
284 return fail_exists;
285 }
286
287 private static boolean multiReadInternal(ArrayList<RCObject> req) {
288 boolean fail_exists = false;
289 JRamCloud rcClient = RCClient.getClient();
290
291 final int reqs = req.size();
292 JRamCloud.multiReadObject mrObjs[] = new JRamCloud.multiReadObject[reqs];
293
294 // setup multi-read operation
295 for (int i = 0; i < reqs; ++i) {
296 RCObject obj = req.get(i);
297 mrObjs[i] = new JRamCloud.multiReadObject(obj.getTableId(),
298 obj.getKey());
299 }
300
301 // execute
302 JRamCloud.Object results[] = rcClient.multiRead(mrObjs);
303 assert (results.length <= req.size());
304
305 // reflect changes to RCObject
306 for (int i = 0; i < results.length; ++i) {
307 RCObject obj = req.get(i);
308 if (results[i] == null) {
309 log.error("MultiRead error, skipping {}, {}", obj.getTable(),
310 obj);
311 fail_exists = true;
312 continue;
313 }
314 assert (Arrays.equals(results[i].key, obj.getKey()));
315
316 obj.value = results[i].value;
317 obj.version = results[i].version;
318 if (obj.version == VERSION_NONEXISTENT) {
319 fail_exists = true;
320 } else {
321 obj.deserializeObjectFromValue();
322 }
323 }
324
325 return fail_exists;
326 }
327
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800328 public static class WriteOp {
329 public enum STATUS {
330 NOT_EXECUTED, SUCCESS, FAILED
331 }
332
333 public enum OPS {
334 CREATE, UPDATE, FORCE_CREATE
335 }
336
337 private RCObject obj;
338 private OPS op;
339 private STATUS status;
340
341 public static WriteOp Create(RCObject obj) {
342 return new WriteOp(obj, OPS.CREATE);
343 }
344
345 public static WriteOp Update(RCObject obj) {
346 return new WriteOp(obj, OPS.UPDATE);
347 }
348
349 public static WriteOp ForceCreate(RCObject obj) {
350 return new WriteOp(obj, OPS.FORCE_CREATE);
351 }
352
353 public WriteOp(RCObject obj, OPS op) {
354 this.obj = obj;
355 this.op = op;
356 this.status = STATUS.NOT_EXECUTED;
357 }
358
359 public boolean hasSucceed() {
360 return status == STATUS.SUCCESS;
361 }
362
363 public RCObject getObject() {
364 return obj;
365 }
366
367 public OPS getOp() {
368 return op;
369 }
Yuta HIGUCHI1ca1afa2014-02-04 19:33:57 -0800370
371 public STATUS getStatus() {
372 return status;
373 }
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800374 }
375
376 public static boolean multiWrite(Collection<WriteOp> objects) {
377 boolean fail_exists = false;
378
379 ArrayList<WriteOp> req = new ArrayList<>();
380 Iterator<WriteOp> it = objects.iterator();
381 while (it.hasNext()) {
382
383 req.add(it.next());
384
385 if (req.size() >= RCClient.MAX_MULTI_WRITES) {
386 // dispatch multiWrite
387 fail_exists |= multiWriteInternal(req);
388 req.clear();
389 }
390 }
391
392 if (!req.isEmpty()) {
393 // dispatch multiWrite
394 fail_exists |= multiWriteInternal(req);
395 req.clear();
396 }
397
398 return fail_exists;
399 }
400
401 private static boolean multiWriteInternal(ArrayList<WriteOp> ops) {
402
403 boolean fail_exists = false;
404 MultiWriteObject multiWriteObjects[] = new MultiWriteObject[ops.size()];
405 JRamCloud rcClient = RCClient.getClient();
406
407 for (int i = 0; i < multiWriteObjects.length; ++i) {
408 WriteOp op = ops.get(i);
409 RCObject obj = op.getObject();
410
411 // FIXME JRamCloud.RejectRules definition is messed up
412 RejectRules rules = rcClient.new RejectRules();
413
414 switch (op.getOp()) {
415 case CREATE:
416 rules.setExists();
417 break;
418 case FORCE_CREATE:
419 // no reject rule
420 break;
421 case UPDATE:
422 rules.setDoesntExists();
423 rules.setNeVersion(obj.getVersion());
424 break;
425 }
426 multiWriteObjects[i] = new MultiWriteObject(obj.getTableId(),
427 obj.getKey(), obj.getValue(), rules);
428 }
429
430 MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects);
431 assert (results.length == multiWriteObjects.length);
432
433 for (int i = 0; i < results.length; ++i) {
434 WriteOp op = ops.get(i);
435
436 if (results[i] != null
437 && results[i].getStatus() == RCClient.STATUS_OK) {
438 op.status = STATUS.SUCCESS;
439
440 RCObject obj = op.getObject();
441 obj.version = results[i].getVersion();
442 } else {
443 op.status = STATUS.FAILED;
444 fail_exists = true;
445 }
446
447 }
448
449 return fail_exists;
450 }
451
452 public static Iterable<RCObject> getAllObjects(RCTable table) {
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800453 return new ObjectEnumerator(table);
454 }
455
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800456 public static class ObjectEnumerator implements Iterable<RCObject> {
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800457
458 private RCTable table;
459
460 public ObjectEnumerator(RCTable table) {
461 this.table = table;
462 }
463
464 @Override
465 public Iterator<RCObject> iterator() {
466 return new ObjectIterator<RCObject>(table);
467 }
468
469 }
470
471 public static class ObjectIterator<E extends RCObject> implements
472 Iterator<E> {
473
474 protected TableEnumerator enumerator;
475
476 public ObjectIterator(RCTable table) {
477 // FIXME workaround for JRamCloud bug. It should have been declared
478 // as static class
479 JRamCloud c = RCClient.getClient();
480 this.enumerator = c.new TableEnumerator(table.getTableId());
481 }
482
483 @Override
484 public boolean hasNext() {
485 return enumerator.hasNext();
486 }
487
488 @Override
489 public E next() {
490 JRamCloud.Object o = enumerator.next();
491 RCObject obj = RCObject.createFromKey(o.key);
492 obj.setValueAndDeserialize(o.value, o.version);
493 return (E) obj;
494 }
495
496 @Deprecated
497 @Override
498 public void remove() {
499 // TODO Not implemented, as I cannot find a use-case for it.
500 throw new UnsupportedOperationException("Not implemented yet");
501 }
502
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800503 }
504
505}