blob: 2aac0e387b38e02b179ccd7e7b023c8865a2d96b [file] [log] [blame]
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -07001package net.onrc.onos.datastore.utils;
2
3import java.util.ArrayList;
4import java.util.HashMap;
5import java.util.Iterator;
6import java.util.List;
7import java.util.Map;
8
9import net.onrc.onos.datastore.DataStoreClient;
10import net.onrc.onos.datastore.IKVClient;
11import net.onrc.onos.datastore.IKVTable;
12import net.onrc.onos.datastore.IKVTable.IKVEntry;
13import net.onrc.onos.datastore.IKVTableID;
14import net.onrc.onos.datastore.IMultiEntryOperation;
15import net.onrc.onos.datastore.IMultiObjectOperation;
16import net.onrc.onos.datastore.ObjectDoesntExistException;
17import net.onrc.onos.datastore.ObjectExistsException;
18import net.onrc.onos.datastore.WrongVersionException;
19import net.onrc.onos.datastore.internal.IModifiableMultiEntryOperation;
20
21import org.slf4j.Logger;
22import org.slf4j.LoggerFactory;
23
24import com.esotericsoftware.kryo.Kryo;
25import com.esotericsoftware.kryo.io.Input;
26import com.esotericsoftware.kryo.io.Output;
27
28/**
29 * Class to represent an Object represented as a single K-V pair Value blob.
30 *
31 */
32public class KVObject {
33 private static final Logger log = LoggerFactory.getLogger(KVObject.class);
34
35 // Default Kryo serializer.
36 // each sub-class should prepare their own serializer, which has required
37 // objects registered for better performance.
38 private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
39 @Override
40 protected Kryo initialValue() {
41 Kryo kryo = new Kryo();
42 // kryo.setRegistrationRequired(true);
43 // kryo.setReferences(false);
44 return kryo;
45 }
46 };
47
48 private final IKVTable table;
49 private final byte[] key;
50
51 /**
52 * serialized value version stored on data store or
53 * {@link IKVTable.VERSION_NONEXISTENT()} if is a new object.
54 */
55 private long version;
56
57 /**
58 * Map to store user-defined properties
59 */
60 private Map<Object, Object> propertyMap;
61
62 public KVObject(final IKVTable table, final byte[] key) {
63 this(table, key, null, table.VERSION_NONEXISTENT());
64 }
65
66 public KVObject(final IKVTable table, final byte[] key, final byte[] value, final long version) {
67 if (table == null) {
68 throw new IllegalArgumentException("table cannot be null");
69 }
70 if (key == null) {
71 throw new IllegalArgumentException("key cannot be null");
72 }
73 this.table = table;
74 this.key = key;
75 this.version = version;
76 this.propertyMap = new HashMap<Object, Object>();
77
78 if (value != null) {
79 deserialize(value);
80 }
81 }
82
83 protected static KVObject createFromKey(final byte[] key) {
84 // Equivalent of this method is expected to be implemented by SubClasses
85 throw new UnsupportedOperationException(
86 "createFromKey() is not expected to be called for RCObject");
87 }
88
89 public IKVTable getTable() {
90 return table;
91 }
92
93 public IKVTableID getTableId() {
94 return table.getTableId();
95 }
96
97 public byte[] getKey() {
98 return key;
99 }
100
101 public long getVersion() {
102 return version;
103 }
104
105 /**
106 * Return user-defined object properties.
107 *
108 * @note Will not trigger deserialization
109 * @return Will return null, if never been set, or was not deserialized
110 */
111 protected Map<Object, Object> getPropertyMap() {
112 return this.propertyMap;
113 }
114
115 protected Map<Object, Object> replacePropertyMap(final Map<Object, Object> newMap) {
116 Map<Object, Object> oldMap = this.propertyMap;
117 this.propertyMap = newMap;
118 return oldMap;
119 }
120
121 /**
122 * Serialize object.
123 *
124 * sub-classes should override this method to customize serialization.
125 *
126 * @return serialized byte array
127 */
128 public byte[] serialize() {
129 return serializePropertyMap(defaultKryo.get(), this.propertyMap);
130 }
131
132 protected byte[] serializePropertyMap(final Kryo kryo,
133 final Map<Object, Object> propMap) {
134
135
136 // value
137 byte[] rcTemp = new byte[1024 * 1024];
138 Output output = new Output(rcTemp);
139 kryo.writeObject(output, propMap);
140 return output.toBytes();
141 }
142
143
144 /**
145 * Deserialize using value and version stored in data store.
146 *
147 * @param bytes serialized bytes
148 * @param version version of this {@code bytes}
149 * @return true if success
150 */
151 public boolean deserialize(final byte[] bytes, final long version) {
152 this.version = version;
153 return deserialize(bytes);
154 }
155
156 /**
157 * Deserialize object.
158 *
159 * sub-classes should override this method to customize deserialization.
160 *
161 * @param bytes serialized byte array
162 * @return true if success
163 */
164 protected boolean deserialize(final byte[] bytes) {
165 deserializePropertyMap(defaultKryo.get(), bytes);
166 return true;
167 }
168
169 /**
170 * Deserialize and set {@link propertyMap}.
171 * @param kryo serializer to use
172 * @param bytes Kryo serialized Map object
173 * @return true if success
174 */
175 protected boolean deserializePropertyMap(final Kryo kryo, final byte[] bytes) {
176 @SuppressWarnings("unchecked")
177 Map<Object, Object> map = deserializePropertyMap(kryo, bytes, HashMap.class);
178 if (map == null) {
179 map = new HashMap<>();
180 }
181 this.propertyMap = map;
182 return true;
183 }
184
185 protected <T extends Map<?, ?>> T deserializePropertyMap(final Kryo kryo,
186 final byte[] bytes, final Class<T> type) {
187
188 if (bytes == null || bytes.length == 0) {
189 return null;
190 }
191
192 Input input = new Input(bytes);
193 T map = kryo.readObject(input, type);
194
195 return map;
196 }
197
198
199 /**
200 * Create an Object in DataStore.
201 *
202 * Fails if the Object with same key already exists.
203 *
204 * @throws ObjectExistsException
205 */
206 public void create() throws ObjectExistsException {
207
208 if (this.propertyMap == null) {
209 log.warn("No object map was set. Setting empty Map.");
210 replacePropertyMap(new HashMap<Object, Object>());
211 }
212
213 this.version = table.create(key, this.serialize());
214 }
215
216 public void forceCreate() {
217
218 if (this.propertyMap == null) {
219 log.warn("No object map was set. Setting empty Map.");
220 replacePropertyMap(new HashMap<Object, Object>());
221 }
222
223 this.version = table.forceCreate(key, this.serialize());
224 }
225
226 /**
227 * Read an Object from DataStore.
228 *
229 * Fails if the Object with the key does not exist.
230 *
231 * @throws ObjectDoesntExistException
232 *
233 */
234 public void read() throws ObjectDoesntExistException {
235 IKVEntry e = table.read(key);
236 deserialize(e.getValue(), e.getVersion());
237 }
238
239 /**
240 * Update an existing Object in DataStore checking versions.
241 *
242 * Fails if the Object with key does not exists, or conditional failure.
243 *
244 * @throws WrongVersionException
245 * @throws ObjectDoesntExistException
246 */
247 public void update() throws ObjectDoesntExistException,
248 WrongVersionException {
249 if (this.propertyMap == null) {
250 replacePropertyMap(new HashMap<Object, Object>());
251 }
252
253 this.version = table.update(key, this.serialize(), version);
254 }
255
256 /**
257 * Remove an existing Object in DataStore.
258 *
259 * Fails if the Object with key does not exists.
260 *
261 * @throws ObjectDoesntExistException
262 * @throws WrongVersionException
263 */
264 public void delete() throws ObjectDoesntExistException,
265 WrongVersionException {
266 this.version = table.delete(key, this.version);
267 }
268
269 public void forceDelete() {
270 this.version = table.forceDelete(key);
271 }
272
273 public WriteOp forceCreateOp(IKVClient client) {
274 return new WriteOp(client.forceCreateOp(getTableId(), getKey(), serialize()), this);
275 }
276
277 public WriteOp createOp(IKVClient client) {
278 return new WriteOp(client.createOp(getTableId(), getKey(), serialize()), this);
279 }
280
281 // this might not be needed?
282 public WriteOp readOp(IKVClient client) {
283 return new WriteOp(client.readOp(getTableId(), getKey()), this);
284 }
285
286 public WriteOp updateOp(IKVClient client) {
287 return new WriteOp(client.updateOp(getTableId(), getKey(), serialize(), getVersion()), this);
288 }
289
290 public WriteOp deleteOp(IKVClient client) {
291 return new WriteOp(client.deleteOp(getTableId(), getKey(), serialize(), getVersion()), this);
292 }
293
294 public WriteOp forceDeleteOp(IKVClient client) {
295 return new WriteOp(client.forceDeleteOp(getTableId(), getKey()), this);
296 }
297
298 /**
299 * Multi-read RCObjects.
300 *
301 * If the blob value was read successfully, RCObject will deserialize them.
302 *
303 * @param objects
304 * RCObjects to read
305 * @return true if there exist a failed read.
306 */
307 public static boolean multiRead(final List<? extends KVObject> objects) {
308
309 final IKVClient client = DataStoreClient.getClient();
310
311 final ArrayList<IMultiEntryOperation> readOps = new ArrayList<>(objects.size());
312 for (KVObject o : objects) {
313 readOps.add(o.readOp(client));
314 }
315
316 boolean failExists = client.multiRead(readOps);
317
318 for (int i = 0; i < readOps.size(); ++i) {
319 KVObject obj = objects.get(i);
320 IMultiEntryOperation entry = readOps.get(i);
321 if ( entry.hasSucceeded() ) {
322 if ( !obj.deserialize(entry.getValue(), entry.getVersion()) ) {
323 //deserialize return true on success
324 failExists = true;
325 log.error("MultiRead error, failed to deserialize {}, {}", obj.getTable(), obj);
326 }
327 } else {
328 log.error("MultiRead error, skipping {}, {}", obj.getTable(), obj);
329 obj.version = obj.getTable().VERSION_NONEXISTENT();
330 failExists = true;
331 }
332 }
333
334 return failExists;
335 }
336
337 /**
338 * TODO Extract common interface
339 */
340 public static class WriteOp implements IMultiObjectOperation, IModifiableMultiEntryOperation {
341
342 private final IModifiableMultiEntryOperation base;
343 private final KVObject obj;
344
345 public WriteOp(IMultiEntryOperation base, final KVObject obj) {
346 this.base = (IModifiableMultiEntryOperation) base;
347 this.obj = obj;
348
349 // switch (base.getOperation()) {
350 // case CREATE:
351 // case FORCE_CREATE:
352 // case UPDATE:
353 // break;
354 // default:
355 // throw new UnsupportedOperationException("Unexpected OPERATION:"+base.getOperation());
356 // }
357 }
358
359 @Override
360 public KVObject getObject() {
361 return obj;
362 }
363
364 @Deprecated
365 public OPERATION getOp() {
366 return this.getOperation();
367 }
368
369 @Override
370 public boolean hasSucceeded() {
371 return base.hasSucceeded();
372 }
373
374 @Override
375 public STATUS getStatus() {
376 return base.getStatus();
377 }
378
379 @Override
380 public IKVTableID getTableId() {
381 return base.getTableId();
382 }
383
384 @Override
385 public byte[] getKey() {
386 return base.getKey();
387 }
388
389 @Override
390 public byte[] getValue() {
391 return base.getValue();
392 }
393
394 @Override
395 public long getVersion() {
396 return base.getVersion();
397 }
398
399 @Override
400 public OPERATION getOperation() {
401 return base.getOperation();
402 }
403
404 @Override
405 public void setStatus(STATUS status) {
406 base.setStatus(status);
407 }
408
409 @Override
410 public void setValue(byte[] value, long version) {
411 base.setValue(value, version);
412 }
413
414 @Override
415 public void setVersion(long version) {
416 base.setVersion(version);
417 this.obj.version = version;
418 }
419
420 @Override
421 public IModifiableMultiEntryOperation getActualOperation() {
422 return base;
423 }
424 }
425
426 public static boolean multiWrite(final List<WriteOp> objects) {
427
428 final IKVClient client = DataStoreClient.getClient();
429
430 final ArrayList<IMultiEntryOperation> writeOps = new ArrayList<>(objects.size());
431 for (WriteOp o : objects) {
432 writeOps.add(o);
433 }
434
435 return client.multiWrite(writeOps);
436 }
437
438 public abstract static class AbstractObjectIterator<E extends KVObject> implements
439 Iterator<E> {
440
441 protected Iterator<IKVEntry> enumerator;
442
443 public AbstractObjectIterator(final IKVTable table) {
444 this.enumerator = table.getAllEntries().iterator();
445 }
446
447 @Override
448 public boolean hasNext() {
449 return enumerator.hasNext();
450 }
451
452 // Implement something similar to below to realize Iterator
453 // @Override
454 // public E next() {
455 // IKVTable.IKVEntry o = enumerator.next();
456 // E obj = E.createFromKey(o.getKey());
457 // obj.deserialize(o.getValue(), o.getVersion());
458 // return obj;
459 // }
460
461 @Deprecated
462 @Override
463 public void remove() {
464 // TODO Not implemented, as I cannot find a use-case for it.
465 throw new UnsupportedOperationException("Not implemented yet");
466 }
467
468 }
469
470}