blob: b41dca8123e191e83d0063309cea761ec8330ea8 [file] [log] [blame]
Yuta HIGUCHIa1e655a2014-01-23 17:43:11 -08001/* Copyright (c) 2013 Stanford University
2 *
3 * Permission to use, copy, modify, and distribute this software for any
4 * purpose with or without fee is hereby granted, provided that the above
5 * copyright notice and this permission notice appear in all copies.
6 *
7 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
8 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
10 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
14 */
15
yoshi28bac132014-01-22 11:00:17 -080016package com.tinkerpop.blueprints.impls.ramcloud;
17
18import java.io.Serializable;
19import java.nio.ByteBuffer;
20import java.nio.ByteOrder;
21import java.util.ArrayList;
22import java.util.Arrays;
23import java.util.HashMap;
24import java.util.HashSet;
25import java.util.LinkedList;
26import java.util.List;
27import java.util.Map;
28import java.util.Map.Entry;
29import java.util.Set;
30
31import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import com.google.protobuf.InvalidProtocolBufferException;
35import com.tinkerpop.blueprints.Direction;
36import com.tinkerpop.blueprints.Edge;
37import com.tinkerpop.blueprints.Vertex;
38import com.tinkerpop.blueprints.VertexQuery;
39import com.tinkerpop.blueprints.impls.ramcloud.RamCloudGraphProtos.EdgeListProtoBuf;
40import com.tinkerpop.blueprints.impls.ramcloud.RamCloudGraphProtos.EdgeProtoBuf;
41import com.tinkerpop.blueprints.util.DefaultVertexQuery;
42import com.tinkerpop.blueprints.util.ExceptionFactory;
43import com.tinkerpop.blueprints.impls.ramcloud.PerfMon;
44
45import edu.stanford.ramcloud.JRamCloud;
46import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
47import edu.stanford.ramcloud.JRamCloud.RejectRules;
48import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
49
50public class RamCloudVertex extends RamCloudElement implements Vertex, Serializable {
51
52 private final static Logger log = LoggerFactory.getLogger(RamCloudGraph.class);
53 private static final long serialVersionUID = 7526472295622776147L;
54 protected long id;
55 protected byte[] rcKey;
56 private RamCloudGraph graph;
57
58 private Versioned<EdgeListProtoBuf> cachedAdjEdgeList;
59
60 public RamCloudVertex(long id, RamCloudGraph graph) {
61 super(idToRcKey(id), graph.vertPropTableId, graph);
62
63 this.id = id;
64 this.rcKey = idToRcKey(id);
65 this.graph = graph;
66 this.cachedAdjEdgeList = null;
67 }
68
69 public RamCloudVertex(byte[] rcKey, RamCloudGraph graph) {
70 super(rcKey, graph.vertPropTableId, graph);
71
72 this.id = rcKeyToId(rcKey);
73 this.rcKey = rcKey;
74 this.graph = graph;
75 this.cachedAdjEdgeList = null;
76 }
77
78
79 /*
80 * Vertex interface implementation
81 */
82 @Override
83 public Edge addEdge(String label, Vertex inVertex) {
84 return graph.addEdge(null, this, inVertex, label);
85 }
86
87 @Override
88 public Iterable<Edge> getEdges(Direction direction, String... labels) {
89 return new ArrayList<Edge>(getEdgeList(direction, labels));
90 }
91
92 @Override
93 public Iterable<Vertex> getVertices(Direction direction, String... labels) {
94 List<RamCloudEdge> edges = getEdgeList(direction, labels);
95 List<Vertex> neighbors = new LinkedList<Vertex>();
96 for (RamCloudEdge edge : edges) {
97 neighbors.add(edge.getNeighbor(this));
98 }
99 return neighbors;
100 }
101
102 @Override
103 public VertexQuery query() {
104 return new DefaultVertexQuery(this);
105 }
106
107 /*
108 * RamCloudElement overridden methods
109 */
110 @Override
111 public Object getId() {
112 return id;
113 }
114
115 @Override
116 public void remove() {
117 Set<RamCloudEdge> edges = getEdgeSet();
118
119 // neighbor vertex -> List of Edges to remove
120 Map<RamCloudVertex, List<RamCloudEdge>> vertexToEdgesMap = new HashMap<RamCloudVertex, List<RamCloudEdge>>( edges.size() );
121
122 // Batch edges together by neighbor vertex
123 for (RamCloudEdge edge : edges) {
124 RamCloudVertex neighbor = (RamCloudVertex) edge.getNeighbor(this);
125 List<RamCloudEdge> edgeList = vertexToEdgesMap.get(neighbor);
126
127 if (edgeList == null) {
128 edgeList = new LinkedList<RamCloudEdge>();
129 }
130
131 edgeList.add(edge);
132 vertexToEdgesMap.put(neighbor, edgeList);
133 }
134
135 // Remove batches of edges at a time by neighbor vertex
136 for (Entry<RamCloudVertex, List<RamCloudEdge>> entry : vertexToEdgesMap.entrySet()) {
137 // Skip over loopback edges to ourself
138 if (!entry.getKey().equals(this)) {
139 entry.getKey().removeEdgesFromAdjList(entry.getValue());
140 }
141
142 // Remove this batch of edges from the edge property table
143 for (RamCloudEdge edge : entry.getValue()) {
144 edge.removeProperties();
145 }
146 }
147
148 Map<String,Object> props = this.getPropertyMap();
149 for( Map.Entry<String,Object> entry : props.entrySet() ) {
150 if ( !graph.indexedKeys.contains(entry.getKey() ) ) continue;
151 RamCloudKeyIndex keyIndex = new RamCloudKeyIndex(graph.kidxVertTableId, entry.getKey(), entry.getValue(), graph, Vertex.class);
152 keyIndex.remove(entry.getKey(), entry.getValue(), this);
153 }
154
155 // Remove ourselves entirely from the vertex table
156 graph.getRcClient().remove(graph.vertTableId, rcKey);
157
158 super.remove();
159 }
160
161 /*
162 * Object overridden methods
163 */
164 @Override
165 public boolean equals(Object obj) {
166 if (this == obj) {
167 return true;
168 }
169 if (obj == null) {
170 return false;
171 }
172 if (getClass() != obj.getClass()) {
173 return false;
174 }
175 RamCloudVertex other = (RamCloudVertex) obj;
176 return (id == other.id);
177 }
178
179 @Override
180 public int hashCode() {
181 return Long.valueOf(id).hashCode();
182 }
183
184 @Override
185 public String toString() {
186 return "RamCloudVertex [id=" + id + "]";
187 }
188
189 /*
190 * RamCloudVertex specific methods
191 */
Yoshi Muroi815c7f92014-01-30 18:06:16 -0800192 public static byte[] idToRcKey(long id) {
yoshi28bac132014-01-22 11:00:17 -0800193 return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(id).array();
194 }
195
Yoshi Muroi815c7f92014-01-30 18:06:16 -0800196 public static long rcKeyToId(byte[] rcKey) {
yoshi28bac132014-01-22 11:00:17 -0800197 return ByteBuffer.wrap(rcKey).order(ByteOrder.LITTLE_ENDIAN).getLong();
198 }
199
200 boolean addEdgeToAdjList(RamCloudEdge edge) {
201 List<RamCloudEdge> edgesToAdd = new ArrayList<RamCloudEdge>(1);
202 edgesToAdd.add(edge);
203 return addEdgesToAdjList(edgesToAdd);
204 }
205 boolean removeEdgeFromAdjList(RamCloudEdge edge) {
206 List<RamCloudEdge> edgesToRemove = new ArrayList<RamCloudEdge>(1);
207 edgesToRemove.add(edge);
208 return removeEdgesFromAdjList(edgesToRemove);
209 }
210
211 private boolean addEdgesToAdjList(List<RamCloudEdge> edgesToAdd) {
212 return updateEdgeAdjList(edgesToAdd, true);
213 }
214 private boolean removeEdgesFromAdjList(List<RamCloudEdge> edgesToAdd) {
215 return updateEdgeAdjList(edgesToAdd, false);
216 }
217
218 /** Conditionally update Adj. Edge List
219 * @return true if EdgeAdjList was logically modified.(Cache update does not imply true return)
220 */
221 private boolean updateEdgeAdjList(List<RamCloudEdge> edgesToModify, boolean add) {
222 PerfMon pm = PerfMon.getInstance();
223 JRamCloud rcClient = graph.getRcClient();
224 final int MAX_RETRIES = 100;
225 for (int retry = 1 ; retry <= MAX_RETRIES ; ++retry ) {
226 Set<RamCloudEdge> edges;
227 long expected_version = 0L;
228 if ( this.cachedAdjEdgeList == null ) {
229 edges = new HashSet<RamCloudEdge>();
230 } else {
231 expected_version = this.cachedAdjEdgeList.getVersion();
232 if ( expected_version == 0L && add == false ) {
233 updateCachedAdjEdgeList();
234 expected_version = this.cachedAdjEdgeList.getVersion();
235 }
236 edges = buildEdgeSetFromProtobuf(this.cachedAdjEdgeList.getValue(), Direction.BOTH);
237 }
238 if ( expected_version == 0L && add == false ) {
239 updateCachedAdjEdgeList();
240 expected_version = this.cachedAdjEdgeList.getVersion();
241 edges = buildEdgeSetFromProtobuf(this.cachedAdjEdgeList.getValue(), Direction.BOTH);
242 }
243 //log.debug( (add?"Adding":"Removing") + " edges to: {"+ edges+ "}");
244
245 try {
246 if ( add ) {
247 if (edges.addAll(edgesToModify) == false) {
248 log.warn("{}: There aren't any changes to edges ({})", this, edgesToModify);
249 return false;
250 }
251 } else {
252 if (edges.removeAll(edgesToModify) == false) {
253 log.warn("{}: There aren't any changes to edges ({})", this, edgesToModify);
254 return false;
255 }
256 }
257
258 EdgeListProtoBuf edgeList = buildProtoBufFromEdgeSet(edges);
259 JRamCloud.RejectRules rules = rcClient.new RejectRules();
260 if ( expected_version == 0L ) {
261 rules.setExists();
262 } else {
263 rules.setNeVersion(expected_version);
264 }
265 pm.write_start("RAMCloudVertex updateEdgeAdjList()");
266 long updated_version = rcClient.writeRule(graph.vertTableId, rcKey, edgeList.toByteArray(), rules);
267 pm.write_end("RAMCloudVertex updateEdgeAdjList()");
268 this.cachedAdjEdgeList.setValue(edgeList, updated_version);
269 return true;
270 } catch (UnsupportedOperationException e) {
271 pm.write_end("RAMCloudVertex updateEdgeAdjList()");
272 pm.write_condfail("RAMCloudVertex updateEdgeAdjList()");
273 log.error("{" + toString() + "}: Failed to modify a set of edges ({" + edgesToModify + "}): ", e);
274 return false;
275 } catch (ClassCastException e) {
276 pm.write_end("RAMCloudVertex updateEdgeAdjList()");
277 pm.write_condfail("RAMCloudVertex updateEdgeAdjList()");
278 log.error("{" + toString() + "}: Failed to modify a set of edges ({" + edgesToModify + "}): ", e);
279 return false;
280 } catch (NullPointerException e) {
281 pm.write_end("RAMCloudVertex updateEdgeAdjList()");
282 pm.write_condfail("RAMCloudVertex updateEdgeAdjList()");
283 log.error("{" + toString() + "}: Failed to modify a set of edges ({" + edgesToModify + "}): ", e);
284 return false;
285 } catch (Exception e) {
286 pm.write_end("RAMCloudVertex updateEdgeAdjList()");
287 pm.write_condfail("RAMCloudVertex updateEdgeAdjList()");
288 // FIXME Workaround for native method exception declaration bug
289 if ( e instanceof WrongVersionException ) {
290 log.debug("Conditional Updating EdgeList failed for {} RETRYING {}", this, retry);
291 //log.debug("Conditional Updating EdgeList failed for {} modifing {} RETRYING [{}]", this, edgesToModify, retry);
292 updateCachedAdjEdgeList();
293 } else {
294 log.debug("Cond. Write to modify adj edge list failed, exception thrown", e);
295 updateCachedAdjEdgeList();
296 }
297 }
298 }
299 log.error("Conditional Updating EdgeList failed for {} gave up RETRYING", this);
300 return false;
301 }
302
303 /** Get all adj.edge list
304 * Method is exposed to package namespace to do Vertex removal efficiently;
305 */
306 Set<RamCloudEdge> getEdgeSet() {
307 return getVersionedEdgeSet(Direction.BOTH).getValue();
308 }
309
310 private Versioned<EdgeListProtoBuf> updateCachedAdjEdgeList() {
311 JRamCloud.Object vertTableEntry;
312 EdgeListProtoBuf edgeList;
313
314 PerfMon pm = PerfMon.getInstance();
315 try {
316 JRamCloud vertTable = graph.getRcClient();
317 pm.read_start("RamCloudVertex updateCachedAdjEdgeList()");
318 vertTableEntry = vertTable.read(graph.vertTableId, rcKey);
319 pm.read_end("RamCloudVertex updateCachedAdjEdgeList()");
320 } catch (Exception e) {
321 pm.read_end("RamCloudVertex updateCachedAdjEdgeList()");
322 log.error("{" + toString() + "}: Error reading vertex table entry: ", e);
323 return null;
324 }
325
326 try {
327 pm.protodeser_start("RamCloudVertex updateCachedAdjEdgeList()");
328 edgeList = EdgeListProtoBuf.parseFrom(vertTableEntry.value);
329 Versioned<EdgeListProtoBuf> updatedEdgeList = new Versioned<EdgeListProtoBuf>(edgeList, vertTableEntry.version);
330 this.cachedAdjEdgeList = updatedEdgeList;
331 pm.protodeser_end("RamCloudVertex updateCachedAdjEdgeList()");
332 return updatedEdgeList;
333 } catch (InvalidProtocolBufferException e) {
334 pm.protodeser_end("RamCloudVertex updateCachedAdjEdgeList()");
335 log.error("{" + toString() + "}: Read malformed edge list: ", e);
336 return null;
337 }
338 }
339
340 private Versioned<Set<RamCloudEdge>> getVersionedEdgeSet(Direction direction, String... labels) {
341 Versioned<EdgeListProtoBuf> cachedEdgeList = updateCachedAdjEdgeList();
342 return new Versioned<Set<RamCloudEdge>>(buildEdgeSetFromProtobuf(cachedEdgeList.getValue(), direction, labels), cachedEdgeList.getVersion() );
343 }
344
345 private Set<RamCloudEdge> buildEdgeSetFromProtobuf(EdgeListProtoBuf edgeList,
346 Direction direction, String... labels) {
347 PerfMon pm = PerfMon.getInstance();
348 long startTime = 0;
349 if(RamCloudGraph.measureSerializeTimeProp == 1) {
350 startTime = System.nanoTime();
351 }
352 pm.protodeser_start("RamCloudVertex buildEdgeSetFromProtobuf()");
353 Set<RamCloudEdge> edgeSet = new HashSet<RamCloudEdge>( edgeList.getEdgeCount() );
354 for (EdgeProtoBuf edge : edgeList.getEdgeList()) {
355 if ((direction.equals(Direction.BOTH) || (edge.getOutgoing() ^ direction.equals(Direction.IN)))
356 && (labels.length == 0 || Arrays.asList(labels).contains(edge.getLabel()))) {
357 RamCloudVertex neighbor = new RamCloudVertex(edge.getNeighborId(), graph);
358 if (edge.getOutgoing()) {
359 edgeSet.add(new RamCloudEdge(this, neighbor, edge.getLabel(), graph));
360 } else {
361 edgeSet.add(new RamCloudEdge(neighbor, this, edge.getLabel(), graph));
362 }
363 }
364 }
365 pm.protodeser_end("RamCloudVertex buildEdgeSetFromProtobuf()");
366 if(RamCloudGraph.measureSerializeTimeProp == 1) {
367 long endTime = System.nanoTime();
368 log.error("Performance buildEdgeSetFromProtobuf key {}, {}, size={}", this, endTime - startTime, edgeList.getSerializedSize());
369 }
370 return edgeSet;
371 }
372
373
374
375 private EdgeListProtoBuf buildProtoBufFromEdgeSet(Set<RamCloudEdge> edgeSet) {
376 PerfMon pm = PerfMon.getInstance();
377 long startTime = 0;
378 if(RamCloudGraph.measureSerializeTimeProp == 1) {
379 startTime = System.nanoTime();
380 }
381
382 pm.protoser_start("RamCloudVertex buildProtoBufFromEdgeSet()");
383
384 EdgeListProtoBuf.Builder edgeListBuilder = EdgeListProtoBuf.newBuilder();
385 EdgeProtoBuf.Builder edgeBuilder = EdgeProtoBuf.newBuilder();
386
387 for (Edge edge : edgeSet) {
388 if (edge.getVertex(Direction.OUT).equals(this) || edge.getVertex(Direction.IN).equals(this)) {
389 if (edge.getVertex(Direction.OUT).equals(edge.getVertex(Direction.IN))) {
390 edgeBuilder.setNeighborId(id);
391 edgeBuilder.setOutgoing(true);
392 edgeBuilder.setLabel(edge.getLabel());
393 edgeListBuilder.addEdge(edgeBuilder.build());
394
395 edgeBuilder.setOutgoing(false);
396 edgeListBuilder.addEdge(edgeBuilder.build());
397 } else {
398 if (edge.getVertex(Direction.OUT).equals(this)) {
399 edgeBuilder.setNeighborId((Long) edge.getVertex(Direction.IN).getId());
400 edgeBuilder.setOutgoing(true);
401 edgeBuilder.setLabel(edge.getLabel());
402 edgeListBuilder.addEdge(edgeBuilder.build());
403 } else {
404 edgeBuilder.setNeighborId((Long) edge.getVertex(Direction.OUT).getId());
405 edgeBuilder.setOutgoing(false);
406 edgeBuilder.setLabel(edge.getLabel());
407 edgeListBuilder.addEdge(edgeBuilder.build());
408 }
409 }
410 } else {
411 log.warn("{}: Tried to add an edge unowned by this vertex ({})", this, edge);
412 }
413 }
414
415 EdgeListProtoBuf buf = edgeListBuilder.build();
416 pm.protoser_end("RamCloudVertex buildProtoBufFromEdgeSet");
417 if(RamCloudGraph.measureSerializeTimeProp == 1) {
418 long endTime = System.nanoTime();
419 log.error("Performance buildProtoBufFromEdgeSet key {}, {}, size={}", this, endTime - startTime, buf.getSerializedSize());
420 }
421 return buf;
422 }
423
424 @Deprecated
425 private List<RamCloudEdge> getEdgeList() {
426 return getEdgeList(Direction.BOTH);
427 }
428
429 private List<RamCloudEdge> getEdgeList(Direction direction, String... labels) {
430
431 Versioned<EdgeListProtoBuf> cachedEdgeList = updateCachedAdjEdgeList();
432 PerfMon pm = PerfMon.getInstance();
433 pm.protodeser_start("RamCloudVertex getEdgeList()");
434
435 List<RamCloudEdge> edgeList = new ArrayList<RamCloudEdge>(cachedEdgeList.getValue().getEdgeCount());
436
437 for (EdgeProtoBuf edge : cachedEdgeList.getValue().getEdgeList()) {
438 if ((direction.equals(Direction.BOTH) || (edge.getOutgoing() ^ direction.equals(Direction.IN)))
439 && (labels.length == 0 || Arrays.asList(labels).contains(edge.getLabel()))) {
440 RamCloudVertex neighbor = new RamCloudVertex(edge.getNeighborId(), graph);
441 if (edge.getOutgoing()) {
442 edgeList.add(new RamCloudEdge(this, neighbor, edge.getLabel(), graph));
443 } else {
444 edgeList.add(new RamCloudEdge(neighbor, this, edge.getLabel(), graph));
445 }
446 }
447 }
448 pm.protodeser_end("RamCloudVertex getEdgeList()");
449
450 return edgeList;
451 }
452
453 protected boolean exists() {
454 boolean vertTableEntryExists = false;
455 boolean vertPropTableEntryExists = false;
456
457 PerfMon pm = PerfMon.getInstance();
458 JRamCloud vertTable = graph.getRcClient();
459 try {
460 pm.read_start("RamCloudVertex exists()");
461 vertTable.read(graph.vertTableId, rcKey);
462 pm.read_end("RamCloudVertex exists()");
463 vertTableEntryExists = true;
464 } catch (Exception e) {
465 // Vertex table entry does not exist
466 pm.read_end("RamCloudVertex exists()");
467 }
468
469 try {
470 pm.read_start("RamCloudVertex exists()");
471 vertTable.read(graph.vertPropTableId, rcKey);
472 pm.read_end("RamCloudVertex exists()");
473 vertPropTableEntryExists = true;
474 } catch (Exception e) {
475 // Vertex property table entry does not exist
476 pm.read_end("RamCloudVertex exists()");
477 }
478
479 if (vertTableEntryExists && vertPropTableEntryExists) {
480 return true;
481 } else if (!vertTableEntryExists && !vertPropTableEntryExists) {
482 return false;
483 } else {
484 log.warn("{}: Detected RamCloudGraph inconsistency: vertTableEntryExists={}, vertPropTableEntryExists={}.", this, vertTableEntryExists, vertPropTableEntryExists);
485 return true;
486 }
487 }
488
489 protected void create() throws IllegalArgumentException {
490 // TODO: Existence check costs extra (presently 2 reads), could use option to turn on/off
491 if (!exists()) {
492 PerfMon pm = PerfMon.getInstance();
493 JRamCloud vertTable = graph.getRcClient();
494 MultiWriteObject[] mwo = new MultiWriteObject[2];
495 mwo[0] = new MultiWriteObject(graph.vertTableId, rcKey, ByteBuffer.allocate(0).array(), null);
496 mwo[1] = new MultiWriteObject(graph.vertPropTableId, rcKey, ByteBuffer.allocate(0).array(), null);
497 pm.multiwrite_start("RamCloudVertex create()");
498 vertTable.multiWrite(mwo);
499 pm.multiwrite_end("RamCloudVertex create()");
500 } else {
501 throw ExceptionFactory.vertexWithIdAlreadyExists(id);
502 }
503 }
504
505 public void debugPrintEdgeList() {
506 List<RamCloudEdge> edgeList = getEdgeList();
507
508 log.debug("{}: Debug Printing Edge List...", this);
509 for (RamCloudEdge edge : edgeList) {
510 System.out.println(edge.toString());
511 }
512 }
513}