blob: 77f5d15fc2035c91f10c23ed3b16fa8e98eaf73f [file] [log] [blame]
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -08001package net.onrc.onos.datastore;
2
3import java.util.ArrayList;
4import java.util.Arrays;
5import java.util.Collection;
6import java.util.HashMap;
7import java.util.Iterator;
8import java.util.Map;
9
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080010import net.onrc.onos.datastore.RCObject.WriteOp.STATUS;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080011import net.onrc.onos.datastore.RCTable.Entry;
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080012
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080013import org.slf4j.Logger;
14import org.slf4j.LoggerFactory;
15
16import com.esotericsoftware.kryo.Kryo;
17import com.esotericsoftware.kryo.io.Input;
18import com.esotericsoftware.kryo.io.Output;
19
20import edu.stanford.ramcloud.JRamCloud;
Yoshi Muroie7693b12014-02-19 19:41:17 -080021import edu.stanford.ramcloud.JRamCloud.MultiReadObject;
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080022import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
23import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080024import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
25import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -080026import edu.stanford.ramcloud.JRamCloud.RejectRules;
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080027import edu.stanford.ramcloud.JRamCloud.TableEnumerator;
Yoshi Muroie7693b12014-02-19 19:41:17 -080028import edu.stanford.ramcloud.JRamCloud.TableEnumerator2;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080029import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
30
31/**
32 * Class to represent an Object represented as a single K-V pair Value blob.
33 *
34 */
35public class RCObject {
36 private static final Logger log = LoggerFactory.getLogger(RCObject.class);
37 /**
38 * Version number which represents that the object doesnot exist, or hase
39 * never read the DB before.
40 */
41 public static final long VERSION_NONEXISTENT = 0L;
42
43 // Each Object should prepare their own serializer, which has required
44 // objects registered.
45 private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
46 @Override
47 protected Kryo initialValue() {
48 Kryo kryo = new Kryo();
49 // kryo.setRegistrationRequired(true);
50 // TODO TreeMap or just Map
51 // kryo.register(TreeMap.class);
52 kryo.setReferences(false);
53 return kryo;
54 }
55 };
56
57 private final RCTable table;
58 private final byte[] key;
Yoshi Muroi212e5ca2014-02-20 22:42:37 -080059 protected byte[] value; //FIXME should not be exposed
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080060 private long version;
61
62 private Map<Object, Object> propertyMap;
63
64 public RCObject(RCTable table, byte[] key) {
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080065 this(table, key, null, VERSION_NONEXISTENT);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080066 }
67
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080068 public RCObject(RCTable table, byte[] key, byte[] value, long version) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080069 if (table == null) {
70 throw new IllegalArgumentException("table cannot be null");
71 }
72 if (key == null) {
73 throw new IllegalArgumentException("key cannot be null");
74 }
75 this.table = table;
76 this.key = key;
77 this.value = value;
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080078 this.version = version;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080079 this.propertyMap = new HashMap<Object, Object>();
80
81 if (this.value != null) {
82 deserializeObjectFromValue();
83 }
84 }
85
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080086 public static <T extends RCObject> T createFromKey(byte[] key) {
87 // Equivalent of this method is expected to be implemented by SubClasses
88 throw new UnsupportedOperationException(
89 "createFromKey() is not expected to be called for RCObject");
90 }
91
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080092 public RCTable getTable() {
93 return table;
94 }
95
96 public long getTableId() {
97 return table.getTableId();
98 }
99
100 public byte[] getKey() {
101 return key;
102 }
103
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800104 /**
105 * Get the byte array value of this object
106 *
107 * @note will trigger serialization, if value was null.
108 * @return
109 */
110 protected byte[] getValue() {
111 if (value == null) {
112 serializeAndSetValue();
113 }
114 return value;
115 }
116
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800117 public long getVersion() {
118 return version;
119 }
120
121 /**
122 * Return serialized Value.
123 *
124 * @note will not trigger serialization
125 * @return Will return null, if never been read, or was not serialized
126 */
127 public byte[] getSerializedValue() {
128 return value;
129 }
130
131 /**
132 * Return Object as a Map.
133 *
134 * @note Will not trigger deserialization
135 * @return Will return null, if never been set, or was not deserialized
136 */
137 protected Map<Object, Object> getObjectMap() {
138 return this.propertyMap;
139 }
140
141 protected Map<Object, Object> setObjectMap(Map<Object, Object> new_map) {
142 Map<Object, Object> old_map = this.propertyMap;
143 this.propertyMap = new_map;
144 return old_map;
145 }
146
147 public void serializeAndSetValue() {
148 serializeAndSetValue(defaultKryo.get(), this.propertyMap);
149 }
150
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800151 protected void serializeAndSetValue(Kryo kryo,
152 Map<Object, Object> javaObject) {
Yoshi Muroi212e5ca2014-02-20 22:42:37 -0800153
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800154
155 // value
156 byte[] rcTemp = new byte[1024 * 1024];
157 Output output = new Output(rcTemp);
158 kryo.writeObject(output, javaObject);
159 this.value = output.toBytes();
160 }
161
162 /**
163 * Deserialize
164 *
165 * @return
166 */
167 public Map<Object, Object> deserializeObjectFromValue() {
168 return deserializeObjectFromValue(defaultKryo.get());
169 }
170
171 protected HashMap<Object, Object> deserializeObjectFromValue(Kryo kryo) {
172 return deserializeObjectFromValue(kryo, HashMap.class);
173 }
174
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800175 protected <T extends Map> T deserializeObjectFromValue(Kryo kryo,
176 Class<T> type) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800177 if (this.value == null)
178 return null;
Yoshi Muroi212e5ca2014-02-20 22:42:37 -0800179
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800180 Input input = new Input(this.value);
181 T map = kryo.readObject(input, type);
182 this.propertyMap = map;
183
184 return map;
185 }
186
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800187 protected void setValueAndDeserialize(byte[] value, long version) {
188 this.value = value;
189 this.version = version;
190 deserializeObjectFromValue();
191 }
192
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800193 /**
194 * Create an Object in DataStore.
195 *
196 * Fails if the Object with same key already exists.
197 *
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800198 * @throws ObjectExistsException
199 */
200 public void create() throws ObjectExistsException {
201
202 if (this.propertyMap == null) {
203 log.warn("No object map was set. Setting empty Map.");
204 setObjectMap(new HashMap<Object, Object>());
205 }
206 serializeAndSetValue();
207
208 this.version = table.create(key, value);
209 }
210
Yuta HIGUCHIc19f5952014-02-05 09:36:06 -0800211 public void forceCreate() {
212
213 if (this.propertyMap == null) {
214 log.warn("No object map was set. Setting empty Map.");
215 setObjectMap(new HashMap<Object, Object>());
216 }
217 serializeAndSetValue();
218
219 this.version = table.forceCreate(key, value);
220 }
221
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800222 /**
223 * Read an Object from DataStore.
224 *
225 * Fails if the Object with the key does not exist.
226 *
227 * @throws ObjectDoesntExistException
228 *
229 */
230 public void read() throws ObjectDoesntExistException {
231 Entry e = table.read(key);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800232 // TODO should we deserialize immediately?
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800233 setValueAndDeserialize(e.value, e.version);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800234 }
235
236 /**
237 * Update an existing Object in DataStore checking versions.
238 *
239 * Fails if the Object with key does not exists, or conditional failure.
240 *
241 * @throws WrongVersionException
242 * @throws ObjectDoesntExistException
243 */
244 public void update() throws ObjectDoesntExistException,
245 WrongVersionException {
246 if (this.propertyMap == null) {
247 setObjectMap(new HashMap<Object, Object>());
248 }
249 serializeAndSetValue();
250
251 this.version = table.update(key, value, version);
252 }
253
254 /**
255 * Remove an existing Object in DataStore.
256 *
257 * Fails if the Object with key does not exists.
258 *
259 * @throws ObjectDoesntExistException
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800260 * @throws WrongVersionException
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800261 */
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800262 public void delete() throws ObjectDoesntExistException,
263 WrongVersionException {
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800264 this.version = table.delete(key, this.version);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800265 }
266
Yuta HIGUCHIc19f5952014-02-05 09:36:06 -0800267 public void forceDelete() {
268 this.version = table.forceDelete(key);
269 }
270
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800271 /**
272 * Multi-read RCObjects.
273 *
274 * If the blob value was read successfully, RCObject will deserialize them.
275 *
276 * @param objects
277 * RCObjects to read
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800278 * @return true if there exist a failed read.
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800279 */
280 public static boolean multiRead(Collection<RCObject> objects) {
281 boolean fail_exists = false;
282
283 ArrayList<RCObject> req = new ArrayList<>();
284 Iterator<RCObject> it = objects.iterator();
285 while (it.hasNext()) {
286
287 req.add(it.next());
288
289 if (req.size() >= RCClient.MAX_MULTI_READS) {
290 // dispatch multiRead
291 fail_exists |= multiReadInternal(req);
292 req.clear();
293 }
294 }
295
296 if (!req.isEmpty()) {
297 // dispatch multiRead
298 fail_exists |= multiReadInternal(req);
299 req.clear();
300 }
301
302 return fail_exists;
303 }
304
305 private static boolean multiReadInternal(ArrayList<RCObject> req) {
306 boolean fail_exists = false;
307 JRamCloud rcClient = RCClient.getClient();
308
309 final int reqs = req.size();
Yoshi Muroie7693b12014-02-19 19:41:17 -0800310
311 MultiReadObject multiReadObjects = new MultiReadObject(req.size());
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800312
313 // setup multi-read operation
314 for (int i = 0; i < reqs; ++i) {
315 RCObject obj = req.get(i);
Yoshi Muroie7693b12014-02-19 19:41:17 -0800316 multiReadObjects.setObject(i, obj.getTableId(), obj.getKey());
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800317 }
318
319 // execute
Yoshi Muroie7693b12014-02-19 19:41:17 -0800320 JRamCloud.Object results[] = rcClient.multiRead(multiReadObjects.tableId, multiReadObjects.key, multiReadObjects.keyLength, reqs);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800321 assert (results.length <= req.size());
322
323 // reflect changes to RCObject
324 for (int i = 0; i < results.length; ++i) {
325 RCObject obj = req.get(i);
326 if (results[i] == null) {
327 log.error("MultiRead error, skipping {}, {}", obj.getTable(),
328 obj);
329 fail_exists = true;
330 continue;
331 }
332 assert (Arrays.equals(results[i].key, obj.getKey()));
333
334 obj.value = results[i].value;
335 obj.version = results[i].version;
336 if (obj.version == VERSION_NONEXISTENT) {
337 fail_exists = true;
338 } else {
339 obj.deserializeObjectFromValue();
340 }
341 }
342
343 return fail_exists;
344 }
345
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800346 public static class WriteOp {
347 public enum STATUS {
348 NOT_EXECUTED, SUCCESS, FAILED
349 }
350
351 public enum OPS {
352 CREATE, UPDATE, FORCE_CREATE
353 }
354
355 private RCObject obj;
356 private OPS op;
357 private STATUS status;
358
359 public static WriteOp Create(RCObject obj) {
360 return new WriteOp(obj, OPS.CREATE);
361 }
362
363 public static WriteOp Update(RCObject obj) {
364 return new WriteOp(obj, OPS.UPDATE);
365 }
366
367 public static WriteOp ForceCreate(RCObject obj) {
368 return new WriteOp(obj, OPS.FORCE_CREATE);
369 }
370
371 public WriteOp(RCObject obj, OPS op) {
372 this.obj = obj;
373 this.op = op;
374 this.status = STATUS.NOT_EXECUTED;
375 }
376
377 public boolean hasSucceed() {
378 return status == STATUS.SUCCESS;
379 }
380
381 public RCObject getObject() {
382 return obj;
383 }
384
385 public OPS getOp() {
386 return op;
387 }
Yuta HIGUCHI1ca1afa2014-02-04 19:33:57 -0800388
389 public STATUS getStatus() {
390 return status;
391 }
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800392 }
393
394 public static boolean multiWrite(Collection<WriteOp> objects) {
395 boolean fail_exists = false;
396
397 ArrayList<WriteOp> req = new ArrayList<>();
398 Iterator<WriteOp> it = objects.iterator();
399 while (it.hasNext()) {
400
401 req.add(it.next());
402
403 if (req.size() >= RCClient.MAX_MULTI_WRITES) {
404 // dispatch multiWrite
405 fail_exists |= multiWriteInternal(req);
406 req.clear();
407 }
408 }
409
410 if (!req.isEmpty()) {
411 // dispatch multiWrite
412 fail_exists |= multiWriteInternal(req);
413 req.clear();
414 }
415
416 return fail_exists;
417 }
418
419 private static boolean multiWriteInternal(ArrayList<WriteOp> ops) {
420
421 boolean fail_exists = false;
Yoshi Muroie7693b12014-02-19 19:41:17 -0800422 MultiWriteObject multiWriteObjects = new MultiWriteObject(ops.size());
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800423 JRamCloud rcClient = RCClient.getClient();
424
Yoshi Muroie7693b12014-02-19 19:41:17 -0800425 for (int i = 0; i < ops.size(); ++i) {
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800426 WriteOp op = ops.get(i);
427 RCObject obj = op.getObject();
428
429 // FIXME JRamCloud.RejectRules definition is messed up
430 RejectRules rules = rcClient.new RejectRules();
431
432 switch (op.getOp()) {
433 case CREATE:
434 rules.setExists();
435 break;
436 case FORCE_CREATE:
437 // no reject rule
438 break;
439 case UPDATE:
440 rules.setDoesntExists();
441 rules.setNeVersion(obj.getVersion());
442 break;
443 }
Yoshi Muroie7693b12014-02-19 19:41:17 -0800444 multiWriteObjects.setObject(i, obj.getTableId(), obj.getKey(), obj.getValue(), rules);
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800445 }
446
Yoshi Muroie7693b12014-02-19 19:41:17 -0800447 MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects.tableId, multiWriteObjects.key, multiWriteObjects.keyLength, multiWriteObjects.value, multiWriteObjects.valueLength, ops.size(), multiWriteObjects.rules);
448 assert (results.length == ops.size());
Yuta HIGUCHIad7dba92014-02-03 16:47:15 -0800449
450 for (int i = 0; i < results.length; ++i) {
451 WriteOp op = ops.get(i);
452
453 if (results[i] != null
454 && results[i].getStatus() == RCClient.STATUS_OK) {
455 op.status = STATUS.SUCCESS;
456
457 RCObject obj = op.getObject();
458 obj.version = results[i].getVersion();
459 } else {
460 op.status = STATUS.FAILED;
461 fail_exists = true;
462 }
463
464 }
465
466 return fail_exists;
467 }
Yoshi Muroie7693b12014-02-19 19:41:17 -0800468
Yuta HIGUCHI25719052014-02-13 14:42:06 -0800469 public static abstract class ObjectIterator<E extends RCObject> implements
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800470 Iterator<E> {
471
Yoshi Muroie7693b12014-02-19 19:41:17 -0800472 protected TableEnumerator2 enumerator;
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800473
474 public ObjectIterator(RCTable table) {
475 // FIXME workaround for JRamCloud bug. It should have been declared
476 // as static class
477 JRamCloud c = RCClient.getClient();
Yoshi Muroie7693b12014-02-19 19:41:17 -0800478 this.enumerator = c.new TableEnumerator2(table.getTableId());
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800479 }
480
481 @Override
482 public boolean hasNext() {
483 return enumerator.hasNext();
484 }
485
Yuta HIGUCHI25719052014-02-13 14:42:06 -0800486// Implement something similar to below to realize Iterator
487// @Override
488// public E next() {
489// JRamCloud.Object o = enumerator.next();
490// E obj = E.createFromKey(o.key);
491// obj.setValueAndDeserialize(o.value, o.version);
492// return obj;
493// }
Yoshi Muroie7693b12014-02-19 19:41:17 -0800494
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800495 @Deprecated
496 @Override
497 public void remove() {
498 // TODO Not implemented, as I cannot find a use-case for it.
499 throw new UnsupportedOperationException("Not implemented yet");
500 }
501
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800502 }
503
504}