blob: 5bea2c545c1f419a5189fe8bf35aee714b3adfa4 [file] [log] [blame]
Yuta HIGUCHI1ef85c42014-01-29 17:23:21 -08001package net.onrc.onos.datastore;
2
3import java.util.ArrayList;
4import java.util.Arrays;
5import java.util.Collection;
6import java.util.HashMap;
7import java.util.Iterator;
8import java.util.Map;
9
10import net.onrc.onos.datastore.RCTable.Entry;
11
12import org.slf4j.Logger;
13import org.slf4j.LoggerFactory;
14
15import com.esotericsoftware.kryo.Kryo;
16import com.esotericsoftware.kryo.io.Input;
17import com.esotericsoftware.kryo.io.Output;
18
19import edu.stanford.ramcloud.JRamCloud;
20import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
21import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
22import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
23
24/**
25 * Class to represent an Object represented as a single K-V pair Value blob.
26 *
27 */
28public class RCObject {
29 private static final Logger log = LoggerFactory.getLogger(RCObject.class);
30 /**
31 * Version number which represents that the object doesnot exist, or hase
32 * never read the DB before.
33 */
34 public static final long VERSION_NONEXISTENT = 0L;
35
36 // Each Object should prepare their own serializer, which has required
37 // objects registered.
38 private static final ThreadLocal<Kryo> defaultKryo = new ThreadLocal<Kryo>() {
39 @Override
40 protected Kryo initialValue() {
41 Kryo kryo = new Kryo();
42 // kryo.setRegistrationRequired(true);
43 // TODO TreeMap or just Map
44 // kryo.register(TreeMap.class);
45 kryo.setReferences(false);
46 return kryo;
47 }
48 };
49
50 private final RCTable table;
51 private final byte[] key;
52 private byte[] value;
53 private long version;
54
55 private Map<Object, Object> propertyMap;
56
57 public RCObject(RCTable table, byte[] key) {
58 this(table, key, null);
59 }
60
61 public RCObject(RCTable table, byte[] key, byte[] value) {
62 if (table == null) {
63 throw new IllegalArgumentException("table cannot be null");
64 }
65 if (key == null) {
66 throw new IllegalArgumentException("key cannot be null");
67 }
68 this.table = table;
69 this.key = key;
70 this.value = value;
71 this.version = VERSION_NONEXISTENT;
72 this.propertyMap = new HashMap<Object, Object>();
73
74 if (this.value != null) {
75 deserializeObjectFromValue();
76 }
77 }
78
79 public RCTable getTable() {
80 return table;
81 }
82
83 public long getTableId() {
84 return table.getTableId();
85 }
86
87 public byte[] getKey() {
88 return key;
89 }
90
91 public long getVersion() {
92 return version;
93 }
94
95 /**
96 * Return serialized Value.
97 *
98 * @note will not trigger serialization
99 * @return Will return null, if never been read, or was not serialized
100 */
101 public byte[] getSerializedValue() {
102 return value;
103 }
104
105 /**
106 * Return Object as a Map.
107 *
108 * @note Will not trigger deserialization
109 * @return Will return null, if never been set, or was not deserialized
110 */
111 protected Map<Object, Object> getObjectMap() {
112 return this.propertyMap;
113 }
114
115 protected Map<Object, Object> setObjectMap(Map<Object, Object> new_map) {
116 Map<Object, Object> old_map = this.propertyMap;
117 this.propertyMap = new_map;
118 return old_map;
119 }
120
121 public void serializeAndSetValue() {
122 serializeAndSetValue(defaultKryo.get(), this.propertyMap);
123 }
124
125 protected void serializeAndSetValue(Kryo kryo, Map<Object, Object> javaObject) {
126
127 // value
128 byte[] rcTemp = new byte[1024 * 1024];
129 Output output = new Output(rcTemp);
130 kryo.writeObject(output, javaObject);
131 this.value = output.toBytes();
132 }
133
134 /**
135 * Deserialize
136 *
137 * @return
138 */
139 public Map<Object, Object> deserializeObjectFromValue() {
140 return deserializeObjectFromValue(defaultKryo.get());
141 }
142
143 protected HashMap<Object, Object> deserializeObjectFromValue(Kryo kryo) {
144 return deserializeObjectFromValue(kryo, HashMap.class);
145 }
146
147 protected <T extends Map> T deserializeObjectFromValue(Kryo kryo, Class<T> type) {
148 if (this.value == null)
149 return null;
150
151 Input input = new Input(this.value);
152 T map = kryo.readObject(input, type);
153 this.propertyMap = map;
154
155 return map;
156 }
157
158 /**
159 * Create an Object in DataStore.
160 *
161 * Fails if the Object with same key already exists.
162 *
163 * @note create an Empty Object if no object has never been set.
164 * @throws ObjectExistsException
165 */
166 public void create() throws ObjectExistsException {
167
168 if (this.propertyMap == null) {
169 log.warn("No object map was set. Setting empty Map.");
170 setObjectMap(new HashMap<Object, Object>());
171 }
172 serializeAndSetValue();
173
174 this.version = table.create(key, value);
175 }
176
177 /**
178 * Read an Object from DataStore.
179 *
180 * Fails if the Object with the key does not exist.
181 *
182 * @throws ObjectDoesntExistException
183 *
184 */
185 public void read() throws ObjectDoesntExistException {
186 Entry e = table.read(key);
187 this.value = e.value;
188 this.version = e.version;
189
190 // TODO should we deserialize immediately?
191 deserializeObjectFromValue();
192 }
193
194 /**
195 * Update an existing Object in DataStore checking versions.
196 *
197 * Fails if the Object with key does not exists, or conditional failure.
198 *
199 * @throws WrongVersionException
200 * @throws ObjectDoesntExistException
201 */
202 public void update() throws ObjectDoesntExistException,
203 WrongVersionException {
204 if (this.propertyMap == null) {
205 setObjectMap(new HashMap<Object, Object>());
206 }
207 serializeAndSetValue();
208
209 this.version = table.update(key, value, version);
210 }
211
212 /**
213 * Remove an existing Object in DataStore.
214 *
215 * Fails if the Object with key does not exists.
216 *
217 * @throws ObjectDoesntExistException
218 */
219 public void delete() throws ObjectDoesntExistException {
220 this.version = table.delete(key);
221 }
222
223 /**
224 * Multi-read RCObjects.
225 *
226 * If the blob value was read successfully, RCObject will deserialize them.
227 *
228 * @param objects
229 * RCObjects to read
230 * @return true if there exist an failed read.
231 */
232 public static boolean multiRead(Collection<RCObject> objects) {
233 boolean fail_exists = false;
234
235 ArrayList<RCObject> req = new ArrayList<>();
236 Iterator<RCObject> it = objects.iterator();
237 while (it.hasNext()) {
238
239 req.add(it.next());
240
241 if (req.size() >= RCClient.MAX_MULTI_READS) {
242 // dispatch multiRead
243 fail_exists |= multiReadInternal(req);
244 req.clear();
245 }
246 }
247
248 if (!req.isEmpty()) {
249 // dispatch multiRead
250 fail_exists |= multiReadInternal(req);
251 req.clear();
252 }
253
254 return fail_exists;
255 }
256
257 private static boolean multiReadInternal(ArrayList<RCObject> req) {
258 boolean fail_exists = false;
259 JRamCloud rcClient = RCClient.getClient();
260
261 final int reqs = req.size();
262 JRamCloud.multiReadObject mrObjs[] = new JRamCloud.multiReadObject[reqs];
263
264 // setup multi-read operation
265 for (int i = 0; i < reqs; ++i) {
266 RCObject obj = req.get(i);
267 mrObjs[i] = new JRamCloud.multiReadObject(obj.getTableId(),
268 obj.getKey());
269 }
270
271 // execute
272 JRamCloud.Object results[] = rcClient.multiRead(mrObjs);
273 assert (results.length <= req.size());
274
275 // reflect changes to RCObject
276 for (int i = 0; i < results.length; ++i) {
277 RCObject obj = req.get(i);
278 if (results[i] == null) {
279 log.error("MultiRead error, skipping {}, {}", obj.getTable(),
280 obj);
281 fail_exists = true;
282 continue;
283 }
284 assert (Arrays.equals(results[i].key, obj.getKey()));
285
286 obj.value = results[i].value;
287 obj.version = results[i].version;
288 if (obj.version == VERSION_NONEXISTENT) {
289 fail_exists = true;
290 } else {
291 obj.deserializeObjectFromValue();
292 }
293 }
294
295 return fail_exists;
296 }
297
298 /**
299 * Get All of it's kind?
300 */
301 public static Collection<? extends RCObject> getAllObjects() {
302 // TODO implement
303 throw new UnsupportedOperationException("Not implemented yet");
304 //Collection<? extends RCObject> list = new ArrayList<>();
305 //return list;
306 }
307
308}