blob: ce4f61760b41a30539927f9ca98fd9e686e6a9ee [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datastore.utils;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -07002
3import java.util.ArrayList;
4import java.util.HashMap;
5import java.util.Iterator;
6import java.util.List;
7import java.util.Map;
8
Jonathan Hart6df90172014-04-03 10:13:11 -07009import net.onrc.onos.core.datastore.DataStoreClient;
10import net.onrc.onos.core.datastore.IKVClient;
11import net.onrc.onos.core.datastore.IKVTable;
Jonathan Harta99ec672014-04-03 11:30:34 -070012import net.onrc.onos.core.datastore.IKVTable.IKVEntry;
Jonathan Hart6df90172014-04-03 10:13:11 -070013import net.onrc.onos.core.datastore.IKVTableID;
14import net.onrc.onos.core.datastore.IMultiEntryOperation;
15import net.onrc.onos.core.datastore.IMultiObjectOperation;
16import net.onrc.onos.core.datastore.ObjectDoesntExistException;
17import net.onrc.onos.core.datastore.ObjectExistsException;
18import net.onrc.onos.core.datastore.WrongVersionException;
Jonathan Hart6df90172014-04-03 10:13:11 -070019import net.onrc.onos.core.datastore.internal.IModifiableMultiEntryOperation;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070020
21import org.slf4j.Logger;
22import org.slf4j.LoggerFactory;
23
24import com.esotericsoftware.kryo.Kryo;
25import com.esotericsoftware.kryo.io.Input;
26import com.esotericsoftware.kryo.io.Output;
27
28/**
29 * Class to represent an Object represented as a single K-V pair Value blob.
30 *
31 */
32public class KVObject {
33 private static final Logger log = LoggerFactory.getLogger(KVObject.class);
34
35 // Default Kryo serializer.
36 // each sub-class should prepare their own serializer, which has required
37 // objects registered for better performance.
38 private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070039 @Override
40 protected Kryo initialValue() {
41 Kryo kryo = new Kryo();
42 // kryo.setRegistrationRequired(true);
43 // kryo.setReferences(false);
44 return kryo;
45 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070046 };
47
48 private final IKVTable table;
49 private final byte[] key;
50
51 /**
52 * serialized value version stored on data store or
53 * {@link IKVTable.VERSION_NONEXISTENT()} if is a new object.
54 */
55 private long version;
56
57 /**
58 * Map to store user-defined properties
59 */
60 private Map<Object, Object> propertyMap;
61
62 public KVObject(final IKVTable table, final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070063 this(table, key, null, table.VERSION_NONEXISTENT());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070064 }
65
66 public KVObject(final IKVTable table, final byte[] key, final byte[] value, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070067 if (table == null) {
68 throw new IllegalArgumentException("table cannot be null");
69 }
70 if (key == null) {
71 throw new IllegalArgumentException("key cannot be null");
72 }
73 this.table = table;
74 this.key = key;
75 this.version = version;
76 this.propertyMap = new HashMap<Object, Object>();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070077
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070078 if (value != null) {
79 deserialize(value);
80 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070081 }
82
83 protected static KVObject createFromKey(final byte[] key) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070084 // Equivalent of this method is expected to be implemented by SubClasses
85 throw new UnsupportedOperationException(
86 "createFromKey() is not expected to be called for RCObject");
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070087 }
88
89 public IKVTable getTable() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070090 return table;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070091 }
92
93 public IKVTableID getTableId() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070094 return table.getTableId();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070095 }
96
97 public byte[] getKey() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -070098 return key;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -070099 }
100
101 public long getVersion() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700102 return version;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700103 }
104
105 /**
106 * Return user-defined object properties.
107 *
108 * @note Will not trigger deserialization
109 * @return Will return null, if never been set, or was not deserialized
110 */
111 protected Map<Object, Object> getPropertyMap() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700112 return this.propertyMap;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700113 }
114
115 protected Map<Object, Object> replacePropertyMap(final Map<Object, Object> newMap) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700116 Map<Object, Object> oldMap = this.propertyMap;
117 this.propertyMap = newMap;
118 return oldMap;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700119 }
120
121 /**
122 * Serialize object.
123 *
124 * sub-classes should override this method to customize serialization.
125 *
126 * @return serialized byte array
127 */
128 public byte[] serialize() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700129 return serializePropertyMap(defaultKryo.get(), this.propertyMap);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700130 }
131
132 protected byte[] serializePropertyMap(final Kryo kryo,
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700133 final Map<Object, Object> propMap) {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700134
135
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700136 // value
137 byte[] rcTemp = new byte[1024 * 1024];
138 Output output = new Output(rcTemp);
139 kryo.writeObject(output, propMap);
140 return output.toBytes();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700141 }
142
143
144 /**
145 * Deserialize using value and version stored in data store.
146 *
147 * @param bytes serialized bytes
148 * @param version version of this {@code bytes}
149 * @return true if success
150 */
151 public boolean deserialize(final byte[] bytes, final long version) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700152 this.version = version;
153 return deserialize(bytes);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700154 }
155
156 /**
157 * Deserialize object.
158 *
159 * sub-classes should override this method to customize deserialization.
160 *
161 * @param bytes serialized byte array
162 * @return true if success
163 */
164 protected boolean deserialize(final byte[] bytes) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700165 deserializePropertyMap(defaultKryo.get(), bytes);
166 return true;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700167 }
168
169 /**
170 * Deserialize and set {@link propertyMap}.
171 * @param kryo serializer to use
172 * @param bytes Kryo serialized Map object
173 * @return true if success
174 */
175 protected boolean deserializePropertyMap(final Kryo kryo, final byte[] bytes) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700176 @SuppressWarnings("unchecked")
177 Map<Object, Object> map = deserializePropertyMap(kryo, bytes, HashMap.class);
178 if (map == null) {
179 map = new HashMap<>();
180 }
181 this.propertyMap = map;
182 return true;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700183 }
184
185 protected <T extends Map<?, ?>> T deserializePropertyMap(final Kryo kryo,
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700186 final byte[] bytes, final Class<T> type) {
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700187
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700188 if (bytes == null || bytes.length == 0) {
189 return null;
190 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700191
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700192 Input input = new Input(bytes);
193 T map = kryo.readObject(input, type);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700194
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700195 return map;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700196 }
197
198
199 /**
200 * Create an Object in DataStore.
201 *
202 * Fails if the Object with same key already exists.
203 *
204 * @throws ObjectExistsException
205 */
206 public void create() throws ObjectExistsException {
207
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700208 if (this.propertyMap == null) {
209 log.warn("No object map was set. Setting empty Map.");
210 replacePropertyMap(new HashMap<Object, Object>());
211 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700212
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700213 this.version = table.create(key, this.serialize());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700214 }
215
216 public void forceCreate() {
217
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700218 if (this.propertyMap == null) {
219 log.warn("No object map was set. Setting empty Map.");
220 replacePropertyMap(new HashMap<Object, Object>());
221 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700222
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700223 this.version = table.forceCreate(key, this.serialize());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700224 }
225
226 /**
227 * Read an Object from DataStore.
228 *
229 * Fails if the Object with the key does not exist.
230 *
231 * @throws ObjectDoesntExistException
232 *
233 */
234 public void read() throws ObjectDoesntExistException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700235 IKVEntry e = table.read(key);
236 deserialize(e.getValue(), e.getVersion());
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700237 }
238
239 /**
240 * Update an existing Object in DataStore checking versions.
241 *
242 * Fails if the Object with key does not exists, or conditional failure.
243 *
244 * @throws WrongVersionException
245 * @throws ObjectDoesntExistException
246 */
247 public void update() throws ObjectDoesntExistException,
248 WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700249 if (this.propertyMap == null) {
250 replacePropertyMap(new HashMap<Object, Object>());
251 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700252
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700253 this.version = table.update(key, this.serialize(), version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700254 }
255
256 /**
257 * Remove an existing Object in DataStore.
258 *
259 * Fails if the Object with key does not exists.
260 *
261 * @throws ObjectDoesntExistException
262 * @throws WrongVersionException
263 */
264 public void delete() throws ObjectDoesntExistException,
265 WrongVersionException {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700266 this.version = table.delete(key, this.version);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700267 }
268
269 public void forceDelete() {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700270 this.version = table.forceDelete(key);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700271 }
272
273 public WriteOp forceCreateOp(IKVClient client) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700274 return new WriteOp(client.forceCreateOp(getTableId(), getKey(), serialize()), this);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700275 }
276
277 public WriteOp createOp(IKVClient client) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700278 return new WriteOp(client.createOp(getTableId(), getKey(), serialize()), this);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700279 }
280
281 // this might not be needed?
282 public WriteOp readOp(IKVClient client) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700283 return new WriteOp(client.readOp(getTableId(), getKey()), this);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700284 }
285
286 public WriteOp updateOp(IKVClient client) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700287 return new WriteOp(client.updateOp(getTableId(), getKey(), serialize(), getVersion()), this);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700288 }
289
290 public WriteOp deleteOp(IKVClient client) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700291 return new WriteOp(client.deleteOp(getTableId(), getKey(), serialize(), getVersion()), this);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700292 }
293
294 public WriteOp forceDeleteOp(IKVClient client) {
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700295 return new WriteOp(client.forceDeleteOp(getTableId(), getKey()), this);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700296 }
297
298 /**
299 * Multi-read RCObjects.
300 *
301 * If the blob value was read successfully, RCObject will deserialize them.
302 *
303 * @param objects
304 * RCObjects to read
305 * @return true if there exist a failed read.
306 */
307 public static boolean multiRead(final List<? extends KVObject> objects) {
308
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700309 final IKVClient client = DataStoreClient.getClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700310
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700311 final ArrayList<IMultiEntryOperation> readOps = new ArrayList<>(objects.size());
312 for (KVObject o : objects) {
313 readOps.add(o.readOp(client));
314 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700315
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700316 boolean failExists = client.multiRead(readOps);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700317
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700318 for (int i = 0; i < readOps.size(); ++i) {
319 KVObject obj = objects.get(i);
320 IMultiEntryOperation entry = readOps.get(i);
321 if ( entry.hasSucceeded() ) {
322 if ( !obj.deserialize(entry.getValue(), entry.getVersion()) ) {
323 //deserialize return true on success
324 failExists = true;
325 log.error("MultiRead error, failed to deserialize {}, {}", obj.getTable(), obj);
326 }
327 } else {
328 log.error("MultiRead error, skipping {}, {}", obj.getTable(), obj);
329 obj.version = obj.getTable().VERSION_NONEXISTENT();
330 failExists = true;
331 }
332 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700333
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700334 return failExists;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700335 }
336
337 /**
338 * TODO Extract common interface
339 */
340 public static class WriteOp implements IMultiObjectOperation, IModifiableMultiEntryOperation {
341
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700342 private final IModifiableMultiEntryOperation base;
343 private final KVObject obj;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700344
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700345 public WriteOp(IMultiEntryOperation base, final KVObject obj) {
346 this.base = (IModifiableMultiEntryOperation) base;
347 this.obj = obj;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700348
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700349 // switch (base.getOperation()) {
350 // case CREATE:
351 // case FORCE_CREATE:
352 // case UPDATE:
353 // break;
354 // default:
355 // throw new UnsupportedOperationException("Unexpected OPERATION:"+base.getOperation());
356 // }
357 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700358
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700359 @Override
360 public KVObject getObject() {
361 return obj;
362 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700363
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700364 @Deprecated
365 public OPERATION getOp() {
366 return this.getOperation();
367 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700368
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700369 @Override
370 public boolean hasSucceeded() {
371 return base.hasSucceeded();
372 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700373
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700374 @Override
375 public STATUS getStatus() {
376 return base.getStatus();
377 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700378
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700379 @Override
380 public IKVTableID getTableId() {
381 return base.getTableId();
382 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700383
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700384 @Override
385 public byte[] getKey() {
386 return base.getKey();
387 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700388
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700389 @Override
390 public byte[] getValue() {
391 return base.getValue();
392 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700393
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700394 @Override
395 public long getVersion() {
396 return base.getVersion();
397 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700398
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700399 @Override
400 public OPERATION getOperation() {
401 return base.getOperation();
402 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700403
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700404 @Override
405 public void setStatus(STATUS status) {
406 base.setStatus(status);
407 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700408
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700409 @Override
410 public void setValue(byte[] value, long version) {
411 base.setValue(value, version);
412 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700413
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700414 @Override
415 public void setVersion(long version) {
416 base.setVersion(version);
417 this.obj.version = version;
418 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700419
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700420 @Override
421 public IModifiableMultiEntryOperation getActualOperation() {
422 return base;
423 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700424 }
425
426 public static boolean multiWrite(final List<WriteOp> objects) {
427
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700428 final IKVClient client = DataStoreClient.getClient();
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700429
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700430 final ArrayList<IMultiEntryOperation> writeOps = new ArrayList<>(objects.size());
431 for (WriteOp o : objects) {
432 writeOps.add(o);
433 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700434
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700435 return client.multiWrite(writeOps);
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700436 }
437
438 public abstract static class AbstractObjectIterator<E extends KVObject> implements
439 Iterator<E> {
440
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700441 protected Iterator<IKVEntry> enumerator;
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700442
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700443 public AbstractObjectIterator(final IKVTable table) {
444 this.enumerator = table.getAllEntries().iterator();
445 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700446
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700447 @Override
448 public boolean hasNext() {
449 return enumerator.hasNext();
450 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700451
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700452 // Implement something similar to below to realize Iterator
453 // @Override
454 // public E next() {
455 // IKVTable.IKVEntry o = enumerator.next();
456 // E obj = E.createFromKey(o.getKey());
457 // obj.deserialize(o.getValue(), o.getVersion());
458 // return obj;
459 // }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700460
Yuta HIGUCHI826b4a42014-03-24 13:10:33 -0700461 @Deprecated
462 @Override
463 public void remove() {
464 // TODO Not implemented, as I cannot find a use-case for it.
465 throw new UnsupportedOperationException("Not implemented yet");
466 }
Yuta HIGUCHI66ca1bf2014-03-12 18:34:09 -0700467
468 }
469
470}