Yuta HIGUCHI | a1e655a | 2014-01-23 17:43:11 -0800 | [diff] [blame] | 1 | /* Copyright (c) 2013 Stanford University |
| 2 | * |
| 3 | * Permission to use, copy, modify, and distribute this software for any |
| 4 | * purpose with or without fee is hereby granted, provided that the above |
| 5 | * copyright notice and this permission notice appear in all copies. |
| 6 | * |
| 7 | * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES |
| 8 | * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF |
| 9 | * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR |
| 10 | * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES |
| 11 | * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN |
| 12 | * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF |
| 13 | * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
| 14 | */ |
| 15 | |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 16 | package com.tinkerpop.blueprints.impls.ramcloud; |
| 17 | |
| 18 | import java.io.ByteArrayInputStream; |
| 19 | import java.io.ByteArrayOutputStream; |
| 20 | import java.io.IOException; |
| 21 | import java.io.ObjectInputStream; |
| 22 | import java.io.ObjectOutputStream; |
| 23 | import java.io.Serializable; |
| 24 | import java.nio.BufferUnderflowException; |
| 25 | import java.nio.ByteBuffer; |
| 26 | import java.nio.ByteOrder; |
| 27 | import java.util.ArrayList; |
| 28 | import java.util.Arrays; |
| 29 | import java.util.Collections; |
| 30 | import java.util.HashMap; |
| 31 | import java.util.HashSet; |
| 32 | import java.util.LinkedList; |
| 33 | import java.util.List; |
| 34 | import java.util.Map; |
| 35 | import java.util.NoSuchElementException; |
| 36 | import java.util.Set; |
| 37 | import java.util.concurrent.atomic.AtomicLong; |
| 38 | |
| 39 | import org.slf4j.Logger; |
| 40 | import org.slf4j.LoggerFactory; |
| 41 | |
| 42 | import com.sun.jersey.core.util.Base64; |
| 43 | import com.tinkerpop.blueprints.Direction; |
| 44 | import com.tinkerpop.blueprints.Edge; |
| 45 | import com.tinkerpop.blueprints.Element; |
| 46 | import com.tinkerpop.blueprints.Features; |
| 47 | import com.tinkerpop.blueprints.GraphQuery; |
| 48 | import com.tinkerpop.blueprints.Index; |
| 49 | import com.tinkerpop.blueprints.IndexableGraph; |
| 50 | import com.tinkerpop.blueprints.KeyIndexableGraph; |
| 51 | import com.tinkerpop.blueprints.Parameter; |
| 52 | import com.tinkerpop.blueprints.TransactionalGraph; |
| 53 | import com.tinkerpop.blueprints.Vertex; |
| 54 | import com.tinkerpop.blueprints.util.DefaultGraphQuery; |
| 55 | import com.tinkerpop.blueprints.util.ExceptionFactory; |
| 56 | import com.tinkerpop.blueprints.impls.ramcloud.PerfMon; |
| 57 | |
| 58 | import edu.stanford.ramcloud.JRamCloud; |
| 59 | import edu.stanford.ramcloud.JRamCloud.MultiWriteObject; |
| 60 | |
| 61 | public class RamCloudGraph implements IndexableGraph, KeyIndexableGraph, TransactionalGraph, Serializable { |
| 62 | private final static Logger log = LoggerFactory.getLogger(RamCloudGraph.class); |
| 63 | |
| 64 | private static final ThreadLocal<JRamCloud> RamCloudThreadLocal = new ThreadLocal<JRamCloud>(); |
| 65 | |
| 66 | protected long vertTableId; //(vertex_id) --> ( (n,d,ll,l), (n,d,ll,l), ... ) |
| 67 | protected long vertPropTableId; //(vertex_id) -> ( (kl,k,vl,v), (kl,k,vl,v), ... ) |
| 68 | protected long edgePropTableId; //(edge_id) -> ( (kl,k,vl,v), (kl,k,vl,v), ... ) |
| 69 | protected long idxVertTableId; |
| 70 | protected long idxEdgeTableId; |
| 71 | protected long kidxVertTableId; |
| 72 | protected long kidxEdgeTableId; |
| 73 | protected long instanceTableId; |
| 74 | private String VERT_TABLE_NAME = "verts"; |
| 75 | private String EDGE_PROP_TABLE_NAME = "edge_props"; |
| 76 | private String VERT_PROP_TABLE_NAME = "vert_props"; |
| 77 | private String IDX_VERT_TABLE_NAME = "idx_vert"; |
| 78 | private String IDX_EDGE_TABLE_NAME = "idx_edge"; |
| 79 | private String KIDX_VERT_TABLE_NAME = "kidx_vert"; |
| 80 | private String KIDX_EDGE_TABLE_NAME = "kidx_edge"; |
| 81 | private final String INSTANCE_TABLE_NAME = "instance"; |
| 82 | private long instanceId; |
| 83 | private AtomicLong nextVertexId; |
| 84 | private final long INSTANCE_ID_RANGE = 100000; |
| 85 | private String coordinatorLocation; |
| 86 | private static final Features FEATURES = new Features(); |
Yoshi Muroi | 815c7f9 | 2014-01-30 18:06:16 -0800 | [diff] [blame] | 87 | // FIXME better loop variable name |
| 88 | public final int CONDITIONALWRITE_RETRY_MAX = 100; |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 89 | public final long measureBPTimeProp = Long.valueOf(System.getProperty("benchmark.measureBP", "0")); |
| 90 | public final long measureRcTimeProp = Long.valueOf(System.getProperty("benchmark.measureRc", "0")); |
| 91 | public static final long measureSerializeTimeProp = Long.valueOf(System.getProperty("benchmark.measureSerializeTimeProp", "0")); |
| 92 | |
| 93 | |
| 94 | public final Set<String> indexedKeys = new HashSet<String>(); |
| 95 | |
| 96 | static { |
| 97 | FEATURES.supportsSerializableObjectProperty = true; |
| 98 | FEATURES.supportsBooleanProperty = true; |
| 99 | FEATURES.supportsDoubleProperty = true; |
| 100 | FEATURES.supportsFloatProperty = true; |
| 101 | FEATURES.supportsIntegerProperty = true; |
| 102 | FEATURES.supportsPrimitiveArrayProperty = true; |
| 103 | FEATURES.supportsUniformListProperty = true; |
| 104 | FEATURES.supportsMixedListProperty = true; |
| 105 | FEATURES.supportsLongProperty = true; |
| 106 | FEATURES.supportsMapProperty = true; |
| 107 | FEATURES.supportsStringProperty = true; |
| 108 | |
| 109 | FEATURES.supportsDuplicateEdges = false; |
| 110 | FEATURES.supportsSelfLoops = false; |
| 111 | FEATURES.isPersistent = false; |
| 112 | FEATURES.isWrapper = false; |
| 113 | FEATURES.supportsVertexIteration = true; |
| 114 | FEATURES.supportsEdgeIteration = true; |
| 115 | FEATURES.supportsVertexIndex = true; |
| 116 | FEATURES.supportsEdgeIndex = false; |
| 117 | FEATURES.ignoresSuppliedIds = true; |
| 118 | FEATURES.supportsTransactions = false; |
| 119 | FEATURES.supportsIndices = false; |
| 120 | FEATURES.supportsKeyIndices = true; |
| 121 | FEATURES.supportsVertexKeyIndex = true; |
| 122 | FEATURES.supportsEdgeKeyIndex = false; |
| 123 | FEATURES.supportsEdgeRetrieval = true; |
| 124 | FEATURES.supportsVertexProperties = true; |
| 125 | FEATURES.supportsEdgeProperties = true; |
| 126 | FEATURES.supportsThreadedTransactions = false; |
| 127 | } |
| 128 | |
| 129 | static { |
| 130 | System.loadLibrary("edu_stanford_ramcloud_JRamCloud"); |
| 131 | } |
| 132 | |
| 133 | private RamCloudGraph() { |
| 134 | } |
| 135 | |
| 136 | |
| 137 | public RamCloudGraph(String coordinatorLocation) { |
| 138 | this.coordinatorLocation = coordinatorLocation; |
| 139 | |
| 140 | JRamCloud rcClient = getRcClient(); |
| 141 | |
| 142 | vertTableId = rcClient.createTable(VERT_TABLE_NAME); |
| 143 | vertPropTableId = rcClient.createTable(VERT_PROP_TABLE_NAME); |
| 144 | edgePropTableId = rcClient.createTable(EDGE_PROP_TABLE_NAME); |
| 145 | idxVertTableId = rcClient.createTable(IDX_VERT_TABLE_NAME); |
| 146 | idxEdgeTableId = rcClient.createTable(IDX_EDGE_TABLE_NAME); |
| 147 | kidxVertTableId = rcClient.createTable(KIDX_VERT_TABLE_NAME); |
| 148 | kidxEdgeTableId = rcClient.createTable(KIDX_EDGE_TABLE_NAME); |
| 149 | instanceTableId = rcClient.createTable(INSTANCE_TABLE_NAME); |
| 150 | |
| 151 | log.info( "Connected to coordinator at {}", coordinatorLocation); |
| 152 | log.debug("VERT_TABLE:{}, VERT_PROP_TABLE:{}, EDGE_PROP_TABLE:{}, IDX_VERT_TABLE:{}, IDX_EDGE_TABLE:{}, KIDX_VERT_TABLE:{}, KIDX_EDGE_TABLE:{}", vertTableId, vertPropTableId, edgePropTableId, idxVertTableId, idxEdgeTableId, kidxVertTableId, kidxEdgeTableId); |
| 153 | nextVertexId = new AtomicLong(-1); |
| 154 | initInstance(); |
| 155 | } |
| 156 | |
| 157 | public JRamCloud getRcClient() { |
| 158 | JRamCloud rcClient = RamCloudThreadLocal.get(); |
| 159 | if (rcClient == null) { |
| 160 | rcClient = new JRamCloud(coordinatorLocation); |
| 161 | RamCloudThreadLocal.set(rcClient); |
| 162 | } |
| 163 | return rcClient; |
| 164 | } |
| 165 | |
| 166 | @Override |
| 167 | public Features getFeatures() { |
| 168 | return FEATURES; |
| 169 | } |
| 170 | |
| 171 | private Long parseVertexId(Object id) { |
| 172 | Long longId; |
| 173 | if (id == null) { |
| 174 | longId = nextVertexId.incrementAndGet(); |
| 175 | } else if (id instanceof Integer) { |
| 176 | longId = ((Integer) id).longValue(); |
| 177 | } else if (id instanceof Long) { |
| 178 | longId = (Long) id; |
| 179 | } else if (id instanceof String) { |
| 180 | try { |
| 181 | longId = Long.parseLong((String) id, 10); |
| 182 | } catch (NumberFormatException e) { |
| 183 | log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e); |
| 184 | return null; |
| 185 | } |
| 186 | } else if (id instanceof byte[]) { |
| 187 | try { |
| 188 | longId = ByteBuffer.wrap((byte[]) id).getLong(); |
| 189 | } catch (BufferUnderflowException e) { |
| 190 | log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e); |
| 191 | return null; |
| 192 | } |
| 193 | } else { |
| 194 | log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass()); |
| 195 | return null; |
| 196 | } |
| 197 | return longId; |
| 198 | } |
| 199 | |
| 200 | @Override |
| 201 | public Vertex addVertex(Object id) { |
| 202 | long startTime = 0; |
| 203 | long Tstamp1 = 0; |
| 204 | long Tstamp2 = 0; |
| 205 | |
| 206 | if (measureBPTimeProp == 1) { |
| 207 | startTime = System.nanoTime(); |
| 208 | } |
| 209 | Long longId = parseVertexId(id); |
| 210 | if (longId == null) |
| 211 | return null; |
| 212 | if (measureBPTimeProp == 1) { |
| 213 | Tstamp1 = System.nanoTime(); |
| 214 | } |
| 215 | RamCloudVertex newVertex = new RamCloudVertex(longId, this); |
| 216 | if (measureBPTimeProp == 1) { |
| 217 | Tstamp2 = System.nanoTime(); |
| 218 | log.error("Performance addVertex [id={}] : Calling create at {}", longId, Tstamp2); |
| 219 | } |
| 220 | |
| 221 | try { |
| 222 | newVertex.create(); |
| 223 | if (measureBPTimeProp == 1) { |
| 224 | long endTime = System.nanoTime(); |
| 225 | log.error("Performance addVertex [id={}] : genid {} newVerex {} create {} total time {}", longId, Tstamp1 - startTime, Tstamp2 - Tstamp1, endTime - Tstamp2, endTime - startTime); |
| 226 | } |
| 227 | log.info("Added vertex: [id={}]", longId); |
| 228 | return newVertex; |
| 229 | } catch (IllegalArgumentException e) { |
| 230 | log.error("Tried to create vertex failed {" + newVertex + "}", e); |
| 231 | return null; |
| 232 | } |
| 233 | } |
| 234 | |
| 235 | public List<RamCloudVertex> addVertices(Iterable<Object> ids) { |
| 236 | log.info("addVertices start"); |
| 237 | List<RamCloudVertex> vertices = new LinkedList<RamCloudVertex>(); |
| 238 | |
| 239 | for (Object id: ids) { |
| 240 | Long longId = parseVertexId(id); |
| 241 | if (longId == null) |
| 242 | return null; |
| 243 | RamCloudVertex v = new RamCloudVertex(longId, this); |
| 244 | if (v.exists()) { |
| 245 | log.error("ramcloud vertex id: {} already exists", v.getId()); |
| 246 | throw ExceptionFactory.vertexWithIdAlreadyExists(v.getId()); |
| 247 | } |
| 248 | vertices.add(v); |
| 249 | } |
| 250 | MultiWriteObject multiWriteObjects[] = new MultiWriteObject[vertices.size() * 2]; |
| 251 | for (int i=0; i < vertices.size(); i++) { |
| 252 | RamCloudVertex v = vertices.get(i); |
| 253 | multiWriteObjects[i*2] = new MultiWriteObject(vertTableId, v.rcKey, ByteBuffer.allocate(0).array(), null); |
| 254 | multiWriteObjects[i*2+1] = new MultiWriteObject(vertPropTableId, v.rcKey, ByteBuffer.allocate(0).array(), null); |
| 255 | } |
| 256 | try { |
| 257 | PerfMon pm = PerfMon.getInstance(); |
| 258 | pm.multiwrite_start("RamCloudVertex create()"); |
| 259 | getRcClient().multiWrite(multiWriteObjects); |
| 260 | pm.multiwrite_end("RamCloudVertex create()"); |
| 261 | log.info("ramcloud vertices are created"); |
| 262 | } catch (Exception e) { |
| 263 | log.error("Tried to create vertices failed {}", e); |
| 264 | return null; |
| 265 | } |
| 266 | log.info("addVertices end (success)"); |
| 267 | return vertices; |
| 268 | } |
| 269 | |
| 270 | private final void initInstance() { |
| 271 | //long incrementValue = 1; |
| 272 | JRamCloud.Object instanceEntry = null; |
| 273 | JRamCloud rcClient = getRcClient(); |
| 274 | try { |
| 275 | instanceEntry = rcClient.read(instanceTableId, "nextInstanceId".getBytes()); |
| 276 | } catch (Exception e) { |
| 277 | if (e instanceof JRamCloud.ObjectDoesntExistException) { |
| 278 | instanceId = 0; |
| 279 | rcClient.write(instanceTableId, "nextInstanceId".getBytes(), ByteBuffer.allocate(0).array()); |
| 280 | } |
| 281 | } |
| 282 | if (instanceEntry != null) { |
| 283 | long curInstanceId = 1; |
Yoshi Muroi | 815c7f9 | 2014-01-30 18:06:16 -0800 | [diff] [blame] | 284 | for (int i = 0 ; i < CONDITIONALWRITE_RETRY_MAX ; i++) { |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 285 | Map<String, Long> propMap = null; |
| 286 | if (instanceEntry.value == null) { |
| 287 | log.warn("Got a null byteArray argument"); |
| 288 | return; |
| 289 | } else if (instanceEntry.value.length != 0) { |
| 290 | try { |
| 291 | ByteArrayInputStream bais = new ByteArrayInputStream(instanceEntry.value); |
| 292 | ObjectInputStream ois = new ObjectInputStream(bais); |
| 293 | propMap = (Map<String, Long>) ois.readObject(); |
| 294 | } catch (IOException e) { |
| 295 | log.error("Got an exception while deserializing element's property map: ", e); |
| 296 | return; |
| 297 | } catch (ClassNotFoundException e) { |
| 298 | log.error("Got an exception while deserializing element's property map: ", e); |
| 299 | return; |
| 300 | } |
| 301 | } else { |
| 302 | propMap = new HashMap<String, Long>(); |
| 303 | } |
| 304 | |
| 305 | if (propMap.containsKey(INSTANCE_TABLE_NAME)) { |
| 306 | curInstanceId = propMap.get(INSTANCE_TABLE_NAME) + 1; |
| 307 | } |
| 308 | |
| 309 | propMap.put(INSTANCE_TABLE_NAME, curInstanceId); |
| 310 | |
| 311 | byte[] rcValue = null; |
| 312 | try { |
| 313 | ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| 314 | ObjectOutputStream oot = new ObjectOutputStream(baos); |
| 315 | oot.writeObject(propMap); |
| 316 | rcValue = baos.toByteArray(); |
| 317 | } catch (IOException e) { |
| 318 | log.error("Got an exception while serializing element's property map", e); |
| 319 | return; |
| 320 | } |
| 321 | JRamCloud.RejectRules rules = rcClient.new RejectRules(); |
| 322 | rules.setNeVersion(instanceEntry.version); |
| 323 | try { |
| 324 | rcClient.writeRule(instanceTableId, "nextInstanceId".getBytes(), rcValue, rules); |
| 325 | instanceId = curInstanceId; |
| 326 | break; |
| 327 | } catch (Exception ex) { |
| 328 | log.debug("Cond. Write increment Vertex property: ", ex); |
| 329 | instanceEntry = rcClient.read(instanceTableId, "nextInstanceId".getBytes()); |
| 330 | continue; |
| 331 | } |
| 332 | } |
| 333 | } |
| 334 | |
| 335 | nextVertexId.compareAndSet(-1, instanceId * INSTANCE_ID_RANGE); |
| 336 | } |
| 337 | |
| 338 | @Override |
| 339 | public Vertex getVertex(Object id) throws IllegalArgumentException { |
| 340 | Long longId; |
| 341 | |
| 342 | if (id == null) { |
| 343 | throw ExceptionFactory.vertexIdCanNotBeNull(); |
| 344 | } else if (id instanceof Integer) { |
| 345 | longId = ((Integer) id).longValue(); |
| 346 | } else if (id instanceof Long) { |
| 347 | longId = (Long) id; |
| 348 | } else if (id instanceof String) { |
| 349 | try { |
| 350 | longId = Long.parseLong((String) id, 10); |
| 351 | } catch (NumberFormatException e) { |
| 352 | log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e); |
| 353 | return null; |
| 354 | } |
| 355 | } else if (id instanceof byte[]) { |
| 356 | try { |
| 357 | longId = ByteBuffer.wrap((byte[]) id).getLong(); |
| 358 | } catch (BufferUnderflowException e) { |
| 359 | log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e); |
| 360 | return null; |
| 361 | } |
| 362 | } else { |
| 363 | log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass()); |
| 364 | return null; |
| 365 | } |
| 366 | |
| 367 | RamCloudVertex vertex = new RamCloudVertex(longId, this); |
| 368 | |
| 369 | if (vertex.exists()) { |
| 370 | return vertex; |
| 371 | } else { |
| 372 | return null; |
| 373 | } |
| 374 | } |
| 375 | |
| 376 | @Override |
| 377 | public void removeVertex(Vertex vertex) { |
| 378 | ((RamCloudVertex) vertex).remove(); |
| 379 | } |
| 380 | |
| 381 | @Override |
| 382 | public Iterable<Vertex> getVertices() { |
| 383 | JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(vertPropTableId); |
| 384 | List<Vertex> vertices = new LinkedList<Vertex>(); |
| 385 | |
| 386 | while (tableEnum.hasNext()) { |
| 387 | vertices.add(new RamCloudVertex(tableEnum.next().key, this)); |
| 388 | } |
| 389 | |
| 390 | return vertices; |
| 391 | } |
| 392 | |
| 393 | @Override |
| 394 | public Iterable<Vertex> getVertices(String key, Object value) { |
| 395 | long startTime = 0; |
| 396 | long Tstamp1 = 0; |
| 397 | long Tstamp2 = 0; |
| 398 | long Tstamp3 = 0; |
| 399 | if (measureBPTimeProp == 1) { |
| 400 | startTime = System.nanoTime(); |
| 401 | log.error("Performance getVertices(key {}) start at {}", key, startTime); |
| 402 | } |
| 403 | |
| 404 | List<Vertex> vertices = new ArrayList<Vertex>(); |
| 405 | List<Object> vertexList = null; |
| 406 | |
| 407 | JRamCloud vertTable = getRcClient(); |
| 408 | if (measureBPTimeProp == 1) { |
| 409 | Tstamp1 = System.nanoTime(); |
| 410 | log.error("Performance getVertices(key {}) Calling indexedKeys.contains(key) at {}", key, Tstamp1); |
| 411 | } |
| 412 | |
| 413 | |
| 414 | if (indexedKeys.contains(key)) { |
| 415 | PerfMon pm = PerfMon.getInstance(); |
| 416 | if (measureBPTimeProp == 1) { |
| 417 | Tstamp2 = System.nanoTime(); |
| 418 | log.error("Performance getVertices(key {}) Calling new RamCloudKeyIndex at {}", key, Tstamp2); |
| 419 | } |
| 420 | RamCloudKeyIndex KeyIndex = new RamCloudKeyIndex(kidxVertTableId, key, value, this, Vertex.class); |
| 421 | if (measureBPTimeProp == 1) { |
| 422 | Tstamp3 = System.nanoTime(); |
| 423 | log.error("Performance getVertices(key {}) Calling KeyIndex.GetElmIdListForPropValue at {}", key, Tstamp3); |
| 424 | } |
| 425 | vertexList = KeyIndex.getElmIdListForPropValue(value.toString()); |
| 426 | if (vertexList == null) { |
| 427 | if (measureBPTimeProp == 1) { |
| 428 | long endTime = System.nanoTime(); |
| 429 | log.error("Performance getVertices(key {}) does not exists : getRcClient {} indexedKeys.contains(key) {} new_RamCloudKeyIndex {} KeyIndex.get..Value {} total {} diff {}", key, Tstamp1-startTime, Tstamp2-Tstamp1,Tstamp3-Tstamp2, endTime-Tstamp3, endTime - startTime, (endTime-startTime)- (Tstamp1-startTime)- (Tstamp2-Tstamp1)- (Tstamp3-Tstamp2)-(endTime-Tstamp3)); |
| 430 | } |
| 431 | return vertices; |
| 432 | } |
| 433 | |
| 434 | final int mreadMax = 400; |
| 435 | final int size = Math.min(mreadMax, vertexList.size()); |
Yoshi Muroi | 2c17060 | 2014-02-15 08:31:28 -0800 | [diff] [blame^] | 436 | |
| 437 | long tableId[] = new long[size]; |
| 438 | byte[] keyData[] = new byte[size][]; |
| 439 | short keySize[] = new short[size]; |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 440 | |
| 441 | int vertexNum = 0; |
| 442 | for (Object vert : vertexList) { |
Yoshi Muroi | 2c17060 | 2014-02-15 08:31:28 -0800 | [diff] [blame^] | 443 | tableId[vertexNum] = vertPropTableId; |
| 444 | keyData[vertexNum] = |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 445 | ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong((Long) vert).array(); |
Yoshi Muroi | 2c17060 | 2014-02-15 08:31:28 -0800 | [diff] [blame^] | 446 | keySize[vertexNum] = (short) keyData[vertexNum].length; |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 447 | if (vertexNum >= (mreadMax - 1)) { |
| 448 | pm.multiread_start("RamCloudGraph getVertices()"); |
| 449 | JRamCloud.Object outvertPropTable[] = |
Yoshi Muroi | 2c17060 | 2014-02-15 08:31:28 -0800 | [diff] [blame^] | 450 | vertTable.multiRead(tableId, keyData, keySize, vertexNum); |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 451 | pm.multiread_end("RamCloudGraph getVertices()"); |
| 452 | for (int i = 0; i < outvertPropTable.length; i++) { |
| 453 | if (outvertPropTable[i] != null) { |
| 454 | vertices.add(new RamCloudVertex(outvertPropTable[i].key, this)); |
| 455 | } |
| 456 | } |
| 457 | vertexNum = 0; |
| 458 | continue; |
| 459 | } |
| 460 | vertexNum++; |
| 461 | } |
| 462 | |
| 463 | if (vertexNum != 0) { |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 464 | long startTime2 = 0; |
| 465 | if (measureRcTimeProp == 1) { |
| 466 | startTime2 = System.nanoTime(); |
| 467 | } |
| 468 | pm.multiread_start("RamCloudGraph getVertices()"); |
Yoshi Muroi | 2c17060 | 2014-02-15 08:31:28 -0800 | [diff] [blame^] | 469 | JRamCloud.Object outvertPropTable[] = vertTable.multiRead(tableId, keyData, keySize, vertexNum); |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 470 | pm.multiread_end("RamCloudGraph getVertices()"); |
| 471 | if (measureRcTimeProp == 1) { |
| 472 | long endTime2 = System.nanoTime(); |
| 473 | log.error("Performance index multiread(key {}, number {}) time {}", key, vertexNum, endTime2 - startTime2); |
| 474 | } |
| 475 | for (int i = 0; i < outvertPropTable.length; i++) { |
| 476 | if (outvertPropTable[i] != null) { |
| 477 | vertices.add(new RamCloudVertex(outvertPropTable[i].key, this)); |
| 478 | } |
| 479 | } |
| 480 | } |
| 481 | } else { |
| 482 | |
| 483 | JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(vertPropTableId); |
| 484 | JRamCloud.Object tableEntry; |
| 485 | |
| 486 | while (tableEnum.hasNext()) { |
| 487 | tableEntry = tableEnum.next(); |
| 488 | if (tableEntry != null) { |
| 489 | //XXX remove temp |
| 490 | // RamCloudVertex temp = new RamCloudVertex(tableEntry.key, this); |
| 491 | Map<String, Object> propMap = RamCloudElement.convertRcBytesToPropertyMapEx(tableEntry.value); |
| 492 | if (propMap.containsKey(key) && propMap.get(key).equals(value)) { |
| 493 | vertices.add(new RamCloudVertex(tableEntry.key, this)); |
| 494 | } |
| 495 | } |
| 496 | } |
| 497 | } |
| 498 | |
| 499 | if (measureBPTimeProp == 1) { |
| 500 | long endTime = System.nanoTime(); |
| 501 | log.error("Performance getVertices exists total time {}.", endTime - startTime); |
| 502 | } |
| 503 | |
| 504 | return vertices; |
| 505 | } |
| 506 | |
| 507 | @Override |
| 508 | public Edge addEdge(Object id, Vertex outVertex, Vertex inVertex, String label) throws IllegalArgumentException { |
| 509 | log.info("Adding edge: [id={}, outVertex={}, inVertex={}, label={}]", id, outVertex, inVertex, label); |
| 510 | |
| 511 | if (label == null) { |
| 512 | throw ExceptionFactory.edgeLabelCanNotBeNull(); |
| 513 | } |
| 514 | |
| 515 | RamCloudEdge newEdge = new RamCloudEdge((RamCloudVertex) outVertex, (RamCloudVertex) inVertex, label, this); |
| 516 | |
| 517 | for (int i = 0; i < 5 ;i++) { |
| 518 | try { |
| 519 | newEdge.create(); |
| 520 | return newEdge; |
| 521 | } catch (Exception e) { |
| 522 | log.warn("Tried to create edge failed: {" + newEdge + "}: ", e); |
| 523 | |
| 524 | if (e instanceof NoSuchElementException) { |
| 525 | log.error("addEdge RETRYING {}", i); |
| 526 | continue; |
| 527 | } |
| 528 | } |
| 529 | } |
| 530 | return null; |
| 531 | } |
| 532 | |
| 533 | public List<Edge> addEdges(Iterable<Edge> edgeEntities) throws IllegalArgumentException { |
| 534 | //TODO WIP: need multi-write |
| 535 | log.info("addEdges start"); |
| 536 | ArrayList<Edge> edges = new ArrayList<Edge>(); |
| 537 | for (Edge edge: edgeEntities) { |
| 538 | edges.add(addEdge(null, edge.getVertex(Direction.OUT), edge.getVertex(Direction.IN), edge.getLabel())); |
| 539 | } |
| 540 | log.info("addVertices end"); |
| 541 | return edges; |
| 542 | } |
| 543 | |
| 544 | public void setProperties(Map<RamCloudVertex, Map<String, Object>> properties) { |
| 545 | // TODO WIP: need multi-write |
| 546 | log.info("setProperties start"); |
| 547 | for (Map.Entry<RamCloudVertex, Map<String, Object>> e: properties.entrySet()) { |
| 548 | e.getKey().setProperties(e.getValue()); |
| 549 | } |
| 550 | log.info("setProperties end"); |
| 551 | } |
| 552 | |
| 553 | @Override |
| 554 | public Edge getEdge(Object id) throws IllegalArgumentException { |
| 555 | byte[] bytearrayId; |
| 556 | |
| 557 | if (id == null) { |
| 558 | throw ExceptionFactory.edgeIdCanNotBeNull(); |
| 559 | } else if (id instanceof byte[]) { |
| 560 | bytearrayId = (byte[]) id; |
| 561 | } else if (id instanceof String) { |
| 562 | bytearrayId = Base64.decode(((String) id)); |
| 563 | } else { |
| 564 | log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass()); |
| 565 | return null; |
| 566 | } |
| 567 | |
| 568 | if (!RamCloudEdge.isValidEdgeId(bytearrayId)) { |
| 569 | log.warn("ID argument {} of type {} is malformed. Returning null.", id, id.getClass()); |
| 570 | return null; |
| 571 | } |
| 572 | |
| 573 | RamCloudEdge edge = new RamCloudEdge(bytearrayId, this); |
| 574 | |
| 575 | if (edge.exists()) { |
| 576 | return edge; |
| 577 | } else { |
| 578 | return null; |
| 579 | } |
| 580 | } |
| 581 | |
| 582 | @Override |
| 583 | public void removeEdge(Edge edge) { |
| 584 | edge.remove(); |
| 585 | } |
| 586 | |
| 587 | @Override |
| 588 | public Iterable<Edge> getEdges() { |
| 589 | JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(edgePropTableId); |
| 590 | List<Edge> edges = new ArrayList<Edge>(); |
| 591 | |
| 592 | while (tableEnum.hasNext()) { |
| 593 | edges.add(new RamCloudEdge(tableEnum.next().key, this)); |
| 594 | } |
| 595 | |
| 596 | return edges; |
| 597 | } |
| 598 | |
| 599 | @Override |
| 600 | public Iterable<Edge> getEdges(String key, Object value) { |
| 601 | JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(edgePropTableId); |
| 602 | List<Edge> edges = new ArrayList<Edge>(); |
| 603 | JRamCloud.Object tableEntry; |
| 604 | |
| 605 | while (tableEnum.hasNext()) { |
| 606 | tableEntry = tableEnum.next(); |
| 607 | // FIXME temp |
| 608 | //RamCloudEdge temp = new RamCloudEdge(tableEntry.key, this); |
| 609 | Map<String, Object> propMap = RamCloudElement.convertRcBytesToPropertyMapEx(tableEntry.value); |
| 610 | if (propMap.containsKey(key) && propMap.get(key).equals(value)) { |
| 611 | edges.add(new RamCloudEdge(tableEntry.key, this)); |
| 612 | } |
| 613 | } |
| 614 | |
| 615 | return edges; |
| 616 | } |
| 617 | |
| 618 | @Override |
| 619 | public GraphQuery query() { |
| 620 | return new DefaultGraphQuery(this); |
| 621 | } |
| 622 | |
| 623 | @Override |
| 624 | public void shutdown() { |
| 625 | JRamCloud rcClient = getRcClient(); |
| 626 | rcClient.dropTable(VERT_TABLE_NAME); |
| 627 | rcClient.dropTable(VERT_PROP_TABLE_NAME); |
| 628 | rcClient.dropTable(EDGE_PROP_TABLE_NAME); |
| 629 | rcClient.dropTable(IDX_VERT_TABLE_NAME); |
| 630 | rcClient.dropTable(IDX_EDGE_TABLE_NAME); |
| 631 | rcClient.dropTable(KIDX_VERT_TABLE_NAME); |
| 632 | rcClient.dropTable(KIDX_EDGE_TABLE_NAME); |
| 633 | rcClient.disconnect(); |
| 634 | } |
| 635 | |
| 636 | @Override |
| 637 | public void stopTransaction(Conclusion conclusion) { |
| 638 | // TODO Auto-generated method stub |
| 639 | } |
| 640 | |
| 641 | @Override |
| 642 | public void commit() { |
| 643 | // TODO Auto-generated method stub |
| 644 | } |
| 645 | |
| 646 | @Override |
| 647 | public void rollback() { |
| 648 | // TODO Auto-generated method stub |
| 649 | } |
| 650 | |
| 651 | @Override |
| 652 | public <T extends Element> void dropKeyIndex(String key, Class<T> elementClass) { |
| 653 | throw new UnsupportedOperationException("Not supported yet."); |
| 654 | //FIXME how to dropKeyIndex |
| 655 | //new RamCloudKeyIndex(kidxVertTableId, key, this, elementClass); |
| 656 | //getIndexedKeys(key, elementClass).removeIndex(); |
| 657 | } |
| 658 | |
| 659 | @Override |
| 660 | public <T extends Element> void createKeyIndex(String key, |
| 661 | Class<T> elementClass, Parameter... indexParameters) { |
| 662 | if (key == null) { |
| 663 | return; |
| 664 | } |
| 665 | if (this.indexedKeys.contains(key)) { |
| 666 | return; |
| 667 | } |
| 668 | this.indexedKeys.add(key); |
| 669 | } |
| 670 | |
| 671 | @Override |
| 672 | public <T extends Element> Set< String> getIndexedKeys(Class< T> elementClass) { |
| 673 | if (null != this.indexedKeys) { |
| 674 | return new HashSet<String>(this.indexedKeys); |
| 675 | } else { |
| 676 | return Collections.emptySet(); |
| 677 | } |
| 678 | } |
| 679 | |
| 680 | @Override |
| 681 | public <T extends Element> Index<T> createIndex(String indexName, |
| 682 | Class<T> indexClass, Parameter... indexParameters) { |
| 683 | throw new UnsupportedOperationException("Not supported yet."); |
| 684 | } |
| 685 | |
| 686 | @Override |
| 687 | public <T extends Element> Index<T> getIndex(String indexName, Class<T> indexClass) { |
| 688 | throw new UnsupportedOperationException("Not supported yet."); |
| 689 | } |
| 690 | |
| 691 | @Override |
| 692 | public Iterable<Index<? extends Element>> getIndices() { |
| 693 | throw new UnsupportedOperationException("Not supported yet."); |
| 694 | } |
| 695 | |
| 696 | @Override |
| 697 | public void dropIndex(String indexName) { |
| 698 | throw new UnsupportedOperationException("Not supported yet."); |
| 699 | } |
| 700 | |
| 701 | @Override |
| 702 | public String toString() { |
| 703 | return getClass().getSimpleName().toLowerCase() + "[vertices:" + ((List<Vertex>)getVertices()).size() + " edges:" + ((List<Edge>)getEdges()).size() + "]"; |
| 704 | } |
| 705 | |
| 706 | public static void main(String[] args) { |
| 707 | RamCloudGraph graph = new RamCloudGraph(); |
| 708 | |
| 709 | Vertex a = graph.addVertex(null); |
| 710 | Vertex b = graph.addVertex(null); |
| 711 | Vertex c = graph.addVertex(null); |
| 712 | Vertex d = graph.addVertex(null); |
| 713 | Vertex e = graph.addVertex(null); |
| 714 | Vertex f = graph.addVertex(null); |
| 715 | Vertex g = graph.addVertex(null); |
| 716 | |
| 717 | graph.addEdge(null, a, a, "friend"); |
| 718 | graph.addEdge(null, a, b, "friend1"); |
| 719 | graph.addEdge(null, a, b, "friend2"); |
| 720 | graph.addEdge(null, a, b, "friend3"); |
| 721 | graph.addEdge(null, a, c, "friend"); |
| 722 | graph.addEdge(null, a, d, "friend"); |
| 723 | graph.addEdge(null, a, e, "friend"); |
| 724 | graph.addEdge(null, a, f, "friend"); |
| 725 | graph.addEdge(null, a, g, "friend"); |
| 726 | |
| 727 | graph.shutdown(); |
| 728 | } |
| 729 | } |