blob: 9871aaa7c312c870e05a1bcb4f5bb954ef8e955f [file] [log] [blame]
Yuta HIGUCHIa1e655a2014-01-23 17:43:11 -08001/* Copyright (c) 2013 Stanford University
2 *
3 * Permission to use, copy, modify, and distribute this software for any
4 * purpose with or without fee is hereby granted, provided that the above
5 * copyright notice and this permission notice appear in all copies.
6 *
7 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
8 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
10 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
14 */
15
yoshi28bac132014-01-22 11:00:17 -080016package com.tinkerpop.blueprints.impls.ramcloud;
17
18import java.io.ByteArrayInputStream;
19import java.io.ByteArrayOutputStream;
20import java.io.IOException;
21import java.io.ObjectInputStream;
22import java.io.ObjectOutputStream;
23import java.io.Serializable;
24import java.nio.BufferUnderflowException;
25import java.nio.ByteBuffer;
26import java.nio.ByteOrder;
27import java.util.ArrayList;
28import java.util.Arrays;
29import java.util.Collections;
30import java.util.HashMap;
31import java.util.HashSet;
32import java.util.LinkedList;
33import java.util.List;
34import java.util.Map;
35import java.util.NoSuchElementException;
36import java.util.Set;
37import java.util.concurrent.atomic.AtomicLong;
38
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
42import com.sun.jersey.core.util.Base64;
43import com.tinkerpop.blueprints.Direction;
44import com.tinkerpop.blueprints.Edge;
45import com.tinkerpop.blueprints.Element;
46import com.tinkerpop.blueprints.Features;
47import com.tinkerpop.blueprints.GraphQuery;
48import com.tinkerpop.blueprints.Index;
49import com.tinkerpop.blueprints.IndexableGraph;
50import com.tinkerpop.blueprints.KeyIndexableGraph;
51import com.tinkerpop.blueprints.Parameter;
52import com.tinkerpop.blueprints.TransactionalGraph;
53import com.tinkerpop.blueprints.Vertex;
54import com.tinkerpop.blueprints.util.DefaultGraphQuery;
55import com.tinkerpop.blueprints.util.ExceptionFactory;
56import com.tinkerpop.blueprints.impls.ramcloud.PerfMon;
57
58import edu.stanford.ramcloud.JRamCloud;
59import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
60
61public class RamCloudGraph implements IndexableGraph, KeyIndexableGraph, TransactionalGraph, Serializable {
62 private final static Logger log = LoggerFactory.getLogger(RamCloudGraph.class);
63
64 private static final ThreadLocal<JRamCloud> RamCloudThreadLocal = new ThreadLocal<JRamCloud>();
65
66 protected long vertTableId; //(vertex_id) --> ( (n,d,ll,l), (n,d,ll,l), ... )
67 protected long vertPropTableId; //(vertex_id) -> ( (kl,k,vl,v), (kl,k,vl,v), ... )
68 protected long edgePropTableId; //(edge_id) -> ( (kl,k,vl,v), (kl,k,vl,v), ... )
69 protected long idxVertTableId;
70 protected long idxEdgeTableId;
71 protected long kidxVertTableId;
72 protected long kidxEdgeTableId;
73 protected long instanceTableId;
74 private String VERT_TABLE_NAME = "verts";
75 private String EDGE_PROP_TABLE_NAME = "edge_props";
76 private String VERT_PROP_TABLE_NAME = "vert_props";
77 private String IDX_VERT_TABLE_NAME = "idx_vert";
78 private String IDX_EDGE_TABLE_NAME = "idx_edge";
79 private String KIDX_VERT_TABLE_NAME = "kidx_vert";
80 private String KIDX_EDGE_TABLE_NAME = "kidx_edge";
81 private final String INSTANCE_TABLE_NAME = "instance";
82 private long instanceId;
83 private AtomicLong nextVertexId;
84 private final long INSTANCE_ID_RANGE = 100000;
85 private String coordinatorLocation;
86 private static final Features FEATURES = new Features();
Yoshi Muroi815c7f92014-01-30 18:06:16 -080087 // FIXME better loop variable name
88 public final int CONDITIONALWRITE_RETRY_MAX = 100;
yoshi28bac132014-01-22 11:00:17 -080089 public final long measureBPTimeProp = Long.valueOf(System.getProperty("benchmark.measureBP", "0"));
90 public final long measureRcTimeProp = Long.valueOf(System.getProperty("benchmark.measureRc", "0"));
91 public static final long measureSerializeTimeProp = Long.valueOf(System.getProperty("benchmark.measureSerializeTimeProp", "0"));
92
93
94 public final Set<String> indexedKeys = new HashSet<String>();
95
96 static {
97 FEATURES.supportsSerializableObjectProperty = true;
98 FEATURES.supportsBooleanProperty = true;
99 FEATURES.supportsDoubleProperty = true;
100 FEATURES.supportsFloatProperty = true;
101 FEATURES.supportsIntegerProperty = true;
102 FEATURES.supportsPrimitiveArrayProperty = true;
103 FEATURES.supportsUniformListProperty = true;
104 FEATURES.supportsMixedListProperty = true;
105 FEATURES.supportsLongProperty = true;
106 FEATURES.supportsMapProperty = true;
107 FEATURES.supportsStringProperty = true;
108
109 FEATURES.supportsDuplicateEdges = false;
110 FEATURES.supportsSelfLoops = false;
111 FEATURES.isPersistent = false;
112 FEATURES.isWrapper = false;
113 FEATURES.supportsVertexIteration = true;
114 FEATURES.supportsEdgeIteration = true;
115 FEATURES.supportsVertexIndex = true;
116 FEATURES.supportsEdgeIndex = false;
117 FEATURES.ignoresSuppliedIds = true;
118 FEATURES.supportsTransactions = false;
119 FEATURES.supportsIndices = false;
120 FEATURES.supportsKeyIndices = true;
121 FEATURES.supportsVertexKeyIndex = true;
122 FEATURES.supportsEdgeKeyIndex = false;
123 FEATURES.supportsEdgeRetrieval = true;
124 FEATURES.supportsVertexProperties = true;
125 FEATURES.supportsEdgeProperties = true;
126 FEATURES.supportsThreadedTransactions = false;
127 }
128
129 static {
130 System.loadLibrary("edu_stanford_ramcloud_JRamCloud");
131 }
132
133 private RamCloudGraph() {
134 }
135
136
137 public RamCloudGraph(String coordinatorLocation) {
138 this.coordinatorLocation = coordinatorLocation;
139
140 JRamCloud rcClient = getRcClient();
141
142 vertTableId = rcClient.createTable(VERT_TABLE_NAME);
143 vertPropTableId = rcClient.createTable(VERT_PROP_TABLE_NAME);
144 edgePropTableId = rcClient.createTable(EDGE_PROP_TABLE_NAME);
145 idxVertTableId = rcClient.createTable(IDX_VERT_TABLE_NAME);
146 idxEdgeTableId = rcClient.createTable(IDX_EDGE_TABLE_NAME);
147 kidxVertTableId = rcClient.createTable(KIDX_VERT_TABLE_NAME);
148 kidxEdgeTableId = rcClient.createTable(KIDX_EDGE_TABLE_NAME);
149 instanceTableId = rcClient.createTable(INSTANCE_TABLE_NAME);
150
151 log.info( "Connected to coordinator at {}", coordinatorLocation);
152 log.debug("VERT_TABLE:{}, VERT_PROP_TABLE:{}, EDGE_PROP_TABLE:{}, IDX_VERT_TABLE:{}, IDX_EDGE_TABLE:{}, KIDX_VERT_TABLE:{}, KIDX_EDGE_TABLE:{}", vertTableId, vertPropTableId, edgePropTableId, idxVertTableId, idxEdgeTableId, kidxVertTableId, kidxEdgeTableId);
153 nextVertexId = new AtomicLong(-1);
154 initInstance();
155 }
156
157 public JRamCloud getRcClient() {
158 JRamCloud rcClient = RamCloudThreadLocal.get();
159 if (rcClient == null) {
160 rcClient = new JRamCloud(coordinatorLocation);
161 RamCloudThreadLocal.set(rcClient);
162 }
163 return rcClient;
164 }
165
166 @Override
167 public Features getFeatures() {
168 return FEATURES;
169 }
170
171 private Long parseVertexId(Object id) {
172 Long longId;
173 if (id == null) {
174 longId = nextVertexId.incrementAndGet();
175 } else if (id instanceof Integer) {
176 longId = ((Integer) id).longValue();
177 } else if (id instanceof Long) {
178 longId = (Long) id;
179 } else if (id instanceof String) {
180 try {
181 longId = Long.parseLong((String) id, 10);
182 } catch (NumberFormatException e) {
183 log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
184 return null;
185 }
186 } else if (id instanceof byte[]) {
187 try {
188 longId = ByteBuffer.wrap((byte[]) id).getLong();
189 } catch (BufferUnderflowException e) {
190 log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
191 return null;
192 }
193 } else {
194 log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass());
195 return null;
196 }
197 return longId;
198 }
199
200 @Override
201 public Vertex addVertex(Object id) {
202 long startTime = 0;
203 long Tstamp1 = 0;
204 long Tstamp2 = 0;
205
206 if (measureBPTimeProp == 1) {
207 startTime = System.nanoTime();
208 }
209 Long longId = parseVertexId(id);
210 if (longId == null)
211 return null;
212 if (measureBPTimeProp == 1) {
213 Tstamp1 = System.nanoTime();
214 }
215 RamCloudVertex newVertex = new RamCloudVertex(longId, this);
216 if (measureBPTimeProp == 1) {
217 Tstamp2 = System.nanoTime();
218 log.error("Performance addVertex [id={}] : Calling create at {}", longId, Tstamp2);
219 }
220
221 try {
222 newVertex.create();
223 if (measureBPTimeProp == 1) {
224 long endTime = System.nanoTime();
225 log.error("Performance addVertex [id={}] : genid {} newVerex {} create {} total time {}", longId, Tstamp1 - startTime, Tstamp2 - Tstamp1, endTime - Tstamp2, endTime - startTime);
226 }
227 log.info("Added vertex: [id={}]", longId);
228 return newVertex;
229 } catch (IllegalArgumentException e) {
230 log.error("Tried to create vertex failed {" + newVertex + "}", e);
231 return null;
232 }
233 }
234
235 public List<RamCloudVertex> addVertices(Iterable<Object> ids) {
236 log.info("addVertices start");
237 List<RamCloudVertex> vertices = new LinkedList<RamCloudVertex>();
238
239 for (Object id: ids) {
240 Long longId = parseVertexId(id);
241 if (longId == null)
242 return null;
243 RamCloudVertex v = new RamCloudVertex(longId, this);
244 if (v.exists()) {
245 log.error("ramcloud vertex id: {} already exists", v.getId());
246 throw ExceptionFactory.vertexWithIdAlreadyExists(v.getId());
247 }
248 vertices.add(v);
249 }
250 MultiWriteObject multiWriteObjects[] = new MultiWriteObject[vertices.size() * 2];
251 for (int i=0; i < vertices.size(); i++) {
252 RamCloudVertex v = vertices.get(i);
253 multiWriteObjects[i*2] = new MultiWriteObject(vertTableId, v.rcKey, ByteBuffer.allocate(0).array(), null);
254 multiWriteObjects[i*2+1] = new MultiWriteObject(vertPropTableId, v.rcKey, ByteBuffer.allocate(0).array(), null);
255 }
256 try {
257 PerfMon pm = PerfMon.getInstance();
258 pm.multiwrite_start("RamCloudVertex create()");
259 getRcClient().multiWrite(multiWriteObjects);
260 pm.multiwrite_end("RamCloudVertex create()");
261 log.info("ramcloud vertices are created");
262 } catch (Exception e) {
263 log.error("Tried to create vertices failed {}", e);
264 return null;
265 }
266 log.info("addVertices end (success)");
267 return vertices;
268 }
269
270 private final void initInstance() {
271 //long incrementValue = 1;
272 JRamCloud.Object instanceEntry = null;
273 JRamCloud rcClient = getRcClient();
274 try {
275 instanceEntry = rcClient.read(instanceTableId, "nextInstanceId".getBytes());
276 } catch (Exception e) {
277 if (e instanceof JRamCloud.ObjectDoesntExistException) {
278 instanceId = 0;
279 rcClient.write(instanceTableId, "nextInstanceId".getBytes(), ByteBuffer.allocate(0).array());
280 }
281 }
282 if (instanceEntry != null) {
283 long curInstanceId = 1;
Yoshi Muroi815c7f92014-01-30 18:06:16 -0800284 for (int i = 0 ; i < CONDITIONALWRITE_RETRY_MAX ; i++) {
yoshi28bac132014-01-22 11:00:17 -0800285 Map<String, Long> propMap = null;
286 if (instanceEntry.value == null) {
287 log.warn("Got a null byteArray argument");
288 return;
289 } else if (instanceEntry.value.length != 0) {
290 try {
291 ByteArrayInputStream bais = new ByteArrayInputStream(instanceEntry.value);
292 ObjectInputStream ois = new ObjectInputStream(bais);
293 propMap = (Map<String, Long>) ois.readObject();
294 } catch (IOException e) {
295 log.error("Got an exception while deserializing element's property map: ", e);
296 return;
297 } catch (ClassNotFoundException e) {
298 log.error("Got an exception while deserializing element's property map: ", e);
299 return;
300 }
301 } else {
302 propMap = new HashMap<String, Long>();
303 }
304
305 if (propMap.containsKey(INSTANCE_TABLE_NAME)) {
306 curInstanceId = propMap.get(INSTANCE_TABLE_NAME) + 1;
307 }
308
309 propMap.put(INSTANCE_TABLE_NAME, curInstanceId);
310
311 byte[] rcValue = null;
312 try {
313 ByteArrayOutputStream baos = new ByteArrayOutputStream();
314 ObjectOutputStream oot = new ObjectOutputStream(baos);
315 oot.writeObject(propMap);
316 rcValue = baos.toByteArray();
317 } catch (IOException e) {
318 log.error("Got an exception while serializing element's property map", e);
319 return;
320 }
321 JRamCloud.RejectRules rules = rcClient.new RejectRules();
322 rules.setNeVersion(instanceEntry.version);
323 try {
324 rcClient.writeRule(instanceTableId, "nextInstanceId".getBytes(), rcValue, rules);
325 instanceId = curInstanceId;
326 break;
327 } catch (Exception ex) {
328 log.debug("Cond. Write increment Vertex property: ", ex);
329 instanceEntry = rcClient.read(instanceTableId, "nextInstanceId".getBytes());
330 continue;
331 }
332 }
333 }
334
335 nextVertexId.compareAndSet(-1, instanceId * INSTANCE_ID_RANGE);
336 }
337
338 @Override
339 public Vertex getVertex(Object id) throws IllegalArgumentException {
340 Long longId;
341
342 if (id == null) {
343 throw ExceptionFactory.vertexIdCanNotBeNull();
344 } else if (id instanceof Integer) {
345 longId = ((Integer) id).longValue();
346 } else if (id instanceof Long) {
347 longId = (Long) id;
348 } else if (id instanceof String) {
349 try {
350 longId = Long.parseLong((String) id, 10);
351 } catch (NumberFormatException e) {
352 log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
353 return null;
354 }
355 } else if (id instanceof byte[]) {
356 try {
357 longId = ByteBuffer.wrap((byte[]) id).getLong();
358 } catch (BufferUnderflowException e) {
359 log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
360 return null;
361 }
362 } else {
363 log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass());
364 return null;
365 }
366
367 RamCloudVertex vertex = new RamCloudVertex(longId, this);
368
369 if (vertex.exists()) {
370 return vertex;
371 } else {
372 return null;
373 }
374 }
375
376 @Override
377 public void removeVertex(Vertex vertex) {
378 ((RamCloudVertex) vertex).remove();
379 }
380
381 @Override
382 public Iterable<Vertex> getVertices() {
383 JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(vertPropTableId);
384 List<Vertex> vertices = new LinkedList<Vertex>();
385
386 while (tableEnum.hasNext()) {
387 vertices.add(new RamCloudVertex(tableEnum.next().key, this));
388 }
389
390 return vertices;
391 }
392
393 @Override
394 public Iterable<Vertex> getVertices(String key, Object value) {
395 long startTime = 0;
396 long Tstamp1 = 0;
397 long Tstamp2 = 0;
398 long Tstamp3 = 0;
399 if (measureBPTimeProp == 1) {
400 startTime = System.nanoTime();
401 log.error("Performance getVertices(key {}) start at {}", key, startTime);
402 }
403
404 List<Vertex> vertices = new ArrayList<Vertex>();
405 List<Object> vertexList = null;
406
407 JRamCloud vertTable = getRcClient();
408 if (measureBPTimeProp == 1) {
409 Tstamp1 = System.nanoTime();
410 log.error("Performance getVertices(key {}) Calling indexedKeys.contains(key) at {}", key, Tstamp1);
411 }
412
413
414 if (indexedKeys.contains(key)) {
415 PerfMon pm = PerfMon.getInstance();
416 if (measureBPTimeProp == 1) {
417 Tstamp2 = System.nanoTime();
418 log.error("Performance getVertices(key {}) Calling new RamCloudKeyIndex at {}", key, Tstamp2);
419 }
420 RamCloudKeyIndex KeyIndex = new RamCloudKeyIndex(kidxVertTableId, key, value, this, Vertex.class);
421 if (measureBPTimeProp == 1) {
422 Tstamp3 = System.nanoTime();
423 log.error("Performance getVertices(key {}) Calling KeyIndex.GetElmIdListForPropValue at {}", key, Tstamp3);
424 }
425 vertexList = KeyIndex.getElmIdListForPropValue(value.toString());
426 if (vertexList == null) {
427 if (measureBPTimeProp == 1) {
428 long endTime = System.nanoTime();
429 log.error("Performance getVertices(key {}) does not exists : getRcClient {} indexedKeys.contains(key) {} new_RamCloudKeyIndex {} KeyIndex.get..Value {} total {} diff {}", key, Tstamp1-startTime, Tstamp2-Tstamp1,Tstamp3-Tstamp2, endTime-Tstamp3, endTime - startTime, (endTime-startTime)- (Tstamp1-startTime)- (Tstamp2-Tstamp1)- (Tstamp3-Tstamp2)-(endTime-Tstamp3));
430 }
431 return vertices;
432 }
433
434 final int mreadMax = 400;
435 final int size = Math.min(mreadMax, vertexList.size());
436 JRamCloud.multiReadObject vertPropTableMread[] = new JRamCloud.multiReadObject[size];
437
438 int vertexNum = 0;
439 for (Object vert : vertexList) {
440 byte[] rckey =
441 ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong((Long) vert).array();
442 vertPropTableMread[vertexNum] = new JRamCloud.multiReadObject(vertPropTableId, rckey);
443 if (vertexNum >= (mreadMax - 1)) {
444 pm.multiread_start("RamCloudGraph getVertices()");
445 JRamCloud.Object outvertPropTable[] =
446 vertTable.multiRead(vertPropTableMread);
447 pm.multiread_end("RamCloudGraph getVertices()");
448 for (int i = 0; i < outvertPropTable.length; i++) {
449 if (outvertPropTable[i] != null) {
450 vertices.add(new RamCloudVertex(outvertPropTable[i].key, this));
451 }
452 }
453 vertexNum = 0;
454 continue;
455 }
456 vertexNum++;
457 }
458
459 if (vertexNum != 0) {
460 JRamCloud.multiReadObject mread_leftover[] = Arrays.copyOf(vertPropTableMread, vertexNum);
461
462 long startTime2 = 0;
463 if (measureRcTimeProp == 1) {
464 startTime2 = System.nanoTime();
465 }
466 pm.multiread_start("RamCloudGraph getVertices()");
467 JRamCloud.Object outvertPropTable[] = vertTable.multiRead(mread_leftover);
468 pm.multiread_end("RamCloudGraph getVertices()");
469 if (measureRcTimeProp == 1) {
470 long endTime2 = System.nanoTime();
471 log.error("Performance index multiread(key {}, number {}) time {}", key, vertexNum, endTime2 - startTime2);
472 }
473 for (int i = 0; i < outvertPropTable.length; i++) {
474 if (outvertPropTable[i] != null) {
475 vertices.add(new RamCloudVertex(outvertPropTable[i].key, this));
476 }
477 }
478 }
479 } else {
480
481 JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(vertPropTableId);
482 JRamCloud.Object tableEntry;
483
484 while (tableEnum.hasNext()) {
485 tableEntry = tableEnum.next();
486 if (tableEntry != null) {
487 //XXX remove temp
488 // RamCloudVertex temp = new RamCloudVertex(tableEntry.key, this);
489 Map<String, Object> propMap = RamCloudElement.convertRcBytesToPropertyMapEx(tableEntry.value);
490 if (propMap.containsKey(key) && propMap.get(key).equals(value)) {
491 vertices.add(new RamCloudVertex(tableEntry.key, this));
492 }
493 }
494 }
495 }
496
497 if (measureBPTimeProp == 1) {
498 long endTime = System.nanoTime();
499 log.error("Performance getVertices exists total time {}.", endTime - startTime);
500 }
501
502 return vertices;
503 }
504
505 @Override
506 public Edge addEdge(Object id, Vertex outVertex, Vertex inVertex, String label) throws IllegalArgumentException {
507 log.info("Adding edge: [id={}, outVertex={}, inVertex={}, label={}]", id, outVertex, inVertex, label);
508
509 if (label == null) {
510 throw ExceptionFactory.edgeLabelCanNotBeNull();
511 }
512
513 RamCloudEdge newEdge = new RamCloudEdge((RamCloudVertex) outVertex, (RamCloudVertex) inVertex, label, this);
514
515 for (int i = 0; i < 5 ;i++) {
516 try {
517 newEdge.create();
518 return newEdge;
519 } catch (Exception e) {
520 log.warn("Tried to create edge failed: {" + newEdge + "}: ", e);
521
522 if (e instanceof NoSuchElementException) {
523 log.error("addEdge RETRYING {}", i);
524 continue;
525 }
526 }
527 }
528 return null;
529 }
530
531 public List<Edge> addEdges(Iterable<Edge> edgeEntities) throws IllegalArgumentException {
532 //TODO WIP: need multi-write
533 log.info("addEdges start");
534 ArrayList<Edge> edges = new ArrayList<Edge>();
535 for (Edge edge: edgeEntities) {
536 edges.add(addEdge(null, edge.getVertex(Direction.OUT), edge.getVertex(Direction.IN), edge.getLabel()));
537 }
538 log.info("addVertices end");
539 return edges;
540 }
541
542 public void setProperties(Map<RamCloudVertex, Map<String, Object>> properties) {
543 // TODO WIP: need multi-write
544 log.info("setProperties start");
545 for (Map.Entry<RamCloudVertex, Map<String, Object>> e: properties.entrySet()) {
546 e.getKey().setProperties(e.getValue());
547 }
548 log.info("setProperties end");
549 }
550
551 @Override
552 public Edge getEdge(Object id) throws IllegalArgumentException {
553 byte[] bytearrayId;
554
555 if (id == null) {
556 throw ExceptionFactory.edgeIdCanNotBeNull();
557 } else if (id instanceof byte[]) {
558 bytearrayId = (byte[]) id;
559 } else if (id instanceof String) {
560 bytearrayId = Base64.decode(((String) id));
561 } else {
562 log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass());
563 return null;
564 }
565
566 if (!RamCloudEdge.isValidEdgeId(bytearrayId)) {
567 log.warn("ID argument {} of type {} is malformed. Returning null.", id, id.getClass());
568 return null;
569 }
570
571 RamCloudEdge edge = new RamCloudEdge(bytearrayId, this);
572
573 if (edge.exists()) {
574 return edge;
575 } else {
576 return null;
577 }
578 }
579
580 @Override
581 public void removeEdge(Edge edge) {
582 edge.remove();
583 }
584
585 @Override
586 public Iterable<Edge> getEdges() {
587 JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(edgePropTableId);
588 List<Edge> edges = new ArrayList<Edge>();
589
590 while (tableEnum.hasNext()) {
591 edges.add(new RamCloudEdge(tableEnum.next().key, this));
592 }
593
594 return edges;
595 }
596
597 @Override
598 public Iterable<Edge> getEdges(String key, Object value) {
599 JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(edgePropTableId);
600 List<Edge> edges = new ArrayList<Edge>();
601 JRamCloud.Object tableEntry;
602
603 while (tableEnum.hasNext()) {
604 tableEntry = tableEnum.next();
605 // FIXME temp
606 //RamCloudEdge temp = new RamCloudEdge(tableEntry.key, this);
607 Map<String, Object> propMap = RamCloudElement.convertRcBytesToPropertyMapEx(tableEntry.value);
608 if (propMap.containsKey(key) && propMap.get(key).equals(value)) {
609 edges.add(new RamCloudEdge(tableEntry.key, this));
610 }
611 }
612
613 return edges;
614 }
615
616 @Override
617 public GraphQuery query() {
618 return new DefaultGraphQuery(this);
619 }
620
621 @Override
622 public void shutdown() {
623 JRamCloud rcClient = getRcClient();
624 rcClient.dropTable(VERT_TABLE_NAME);
625 rcClient.dropTable(VERT_PROP_TABLE_NAME);
626 rcClient.dropTable(EDGE_PROP_TABLE_NAME);
627 rcClient.dropTable(IDX_VERT_TABLE_NAME);
628 rcClient.dropTable(IDX_EDGE_TABLE_NAME);
629 rcClient.dropTable(KIDX_VERT_TABLE_NAME);
630 rcClient.dropTable(KIDX_EDGE_TABLE_NAME);
631 rcClient.disconnect();
632 }
633
634 @Override
635 public void stopTransaction(Conclusion conclusion) {
636 // TODO Auto-generated method stub
637 }
638
639 @Override
640 public void commit() {
641 // TODO Auto-generated method stub
642 }
643
644 @Override
645 public void rollback() {
646 // TODO Auto-generated method stub
647 }
648
649 @Override
650 public <T extends Element> void dropKeyIndex(String key, Class<T> elementClass) {
651 throw new UnsupportedOperationException("Not supported yet.");
652 //FIXME how to dropKeyIndex
653 //new RamCloudKeyIndex(kidxVertTableId, key, this, elementClass);
654 //getIndexedKeys(key, elementClass).removeIndex();
655 }
656
657 @Override
658 public <T extends Element> void createKeyIndex(String key,
659 Class<T> elementClass, Parameter... indexParameters) {
660 if (key == null) {
661 return;
662 }
663 if (this.indexedKeys.contains(key)) {
664 return;
665 }
666 this.indexedKeys.add(key);
667 }
668
669 @Override
670 public <T extends Element> Set< String> getIndexedKeys(Class< T> elementClass) {
671 if (null != this.indexedKeys) {
672 return new HashSet<String>(this.indexedKeys);
673 } else {
674 return Collections.emptySet();
675 }
676 }
677
678 @Override
679 public <T extends Element> Index<T> createIndex(String indexName,
680 Class<T> indexClass, Parameter... indexParameters) {
681 throw new UnsupportedOperationException("Not supported yet.");
682 }
683
684 @Override
685 public <T extends Element> Index<T> getIndex(String indexName, Class<T> indexClass) {
686 throw new UnsupportedOperationException("Not supported yet.");
687 }
688
689 @Override
690 public Iterable<Index<? extends Element>> getIndices() {
691 throw new UnsupportedOperationException("Not supported yet.");
692 }
693
694 @Override
695 public void dropIndex(String indexName) {
696 throw new UnsupportedOperationException("Not supported yet.");
697 }
698
699 @Override
700 public String toString() {
701 return getClass().getSimpleName().toLowerCase() + "[vertices:" + ((List<Vertex>)getVertices()).size() + " edges:" + ((List<Edge>)getEdges()).size() + "]";
702 }
703
704 public static void main(String[] args) {
705 RamCloudGraph graph = new RamCloudGraph();
706
707 Vertex a = graph.addVertex(null);
708 Vertex b = graph.addVertex(null);
709 Vertex c = graph.addVertex(null);
710 Vertex d = graph.addVertex(null);
711 Vertex e = graph.addVertex(null);
712 Vertex f = graph.addVertex(null);
713 Vertex g = graph.addVertex(null);
714
715 graph.addEdge(null, a, a, "friend");
716 graph.addEdge(null, a, b, "friend1");
717 graph.addEdge(null, a, b, "friend2");
718 graph.addEdge(null, a, b, "friend3");
719 graph.addEdge(null, a, c, "friend");
720 graph.addEdge(null, a, d, "friend");
721 graph.addEdge(null, a, e, "friend");
722 graph.addEdge(null, a, f, "friend");
723 graph.addEdge(null, a, g, "friend");
724
725 graph.shutdown();
726 }
727}