blob: 9597f7e51263c9c8b9bafd8527981de99d753911 [file] [log] [blame]
yoshi28bac132014-01-22 11:00:17 -08001package com.tinkerpop.blueprints.impls.ramcloud;
2
3import java.io.Serializable;
4import java.nio.ByteBuffer;
5import java.nio.ByteOrder;
6import java.util.ArrayList;
7import java.util.Arrays;
8import java.util.HashMap;
9import java.util.HashSet;
10import java.util.LinkedList;
11import java.util.List;
12import java.util.Map;
13import java.util.Map.Entry;
14import java.util.Set;
15
16import org.slf4j.Logger;
17import org.slf4j.LoggerFactory;
18
19import com.google.protobuf.InvalidProtocolBufferException;
20import com.tinkerpop.blueprints.Direction;
21import com.tinkerpop.blueprints.Edge;
22import com.tinkerpop.blueprints.Vertex;
23import com.tinkerpop.blueprints.VertexQuery;
24import com.tinkerpop.blueprints.impls.ramcloud.RamCloudGraphProtos.EdgeListProtoBuf;
25import com.tinkerpop.blueprints.impls.ramcloud.RamCloudGraphProtos.EdgeProtoBuf;
26import com.tinkerpop.blueprints.util.DefaultVertexQuery;
27import com.tinkerpop.blueprints.util.ExceptionFactory;
28import com.tinkerpop.blueprints.impls.ramcloud.PerfMon;
29
30import edu.stanford.ramcloud.JRamCloud;
31import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
32import edu.stanford.ramcloud.JRamCloud.RejectRules;
33import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
34
35public class RamCloudVertex extends RamCloudElement implements Vertex, Serializable {
36
37 private final static Logger log = LoggerFactory.getLogger(RamCloudGraph.class);
38 private static final long serialVersionUID = 7526472295622776147L;
39 protected long id;
40 protected byte[] rcKey;
41 private RamCloudGraph graph;
42
43 private Versioned<EdgeListProtoBuf> cachedAdjEdgeList;
44
45 public RamCloudVertex(long id, RamCloudGraph graph) {
46 super(idToRcKey(id), graph.vertPropTableId, graph);
47
48 this.id = id;
49 this.rcKey = idToRcKey(id);
50 this.graph = graph;
51 this.cachedAdjEdgeList = null;
52 }
53
54 public RamCloudVertex(byte[] rcKey, RamCloudGraph graph) {
55 super(rcKey, graph.vertPropTableId, graph);
56
57 this.id = rcKeyToId(rcKey);
58 this.rcKey = rcKey;
59 this.graph = graph;
60 this.cachedAdjEdgeList = null;
61 }
62
63
64 /*
65 * Vertex interface implementation
66 */
67 @Override
68 public Edge addEdge(String label, Vertex inVertex) {
69 return graph.addEdge(null, this, inVertex, label);
70 }
71
72 @Override
73 public Iterable<Edge> getEdges(Direction direction, String... labels) {
74 return new ArrayList<Edge>(getEdgeList(direction, labels));
75 }
76
77 @Override
78 public Iterable<Vertex> getVertices(Direction direction, String... labels) {
79 List<RamCloudEdge> edges = getEdgeList(direction, labels);
80 List<Vertex> neighbors = new LinkedList<Vertex>();
81 for (RamCloudEdge edge : edges) {
82 neighbors.add(edge.getNeighbor(this));
83 }
84 return neighbors;
85 }
86
87 @Override
88 public VertexQuery query() {
89 return new DefaultVertexQuery(this);
90 }
91
92 /*
93 * RamCloudElement overridden methods
94 */
95 @Override
96 public Object getId() {
97 return id;
98 }
99
100 @Override
101 public void remove() {
102 Set<RamCloudEdge> edges = getEdgeSet();
103
104 // neighbor vertex -> List of Edges to remove
105 Map<RamCloudVertex, List<RamCloudEdge>> vertexToEdgesMap = new HashMap<RamCloudVertex, List<RamCloudEdge>>( edges.size() );
106
107 // Batch edges together by neighbor vertex
108 for (RamCloudEdge edge : edges) {
109 RamCloudVertex neighbor = (RamCloudVertex) edge.getNeighbor(this);
110 List<RamCloudEdge> edgeList = vertexToEdgesMap.get(neighbor);
111
112 if (edgeList == null) {
113 edgeList = new LinkedList<RamCloudEdge>();
114 }
115
116 edgeList.add(edge);
117 vertexToEdgesMap.put(neighbor, edgeList);
118 }
119
120 // Remove batches of edges at a time by neighbor vertex
121 for (Entry<RamCloudVertex, List<RamCloudEdge>> entry : vertexToEdgesMap.entrySet()) {
122 // Skip over loopback edges to ourself
123 if (!entry.getKey().equals(this)) {
124 entry.getKey().removeEdgesFromAdjList(entry.getValue());
125 }
126
127 // Remove this batch of edges from the edge property table
128 for (RamCloudEdge edge : entry.getValue()) {
129 edge.removeProperties();
130 }
131 }
132
133 Map<String,Object> props = this.getPropertyMap();
134 for( Map.Entry<String,Object> entry : props.entrySet() ) {
135 if ( !graph.indexedKeys.contains(entry.getKey() ) ) continue;
136 RamCloudKeyIndex keyIndex = new RamCloudKeyIndex(graph.kidxVertTableId, entry.getKey(), entry.getValue(), graph, Vertex.class);
137 keyIndex.remove(entry.getKey(), entry.getValue(), this);
138 }
139
140 // Remove ourselves entirely from the vertex table
141 graph.getRcClient().remove(graph.vertTableId, rcKey);
142
143 super.remove();
144 }
145
146 /*
147 * Object overridden methods
148 */
149 @Override
150 public boolean equals(Object obj) {
151 if (this == obj) {
152 return true;
153 }
154 if (obj == null) {
155 return false;
156 }
157 if (getClass() != obj.getClass()) {
158 return false;
159 }
160 RamCloudVertex other = (RamCloudVertex) obj;
161 return (id == other.id);
162 }
163
164 @Override
165 public int hashCode() {
166 return Long.valueOf(id).hashCode();
167 }
168
169 @Override
170 public String toString() {
171 return "RamCloudVertex [id=" + id + "]";
172 }
173
174 /*
175 * RamCloudVertex specific methods
176 */
177 private static byte[] idToRcKey(long id) {
178 return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(id).array();
179 }
180
181 private static long rcKeyToId(byte[] rcKey) {
182 return ByteBuffer.wrap(rcKey).order(ByteOrder.LITTLE_ENDIAN).getLong();
183 }
184
185 boolean addEdgeToAdjList(RamCloudEdge edge) {
186 List<RamCloudEdge> edgesToAdd = new ArrayList<RamCloudEdge>(1);
187 edgesToAdd.add(edge);
188 return addEdgesToAdjList(edgesToAdd);
189 }
190 boolean removeEdgeFromAdjList(RamCloudEdge edge) {
191 List<RamCloudEdge> edgesToRemove = new ArrayList<RamCloudEdge>(1);
192 edgesToRemove.add(edge);
193 return removeEdgesFromAdjList(edgesToRemove);
194 }
195
196 private boolean addEdgesToAdjList(List<RamCloudEdge> edgesToAdd) {
197 return updateEdgeAdjList(edgesToAdd, true);
198 }
199 private boolean removeEdgesFromAdjList(List<RamCloudEdge> edgesToAdd) {
200 return updateEdgeAdjList(edgesToAdd, false);
201 }
202
203 /** Conditionally update Adj. Edge List
204 * @return true if EdgeAdjList was logically modified.(Cache update does not imply true return)
205 */
206 private boolean updateEdgeAdjList(List<RamCloudEdge> edgesToModify, boolean add) {
207 PerfMon pm = PerfMon.getInstance();
208 JRamCloud rcClient = graph.getRcClient();
209 final int MAX_RETRIES = 100;
210 for (int retry = 1 ; retry <= MAX_RETRIES ; ++retry ) {
211 Set<RamCloudEdge> edges;
212 long expected_version = 0L;
213 if ( this.cachedAdjEdgeList == null ) {
214 edges = new HashSet<RamCloudEdge>();
215 } else {
216 expected_version = this.cachedAdjEdgeList.getVersion();
217 if ( expected_version == 0L && add == false ) {
218 updateCachedAdjEdgeList();
219 expected_version = this.cachedAdjEdgeList.getVersion();
220 }
221 edges = buildEdgeSetFromProtobuf(this.cachedAdjEdgeList.getValue(), Direction.BOTH);
222 }
223 if ( expected_version == 0L && add == false ) {
224 updateCachedAdjEdgeList();
225 expected_version = this.cachedAdjEdgeList.getVersion();
226 edges = buildEdgeSetFromProtobuf(this.cachedAdjEdgeList.getValue(), Direction.BOTH);
227 }
228 //log.debug( (add?"Adding":"Removing") + " edges to: {"+ edges+ "}");
229
230 try {
231 if ( add ) {
232 if (edges.addAll(edgesToModify) == false) {
233 log.warn("{}: There aren't any changes to edges ({})", this, edgesToModify);
234 return false;
235 }
236 } else {
237 if (edges.removeAll(edgesToModify) == false) {
238 log.warn("{}: There aren't any changes to edges ({})", this, edgesToModify);
239 return false;
240 }
241 }
242
243 EdgeListProtoBuf edgeList = buildProtoBufFromEdgeSet(edges);
244 JRamCloud.RejectRules rules = rcClient.new RejectRules();
245 if ( expected_version == 0L ) {
246 rules.setExists();
247 } else {
248 rules.setNeVersion(expected_version);
249 }
250 pm.write_start("RAMCloudVertex updateEdgeAdjList()");
251 long updated_version = rcClient.writeRule(graph.vertTableId, rcKey, edgeList.toByteArray(), rules);
252 pm.write_end("RAMCloudVertex updateEdgeAdjList()");
253 this.cachedAdjEdgeList.setValue(edgeList, updated_version);
254 return true;
255 } catch (UnsupportedOperationException e) {
256 pm.write_end("RAMCloudVertex updateEdgeAdjList()");
257 pm.write_condfail("RAMCloudVertex updateEdgeAdjList()");
258 log.error("{" + toString() + "}: Failed to modify a set of edges ({" + edgesToModify + "}): ", e);
259 return false;
260 } catch (ClassCastException e) {
261 pm.write_end("RAMCloudVertex updateEdgeAdjList()");
262 pm.write_condfail("RAMCloudVertex updateEdgeAdjList()");
263 log.error("{" + toString() + "}: Failed to modify a set of edges ({" + edgesToModify + "}): ", e);
264 return false;
265 } catch (NullPointerException e) {
266 pm.write_end("RAMCloudVertex updateEdgeAdjList()");
267 pm.write_condfail("RAMCloudVertex updateEdgeAdjList()");
268 log.error("{" + toString() + "}: Failed to modify a set of edges ({" + edgesToModify + "}): ", e);
269 return false;
270 } catch (Exception e) {
271 pm.write_end("RAMCloudVertex updateEdgeAdjList()");
272 pm.write_condfail("RAMCloudVertex updateEdgeAdjList()");
273 // FIXME Workaround for native method exception declaration bug
274 if ( e instanceof WrongVersionException ) {
275 log.debug("Conditional Updating EdgeList failed for {} RETRYING {}", this, retry);
276 //log.debug("Conditional Updating EdgeList failed for {} modifing {} RETRYING [{}]", this, edgesToModify, retry);
277 updateCachedAdjEdgeList();
278 } else {
279 log.debug("Cond. Write to modify adj edge list failed, exception thrown", e);
280 updateCachedAdjEdgeList();
281 }
282 }
283 }
284 log.error("Conditional Updating EdgeList failed for {} gave up RETRYING", this);
285 return false;
286 }
287
288 /** Get all adj.edge list
289 * Method is exposed to package namespace to do Vertex removal efficiently;
290 */
291 Set<RamCloudEdge> getEdgeSet() {
292 return getVersionedEdgeSet(Direction.BOTH).getValue();
293 }
294
295 private Versioned<EdgeListProtoBuf> updateCachedAdjEdgeList() {
296 JRamCloud.Object vertTableEntry;
297 EdgeListProtoBuf edgeList;
298
299 PerfMon pm = PerfMon.getInstance();
300 try {
301 JRamCloud vertTable = graph.getRcClient();
302 pm.read_start("RamCloudVertex updateCachedAdjEdgeList()");
303 vertTableEntry = vertTable.read(graph.vertTableId, rcKey);
304 pm.read_end("RamCloudVertex updateCachedAdjEdgeList()");
305 } catch (Exception e) {
306 pm.read_end("RamCloudVertex updateCachedAdjEdgeList()");
307 log.error("{" + toString() + "}: Error reading vertex table entry: ", e);
308 return null;
309 }
310
311 try {
312 pm.protodeser_start("RamCloudVertex updateCachedAdjEdgeList()");
313 edgeList = EdgeListProtoBuf.parseFrom(vertTableEntry.value);
314 Versioned<EdgeListProtoBuf> updatedEdgeList = new Versioned<EdgeListProtoBuf>(edgeList, vertTableEntry.version);
315 this.cachedAdjEdgeList = updatedEdgeList;
316 pm.protodeser_end("RamCloudVertex updateCachedAdjEdgeList()");
317 return updatedEdgeList;
318 } catch (InvalidProtocolBufferException e) {
319 pm.protodeser_end("RamCloudVertex updateCachedAdjEdgeList()");
320 log.error("{" + toString() + "}: Read malformed edge list: ", e);
321 return null;
322 }
323 }
324
325 private Versioned<Set<RamCloudEdge>> getVersionedEdgeSet(Direction direction, String... labels) {
326 Versioned<EdgeListProtoBuf> cachedEdgeList = updateCachedAdjEdgeList();
327 return new Versioned<Set<RamCloudEdge>>(buildEdgeSetFromProtobuf(cachedEdgeList.getValue(), direction, labels), cachedEdgeList.getVersion() );
328 }
329
330 private Set<RamCloudEdge> buildEdgeSetFromProtobuf(EdgeListProtoBuf edgeList,
331 Direction direction, String... labels) {
332 PerfMon pm = PerfMon.getInstance();
333 long startTime = 0;
334 if(RamCloudGraph.measureSerializeTimeProp == 1) {
335 startTime = System.nanoTime();
336 }
337 pm.protodeser_start("RamCloudVertex buildEdgeSetFromProtobuf()");
338 Set<RamCloudEdge> edgeSet = new HashSet<RamCloudEdge>( edgeList.getEdgeCount() );
339 for (EdgeProtoBuf edge : edgeList.getEdgeList()) {
340 if ((direction.equals(Direction.BOTH) || (edge.getOutgoing() ^ direction.equals(Direction.IN)))
341 && (labels.length == 0 || Arrays.asList(labels).contains(edge.getLabel()))) {
342 RamCloudVertex neighbor = new RamCloudVertex(edge.getNeighborId(), graph);
343 if (edge.getOutgoing()) {
344 edgeSet.add(new RamCloudEdge(this, neighbor, edge.getLabel(), graph));
345 } else {
346 edgeSet.add(new RamCloudEdge(neighbor, this, edge.getLabel(), graph));
347 }
348 }
349 }
350 pm.protodeser_end("RamCloudVertex buildEdgeSetFromProtobuf()");
351 if(RamCloudGraph.measureSerializeTimeProp == 1) {
352 long endTime = System.nanoTime();
353 log.error("Performance buildEdgeSetFromProtobuf key {}, {}, size={}", this, endTime - startTime, edgeList.getSerializedSize());
354 }
355 return edgeSet;
356 }
357
358
359
360 private EdgeListProtoBuf buildProtoBufFromEdgeSet(Set<RamCloudEdge> edgeSet) {
361 PerfMon pm = PerfMon.getInstance();
362 long startTime = 0;
363 if(RamCloudGraph.measureSerializeTimeProp == 1) {
364 startTime = System.nanoTime();
365 }
366
367 pm.protoser_start("RamCloudVertex buildProtoBufFromEdgeSet()");
368
369 EdgeListProtoBuf.Builder edgeListBuilder = EdgeListProtoBuf.newBuilder();
370 EdgeProtoBuf.Builder edgeBuilder = EdgeProtoBuf.newBuilder();
371
372 for (Edge edge : edgeSet) {
373 if (edge.getVertex(Direction.OUT).equals(this) || edge.getVertex(Direction.IN).equals(this)) {
374 if (edge.getVertex(Direction.OUT).equals(edge.getVertex(Direction.IN))) {
375 edgeBuilder.setNeighborId(id);
376 edgeBuilder.setOutgoing(true);
377 edgeBuilder.setLabel(edge.getLabel());
378 edgeListBuilder.addEdge(edgeBuilder.build());
379
380 edgeBuilder.setOutgoing(false);
381 edgeListBuilder.addEdge(edgeBuilder.build());
382 } else {
383 if (edge.getVertex(Direction.OUT).equals(this)) {
384 edgeBuilder.setNeighborId((Long) edge.getVertex(Direction.IN).getId());
385 edgeBuilder.setOutgoing(true);
386 edgeBuilder.setLabel(edge.getLabel());
387 edgeListBuilder.addEdge(edgeBuilder.build());
388 } else {
389 edgeBuilder.setNeighborId((Long) edge.getVertex(Direction.OUT).getId());
390 edgeBuilder.setOutgoing(false);
391 edgeBuilder.setLabel(edge.getLabel());
392 edgeListBuilder.addEdge(edgeBuilder.build());
393 }
394 }
395 } else {
396 log.warn("{}: Tried to add an edge unowned by this vertex ({})", this, edge);
397 }
398 }
399
400 EdgeListProtoBuf buf = edgeListBuilder.build();
401 pm.protoser_end("RamCloudVertex buildProtoBufFromEdgeSet");
402 if(RamCloudGraph.measureSerializeTimeProp == 1) {
403 long endTime = System.nanoTime();
404 log.error("Performance buildProtoBufFromEdgeSet key {}, {}, size={}", this, endTime - startTime, buf.getSerializedSize());
405 }
406 return buf;
407 }
408
409 @Deprecated
410 private List<RamCloudEdge> getEdgeList() {
411 return getEdgeList(Direction.BOTH);
412 }
413
414 private List<RamCloudEdge> getEdgeList(Direction direction, String... labels) {
415
416 Versioned<EdgeListProtoBuf> cachedEdgeList = updateCachedAdjEdgeList();
417 PerfMon pm = PerfMon.getInstance();
418 pm.protodeser_start("RamCloudVertex getEdgeList()");
419
420 List<RamCloudEdge> edgeList = new ArrayList<RamCloudEdge>(cachedEdgeList.getValue().getEdgeCount());
421
422 for (EdgeProtoBuf edge : cachedEdgeList.getValue().getEdgeList()) {
423 if ((direction.equals(Direction.BOTH) || (edge.getOutgoing() ^ direction.equals(Direction.IN)))
424 && (labels.length == 0 || Arrays.asList(labels).contains(edge.getLabel()))) {
425 RamCloudVertex neighbor = new RamCloudVertex(edge.getNeighborId(), graph);
426 if (edge.getOutgoing()) {
427 edgeList.add(new RamCloudEdge(this, neighbor, edge.getLabel(), graph));
428 } else {
429 edgeList.add(new RamCloudEdge(neighbor, this, edge.getLabel(), graph));
430 }
431 }
432 }
433 pm.protodeser_end("RamCloudVertex getEdgeList()");
434
435 return edgeList;
436 }
437
438 protected boolean exists() {
439 boolean vertTableEntryExists = false;
440 boolean vertPropTableEntryExists = false;
441
442 PerfMon pm = PerfMon.getInstance();
443 JRamCloud vertTable = graph.getRcClient();
444 try {
445 pm.read_start("RamCloudVertex exists()");
446 vertTable.read(graph.vertTableId, rcKey);
447 pm.read_end("RamCloudVertex exists()");
448 vertTableEntryExists = true;
449 } catch (Exception e) {
450 // Vertex table entry does not exist
451 pm.read_end("RamCloudVertex exists()");
452 }
453
454 try {
455 pm.read_start("RamCloudVertex exists()");
456 vertTable.read(graph.vertPropTableId, rcKey);
457 pm.read_end("RamCloudVertex exists()");
458 vertPropTableEntryExists = true;
459 } catch (Exception e) {
460 // Vertex property table entry does not exist
461 pm.read_end("RamCloudVertex exists()");
462 }
463
464 if (vertTableEntryExists && vertPropTableEntryExists) {
465 return true;
466 } else if (!vertTableEntryExists && !vertPropTableEntryExists) {
467 return false;
468 } else {
469 log.warn("{}: Detected RamCloudGraph inconsistency: vertTableEntryExists={}, vertPropTableEntryExists={}.", this, vertTableEntryExists, vertPropTableEntryExists);
470 return true;
471 }
472 }
473
474 protected void create() throws IllegalArgumentException {
475 // TODO: Existence check costs extra (presently 2 reads), could use option to turn on/off
476 if (!exists()) {
477 PerfMon pm = PerfMon.getInstance();
478 JRamCloud vertTable = graph.getRcClient();
479 MultiWriteObject[] mwo = new MultiWriteObject[2];
480 mwo[0] = new MultiWriteObject(graph.vertTableId, rcKey, ByteBuffer.allocate(0).array(), null);
481 mwo[1] = new MultiWriteObject(graph.vertPropTableId, rcKey, ByteBuffer.allocate(0).array(), null);
482 pm.multiwrite_start("RamCloudVertex create()");
483 vertTable.multiWrite(mwo);
484 pm.multiwrite_end("RamCloudVertex create()");
485 } else {
486 throw ExceptionFactory.vertexWithIdAlreadyExists(id);
487 }
488 }
489
490 public void debugPrintEdgeList() {
491 List<RamCloudEdge> edgeList = getEdgeList();
492
493 log.debug("{}: Debug Printing Edge List...", this);
494 for (RamCloudEdge edge : edgeList) {
495 System.out.println(edge.toString());
496 }
497 }
498}