blob: 0ba951d7fa60713af31433b10ef92feef1f6c811 [file] [log] [blame]
Yuta HIGUCHIa1e655a2014-01-23 17:43:11 -08001/* Copyright (c) 2013 Stanford University
2 *
3 * Permission to use, copy, modify, and distribute this software for any
4 * purpose with or without fee is hereby granted, provided that the above
5 * copyright notice and this permission notice appear in all copies.
6 *
7 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
8 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
10 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
14 */
15
yoshi28bac132014-01-22 11:00:17 -080016package com.tinkerpop.blueprints.impls.ramcloud;
17
18import java.io.Serializable;
19import java.io.UnsupportedEncodingException;
20import java.nio.ByteBuffer;
21import java.util.ArrayList;
22import java.util.Iterator;
23import java.util.List;
24import java.util.Map;
25import java.util.Set;
26import java.util.TreeMap;
27
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31import com.google.protobuf.InvalidProtocolBufferException;
32import com.tinkerpop.blueprints.CloseableIterable;
33import com.tinkerpop.blueprints.Element;
34import com.tinkerpop.blueprints.Index;
35import com.tinkerpop.blueprints.util.ExceptionFactory;
36import com.tinkerpop.blueprints.impls.ramcloud.PerfMon;
37import com.tinkerpop.blueprints.impls.ramcloud.RamCloudGraphProtos.IndexBlob;
38import com.tinkerpop.blueprints.impls.ramcloud.RamCloudGraphProtos.IndexBlob.Builder;
39
40import edu.stanford.ramcloud.JRamCloud;
41
42// FIXME Index instance should be representing an Index table, not a IndexTable K-V pair
43public class RamCloudIndex<T extends Element> implements Index<T>, Serializable {
44
45 private final static Logger log = LoggerFactory.getLogger(RamCloudGraph.class);
46 protected RamCloudGraph graph;
47 private long tableId;
48 private String indexName;
49 protected byte[] rcKey;
50 private Class<T> indexClass;
51 // FIXME this should not be defined here
52 private long indexVersion;
53
54// private static final ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
55// @Override
56// protected Kryo initialValue() {
57// Kryo kryo = new Kryo();
58// kryo.setRegistrationRequired(true);
59// kryo.register(Long.class);
60// kryo.register(String.class);
61// kryo.register(TreeMap.class);
62// kryo.register(ArrayList.class);
63// kryo.setReferences(false);
64// return kryo;
65// }
66// };
67
68
69 public RamCloudIndex(long tableId, String indexName, Object propValue, RamCloudGraph graph, Class<T> indexClass) {
70 this.tableId = tableId;
71 this.graph = graph;
72 this.rcKey = indexToRcKey(indexName, propValue);
73 this.indexName = indexName;
74 this.indexClass = indexClass;
75 }
76
77 public RamCloudIndex(long tableId, byte[] rcKey, RamCloudGraph graph, Class<T> indexClass) {
78 this.tableId = tableId;
79 this.graph = graph;
80 this.rcKey = rcKey;
81 this.indexName = rcKeyToIndexName(rcKey);
82 this.indexClass = indexClass;
83 }
84
85 public boolean exists() {
86 PerfMon pm = PerfMon.getInstance();
87
88 try {
89 JRamCloud.Object vertTableEntry;
90 JRamCloud vertTable = graph.getRcClient();
91
92 //vertTableEntry = graph.getRcClient().read(tableId, rcKey);
93 pm.indexread_start("RamCloudIndex exists()");
94 vertTableEntry = vertTable.read(tableId, rcKey);
95 pm.indexread_end("RamCloudIndex exists()");
96 indexVersion = vertTableEntry.version;
97 return true;
98 } catch (Exception e) {
99 pm.indexread_end("RamCloudIndex exists()");
100 log.debug("IndexTable entry for {} does not exists(): {}@{} [{}]", indexName, new String(rcKey), tableId, this);
101 return false;
102 }
103 }
104
105 public void create() {
106 if (!exists()) {
107 PerfMon pm = PerfMon.getInstance();
108 try {
109 JRamCloud rcClient = graph.getRcClient();
110 JRamCloud.RejectRules rules = rcClient.new RejectRules();
111 rules.setExists();
112
113 //graph.getRcClient().writeRule(tableId, rcKey, ByteBuffer.allocate(0).array(), rules);
114 pm.indexwrite_start("RamCloudIndex create()");
115 rcClient.writeRule(tableId, rcKey, ByteBuffer.allocate(0).array(), rules);
116 pm.indexwrite_end("RamCloudIndex create()");
117 } catch (Exception e) {
118 pm.indexwrite_end("RamCloudIndex create()");
119 log.info(toString() + ": Write create index list: ", e);
120 }
121 }
122 }
123
124 public static byte[] indexToRcKey(String key, Object propValue) {
125 try {
126 String s = key + "=" + propValue;
127 return ByteBuffer.allocate(s.getBytes().length).put(s.getBytes("UTF-8")).array();
128 } catch (UnsupportedEncodingException ex) {
129 log.error("indexToRcKey({}, {}) failed with exception {}", key, propValue, ex);
130 }
131 return null;
132 }
133
134 public static String rcKeyToIndexName(byte[] rcKey) {
135 try {
136 String s = new String(rcKey, "UTF-8");
137 return s.substring(0, s.indexOf('='));
138 } catch (UnsupportedEncodingException ex) {
139 log.error("rcKeyToIndexName({}) failed with exception {}", rcKey, ex);
140 }
141 return null;
142 }
143 public static String rcKeyToPropName(byte[] rcKey) {
144 try {
145 String s = new String(rcKey, "UTF-8");
146 return s.substring(s.indexOf('=')+1);
147 } catch (UnsupportedEncodingException ex) {
148 log.error("rcKeyToPropName({}) failed with exception {}", rcKey, ex);
149 }
150 return null;
151 }
152
153 @Override
154 public String getIndexName() {
155 return this.indexName;
156 }
157
158 @Override
159 public Class<T> getIndexClass() {
160 return this.indexClass;
161 }
162
163 @Override
164 public void put(String key, Object value, T element) {
165 getSetProperty(key, value, element.getId());
166 }
167
168 public void getSetProperty(String key, Object propValue, Object elmId) {
169 if (elmId == null) {
170 // FIXME Throw appropriate Exception
171 log.error("Element Id cannot be null");
172 return;
173 //throw ExceptionFactory.vertexIdCanNotBeNull();
174 //throw ExceptionFactory.edgeIdCanNotBeNull();
175 }
176
177 long startTime = 0;
178 if (graph.measureBPTimeProp == 1) {
179 startTime = System.nanoTime();
180 }
181
182 create();
183
184 // FIXME give more meaningful loop variable
185 for (int i = 0; i < 100; i++) {
186 Map<Object, List<Object>> map = readIndexPropertyMapFromDB();
187 List<Object> values = map.get(propValue);
188 if (values == null) {
189 values = new ArrayList<Object>();
190 map.put(propValue, values);
191 }
192 if (!values.contains(elmId)) {
193 values.add(elmId);
194 }
195
196 //Masa commented out the following measurement b/c Serialization delay is measured in onvertIndexPropertyMapToRcBytes(map)
197 //long serStartTime = System.nanoTime();
198 byte[] rcValue = convertIndexPropertyMapToRcBytes(map);
199 //if(RamCloudGraph.measureSerializeTimeProp == 1) {
200 // long serEndTime = System.nanoTime();
201 // log.error("Performance index kryo serialization [id={}] {} size {}", elmId, serEndTime - serStartTime, rcValue.length);
202 //}
203
204 if (rcValue.length != 0) {
205 if (writeWithRules(rcValue)) {
206 break;
207 } else {
208 log.debug("getSetProperty(String {}, Object {}) cond. write failure RETRYING {}", propValue, elmId, i+1);
209 if (i == 100) {
210 log.error("getSetProperty(String {}, Object {}) cond. write failure Gaveup RETRYING", propValue, elmId);
211 }
212 }
213 }
214 }
215
216 if (graph.measureBPTimeProp == 1) {
217 long endTime = System.nanoTime();
218 log.error("Performance index setProperty total time {}", endTime - startTime);
219 }
220 }
221
222 @Override
223 public CloseableIterable<T> get(String string, Object value) {
224 // FIXME Do we need this implemented
225 throw new RuntimeException("Not implemented yet");
226 //return getElmIdListForPropValue(value);
227 }
228
229 @Override
230 public CloseableIterable<T> query(String string, Object o) {
231 throw new UnsupportedOperationException("Not supported yet.");
232 }
233
234 @Override
235 public long count(String key, Object value) {
236 Map<Object, List<Object>> map = readIndexPropertyMapFromDB();
237 List<Object> values = map.get(value);
238 if (null == values) {
239 return 0;
240 } else {
241 return values.size();
242 }
243 }
244
245 @Override
246 public void remove(String propName, Object propValue, T element) {
247
248 if (propName == null) {
249 throw ExceptionFactory.propertyKeyCanNotBeNull();
250 }
251
252 if (propName.equals("")) {
253 throw ExceptionFactory.propertyKeyCanNotBeEmpty();
254 }
255
256 if (propName.equals("id")) {
257 throw ExceptionFactory.propertyKeyIdIsReserved();
258 }
259
260 if (!propName.equals(indexName)) {
261 log.error("Index name mismatch indexName:{}, remove({},{},...). SOMETHING IS WRONG", indexName, propName, propValue);
262 }
263
264 // FIXME better loop variable name
265 final int MAX_RETRYS = 100;
266 for (int i = 0; i < MAX_RETRYS; ++i) {
267 Map<Object, List<Object>> map = readIndexPropertyMapFromDB();
268
269 if (map.containsKey(propValue)) {
270 List<Object> idList = map.get(propValue);
271 if (null != idList) {
272 idList.remove(element.getId());
273 if (idList.isEmpty()) {
274 log.debug("remove({},{},...) called, and list became empty.", propName, propValue);
275 map.remove(propValue);
276 }
277 }
278 } else {
279 // propValue not found
280 log.warn("remove({},{},...) called on '{}' index table, but was not found on index. SOMETHING MAY BE WRONG", propName, propValue, this.indexName);
281 // no change to DB so exit now
282 return;
283 }
284 //long startTime = System.nanoTime();
285 //if(RamCloudGraph.measureSerializeTimeProp == 1) {
286 // pm.ser_start("SC");
287 //}
288 byte[] rcValue = convertIndexPropertyMapToRcBytes(map);
289 //if(RamCloudGraph.measureSerializeTimeProp == 1) {
290 // pm.ser_end("SC");
291 //long endTime = System.nanoTime();
292 //pm.ser_add(endTime - startTime);
293 //log.error("Performance index kryo serialization for removal key {} {} size {}", element, endTime - startTime, rcValue.length);
294 //}
295
296 if (rcValue.length == 0) {
297 return;
298 }
299
300 if (writeWithRules(rcValue)) {
301 break;
302 } else {
303 log.debug("remove({}, {}, T element) write failure RETRYING {}", propName, propValue, (i + 1));
304 if (i + 1 == MAX_RETRYS) {
305 log.error("remove({}, {}, T element) write failed completely. gave up RETRYING", propName, propValue);
306 }
307 }
308 }
309
310 }
311
312 public void removeElement(T element) {
313 removeElement(this.tableId, element, this.graph);
314 }
315
316 // FIXME this methods should not be defined here
317 public <T extends Element> void removeElement(long tableId, T element, RamCloudGraph graph) {
318 JRamCloud.TableEnumerator tableEnum = graph.getRcClient().new TableEnumerator(tableId);
319
320 while (tableEnum.hasNext()) {
321 JRamCloud.Object tableEntry = tableEnum.next();
322 Map<Object, List<Object>> indexValMap = convertRcBytesToIndexPropertyMap(tableEntry.value);
323
324 boolean madeChange = false;
325 Iterator<Map.Entry<Object, List<Object>>> indexValMapIt = indexValMap.entrySet().iterator();
326 while (indexValMapIt.hasNext()) {
327 Map.Entry<Object, List<Object>> entry = indexValMapIt.next();
328 List<Object> idList = entry.getValue();
329 madeChange |= idList.remove(element.getId());
330 if (idList.isEmpty()) {
331 madeChange = true;
332 indexValMapIt.remove();
333 }
334 }
335 if (madeChange == false) {
336 continue;
337 }
338
339 byte[] rcValue = convertIndexPropertyMapToRcBytes(indexValMap);
340 if (rcValue.length == 0) {
341 // nothing to write
342 continue;
343 }
344 if (writeWithRules(tableId, tableEntry.key, rcValue, tableEntry.version, graph)) {
345 // cond. write success
346 continue;
347 } else {
348 // cond. write failure
349 log.debug("removeElement({}, {}, ...) cond. key/value write failure RETRYING", tableId, element );
350 // FIXME Dirty hack
351 final int RETRY_MAX = 100;
352 for (int retry = RETRY_MAX; retry >= 0; --retry) {
353 RamCloudKeyIndex idx = new RamCloudKeyIndex(tableId, tableEntry.key, graph, element.getClass());
354 Map<Object, List<Object>> rereadMap = idx.readIndexPropertyMapFromDB();
355
356 boolean madeChangeOnRetry = false;
357 Iterator<Map.Entry<Object, List<Object>>> rereadIndexValMapIt = rereadMap.entrySet().iterator();
358 while (rereadIndexValMapIt.hasNext()) {
359 Map.Entry<Object, List<Object>> entry = rereadIndexValMapIt.next();
360 List<Object> idList = entry.getValue();
361 madeChangeOnRetry |= idList.remove(element.getId());
362 if (idList.isEmpty()) {
363 madeChangeOnRetry = true;
364 rereadIndexValMapIt.remove();
365 }
366 }
367 if (madeChangeOnRetry == false) {
368 log.debug("removeElement({}, {}, ...) no more write required. SOMETHING MAY BE WRONG", tableId, element);
369 break;
370 }
371
372 if (idx.writeWithRules(convertIndexPropertyMapToRcBytes(rereadMap))) {
373 log.debug("removeElement({}, {}, ...) cond. key/value {} write failure RETRYING {}", tableId, element, rereadMap, RETRY_MAX - retry);
374 // cond. re-write success
375 break;
376 }
377 if (retry == 0) {
378 log.error("removeElement({}, {}, ...) cond. write failed completely. Gave up RETRYING", tableId, element);
379 // XXX may be we should throw some kind of exception here?
380 }
381 }
382 }
383 }
384 }
385
386 public Map<Object, List<Object>> readIndexPropertyMapFromDB() {
387 //log.debug("getIndexPropertyMap() ");
388 JRamCloud.Object propTableEntry;
389 long startTime = 0;
390 PerfMon pm = PerfMon.getInstance();
391
392 try {
393 JRamCloud vertTable = graph.getRcClient();
394 if (graph.measureRcTimeProp == 1) {
395 startTime = System.nanoTime();
396 }
397 //propTableEntry = graph.getRcClient().read(tableId, rcKey);
398 pm.indexread_start("RamCloudIndex readIndexPropertyMapFromDB()");
399 propTableEntry = vertTable.read(tableId, rcKey);
400 pm.indexread_end("RamCloudIndex readIndexPropertyMapFromDB()");
401 if (graph.measureRcTimeProp == 1) {
402 long endTime = System.nanoTime();
403 log.error("Performance readIndexPropertyMapFromDB(indexName {}) read time {}", indexName, endTime - startTime);
404 }
405 indexVersion = propTableEntry.version;
406 } catch (Exception e) {
407 pm.indexread_end("RamCloudIndex readIndexPropertyMapFromDB()");
408 indexVersion = 0;
409 if (graph.measureRcTimeProp == 1) {
410 long endTime = System.nanoTime();
411 log.error("Performance readIndexPropertyMapFromDB(indexName {}) exception read time {}", indexName, endTime - startTime);
412 }
413 log.warn("readIndexPropertyMapFromDB() Element does not have a index property table entry! tableId :" + tableId + " indexName : " + indexName + " ", e);
414 return null;
415 }
416
417 return convertRcBytesToIndexPropertyMap(propTableEntry.value);
418 }
419
420 public Map<Object, List<Object>> convertRcBytesToIndexPropertyMap(byte[] byteArray) {
421 if (byteArray == null) {
422 log.error("Got a null byteArray argument");
423 return null;
424 } else if (byteArray.length != 0) {
425 PerfMon pm = PerfMon.getInstance();
426 long startTime = 0;
427 if(RamCloudGraph.measureSerializeTimeProp == 1) {
428 startTime = System.nanoTime();
429 }
430 pm.indexdeser_start("RamCloudIndex convertRcBytesToIndexPropertyMap()");
431 IndexBlob blob;
432 TreeMap<Object, List<Object>> map = new TreeMap<Object, List<Object>>();
433 try {
434 blob = IndexBlob.parseFrom(byteArray);
435 List const_list = blob.getVertexIdList();
436 ArrayList list = new ArrayList<>(const_list);
437// ByteBufferInput input = new ByteBufferInput(byteArray);
438// ArrayList list = kryo.get().readObject(input, ArrayList.class);
439 map.put(rcKeyToPropName(rcKey), list);
440 } catch (InvalidProtocolBufferException e) {
441 log.error("{" + toString() + "}: Read malformed edge list: ", e);
442 } finally {
443 pm.indexdeser_end("RamCloudIndex convertRcBytesToIndexPropertyMap()");
444 }
445 if(RamCloudGraph.measureSerializeTimeProp == 1) {
446 long endTime = System.nanoTime();
447 log.error("Performance index kryo deserialization [id=N/A] {} size {}", endTime - startTime, byteArray.length);
448 }
449 return map;
450 } else {
451 return new TreeMap<Object, List<Object>>();
452 }
453 }
454
455 public static byte[] convertIndexPropertyMapToRcBytes(Map<Object, List<Object>> map) {
456 PerfMon pm = PerfMon.getInstance();
457 long startTime = 0;
458 if(RamCloudGraph.measureSerializeTimeProp == 1) {
459 startTime = System.nanoTime();
460 }
461 byte[] bytes;
462
463 pm.indexser_start("RamCloudIndex convertIndexPropertyMapToRcBytes()");
464 Builder builder = IndexBlob.newBuilder();
465 if ( map.values().size() != 0 ) {
466 List<Long> vtxIds = (List)map.values().iterator().next();
467 builder.addAllVertexId(vtxIds);
468 }
469 IndexBlob blob = builder.build();
470 bytes = blob.toByteArray();
471// ByteBufferOutput output = new ByteBufferOutput(1024*1024);
472// if ( map.values().size() == 0 ) {
473// kryo.get().writeObject(output, new ArrayList<Object>());
474// } else {
475// kryo.get().writeObject(output, vtxIds);
476// }
477// bytes = output.toBytes();
478 pm.indexser_end("RamCloudIndex convertIndexPropertyMapToRcBytes()");
479 if(RamCloudGraph.measureSerializeTimeProp == 1) {
480 long endTime = System.nanoTime();
481 log.error("Performance index ProtoBuff serialization {}, size={}", endTime - startTime, bytes);
482 }
483 return bytes;
484 }
485
486 protected boolean writeWithRules(byte[] rcValue) {
487 return writeWithRules(this.tableId, this.rcKey, rcValue, this.indexVersion, this.graph);
488 }
489
490 private static boolean writeWithRules(long tableId, byte[] rcKey, byte[] rcValue, long expectedVersion, RamCloudGraph graph) {
491 JRamCloud.RejectRules rules = graph.getRcClient().new RejectRules();
492
493 if (expectedVersion == 0) {
494 rules.setExists();
495 } else {
496 rules.setNeVersion(expectedVersion);
497 }
498
499 PerfMon pm = PerfMon.getInstance();
500 try {
501 JRamCloud vertTable = graph.getRcClient();
502 pm.indexwrite_start("RamCloudIndex writeWithRules()");
503 vertTable.writeRule(tableId, rcKey, rcValue, rules);
504 pm.indexwrite_end("RamCloudIndex writeWithRules()");
505 } catch (Exception e) {
506 pm.indexwrite_end("RamCloudIndex writeWithRules()");
507 pm.indexwrite_condfail("RamCloudIndex writeWithRules()");
508 log.debug("Cond. Write index property: {} failed {} expected version: {}", rcKeyToIndexName(rcKey), e, expectedVersion);
509 return false;
510 }
511 return true;
512 }
513
514 public List<Object> getElmIdListForPropValue(Object propValue) {
515 Map<Object, List<Object>> map = readIndexPropertyMapFromDB();
516 if (map == null) {
517 log.debug("IndexPropertyMap was null. {} : {}", indexName, propValue);
518 return null;
519 }
520 return map.get(propValue);
521 }
522
523 public Set<Object> getIndexPropertyKeys() {
524 Map<Object, List<Object>> map = readIndexPropertyMapFromDB();
525 return map.keySet();
526 }
527
528 public <T> T removeIndexProperty(String key) {
529 for (int i = 0; i < 100; ++i) {
530 Map<Object, List<Object>> map = readIndexPropertyMapFromDB();
531 T retVal = (T) map.remove(key);
532 byte[] rcValue = convertIndexPropertyMapToRcBytes(map);
533 if (rcValue.length != 0) {
534 if (writeWithRules(rcValue)) {
535 return retVal;
536 } else {
537 log.debug("removeIndexProperty({}) cond. key/value write failure RETRYING {}", tableId, (i + 1));
538 }
539 }
540 }
541 log.error("removeIndexProperty({}) cond. key/value write failure gave up RETRYING", tableId);
542 // XXX ?Is this correct
543 return null;
544 }
545
546 public void removeIndex() {
547 log.info("Removing Index: {} was version {} [{}]", indexName, indexVersion, this);
548 graph.getRcClient().remove(tableId, rcKey);
549 }
550}