blob: 26f1418672d685cee154d07e0ec3e650eab3dbb1 [file] [log] [blame]
Yuta HIGUCHIa1e655a2014-01-23 17:43:11 -08001/* Copyright (c) 2013 Stanford University
2 *
3 * Permission to use, copy, modify, and distribute this software for any
4 * purpose with or without fee is hereby granted, provided that the above
5 * copyright notice and this permission notice appear in all copies.
6 *
7 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
8 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
10 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
14 */
15
yoshi28bac132014-01-22 11:00:17 -080016package com.tinkerpop.blueprints.impls.ramcloud;
17
18import java.io.ByteArrayInputStream;
19import java.io.ByteArrayOutputStream;
20import java.io.IOException;
21import java.io.ObjectInputStream;
22import java.io.ObjectOutputStream;
23import java.io.Serializable;
24import java.nio.BufferUnderflowException;
25import java.nio.ByteBuffer;
26import java.nio.ByteOrder;
27import java.util.ArrayList;
28import java.util.Arrays;
29import java.util.Collections;
30import java.util.HashMap;
31import java.util.HashSet;
32import java.util.LinkedList;
33import java.util.List;
34import java.util.Map;
35import java.util.NoSuchElementException;
36import java.util.Set;
37import java.util.concurrent.atomic.AtomicLong;
38
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
42import com.sun.jersey.core.util.Base64;
43import com.tinkerpop.blueprints.Direction;
44import com.tinkerpop.blueprints.Edge;
45import com.tinkerpop.blueprints.Element;
46import com.tinkerpop.blueprints.Features;
47import com.tinkerpop.blueprints.GraphQuery;
48import com.tinkerpop.blueprints.Index;
49import com.tinkerpop.blueprints.IndexableGraph;
50import com.tinkerpop.blueprints.KeyIndexableGraph;
51import com.tinkerpop.blueprints.Parameter;
52import com.tinkerpop.blueprints.TransactionalGraph;
53import com.tinkerpop.blueprints.Vertex;
54import com.tinkerpop.blueprints.util.DefaultGraphQuery;
55import com.tinkerpop.blueprints.util.ExceptionFactory;
56import com.tinkerpop.blueprints.impls.ramcloud.PerfMon;
57
58import edu.stanford.ramcloud.JRamCloud;
59import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
60
61public class RamCloudGraph implements IndexableGraph, KeyIndexableGraph, TransactionalGraph, Serializable {
62 private final static Logger log = LoggerFactory.getLogger(RamCloudGraph.class);
63
64 private static final ThreadLocal<JRamCloud> RamCloudThreadLocal = new ThreadLocal<JRamCloud>();
65
66 protected long vertTableId; //(vertex_id) --> ( (n,d,ll,l), (n,d,ll,l), ... )
67 protected long vertPropTableId; //(vertex_id) -> ( (kl,k,vl,v), (kl,k,vl,v), ... )
68 protected long edgePropTableId; //(edge_id) -> ( (kl,k,vl,v), (kl,k,vl,v), ... )
69 protected long idxVertTableId;
70 protected long idxEdgeTableId;
71 protected long kidxVertTableId;
72 protected long kidxEdgeTableId;
73 protected long instanceTableId;
74 private String VERT_TABLE_NAME = "verts";
75 private String EDGE_PROP_TABLE_NAME = "edge_props";
76 private String VERT_PROP_TABLE_NAME = "vert_props";
77 private String IDX_VERT_TABLE_NAME = "idx_vert";
78 private String IDX_EDGE_TABLE_NAME = "idx_edge";
79 private String KIDX_VERT_TABLE_NAME = "kidx_vert";
80 private String KIDX_EDGE_TABLE_NAME = "kidx_edge";
81 private final String INSTANCE_TABLE_NAME = "instance";
82 private long instanceId;
83 private AtomicLong nextVertexId;
84 private final long INSTANCE_ID_RANGE = 100000;
85 private String coordinatorLocation;
86 private static final Features FEATURES = new Features();
Yoshi Muroi815c7f92014-01-30 18:06:16 -080087 // FIXME better loop variable name
88 public final int CONDITIONALWRITE_RETRY_MAX = 100;
yoshi28bac132014-01-22 11:00:17 -080089 public final long measureBPTimeProp = Long.valueOf(System.getProperty("benchmark.measureBP", "0"));
90 public final long measureRcTimeProp = Long.valueOf(System.getProperty("benchmark.measureRc", "0"));
91 public static final long measureSerializeTimeProp = Long.valueOf(System.getProperty("benchmark.measureSerializeTimeProp", "0"));
92
93
94 public final Set<String> indexedKeys = new HashSet<String>();
95
96 static {
97 FEATURES.supportsSerializableObjectProperty = true;
98 FEATURES.supportsBooleanProperty = true;
99 FEATURES.supportsDoubleProperty = true;
100 FEATURES.supportsFloatProperty = true;
101 FEATURES.supportsIntegerProperty = true;
102 FEATURES.supportsPrimitiveArrayProperty = true;
103 FEATURES.supportsUniformListProperty = true;
104 FEATURES.supportsMixedListProperty = true;
105 FEATURES.supportsLongProperty = true;
106 FEATURES.supportsMapProperty = true;
107 FEATURES.supportsStringProperty = true;
108
109 FEATURES.supportsDuplicateEdges = false;
110 FEATURES.supportsSelfLoops = false;
111 FEATURES.isPersistent = false;
112 FEATURES.isWrapper = false;
113 FEATURES.supportsVertexIteration = true;
114 FEATURES.supportsEdgeIteration = true;
115 FEATURES.supportsVertexIndex = true;
116 FEATURES.supportsEdgeIndex = false;
117 FEATURES.ignoresSuppliedIds = true;
118 FEATURES.supportsTransactions = false;
119 FEATURES.supportsIndices = false;
120 FEATURES.supportsKeyIndices = true;
121 FEATURES.supportsVertexKeyIndex = true;
122 FEATURES.supportsEdgeKeyIndex = false;
123 FEATURES.supportsEdgeRetrieval = true;
124 FEATURES.supportsVertexProperties = true;
125 FEATURES.supportsEdgeProperties = true;
126 FEATURES.supportsThreadedTransactions = false;
127 }
128
129 static {
130 System.loadLibrary("edu_stanford_ramcloud_JRamCloud");
131 }
132
133 private RamCloudGraph() {
134 }
135
136
137 public RamCloudGraph(String coordinatorLocation) {
138 this.coordinatorLocation = coordinatorLocation;
139
140 JRamCloud rcClient = getRcClient();
141
142 vertTableId = rcClient.createTable(VERT_TABLE_NAME);
143 vertPropTableId = rcClient.createTable(VERT_PROP_TABLE_NAME);
144 edgePropTableId = rcClient.createTable(EDGE_PROP_TABLE_NAME);
145 idxVertTableId = rcClient.createTable(IDX_VERT_TABLE_NAME);
146 idxEdgeTableId = rcClient.createTable(IDX_EDGE_TABLE_NAME);
147 kidxVertTableId = rcClient.createTable(KIDX_VERT_TABLE_NAME);
148 kidxEdgeTableId = rcClient.createTable(KIDX_EDGE_TABLE_NAME);
149 instanceTableId = rcClient.createTable(INSTANCE_TABLE_NAME);
150
151 log.info( "Connected to coordinator at {}", coordinatorLocation);
152 log.debug("VERT_TABLE:{}, VERT_PROP_TABLE:{}, EDGE_PROP_TABLE:{}, IDX_VERT_TABLE:{}, IDX_EDGE_TABLE:{}, KIDX_VERT_TABLE:{}, KIDX_EDGE_TABLE:{}", vertTableId, vertPropTableId, edgePropTableId, idxVertTableId, idxEdgeTableId, kidxVertTableId, kidxEdgeTableId);
153 nextVertexId = new AtomicLong(-1);
154 initInstance();
155 }
156
157 public JRamCloud getRcClient() {
158 JRamCloud rcClient = RamCloudThreadLocal.get();
159 if (rcClient == null) {
160 rcClient = new JRamCloud(coordinatorLocation);
161 RamCloudThreadLocal.set(rcClient);
162 }
163 return rcClient;
164 }
165
166 @Override
167 public Features getFeatures() {
168 return FEATURES;
169 }
170
171 private Long parseVertexId(Object id) {
172 Long longId;
173 if (id == null) {
174 longId = nextVertexId.incrementAndGet();
175 } else if (id instanceof Integer) {
176 longId = ((Integer) id).longValue();
177 } else if (id instanceof Long) {
178 longId = (Long) id;
179 } else if (id instanceof String) {
180 try {
181 longId = Long.parseLong((String) id, 10);
182 } catch (NumberFormatException e) {
183 log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
184 return null;
185 }
186 } else if (id instanceof byte[]) {
187 try {
188 longId = ByteBuffer.wrap((byte[]) id).getLong();
189 } catch (BufferUnderflowException e) {
190 log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
191 return null;
192 }
193 } else {
194 log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass());
195 return null;
196 }
197 return longId;
198 }
199
200 @Override
201 public Vertex addVertex(Object id) {
202 long startTime = 0;
203 long Tstamp1 = 0;
204 long Tstamp2 = 0;
205
206 if (measureBPTimeProp == 1) {
207 startTime = System.nanoTime();
208 }
209 Long longId = parseVertexId(id);
210 if (longId == null)
211 return null;
212 if (measureBPTimeProp == 1) {
213 Tstamp1 = System.nanoTime();
214 }
215 RamCloudVertex newVertex = new RamCloudVertex(longId, this);
216 if (measureBPTimeProp == 1) {
217 Tstamp2 = System.nanoTime();
218 log.error("Performance addVertex [id={}] : Calling create at {}", longId, Tstamp2);
219 }
220
221 try {
222 newVertex.create();
223 if (measureBPTimeProp == 1) {
224 long endTime = System.nanoTime();
225 log.error("Performance addVertex [id={}] : genid {} newVerex {} create {} total time {}", longId, Tstamp1 - startTime, Tstamp2 - Tstamp1, endTime - Tstamp2, endTime - startTime);
226 }
227 log.info("Added vertex: [id={}]", longId);
228 return newVertex;
229 } catch (IllegalArgumentException e) {
230 log.error("Tried to create vertex failed {" + newVertex + "}", e);
231 return null;
232 }
233 }
234
235 public List<RamCloudVertex> addVertices(Iterable<Object> ids) {
236 log.info("addVertices start");
237 List<RamCloudVertex> vertices = new LinkedList<RamCloudVertex>();
238
239 for (Object id: ids) {
240 Long longId = parseVertexId(id);
241 if (longId == null)
242 return null;
243 RamCloudVertex v = new RamCloudVertex(longId, this);
244 if (v.exists()) {
245 log.error("ramcloud vertex id: {} already exists", v.getId());
246 throw ExceptionFactory.vertexWithIdAlreadyExists(v.getId());
247 }
248 vertices.add(v);
249 }
250 MultiWriteObject multiWriteObjects[] = new MultiWriteObject[vertices.size() * 2];
251 for (int i=0; i < vertices.size(); i++) {
252 RamCloudVertex v = vertices.get(i);
253 multiWriteObjects[i*2] = new MultiWriteObject(vertTableId, v.rcKey, ByteBuffer.allocate(0).array(), null);
254 multiWriteObjects[i*2+1] = new MultiWriteObject(vertPropTableId, v.rcKey, ByteBuffer.allocate(0).array(), null);
255 }
256 try {
257 PerfMon pm = PerfMon.getInstance();
258 pm.multiwrite_start("RamCloudVertex create()");
259 getRcClient().multiWrite(multiWriteObjects);
260 pm.multiwrite_end("RamCloudVertex create()");
261 log.info("ramcloud vertices are created");
262 } catch (Exception e) {
263 log.error("Tried to create vertices failed {}", e);
264 return null;
265 }
266 log.info("addVertices end (success)");
267 return vertices;
268 }
269
270 private final void initInstance() {
271 //long incrementValue = 1;
272 JRamCloud.Object instanceEntry = null;
273 JRamCloud rcClient = getRcClient();
274 try {
275 instanceEntry = rcClient.read(instanceTableId, "nextInstanceId".getBytes());
276 } catch (Exception e) {
277 if (e instanceof JRamCloud.ObjectDoesntExistException) {
278 instanceId = 0;
279 rcClient.write(instanceTableId, "nextInstanceId".getBytes(), ByteBuffer.allocate(0).array());
280 }
281 }
282 if (instanceEntry != null) {
283 long curInstanceId = 1;
Yoshi Muroi815c7f92014-01-30 18:06:16 -0800284 for (int i = 0 ; i < CONDITIONALWRITE_RETRY_MAX ; i++) {
yoshi28bac132014-01-22 11:00:17 -0800285 Map<String, Long> propMap = null;
286 if (instanceEntry.value == null) {
287 log.warn("Got a null byteArray argument");
288 return;
289 } else if (instanceEntry.value.length != 0) {
290 try {
291 ByteArrayInputStream bais = new ByteArrayInputStream(instanceEntry.value);
292 ObjectInputStream ois = new ObjectInputStream(bais);
293 propMap = (Map<String, Long>) ois.readObject();
294 } catch (IOException e) {
295 log.error("Got an exception while deserializing element's property map: ", e);
296 return;
297 } catch (ClassNotFoundException e) {
298 log.error("Got an exception while deserializing element's property map: ", e);
299 return;
300 }
301 } else {
302 propMap = new HashMap<String, Long>();
303 }
304
305 if (propMap.containsKey(INSTANCE_TABLE_NAME)) {
306 curInstanceId = propMap.get(INSTANCE_TABLE_NAME) + 1;
307 }
308
309 propMap.put(INSTANCE_TABLE_NAME, curInstanceId);
310
311 byte[] rcValue = null;
312 try {
313 ByteArrayOutputStream baos = new ByteArrayOutputStream();
314 ObjectOutputStream oot = new ObjectOutputStream(baos);
315 oot.writeObject(propMap);
316 rcValue = baos.toByteArray();
317 } catch (IOException e) {
318 log.error("Got an exception while serializing element's property map", e);
319 return;
320 }
321 JRamCloud.RejectRules rules = rcClient.new RejectRules();
322 rules.setNeVersion(instanceEntry.version);
323 try {
324 rcClient.writeRule(instanceTableId, "nextInstanceId".getBytes(), rcValue, rules);
325 instanceId = curInstanceId;
326 break;
327 } catch (Exception ex) {
328 log.debug("Cond. Write increment Vertex property: ", ex);
329 instanceEntry = rcClient.read(instanceTableId, "nextInstanceId".getBytes());
330 continue;
331 }
332 }
333 }
334
335 nextVertexId.compareAndSet(-1, instanceId * INSTANCE_ID_RANGE);
336 }
337
338 @Override
339 public Vertex getVertex(Object id) throws IllegalArgumentException {
340 Long longId;
341
342 if (id == null) {
343 throw ExceptionFactory.vertexIdCanNotBeNull();
344 } else if (id instanceof Integer) {
345 longId = ((Integer) id).longValue();
346 } else if (id instanceof Long) {
347 longId = (Long) id;
348 } else if (id instanceof String) {
349 try {
350 longId = Long.parseLong((String) id, 10);
351 } catch (NumberFormatException e) {
352 log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
353 return null;
354 }
355 } else if (id instanceof byte[]) {
356 try {
357 longId = ByteBuffer.wrap((byte[]) id).getLong();
358 } catch (BufferUnderflowException e) {
359 log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e);
360 return null;
361 }
362 } else {
363 log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass());
364 return null;
365 }
366
367 RamCloudVertex vertex = new RamCloudVertex(longId, this);
368
369 if (vertex.exists()) {
370 return vertex;
371 } else {
372 return null;
373 }
374 }
375
376 @Override
377 public void removeVertex(Vertex vertex) {
378 ((RamCloudVertex) vertex).remove();
379 }
380
381 @Override
382 public Iterable<Vertex> getVertices() {
383 JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(vertPropTableId);
384 List<Vertex> vertices = new LinkedList<Vertex>();
385
386 while (tableEnum.hasNext()) {
387 vertices.add(new RamCloudVertex(tableEnum.next().key, this));
388 }
389
390 return vertices;
391 }
392
393 @Override
394 public Iterable<Vertex> getVertices(String key, Object value) {
395 long startTime = 0;
396 long Tstamp1 = 0;
397 long Tstamp2 = 0;
398 long Tstamp3 = 0;
399 if (measureBPTimeProp == 1) {
400 startTime = System.nanoTime();
401 log.error("Performance getVertices(key {}) start at {}", key, startTime);
402 }
403
404 List<Vertex> vertices = new ArrayList<Vertex>();
405 List<Object> vertexList = null;
406
407 JRamCloud vertTable = getRcClient();
408 if (measureBPTimeProp == 1) {
409 Tstamp1 = System.nanoTime();
410 log.error("Performance getVertices(key {}) Calling indexedKeys.contains(key) at {}", key, Tstamp1);
411 }
412
413
414 if (indexedKeys.contains(key)) {
415 PerfMon pm = PerfMon.getInstance();
416 if (measureBPTimeProp == 1) {
417 Tstamp2 = System.nanoTime();
418 log.error("Performance getVertices(key {}) Calling new RamCloudKeyIndex at {}", key, Tstamp2);
419 }
420 RamCloudKeyIndex KeyIndex = new RamCloudKeyIndex(kidxVertTableId, key, value, this, Vertex.class);
421 if (measureBPTimeProp == 1) {
422 Tstamp3 = System.nanoTime();
423 log.error("Performance getVertices(key {}) Calling KeyIndex.GetElmIdListForPropValue at {}", key, Tstamp3);
424 }
425 vertexList = KeyIndex.getElmIdListForPropValue(value.toString());
426 if (vertexList == null) {
427 if (measureBPTimeProp == 1) {
428 long endTime = System.nanoTime();
429 log.error("Performance getVertices(key {}) does not exists : getRcClient {} indexedKeys.contains(key) {} new_RamCloudKeyIndex {} KeyIndex.get..Value {} total {} diff {}", key, Tstamp1-startTime, Tstamp2-Tstamp1,Tstamp3-Tstamp2, endTime-Tstamp3, endTime - startTime, (endTime-startTime)- (Tstamp1-startTime)- (Tstamp2-Tstamp1)- (Tstamp3-Tstamp2)-(endTime-Tstamp3));
430 }
431 return vertices;
432 }
433
434 final int mreadMax = 400;
435 final int size = Math.min(mreadMax, vertexList.size());
Yoshi Muroi2c170602014-02-15 08:31:28 -0800436
437 long tableId[] = new long[size];
438 byte[] keyData[] = new byte[size][];
439 short keySize[] = new short[size];
yoshi28bac132014-01-22 11:00:17 -0800440
441 int vertexNum = 0;
442 for (Object vert : vertexList) {
Yoshi Muroi2c170602014-02-15 08:31:28 -0800443 tableId[vertexNum] = vertPropTableId;
444 keyData[vertexNum] =
yoshi28bac132014-01-22 11:00:17 -0800445 ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong((Long) vert).array();
Yoshi Muroi2c170602014-02-15 08:31:28 -0800446 keySize[vertexNum] = (short) keyData[vertexNum].length;
yoshi28bac132014-01-22 11:00:17 -0800447 if (vertexNum >= (mreadMax - 1)) {
448 pm.multiread_start("RamCloudGraph getVertices()");
449 JRamCloud.Object outvertPropTable[] =
Yoshi Muroi2c170602014-02-15 08:31:28 -0800450 vertTable.multiRead(tableId, keyData, keySize, vertexNum);
yoshi28bac132014-01-22 11:00:17 -0800451 pm.multiread_end("RamCloudGraph getVertices()");
452 for (int i = 0; i < outvertPropTable.length; i++) {
453 if (outvertPropTable[i] != null) {
454 vertices.add(new RamCloudVertex(outvertPropTable[i].key, this));
455 }
456 }
457 vertexNum = 0;
458 continue;
459 }
460 vertexNum++;
461 }
462
463 if (vertexNum != 0) {
yoshi28bac132014-01-22 11:00:17 -0800464 long startTime2 = 0;
465 if (measureRcTimeProp == 1) {
466 startTime2 = System.nanoTime();
467 }
468 pm.multiread_start("RamCloudGraph getVertices()");
Yoshi Muroi2c170602014-02-15 08:31:28 -0800469 JRamCloud.Object outvertPropTable[] = vertTable.multiRead(tableId, keyData, keySize, vertexNum);
yoshi28bac132014-01-22 11:00:17 -0800470 pm.multiread_end("RamCloudGraph getVertices()");
471 if (measureRcTimeProp == 1) {
472 long endTime2 = System.nanoTime();
473 log.error("Performance index multiread(key {}, number {}) time {}", key, vertexNum, endTime2 - startTime2);
474 }
475 for (int i = 0; i < outvertPropTable.length; i++) {
476 if (outvertPropTable[i] != null) {
477 vertices.add(new RamCloudVertex(outvertPropTable[i].key, this));
478 }
479 }
480 }
481 } else {
482
483 JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(vertPropTableId);
484 JRamCloud.Object tableEntry;
485
486 while (tableEnum.hasNext()) {
487 tableEntry = tableEnum.next();
488 if (tableEntry != null) {
489 //XXX remove temp
490 // RamCloudVertex temp = new RamCloudVertex(tableEntry.key, this);
491 Map<String, Object> propMap = RamCloudElement.convertRcBytesToPropertyMapEx(tableEntry.value);
492 if (propMap.containsKey(key) && propMap.get(key).equals(value)) {
493 vertices.add(new RamCloudVertex(tableEntry.key, this));
494 }
495 }
496 }
497 }
498
499 if (measureBPTimeProp == 1) {
500 long endTime = System.nanoTime();
501 log.error("Performance getVertices exists total time {}.", endTime - startTime);
502 }
503
504 return vertices;
505 }
506
507 @Override
508 public Edge addEdge(Object id, Vertex outVertex, Vertex inVertex, String label) throws IllegalArgumentException {
509 log.info("Adding edge: [id={}, outVertex={}, inVertex={}, label={}]", id, outVertex, inVertex, label);
510
511 if (label == null) {
512 throw ExceptionFactory.edgeLabelCanNotBeNull();
513 }
514
515 RamCloudEdge newEdge = new RamCloudEdge((RamCloudVertex) outVertex, (RamCloudVertex) inVertex, label, this);
516
517 for (int i = 0; i < 5 ;i++) {
518 try {
519 newEdge.create();
520 return newEdge;
521 } catch (Exception e) {
522 log.warn("Tried to create edge failed: {" + newEdge + "}: ", e);
523
524 if (e instanceof NoSuchElementException) {
525 log.error("addEdge RETRYING {}", i);
526 continue;
527 }
528 }
529 }
530 return null;
531 }
532
533 public List<Edge> addEdges(Iterable<Edge> edgeEntities) throws IllegalArgumentException {
534 //TODO WIP: need multi-write
535 log.info("addEdges start");
536 ArrayList<Edge> edges = new ArrayList<Edge>();
537 for (Edge edge: edgeEntities) {
538 edges.add(addEdge(null, edge.getVertex(Direction.OUT), edge.getVertex(Direction.IN), edge.getLabel()));
539 }
540 log.info("addVertices end");
541 return edges;
542 }
543
544 public void setProperties(Map<RamCloudVertex, Map<String, Object>> properties) {
545 // TODO WIP: need multi-write
546 log.info("setProperties start");
547 for (Map.Entry<RamCloudVertex, Map<String, Object>> e: properties.entrySet()) {
548 e.getKey().setProperties(e.getValue());
549 }
550 log.info("setProperties end");
551 }
552
553 @Override
554 public Edge getEdge(Object id) throws IllegalArgumentException {
555 byte[] bytearrayId;
556
557 if (id == null) {
558 throw ExceptionFactory.edgeIdCanNotBeNull();
559 } else if (id instanceof byte[]) {
560 bytearrayId = (byte[]) id;
561 } else if (id instanceof String) {
562 bytearrayId = Base64.decode(((String) id));
563 } else {
564 log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass());
565 return null;
566 }
567
568 if (!RamCloudEdge.isValidEdgeId(bytearrayId)) {
569 log.warn("ID argument {} of type {} is malformed. Returning null.", id, id.getClass());
570 return null;
571 }
572
573 RamCloudEdge edge = new RamCloudEdge(bytearrayId, this);
574
575 if (edge.exists()) {
576 return edge;
577 } else {
578 return null;
579 }
580 }
581
582 @Override
583 public void removeEdge(Edge edge) {
584 edge.remove();
585 }
586
587 @Override
588 public Iterable<Edge> getEdges() {
589 JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(edgePropTableId);
590 List<Edge> edges = new ArrayList<Edge>();
591
592 while (tableEnum.hasNext()) {
593 edges.add(new RamCloudEdge(tableEnum.next().key, this));
594 }
595
596 return edges;
597 }
598
599 @Override
600 public Iterable<Edge> getEdges(String key, Object value) {
601 JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(edgePropTableId);
602 List<Edge> edges = new ArrayList<Edge>();
603 JRamCloud.Object tableEntry;
604
605 while (tableEnum.hasNext()) {
606 tableEntry = tableEnum.next();
607 // FIXME temp
608 //RamCloudEdge temp = new RamCloudEdge(tableEntry.key, this);
609 Map<String, Object> propMap = RamCloudElement.convertRcBytesToPropertyMapEx(tableEntry.value);
610 if (propMap.containsKey(key) && propMap.get(key).equals(value)) {
611 edges.add(new RamCloudEdge(tableEntry.key, this));
612 }
613 }
614
615 return edges;
616 }
617
618 @Override
619 public GraphQuery query() {
620 return new DefaultGraphQuery(this);
621 }
622
623 @Override
624 public void shutdown() {
625 JRamCloud rcClient = getRcClient();
626 rcClient.dropTable(VERT_TABLE_NAME);
627 rcClient.dropTable(VERT_PROP_TABLE_NAME);
628 rcClient.dropTable(EDGE_PROP_TABLE_NAME);
629 rcClient.dropTable(IDX_VERT_TABLE_NAME);
630 rcClient.dropTable(IDX_EDGE_TABLE_NAME);
631 rcClient.dropTable(KIDX_VERT_TABLE_NAME);
632 rcClient.dropTable(KIDX_EDGE_TABLE_NAME);
633 rcClient.disconnect();
634 }
635
636 @Override
637 public void stopTransaction(Conclusion conclusion) {
638 // TODO Auto-generated method stub
639 }
640
641 @Override
642 public void commit() {
643 // TODO Auto-generated method stub
644 }
645
646 @Override
647 public void rollback() {
648 // TODO Auto-generated method stub
649 }
650
651 @Override
652 public <T extends Element> void dropKeyIndex(String key, Class<T> elementClass) {
653 throw new UnsupportedOperationException("Not supported yet.");
654 //FIXME how to dropKeyIndex
655 //new RamCloudKeyIndex(kidxVertTableId, key, this, elementClass);
656 //getIndexedKeys(key, elementClass).removeIndex();
657 }
658
659 @Override
660 public <T extends Element> void createKeyIndex(String key,
661 Class<T> elementClass, Parameter... indexParameters) {
662 if (key == null) {
663 return;
664 }
665 if (this.indexedKeys.contains(key)) {
666 return;
667 }
668 this.indexedKeys.add(key);
669 }
670
671 @Override
672 public <T extends Element> Set< String> getIndexedKeys(Class< T> elementClass) {
673 if (null != this.indexedKeys) {
674 return new HashSet<String>(this.indexedKeys);
675 } else {
676 return Collections.emptySet();
677 }
678 }
679
680 @Override
681 public <T extends Element> Index<T> createIndex(String indexName,
682 Class<T> indexClass, Parameter... indexParameters) {
683 throw new UnsupportedOperationException("Not supported yet.");
684 }
685
686 @Override
687 public <T extends Element> Index<T> getIndex(String indexName, Class<T> indexClass) {
688 throw new UnsupportedOperationException("Not supported yet.");
689 }
690
691 @Override
692 public Iterable<Index<? extends Element>> getIndices() {
693 throw new UnsupportedOperationException("Not supported yet.");
694 }
695
696 @Override
697 public void dropIndex(String indexName) {
698 throw new UnsupportedOperationException("Not supported yet.");
699 }
700
701 @Override
702 public String toString() {
703 return getClass().getSimpleName().toLowerCase() + "[vertices:" + ((List<Vertex>)getVertices()).size() + " edges:" + ((List<Edge>)getEdges()).size() + "]";
704 }
705
706 public static void main(String[] args) {
707 RamCloudGraph graph = new RamCloudGraph();
708
709 Vertex a = graph.addVertex(null);
710 Vertex b = graph.addVertex(null);
711 Vertex c = graph.addVertex(null);
712 Vertex d = graph.addVertex(null);
713 Vertex e = graph.addVertex(null);
714 Vertex f = graph.addVertex(null);
715 Vertex g = graph.addVertex(null);
716
717 graph.addEdge(null, a, a, "friend");
718 graph.addEdge(null, a, b, "friend1");
719 graph.addEdge(null, a, b, "friend2");
720 graph.addEdge(null, a, b, "friend3");
721 graph.addEdge(null, a, c, "friend");
722 graph.addEdge(null, a, d, "friend");
723 graph.addEdge(null, a, e, "friend");
724 graph.addEdge(null, a, f, "friend");
725 graph.addEdge(null, a, g, "friend");
726
727 graph.shutdown();
728 }
729}