blob: ad519c85d34d27709d597d5d9aa3df7947da0251 [file] [log] [blame]
Yuta HIGUCHIa1e655a2014-01-23 17:43:11 -08001/* Copyright (c) 2013 Stanford University
2 *
3 * Permission to use, copy, modify, and distribute this software for any
4 * purpose with or without fee is hereby granted, provided that the above
5 * copyright notice and this permission notice appear in all copies.
6 *
7 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
8 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
10 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
14 */
15
yoshi28bac132014-01-22 11:00:17 -080016package com.tinkerpop.blueprints.impls.ramcloud;
17
18import java.io.ByteArrayInputStream;
19import java.io.ByteArrayOutputStream;
20import java.io.IOException;
21import java.io.ObjectInputStream;
22import java.io.ObjectOutputStream;
23import java.io.Serializable;
24import java.nio.BufferUnderflowException;
25import java.nio.ByteBuffer;
26import java.nio.ByteOrder;
27import java.util.ArrayList;
28import java.util.Arrays;
29import java.util.Collections;
30import java.util.HashMap;
31import java.util.HashSet;
32import java.util.LinkedList;
33import java.util.List;
34import java.util.Map;
35import java.util.NoSuchElementException;
36import java.util.Set;
37import java.util.concurrent.atomic.AtomicLong;
38
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
42import com.sun.jersey.core.util.Base64;
43import com.tinkerpop.blueprints.Direction;
44import com.tinkerpop.blueprints.Edge;
45import com.tinkerpop.blueprints.Element;
46import com.tinkerpop.blueprints.Features;
47import com.tinkerpop.blueprints.GraphQuery;
48import com.tinkerpop.blueprints.Index;
49import com.tinkerpop.blueprints.IndexableGraph;
50import com.tinkerpop.blueprints.KeyIndexableGraph;
51import com.tinkerpop.blueprints.Parameter;
52import com.tinkerpop.blueprints.TransactionalGraph;
53import com.tinkerpop.blueprints.Vertex;
54import com.tinkerpop.blueprints.util.DefaultGraphQuery;
55import com.tinkerpop.blueprints.util.ExceptionFactory;
56import com.tinkerpop.blueprints.impls.ramcloud.PerfMon;
57
58import edu.stanford.ramcloud.JRamCloud;
59import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
60
61public class RamCloudGraph implements IndexableGraph, KeyIndexableGraph, TransactionalGraph, Serializable {
62 private final static Logger log = LoggerFactory.getLogger(RamCloudGraph.class);
63
64 private static final ThreadLocal<JRamCloud> RamCloudThreadLocal = new ThreadLocal<JRamCloud>();
65
66 protected long vertTableId; //(vertex_id) --> ( (n,d,ll,l), (n,d,ll,l), ... )
67 protected long vertPropTableId; //(vertex_id) -> ( (kl,k,vl,v), (kl,k,vl,v), ... )
68 protected long edgePropTableId; //(edge_id) -> ( (kl,k,vl,v), (kl,k,vl,v), ... )
69 protected long idxVertTableId;
70 protected long idxEdgeTableId;
71 protected long kidxVertTableId;
72 protected long kidxEdgeTableId;
73 protected long instanceTableId;
74 private String VERT_TABLE_NAME = "verts";
75 private String EDGE_PROP_TABLE_NAME = "edge_props";
76 private String VERT_PROP_TABLE_NAME = "vert_props";
77 private String IDX_VERT_TABLE_NAME = "idx_vert";
78 private String IDX_EDGE_TABLE_NAME = "idx_edge";
79 private String KIDX_VERT_TABLE_NAME = "kidx_vert";
80 private String KIDX_EDGE_TABLE_NAME = "kidx_edge";
81 private final String INSTANCE_TABLE_NAME = "instance";
82 private long instanceId;
83 private AtomicLong nextVertexId;
84 private final long INSTANCE_ID_RANGE = 100000;
85 private String coordinatorLocation;
86 private static final Features FEATURES = new Features();
Yoshi Muroi815c7f92014-01-30 18:06:16 -080087 // FIXME better loop variable name
88 public final int CONDITIONALWRITE_RETRY_MAX = 100;
yoshi28bac132014-01-22 11:00:17 -080089 public final long measureBPTimeProp = Long.valueOf(System.getProperty("benchmark.measureBP", "0"));
90 public final long measureRcTimeProp = Long.valueOf(System.getProperty("benchmark.measureRc", "0"));
91 public static final long measureSerializeTimeProp = Long.valueOf(System.getProperty("benchmark.measureSerializeTimeProp", "0"));
92
93
94 public final Set<String> indexedKeys = new HashSet<String>();
95
96 static {
97 FEATURES.supportsSerializableObjectProperty = true;
98 FEATURES.supportsBooleanProperty = true;
99 FEATURES.supportsDoubleProperty = true;
100 FEATURES.supportsFloatProperty = true;
101 FEATURES.supportsIntegerProperty = true;
102 FEATURES.supportsPrimitiveArrayProperty = true;
103 FEATURES.supportsUniformListProperty = true;
104 FEATURES.supportsMixedListProperty = true;
105 FEATURES.supportsLongProperty = true;
106 FEATURES.supportsMapProperty = true;
107 FEATURES.supportsStringProperty = true;
108
109 FEATURES.supportsDuplicateEdges = false;
110 FEATURES.supportsSelfLoops = false;
111 FEATURES.isPersistent = false;
112 FEATURES.isWrapper = false;
113 FEATURES.supportsVertexIteration = true;
114 FEATURES.supportsEdgeIteration = true;
115 FEATURES.supportsVertexIndex = true;
116 FEATURES.supportsEdgeIndex = false;
117 FEATURES.ignoresSuppliedIds = true;
118 FEATURES.supportsTransactions = false;
119 FEATURES.supportsIndices = false;
120 FEATURES.supportsKeyIndices = true;
121 FEATURES.supportsVertexKeyIndex = true;
122 FEATURES.supportsEdgeKeyIndex = false;
123 FEATURES.supportsEdgeRetrieval = true;
124 FEATURES.supportsVertexProperties = true;
125 FEATURES.supportsEdgeProperties = true;
126 FEATURES.supportsThreadedTransactions = false;
127 }
128
129 static {
130 System.loadLibrary("edu_stanford_ramcloud_JRamCloud");
131 }
132
133 private RamCloudGraph() {
134 }
135
136
137 public RamCloudGraph(String coordinatorLocation) {
138 this.coordinatorLocation = coordinatorLocation;
139
140 JRamCloud rcClient = getRcClient();
141
142 vertTableId = rcClient.createTable(VERT_TABLE_NAME);
143 vertPropTableId = rcClient.createTable(VERT_PROP_TABLE_NAME);
144 edgePropTableId = rcClient.createTable(EDGE_PROP_TABLE_NAME);
145 idxVertTableId = rcClient.createTable(IDX_VERT_TABLE_NAME);
146 idxEdgeTableId = rcClient.createTable(IDX_EDGE_TABLE_NAME);
147 kidxVertTableId = rcClient.createTable(KIDX_VERT_TABLE_NAME);
148 kidxEdgeTableId = rcClient.createTable(KIDX_EDGE_TABLE_NAME);
149 instanceTableId = rcClient.createTable(INSTANCE_TABLE_NAME);
150
151 log.info( "Connected to coordinator at {}", coordinatorLocation);
152 log.debug("VERT_TABLE:{}, VERT_PROP_TABLE:{}, EDGE_PROP_TABLE:{}, IDX_VERT_TABLE:{}, IDX_EDGE_TABLE:{}, KIDX_VERT_TABLE:{}, KIDX_EDGE_TABLE:{}", vertTableId, vertPropTableId, edgePropTableId, idxVertTableId, idxEdgeTableId, kidxVertTableId, kidxEdgeTableId);
153 nextVertexId = new AtomicLong(-1);
154 initInstance();
155 }
156
157 public JRamCloud getRcClient() {
158 JRamCloud rcClient = RamCloudThreadLocal.get();
159 if (rcClient == null) {
160 rcClient = new JRamCloud(coordinatorLocation);
161 RamCloudThreadLocal.set(rcClient);
162 }
163 return rcClient;
164 }
165
166 @Override
167 public Features getFeatures() {
168 return FEATURES;
169 }
170
171 private Long parseVertexId(Object id) {
172 Long longId;
173 if (id == null) {
174 longId = nextVertexId.incrementAndGet();
175 } else if (id instanceof Integer) {
176 longId = ((Integer) id).longValue();
177 } else if (id instanceof Long) {
178 longId = (Long) id;
179 } else if (id instanceof String) {
180 try {
181 longId = Long.parseLong((String) id, 10);
182 } catch (NumberFormatException e) {
183 log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
184 return null;
185 }
186 } else if (id instanceof byte[]) {
187 try {
188 longId = ByteBuffer.wrap((byte[]) id).getLong();
189 } catch (BufferUnderflowException e) {
190 log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
191 return null;
192 }
193 } else {
194 log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass());
195 return null;
196 }
197 return longId;
198 }
199
200 @Override
201 public Vertex addVertex(Object id) {
202 long startTime = 0;
203 long Tstamp1 = 0;
204 long Tstamp2 = 0;
205
206 if (measureBPTimeProp == 1) {
207 startTime = System.nanoTime();
208 }
209 Long longId = parseVertexId(id);
210 if (longId == null)
211 return null;
212 if (measureBPTimeProp == 1) {
213 Tstamp1 = System.nanoTime();
214 }
215 RamCloudVertex newVertex = new RamCloudVertex(longId, this);
216 if (measureBPTimeProp == 1) {
217 Tstamp2 = System.nanoTime();
218 log.error("Performance addVertex [id={}] : Calling create at {}", longId, Tstamp2);
219 }
220
221 try {
222 newVertex.create();
223 if (measureBPTimeProp == 1) {
224 long endTime = System.nanoTime();
225 log.error("Performance addVertex [id={}] : genid {} newVerex {} create {} total time {}", longId, Tstamp1 - startTime, Tstamp2 - Tstamp1, endTime - Tstamp2, endTime - startTime);
226 }
227 log.info("Added vertex: [id={}]", longId);
228 return newVertex;
229 } catch (IllegalArgumentException e) {
230 log.error("Tried to create vertex failed {" + newVertex + "}", e);
231 return null;
232 }
233 }
234
235 public List<RamCloudVertex> addVertices(Iterable<Object> ids) {
236 log.info("addVertices start");
237 List<RamCloudVertex> vertices = new LinkedList<RamCloudVertex>();
238
239 for (Object id: ids) {
240 Long longId = parseVertexId(id);
241 if (longId == null)
242 return null;
243 RamCloudVertex v = new RamCloudVertex(longId, this);
244 if (v.exists()) {
245 log.error("ramcloud vertex id: {} already exists", v.getId());
246 throw ExceptionFactory.vertexWithIdAlreadyExists(v.getId());
247 }
248 vertices.add(v);
249 }
Yoshi Muroie7693b12014-02-19 19:41:17 -0800250
251 MultiWriteObject mwo = new MultiWriteObject(vertices.size()*2);
252
yoshi28bac132014-01-22 11:00:17 -0800253 for (int i=0; i < vertices.size(); i++) {
254 RamCloudVertex v = vertices.get(i);
Yoshi Muroie7693b12014-02-19 19:41:17 -0800255 mwo.setObject(i*2, vertTableId, v.rcKey, ByteBuffer.allocate(0).array(), null);
256 mwo.setObject(i*2+1, vertTableId, v.rcKey, ByteBuffer.allocate(0).array(), null);
yoshi28bac132014-01-22 11:00:17 -0800257 }
258 try {
259 PerfMon pm = PerfMon.getInstance();
260 pm.multiwrite_start("RamCloudVertex create()");
Yoshi Muroie7693b12014-02-19 19:41:17 -0800261 getRcClient().multiWrite(mwo.tableId, mwo.key, mwo.keyLength, mwo.value, mwo.valueLength, vertices.size()*2, mwo.rules);
yoshi28bac132014-01-22 11:00:17 -0800262 pm.multiwrite_end("RamCloudVertex create()");
263 log.info("ramcloud vertices are created");
264 } catch (Exception e) {
265 log.error("Tried to create vertices failed {}", e);
266 return null;
267 }
268 log.info("addVertices end (success)");
269 return vertices;
270 }
271
272 private final void initInstance() {
273 //long incrementValue = 1;
274 JRamCloud.Object instanceEntry = null;
275 JRamCloud rcClient = getRcClient();
276 try {
277 instanceEntry = rcClient.read(instanceTableId, "nextInstanceId".getBytes());
278 } catch (Exception e) {
279 if (e instanceof JRamCloud.ObjectDoesntExistException) {
280 instanceId = 0;
281 rcClient.write(instanceTableId, "nextInstanceId".getBytes(), ByteBuffer.allocate(0).array());
282 }
283 }
284 if (instanceEntry != null) {
285 long curInstanceId = 1;
Yoshi Muroi815c7f92014-01-30 18:06:16 -0800286 for (int i = 0 ; i < CONDITIONALWRITE_RETRY_MAX ; i++) {
yoshi28bac132014-01-22 11:00:17 -0800287 Map<String, Long> propMap = null;
288 if (instanceEntry.value == null) {
289 log.warn("Got a null byteArray argument");
290 return;
291 } else if (instanceEntry.value.length != 0) {
292 try {
293 ByteArrayInputStream bais = new ByteArrayInputStream(instanceEntry.value);
294 ObjectInputStream ois = new ObjectInputStream(bais);
295 propMap = (Map<String, Long>) ois.readObject();
296 } catch (IOException e) {
297 log.error("Got an exception while deserializing element's property map: ", e);
298 return;
299 } catch (ClassNotFoundException e) {
300 log.error("Got an exception while deserializing element's property map: ", e);
301 return;
302 }
303 } else {
304 propMap = new HashMap<String, Long>();
305 }
306
307 if (propMap.containsKey(INSTANCE_TABLE_NAME)) {
308 curInstanceId = propMap.get(INSTANCE_TABLE_NAME) + 1;
309 }
310
311 propMap.put(INSTANCE_TABLE_NAME, curInstanceId);
312
313 byte[] rcValue = null;
314 try {
315 ByteArrayOutputStream baos = new ByteArrayOutputStream();
316 ObjectOutputStream oot = new ObjectOutputStream(baos);
317 oot.writeObject(propMap);
318 rcValue = baos.toByteArray();
319 } catch (IOException e) {
320 log.error("Got an exception while serializing element's property map", e);
321 return;
322 }
323 JRamCloud.RejectRules rules = rcClient.new RejectRules();
324 rules.setNeVersion(instanceEntry.version);
325 try {
326 rcClient.writeRule(instanceTableId, "nextInstanceId".getBytes(), rcValue, rules);
327 instanceId = curInstanceId;
328 break;
329 } catch (Exception ex) {
330 log.debug("Cond. Write increment Vertex property: ", ex);
331 instanceEntry = rcClient.read(instanceTableId, "nextInstanceId".getBytes());
332 continue;
333 }
334 }
335 }
336
337 nextVertexId.compareAndSet(-1, instanceId * INSTANCE_ID_RANGE);
338 }
339
340 @Override
341 public Vertex getVertex(Object id) throws IllegalArgumentException {
342 Long longId;
343
344 if (id == null) {
345 throw ExceptionFactory.vertexIdCanNotBeNull();
346 } else if (id instanceof Integer) {
347 longId = ((Integer) id).longValue();
348 } else if (id instanceof Long) {
349 longId = (Long) id;
350 } else if (id instanceof String) {
351 try {
352 longId = Long.parseLong((String) id, 10);
353 } catch (NumberFormatException e) {
354 log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
355 return null;
356 }
357 } else if (id instanceof byte[]) {
358 try {
359 longId = ByteBuffer.wrap((byte[]) id).getLong();
360 } catch (BufferUnderflowException e) {
361 log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
362 return null;
363 }
364 } else {
365 log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass());
366 return null;
367 }
368
369 RamCloudVertex vertex = new RamCloudVertex(longId, this);
370
371 if (vertex.exists()) {
372 return vertex;
373 } else {
374 return null;
375 }
376 }
377
378 @Override
379 public void removeVertex(Vertex vertex) {
380 ((RamCloudVertex) vertex).remove();
381 }
382
383 @Override
384 public Iterable<Vertex> getVertices() {
385 JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(vertPropTableId);
386 List<Vertex> vertices = new LinkedList<Vertex>();
387
388 while (tableEnum.hasNext()) {
389 vertices.add(new RamCloudVertex(tableEnum.next().key, this));
390 }
391
392 return vertices;
393 }
394
395 @Override
396 public Iterable<Vertex> getVertices(String key, Object value) {
397 long startTime = 0;
398 long Tstamp1 = 0;
399 long Tstamp2 = 0;
400 long Tstamp3 = 0;
401 if (measureBPTimeProp == 1) {
402 startTime = System.nanoTime();
403 log.error("Performance getVertices(key {}) start at {}", key, startTime);
404 }
405
406 List<Vertex> vertices = new ArrayList<Vertex>();
407 List<Object> vertexList = null;
408
409 JRamCloud vertTable = getRcClient();
410 if (measureBPTimeProp == 1) {
411 Tstamp1 = System.nanoTime();
412 log.error("Performance getVertices(key {}) Calling indexedKeys.contains(key) at {}", key, Tstamp1);
413 }
414
415
416 if (indexedKeys.contains(key)) {
417 PerfMon pm = PerfMon.getInstance();
418 if (measureBPTimeProp == 1) {
419 Tstamp2 = System.nanoTime();
420 log.error("Performance getVertices(key {}) Calling new RamCloudKeyIndex at {}", key, Tstamp2);
421 }
422 RamCloudKeyIndex KeyIndex = new RamCloudKeyIndex(kidxVertTableId, key, value, this, Vertex.class);
423 if (measureBPTimeProp == 1) {
424 Tstamp3 = System.nanoTime();
425 log.error("Performance getVertices(key {}) Calling KeyIndex.GetElmIdListForPropValue at {}", key, Tstamp3);
426 }
427 vertexList = KeyIndex.getElmIdListForPropValue(value.toString());
428 if (vertexList == null) {
429 if (measureBPTimeProp == 1) {
430 long endTime = System.nanoTime();
431 log.error("Performance getVertices(key {}) does not exists : getRcClient {} indexedKeys.contains(key) {} new_RamCloudKeyIndex {} KeyIndex.get..Value {} total {} diff {}", key, Tstamp1-startTime, Tstamp2-Tstamp1,Tstamp3-Tstamp2, endTime-Tstamp3, endTime - startTime, (endTime-startTime)- (Tstamp1-startTime)- (Tstamp2-Tstamp1)- (Tstamp3-Tstamp2)-(endTime-Tstamp3));
432 }
433 return vertices;
434 }
435
436 final int mreadMax = 400;
437 final int size = Math.min(mreadMax, vertexList.size());
Yoshi Muroi2c170602014-02-15 08:31:28 -0800438
439 long tableId[] = new long[size];
440 byte[] keyData[] = new byte[size][];
441 short keySize[] = new short[size];
yoshi28bac132014-01-22 11:00:17 -0800442
443 int vertexNum = 0;
444 for (Object vert : vertexList) {
Yoshi Muroi2c170602014-02-15 08:31:28 -0800445 tableId[vertexNum] = vertPropTableId;
446 keyData[vertexNum] =
yoshi28bac132014-01-22 11:00:17 -0800447 ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong((Long) vert).array();
Yoshi Muroi2c170602014-02-15 08:31:28 -0800448 keySize[vertexNum] = (short) keyData[vertexNum].length;
yoshi28bac132014-01-22 11:00:17 -0800449 if (vertexNum >= (mreadMax - 1)) {
450 pm.multiread_start("RamCloudGraph getVertices()");
451 JRamCloud.Object outvertPropTable[] =
Yoshi Muroi2c170602014-02-15 08:31:28 -0800452 vertTable.multiRead(tableId, keyData, keySize, vertexNum);
yoshi28bac132014-01-22 11:00:17 -0800453 pm.multiread_end("RamCloudGraph getVertices()");
454 for (int i = 0; i < outvertPropTable.length; i++) {
455 if (outvertPropTable[i] != null) {
456 vertices.add(new RamCloudVertex(outvertPropTable[i].key, this));
457 }
458 }
459 vertexNum = 0;
460 continue;
461 }
462 vertexNum++;
463 }
464
465 if (vertexNum != 0) {
yoshi28bac132014-01-22 11:00:17 -0800466 long startTime2 = 0;
467 if (measureRcTimeProp == 1) {
468 startTime2 = System.nanoTime();
469 }
470 pm.multiread_start("RamCloudGraph getVertices()");
Yoshi Muroi2c170602014-02-15 08:31:28 -0800471 JRamCloud.Object outvertPropTable[] = vertTable.multiRead(tableId, keyData, keySize, vertexNum);
yoshi28bac132014-01-22 11:00:17 -0800472 pm.multiread_end("RamCloudGraph getVertices()");
473 if (measureRcTimeProp == 1) {
474 long endTime2 = System.nanoTime();
475 log.error("Performance index multiread(key {}, number {}) time {}", key, vertexNum, endTime2 - startTime2);
476 }
477 for (int i = 0; i < outvertPropTable.length; i++) {
478 if (outvertPropTable[i] != null) {
479 vertices.add(new RamCloudVertex(outvertPropTable[i].key, this));
480 }
481 }
482 }
483 } else {
484
485 JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(vertPropTableId);
486 JRamCloud.Object tableEntry;
487
488 while (tableEnum.hasNext()) {
489 tableEntry = tableEnum.next();
490 if (tableEntry != null) {
491 //XXX remove temp
492 // RamCloudVertex temp = new RamCloudVertex(tableEntry.key, this);
493 Map<String, Object> propMap = RamCloudElement.convertRcBytesToPropertyMapEx(tableEntry.value);
494 if (propMap.containsKey(key) && propMap.get(key).equals(value)) {
495 vertices.add(new RamCloudVertex(tableEntry.key, this));
496 }
497 }
498 }
499 }
500
501 if (measureBPTimeProp == 1) {
502 long endTime = System.nanoTime();
503 log.error("Performance getVertices exists total time {}.", endTime - startTime);
504 }
505
506 return vertices;
507 }
508
509 @Override
510 public Edge addEdge(Object id, Vertex outVertex, Vertex inVertex, String label) throws IllegalArgumentException {
511 log.info("Adding edge: [id={}, outVertex={}, inVertex={}, label={}]", id, outVertex, inVertex, label);
512
513 if (label == null) {
514 throw ExceptionFactory.edgeLabelCanNotBeNull();
515 }
516
517 RamCloudEdge newEdge = new RamCloudEdge((RamCloudVertex) outVertex, (RamCloudVertex) inVertex, label, this);
518
519 for (int i = 0; i < 5 ;i++) {
520 try {
521 newEdge.create();
522 return newEdge;
523 } catch (Exception e) {
524 log.warn("Tried to create edge failed: {" + newEdge + "}: ", e);
525
526 if (e instanceof NoSuchElementException) {
527 log.error("addEdge RETRYING {}", i);
528 continue;
529 }
530 }
531 }
532 return null;
533 }
534
535 public List<Edge> addEdges(Iterable<Edge> edgeEntities) throws IllegalArgumentException {
536 //TODO WIP: need multi-write
537 log.info("addEdges start");
538 ArrayList<Edge> edges = new ArrayList<Edge>();
539 for (Edge edge: edgeEntities) {
540 edges.add(addEdge(null, edge.getVertex(Direction.OUT), edge.getVertex(Direction.IN), edge.getLabel()));
541 }
542 log.info("addVertices end");
543 return edges;
544 }
545
546 public void setProperties(Map<RamCloudVertex, Map<String, Object>> properties) {
547 // TODO WIP: need multi-write
548 log.info("setProperties start");
549 for (Map.Entry<RamCloudVertex, Map<String, Object>> e: properties.entrySet()) {
550 e.getKey().setProperties(e.getValue());
551 }
552 log.info("setProperties end");
553 }
554
555 @Override
556 public Edge getEdge(Object id) throws IllegalArgumentException {
557 byte[] bytearrayId;
558
559 if (id == null) {
560 throw ExceptionFactory.edgeIdCanNotBeNull();
561 } else if (id instanceof byte[]) {
562 bytearrayId = (byte[]) id;
563 } else if (id instanceof String) {
564 bytearrayId = Base64.decode(((String) id));
565 } else {
566 log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass());
567 return null;
568 }
569
570 if (!RamCloudEdge.isValidEdgeId(bytearrayId)) {
571 log.warn("ID argument {} of type {} is malformed. Returning null.", id, id.getClass());
572 return null;
573 }
574
575 RamCloudEdge edge = new RamCloudEdge(bytearrayId, this);
576
577 if (edge.exists()) {
578 return edge;
579 } else {
580 return null;
581 }
582 }
583
584 @Override
585 public void removeEdge(Edge edge) {
586 edge.remove();
587 }
588
589 @Override
590 public Iterable<Edge> getEdges() {
591 JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(edgePropTableId);
592 List<Edge> edges = new ArrayList<Edge>();
593
594 while (tableEnum.hasNext()) {
595 edges.add(new RamCloudEdge(tableEnum.next().key, this));
596 }
597
598 return edges;
599 }
600
601 @Override
602 public Iterable<Edge> getEdges(String key, Object value) {
603 JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(edgePropTableId);
604 List<Edge> edges = new ArrayList<Edge>();
605 JRamCloud.Object tableEntry;
606
607 while (tableEnum.hasNext()) {
608 tableEntry = tableEnum.next();
609 // FIXME temp
610 //RamCloudEdge temp = new RamCloudEdge(tableEntry.key, this);
611 Map<String, Object> propMap = RamCloudElement.convertRcBytesToPropertyMapEx(tableEntry.value);
612 if (propMap.containsKey(key) && propMap.get(key).equals(value)) {
613 edges.add(new RamCloudEdge(tableEntry.key, this));
614 }
615 }
616
617 return edges;
618 }
619
620 @Override
621 public GraphQuery query() {
622 return new DefaultGraphQuery(this);
623 }
624
625 @Override
626 public void shutdown() {
627 JRamCloud rcClient = getRcClient();
628 rcClient.dropTable(VERT_TABLE_NAME);
629 rcClient.dropTable(VERT_PROP_TABLE_NAME);
630 rcClient.dropTable(EDGE_PROP_TABLE_NAME);
631 rcClient.dropTable(IDX_VERT_TABLE_NAME);
632 rcClient.dropTable(IDX_EDGE_TABLE_NAME);
633 rcClient.dropTable(KIDX_VERT_TABLE_NAME);
634 rcClient.dropTable(KIDX_EDGE_TABLE_NAME);
635 rcClient.disconnect();
636 }
637
638 @Override
639 public void stopTransaction(Conclusion conclusion) {
640 // TODO Auto-generated method stub
641 }
642
643 @Override
644 public void commit() {
645 // TODO Auto-generated method stub
646 }
647
648 @Override
649 public void rollback() {
650 // TODO Auto-generated method stub
651 }
652
653 @Override
654 public <T extends Element> void dropKeyIndex(String key, Class<T> elementClass) {
655 throw new UnsupportedOperationException("Not supported yet.");
656 //FIXME how to dropKeyIndex
657 //new RamCloudKeyIndex(kidxVertTableId, key, this, elementClass);
658 //getIndexedKeys(key, elementClass).removeIndex();
659 }
660
661 @Override
662 public <T extends Element> void createKeyIndex(String key,
663 Class<T> elementClass, Parameter... indexParameters) {
664 if (key == null) {
665 return;
666 }
667 if (this.indexedKeys.contains(key)) {
668 return;
669 }
670 this.indexedKeys.add(key);
671 }
672
673 @Override
674 public <T extends Element> Set< String> getIndexedKeys(Class< T> elementClass) {
675 if (null != this.indexedKeys) {
676 return new HashSet<String>(this.indexedKeys);
677 } else {
678 return Collections.emptySet();
679 }
680 }
681
682 @Override
683 public <T extends Element> Index<T> createIndex(String indexName,
684 Class<T> indexClass, Parameter... indexParameters) {
685 throw new UnsupportedOperationException("Not supported yet.");
686 }
687
688 @Override
689 public <T extends Element> Index<T> getIndex(String indexName, Class<T> indexClass) {
690 throw new UnsupportedOperationException("Not supported yet.");
691 }
692
693 @Override
694 public Iterable<Index<? extends Element>> getIndices() {
695 throw new UnsupportedOperationException("Not supported yet.");
696 }
697
698 @Override
699 public void dropIndex(String indexName) {
700 throw new UnsupportedOperationException("Not supported yet.");
701 }
702
703 @Override
704 public String toString() {
705 return getClass().getSimpleName().toLowerCase() + "[vertices:" + ((List<Vertex>)getVertices()).size() + " edges:" + ((List<Edge>)getEdges()).size() + "]";
706 }
707
708 public static void main(String[] args) {
709 RamCloudGraph graph = new RamCloudGraph();
710
711 Vertex a = graph.addVertex(null);
712 Vertex b = graph.addVertex(null);
713 Vertex c = graph.addVertex(null);
714 Vertex d = graph.addVertex(null);
715 Vertex e = graph.addVertex(null);
716 Vertex f = graph.addVertex(null);
717 Vertex g = graph.addVertex(null);
718
719 graph.addEdge(null, a, a, "friend");
720 graph.addEdge(null, a, b, "friend1");
721 graph.addEdge(null, a, b, "friend2");
722 graph.addEdge(null, a, b, "friend3");
723 graph.addEdge(null, a, c, "friend");
724 graph.addEdge(null, a, d, "friend");
725 graph.addEdge(null, a, e, "friend");
726 graph.addEdge(null, a, f, "friend");
727 graph.addEdge(null, a, g, "friend");
728
729 graph.shutdown();
730 }
731}