blob: 37bc1930db78bddc3db54c098123faa40d61e8ea [file] [log] [blame]
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -08001package net.onrc.onos.datastore;
2
3import java.util.ArrayList;
4import java.util.Arrays;
5import java.util.Collection;
6import java.util.HashMap;
7import java.util.Iterator;
8import java.util.Map;
9
10import net.onrc.onos.datastore.RCTable.Entry;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080011import org.slf4j.Logger;
12import org.slf4j.LoggerFactory;
13
14import com.esotericsoftware.kryo.Kryo;
15import com.esotericsoftware.kryo.io.Input;
16import com.esotericsoftware.kryo.io.Output;
17
18import edu.stanford.ramcloud.JRamCloud;
19import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
20import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080021import edu.stanford.ramcloud.JRamCloud.TableEnumerator;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080022import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
23
24/**
25 * Class to represent an Object represented as a single K-V pair Value blob.
26 *
27 */
28public class RCObject {
29 private static final Logger log = LoggerFactory.getLogger(RCObject.class);
30 /**
31 * Version number which represents that the object doesnot exist, or hase
32 * never read the DB before.
33 */
34 public static final long VERSION_NONEXISTENT = 0L;
35
36 // Each Object should prepare their own serializer, which has required
37 // objects registered.
38 private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
39 @Override
40 protected Kryo initialValue() {
41 Kryo kryo = new Kryo();
42 // kryo.setRegistrationRequired(true);
43 // TODO TreeMap or just Map
44 // kryo.register(TreeMap.class);
45 kryo.setReferences(false);
46 return kryo;
47 }
48 };
49
50 private final RCTable table;
51 private final byte[] key;
52 private byte[] value;
53 private long version;
54
55 private Map<Object, Object> propertyMap;
56
57 public RCObject(RCTable table, byte[] key) {
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080058 this(table, key, null, VERSION_NONEXISTENT);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080059 }
60
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080061 public RCObject(RCTable table, byte[] key, byte[] value, long version) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080062 if (table == null) {
63 throw new IllegalArgumentException("table cannot be null");
64 }
65 if (key == null) {
66 throw new IllegalArgumentException("key cannot be null");
67 }
68 this.table = table;
69 this.key = key;
70 this.value = value;
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080071 this.version = version;
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080072 this.propertyMap = new HashMap<Object, Object>();
73
74 if (this.value != null) {
75 deserializeObjectFromValue();
76 }
77 }
78
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -080079 public static <T extends RCObject> T createFromKey(byte[] key) {
80 // Equivalent of this method is expected to be implemented by SubClasses
81 throw new UnsupportedOperationException(
82 "createFromKey() is not expected to be called for RCObject");
83 }
84
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -080085 public RCTable getTable() {
86 return table;
87 }
88
89 public long getTableId() {
90 return table.getTableId();
91 }
92
93 public byte[] getKey() {
94 return key;
95 }
96
97 public long getVersion() {
98 return version;
99 }
100
101 /**
102 * Return serialized Value.
103 *
104 * @note will not trigger serialization
105 * @return Will return null, if never been read, or was not serialized
106 */
107 public byte[] getSerializedValue() {
108 return value;
109 }
110
111 /**
112 * Return Object as a Map.
113 *
114 * @note Will not trigger deserialization
115 * @return Will return null, if never been set, or was not deserialized
116 */
117 protected Map<Object, Object> getObjectMap() {
118 return this.propertyMap;
119 }
120
121 protected Map<Object, Object> setObjectMap(Map<Object, Object> new_map) {
122 Map<Object, Object> old_map = this.propertyMap;
123 this.propertyMap = new_map;
124 return old_map;
125 }
126
127 public void serializeAndSetValue() {
128 serializeAndSetValue(defaultKryo.get(), this.propertyMap);
129 }
130
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800131 protected void serializeAndSetValue(Kryo kryo,
132 Map<Object, Object> javaObject) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800133
134 // value
135 byte[] rcTemp = new byte[1024 * 1024];
136 Output output = new Output(rcTemp);
137 kryo.writeObject(output, javaObject);
138 this.value = output.toBytes();
139 }
140
141 /**
142 * Deserialize
143 *
144 * @return
145 */
146 public Map<Object, Object> deserializeObjectFromValue() {
147 return deserializeObjectFromValue(defaultKryo.get());
148 }
149
150 protected HashMap<Object, Object> deserializeObjectFromValue(Kryo kryo) {
151 return deserializeObjectFromValue(kryo, HashMap.class);
152 }
153
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800154 protected <T extends Map> T deserializeObjectFromValue(Kryo kryo,
155 Class<T> type) {
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800156 if (this.value == null)
157 return null;
158
159 Input input = new Input(this.value);
160 T map = kryo.readObject(input, type);
161 this.propertyMap = map;
162
163 return map;
164 }
165
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800166 protected void setValueAndDeserialize(byte[] value, long version) {
167 this.value = value;
168 this.version = version;
169 deserializeObjectFromValue();
170 }
171
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800172 /**
173 * Create an Object in DataStore.
174 *
175 * Fails if the Object with same key already exists.
176 *
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800177 * @throws ObjectExistsException
178 */
179 public void create() throws ObjectExistsException {
180
181 if (this.propertyMap == null) {
182 log.warn("No object map was set. Setting empty Map.");
183 setObjectMap(new HashMap<Object, Object>());
184 }
185 serializeAndSetValue();
186
187 this.version = table.create(key, value);
188 }
189
190 /**
191 * Read an Object from DataStore.
192 *
193 * Fails if the Object with the key does not exist.
194 *
195 * @throws ObjectDoesntExistException
196 *
197 */
198 public void read() throws ObjectDoesntExistException {
199 Entry e = table.read(key);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800200 // TODO should we deserialize immediately?
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800201 setValueAndDeserialize(e.value, e.version);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800202 }
203
204 /**
205 * Update an existing Object in DataStore checking versions.
206 *
207 * Fails if the Object with key does not exists, or conditional failure.
208 *
209 * @throws WrongVersionException
210 * @throws ObjectDoesntExistException
211 */
212 public void update() throws ObjectDoesntExistException,
213 WrongVersionException {
214 if (this.propertyMap == null) {
215 setObjectMap(new HashMap<Object, Object>());
216 }
217 serializeAndSetValue();
218
219 this.version = table.update(key, value, version);
220 }
221
222 /**
223 * Remove an existing Object in DataStore.
224 *
225 * Fails if the Object with key does not exists.
226 *
227 * @throws ObjectDoesntExistException
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800228 * @throws WrongVersionException
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800229 */
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800230 public void delete() throws ObjectDoesntExistException,
231 WrongVersionException {
Yuta HIGUCHIc9ca4ac2014-01-31 19:48:31 -0800232 this.version = table.delete(key, this.version);
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800233 }
234
235 /**
236 * Multi-read RCObjects.
237 *
238 * If the blob value was read successfully, RCObject will deserialize them.
239 *
240 * @param objects
241 * RCObjects to read
242 * @return true if there exist an failed read.
243 */
244 public static boolean multiRead(Collection<RCObject> objects) {
245 boolean fail_exists = false;
246
247 ArrayList<RCObject> req = new ArrayList<>();
248 Iterator<RCObject> it = objects.iterator();
249 while (it.hasNext()) {
250
251 req.add(it.next());
252
253 if (req.size() >= RCClient.MAX_MULTI_READS) {
254 // dispatch multiRead
255 fail_exists |= multiReadInternal(req);
256 req.clear();
257 }
258 }
259
260 if (!req.isEmpty()) {
261 // dispatch multiRead
262 fail_exists |= multiReadInternal(req);
263 req.clear();
264 }
265
266 return fail_exists;
267 }
268
269 private static boolean multiReadInternal(ArrayList<RCObject> req) {
270 boolean fail_exists = false;
271 JRamCloud rcClient = RCClient.getClient();
272
273 final int reqs = req.size();
274 JRamCloud.multiReadObject mrObjs[] = new JRamCloud.multiReadObject[reqs];
275
276 // setup multi-read operation
277 for (int i = 0; i < reqs; ++i) {
278 RCObject obj = req.get(i);
279 mrObjs[i] = new JRamCloud.multiReadObject(obj.getTableId(),
280 obj.getKey());
281 }
282
283 // execute
284 JRamCloud.Object results[] = rcClient.multiRead(mrObjs);
285 assert (results.length <= req.size());
286
287 // reflect changes to RCObject
288 for (int i = 0; i < results.length; ++i) {
289 RCObject obj = req.get(i);
290 if (results[i] == null) {
291 log.error("MultiRead error, skipping {}, {}", obj.getTable(),
292 obj);
293 fail_exists = true;
294 continue;
295 }
296 assert (Arrays.equals(results[i].key, obj.getKey()));
297
298 obj.value = results[i].value;
299 obj.version = results[i].version;
300 if (obj.version == VERSION_NONEXISTENT) {
301 fail_exists = true;
302 } else {
303 obj.deserializeObjectFromValue();
304 }
305 }
306
307 return fail_exists;
308 }
309
Yuta HIGUCHI10eebea2014-02-03 10:41:41 -0800310 public static Iterable<RCObject> getAllObjects(
311 RCTable table) {
312 return new ObjectEnumerator(table);
313 }
314
315 public static class ObjectEnumerator implements
316 Iterable<RCObject> {
317
318 private RCTable table;
319
320 public ObjectEnumerator(RCTable table) {
321 this.table = table;
322 }
323
324 @Override
325 public Iterator<RCObject> iterator() {
326 return new ObjectIterator<RCObject>(table);
327 }
328
329 }
330
331 public static class ObjectIterator<E extends RCObject> implements
332 Iterator<E> {
333
334 protected TableEnumerator enumerator;
335
336 public ObjectIterator(RCTable table) {
337 // FIXME workaround for JRamCloud bug. It should have been declared
338 // as static class
339 JRamCloud c = RCClient.getClient();
340 this.enumerator = c.new TableEnumerator(table.getTableId());
341 }
342
343 @Override
344 public boolean hasNext() {
345 return enumerator.hasNext();
346 }
347
348 @Override
349 public E next() {
350 JRamCloud.Object o = enumerator.next();
351 RCObject obj = RCObject.createFromKey(o.key);
352 obj.setValueAndDeserialize(o.value, o.version);
353 return (E) obj;
354 }
355
356 @Deprecated
357 @Override
358 public void remove() {
359 // TODO Not implemented, as I cannot find a use-case for it.
360 throw new UnsupportedOperationException("Not implemented yet");
361 }
362
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -0800363 }
364
365}