Yuta HIGUCHI | a1e655a | 2014-01-23 17:43:11 -0800 | [diff] [blame] | 1 | /* Copyright (c) 2013 Stanford University |
| 2 | * |
| 3 | * Permission to use, copy, modify, and distribute this software for any |
| 4 | * purpose with or without fee is hereby granted, provided that the above |
| 5 | * copyright notice and this permission notice appear in all copies. |
| 6 | * |
| 7 | * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES |
| 8 | * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF |
| 9 | * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR |
| 10 | * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES |
| 11 | * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN |
| 12 | * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF |
| 13 | * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
| 14 | */ |
| 15 | |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 16 | package com.tinkerpop.blueprints.impls.ramcloud; |
| 17 | |
| 18 | import java.io.ByteArrayInputStream; |
| 19 | import java.io.ByteArrayOutputStream; |
| 20 | import java.io.IOException; |
| 21 | import java.io.ObjectInputStream; |
| 22 | import java.io.ObjectOutputStream; |
| 23 | import java.io.Serializable; |
| 24 | import java.nio.BufferUnderflowException; |
| 25 | import java.nio.ByteBuffer; |
| 26 | import java.nio.ByteOrder; |
| 27 | import java.util.ArrayList; |
| 28 | import java.util.Arrays; |
| 29 | import java.util.Collections; |
| 30 | import java.util.HashMap; |
| 31 | import java.util.HashSet; |
| 32 | import java.util.LinkedList; |
| 33 | import java.util.List; |
| 34 | import java.util.Map; |
| 35 | import java.util.NoSuchElementException; |
| 36 | import java.util.Set; |
| 37 | import java.util.concurrent.atomic.AtomicLong; |
| 38 | |
| 39 | import org.slf4j.Logger; |
| 40 | import org.slf4j.LoggerFactory; |
| 41 | |
| 42 | import com.sun.jersey.core.util.Base64; |
| 43 | import com.tinkerpop.blueprints.Direction; |
| 44 | import com.tinkerpop.blueprints.Edge; |
| 45 | import com.tinkerpop.blueprints.Element; |
| 46 | import com.tinkerpop.blueprints.Features; |
| 47 | import com.tinkerpop.blueprints.GraphQuery; |
| 48 | import com.tinkerpop.blueprints.Index; |
| 49 | import com.tinkerpop.blueprints.IndexableGraph; |
| 50 | import com.tinkerpop.blueprints.KeyIndexableGraph; |
| 51 | import com.tinkerpop.blueprints.Parameter; |
| 52 | import com.tinkerpop.blueprints.TransactionalGraph; |
| 53 | import com.tinkerpop.blueprints.Vertex; |
| 54 | import com.tinkerpop.blueprints.util.DefaultGraphQuery; |
| 55 | import com.tinkerpop.blueprints.util.ExceptionFactory; |
| 56 | import com.tinkerpop.blueprints.impls.ramcloud.PerfMon; |
| 57 | |
| 58 | import edu.stanford.ramcloud.JRamCloud; |
| 59 | import edu.stanford.ramcloud.JRamCloud.MultiWriteObject; |
| 60 | |
| 61 | public class RamCloudGraph implements IndexableGraph, KeyIndexableGraph, TransactionalGraph, Serializable { |
| 62 | private final static Logger log = LoggerFactory.getLogger(RamCloudGraph.class); |
| 63 | |
| 64 | private static final ThreadLocal<JRamCloud> RamCloudThreadLocal = new ThreadLocal<JRamCloud>(); |
| 65 | |
| 66 | protected long vertTableId; //(vertex_id) --> ( (n,d,ll,l), (n,d,ll,l), ... ) |
| 67 | protected long vertPropTableId; //(vertex_id) -> ( (kl,k,vl,v), (kl,k,vl,v), ... ) |
| 68 | protected long edgePropTableId; //(edge_id) -> ( (kl,k,vl,v), (kl,k,vl,v), ... ) |
| 69 | protected long idxVertTableId; |
| 70 | protected long idxEdgeTableId; |
| 71 | protected long kidxVertTableId; |
| 72 | protected long kidxEdgeTableId; |
| 73 | protected long instanceTableId; |
| 74 | private String VERT_TABLE_NAME = "verts"; |
| 75 | private String EDGE_PROP_TABLE_NAME = "edge_props"; |
| 76 | private String VERT_PROP_TABLE_NAME = "vert_props"; |
| 77 | private String IDX_VERT_TABLE_NAME = "idx_vert"; |
| 78 | private String IDX_EDGE_TABLE_NAME = "idx_edge"; |
| 79 | private String KIDX_VERT_TABLE_NAME = "kidx_vert"; |
| 80 | private String KIDX_EDGE_TABLE_NAME = "kidx_edge"; |
| 81 | private final String INSTANCE_TABLE_NAME = "instance"; |
| 82 | private long instanceId; |
| 83 | private AtomicLong nextVertexId; |
| 84 | private final long INSTANCE_ID_RANGE = 100000; |
| 85 | private String coordinatorLocation; |
| 86 | private static final Features FEATURES = new Features(); |
Yoshi Muroi | 815c7f9 | 2014-01-30 18:06:16 -0800 | [diff] [blame] | 87 | // FIXME better loop variable name |
| 88 | public final int CONDITIONALWRITE_RETRY_MAX = 100; |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 89 | public final long measureBPTimeProp = Long.valueOf(System.getProperty("benchmark.measureBP", "0")); |
| 90 | public final long measureRcTimeProp = Long.valueOf(System.getProperty("benchmark.measureRc", "0")); |
| 91 | public static final long measureSerializeTimeProp = Long.valueOf(System.getProperty("benchmark.measureSerializeTimeProp", "0")); |
| 92 | |
| 93 | |
| 94 | public final Set<String> indexedKeys = new HashSet<String>(); |
| 95 | |
| 96 | static { |
| 97 | FEATURES.supportsSerializableObjectProperty = true; |
| 98 | FEATURES.supportsBooleanProperty = true; |
| 99 | FEATURES.supportsDoubleProperty = true; |
| 100 | FEATURES.supportsFloatProperty = true; |
| 101 | FEATURES.supportsIntegerProperty = true; |
| 102 | FEATURES.supportsPrimitiveArrayProperty = true; |
| 103 | FEATURES.supportsUniformListProperty = true; |
| 104 | FEATURES.supportsMixedListProperty = true; |
| 105 | FEATURES.supportsLongProperty = true; |
| 106 | FEATURES.supportsMapProperty = true; |
| 107 | FEATURES.supportsStringProperty = true; |
| 108 | |
| 109 | FEATURES.supportsDuplicateEdges = false; |
| 110 | FEATURES.supportsSelfLoops = false; |
| 111 | FEATURES.isPersistent = false; |
| 112 | FEATURES.isWrapper = false; |
| 113 | FEATURES.supportsVertexIteration = true; |
| 114 | FEATURES.supportsEdgeIteration = true; |
| 115 | FEATURES.supportsVertexIndex = true; |
| 116 | FEATURES.supportsEdgeIndex = false; |
| 117 | FEATURES.ignoresSuppliedIds = true; |
| 118 | FEATURES.supportsTransactions = false; |
| 119 | FEATURES.supportsIndices = false; |
| 120 | FEATURES.supportsKeyIndices = true; |
| 121 | FEATURES.supportsVertexKeyIndex = true; |
| 122 | FEATURES.supportsEdgeKeyIndex = false; |
| 123 | FEATURES.supportsEdgeRetrieval = true; |
| 124 | FEATURES.supportsVertexProperties = true; |
| 125 | FEATURES.supportsEdgeProperties = true; |
| 126 | FEATURES.supportsThreadedTransactions = false; |
| 127 | } |
| 128 | |
| 129 | static { |
| 130 | System.loadLibrary("edu_stanford_ramcloud_JRamCloud"); |
| 131 | } |
| 132 | |
| 133 | private RamCloudGraph() { |
| 134 | } |
| 135 | |
| 136 | |
| 137 | public RamCloudGraph(String coordinatorLocation) { |
| 138 | this.coordinatorLocation = coordinatorLocation; |
| 139 | |
| 140 | JRamCloud rcClient = getRcClient(); |
| 141 | |
| 142 | vertTableId = rcClient.createTable(VERT_TABLE_NAME); |
| 143 | vertPropTableId = rcClient.createTable(VERT_PROP_TABLE_NAME); |
| 144 | edgePropTableId = rcClient.createTable(EDGE_PROP_TABLE_NAME); |
| 145 | idxVertTableId = rcClient.createTable(IDX_VERT_TABLE_NAME); |
| 146 | idxEdgeTableId = rcClient.createTable(IDX_EDGE_TABLE_NAME); |
| 147 | kidxVertTableId = rcClient.createTable(KIDX_VERT_TABLE_NAME); |
| 148 | kidxEdgeTableId = rcClient.createTable(KIDX_EDGE_TABLE_NAME); |
| 149 | instanceTableId = rcClient.createTable(INSTANCE_TABLE_NAME); |
| 150 | |
| 151 | log.info( "Connected to coordinator at {}", coordinatorLocation); |
| 152 | log.debug("VERT_TABLE:{}, VERT_PROP_TABLE:{}, EDGE_PROP_TABLE:{}, IDX_VERT_TABLE:{}, IDX_EDGE_TABLE:{}, KIDX_VERT_TABLE:{}, KIDX_EDGE_TABLE:{}", vertTableId, vertPropTableId, edgePropTableId, idxVertTableId, idxEdgeTableId, kidxVertTableId, kidxEdgeTableId); |
| 153 | nextVertexId = new AtomicLong(-1); |
| 154 | initInstance(); |
| 155 | } |
| 156 | |
| 157 | public JRamCloud getRcClient() { |
| 158 | JRamCloud rcClient = RamCloudThreadLocal.get(); |
| 159 | if (rcClient == null) { |
| 160 | rcClient = new JRamCloud(coordinatorLocation); |
| 161 | RamCloudThreadLocal.set(rcClient); |
| 162 | } |
| 163 | return rcClient; |
| 164 | } |
| 165 | |
| 166 | @Override |
| 167 | public Features getFeatures() { |
| 168 | return FEATURES; |
| 169 | } |
| 170 | |
| 171 | private Long parseVertexId(Object id) { |
| 172 | Long longId; |
| 173 | if (id == null) { |
| 174 | longId = nextVertexId.incrementAndGet(); |
| 175 | } else if (id instanceof Integer) { |
| 176 | longId = ((Integer) id).longValue(); |
| 177 | } else if (id instanceof Long) { |
| 178 | longId = (Long) id; |
| 179 | } else if (id instanceof String) { |
| 180 | try { |
| 181 | longId = Long.parseLong((String) id, 10); |
| 182 | } catch (NumberFormatException e) { |
| 183 | log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e); |
| 184 | return null; |
| 185 | } |
| 186 | } else if (id instanceof byte[]) { |
| 187 | try { |
| 188 | longId = ByteBuffer.wrap((byte[]) id).getLong(); |
| 189 | } catch (BufferUnderflowException e) { |
| 190 | log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e); |
| 191 | return null; |
| 192 | } |
| 193 | } else { |
| 194 | log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass()); |
| 195 | return null; |
| 196 | } |
| 197 | return longId; |
| 198 | } |
| 199 | |
| 200 | @Override |
| 201 | public Vertex addVertex(Object id) { |
| 202 | long startTime = 0; |
| 203 | long Tstamp1 = 0; |
| 204 | long Tstamp2 = 0; |
| 205 | |
| 206 | if (measureBPTimeProp == 1) { |
| 207 | startTime = System.nanoTime(); |
| 208 | } |
| 209 | Long longId = parseVertexId(id); |
| 210 | if (longId == null) |
| 211 | return null; |
| 212 | if (measureBPTimeProp == 1) { |
| 213 | Tstamp1 = System.nanoTime(); |
| 214 | } |
| 215 | RamCloudVertex newVertex = new RamCloudVertex(longId, this); |
| 216 | if (measureBPTimeProp == 1) { |
| 217 | Tstamp2 = System.nanoTime(); |
| 218 | log.error("Performance addVertex [id={}] : Calling create at {}", longId, Tstamp2); |
| 219 | } |
| 220 | |
| 221 | try { |
| 222 | newVertex.create(); |
| 223 | if (measureBPTimeProp == 1) { |
| 224 | long endTime = System.nanoTime(); |
| 225 | log.error("Performance addVertex [id={}] : genid {} newVerex {} create {} total time {}", longId, Tstamp1 - startTime, Tstamp2 - Tstamp1, endTime - Tstamp2, endTime - startTime); |
| 226 | } |
| 227 | log.info("Added vertex: [id={}]", longId); |
| 228 | return newVertex; |
| 229 | } catch (IllegalArgumentException e) { |
| 230 | log.error("Tried to create vertex failed {" + newVertex + "}", e); |
| 231 | return null; |
| 232 | } |
| 233 | } |
| 234 | |
| 235 | public List<RamCloudVertex> addVertices(Iterable<Object> ids) { |
| 236 | log.info("addVertices start"); |
| 237 | List<RamCloudVertex> vertices = new LinkedList<RamCloudVertex>(); |
| 238 | |
| 239 | for (Object id: ids) { |
| 240 | Long longId = parseVertexId(id); |
| 241 | if (longId == null) |
| 242 | return null; |
| 243 | RamCloudVertex v = new RamCloudVertex(longId, this); |
| 244 | if (v.exists()) { |
| 245 | log.error("ramcloud vertex id: {} already exists", v.getId()); |
| 246 | throw ExceptionFactory.vertexWithIdAlreadyExists(v.getId()); |
| 247 | } |
| 248 | vertices.add(v); |
| 249 | } |
Yoshi Muroi | e7693b1 | 2014-02-19 19:41:17 -0800 | [diff] [blame] | 250 | |
| 251 | MultiWriteObject mwo = new MultiWriteObject(vertices.size()*2); |
| 252 | |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 253 | for (int i=0; i < vertices.size(); i++) { |
| 254 | RamCloudVertex v = vertices.get(i); |
Yoshi Muroi | e7693b1 | 2014-02-19 19:41:17 -0800 | [diff] [blame] | 255 | mwo.setObject(i*2, vertTableId, v.rcKey, ByteBuffer.allocate(0).array(), null); |
| 256 | mwo.setObject(i*2+1, vertTableId, v.rcKey, ByteBuffer.allocate(0).array(), null); |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 257 | } |
| 258 | try { |
| 259 | PerfMon pm = PerfMon.getInstance(); |
| 260 | pm.multiwrite_start("RamCloudVertex create()"); |
Yoshi Muroi | e7693b1 | 2014-02-19 19:41:17 -0800 | [diff] [blame] | 261 | getRcClient().multiWrite(mwo.tableId, mwo.key, mwo.keyLength, mwo.value, mwo.valueLength, vertices.size()*2, mwo.rules); |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 262 | pm.multiwrite_end("RamCloudVertex create()"); |
| 263 | log.info("ramcloud vertices are created"); |
| 264 | } catch (Exception e) { |
| 265 | log.error("Tried to create vertices failed {}", e); |
| 266 | return null; |
| 267 | } |
| 268 | log.info("addVertices end (success)"); |
| 269 | return vertices; |
| 270 | } |
| 271 | |
| 272 | private final void initInstance() { |
| 273 | //long incrementValue = 1; |
| 274 | JRamCloud.Object instanceEntry = null; |
| 275 | JRamCloud rcClient = getRcClient(); |
| 276 | try { |
| 277 | instanceEntry = rcClient.read(instanceTableId, "nextInstanceId".getBytes()); |
| 278 | } catch (Exception e) { |
| 279 | if (e instanceof JRamCloud.ObjectDoesntExistException) { |
| 280 | instanceId = 0; |
| 281 | rcClient.write(instanceTableId, "nextInstanceId".getBytes(), ByteBuffer.allocate(0).array()); |
| 282 | } |
| 283 | } |
| 284 | if (instanceEntry != null) { |
| 285 | long curInstanceId = 1; |
Yoshi Muroi | 815c7f9 | 2014-01-30 18:06:16 -0800 | [diff] [blame] | 286 | for (int i = 0 ; i < CONDITIONALWRITE_RETRY_MAX ; i++) { |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 287 | Map<String, Long> propMap = null; |
| 288 | if (instanceEntry.value == null) { |
| 289 | log.warn("Got a null byteArray argument"); |
| 290 | return; |
| 291 | } else if (instanceEntry.value.length != 0) { |
| 292 | try { |
| 293 | ByteArrayInputStream bais = new ByteArrayInputStream(instanceEntry.value); |
| 294 | ObjectInputStream ois = new ObjectInputStream(bais); |
| 295 | propMap = (Map<String, Long>) ois.readObject(); |
| 296 | } catch (IOException e) { |
| 297 | log.error("Got an exception while deserializing element's property map: ", e); |
| 298 | return; |
| 299 | } catch (ClassNotFoundException e) { |
| 300 | log.error("Got an exception while deserializing element's property map: ", e); |
| 301 | return; |
| 302 | } |
| 303 | } else { |
| 304 | propMap = new HashMap<String, Long>(); |
| 305 | } |
| 306 | |
| 307 | if (propMap.containsKey(INSTANCE_TABLE_NAME)) { |
| 308 | curInstanceId = propMap.get(INSTANCE_TABLE_NAME) + 1; |
| 309 | } |
| 310 | |
| 311 | propMap.put(INSTANCE_TABLE_NAME, curInstanceId); |
| 312 | |
| 313 | byte[] rcValue = null; |
| 314 | try { |
| 315 | ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| 316 | ObjectOutputStream oot = new ObjectOutputStream(baos); |
| 317 | oot.writeObject(propMap); |
| 318 | rcValue = baos.toByteArray(); |
| 319 | } catch (IOException e) { |
| 320 | log.error("Got an exception while serializing element's property map", e); |
| 321 | return; |
| 322 | } |
| 323 | JRamCloud.RejectRules rules = rcClient.new RejectRules(); |
| 324 | rules.setNeVersion(instanceEntry.version); |
| 325 | try { |
| 326 | rcClient.writeRule(instanceTableId, "nextInstanceId".getBytes(), rcValue, rules); |
| 327 | instanceId = curInstanceId; |
| 328 | break; |
| 329 | } catch (Exception ex) { |
| 330 | log.debug("Cond. Write increment Vertex property: ", ex); |
| 331 | instanceEntry = rcClient.read(instanceTableId, "nextInstanceId".getBytes()); |
| 332 | continue; |
| 333 | } |
| 334 | } |
| 335 | } |
| 336 | |
| 337 | nextVertexId.compareAndSet(-1, instanceId * INSTANCE_ID_RANGE); |
| 338 | } |
| 339 | |
| 340 | @Override |
| 341 | public Vertex getVertex(Object id) throws IllegalArgumentException { |
| 342 | Long longId; |
| 343 | |
| 344 | if (id == null) { |
| 345 | throw ExceptionFactory.vertexIdCanNotBeNull(); |
| 346 | } else if (id instanceof Integer) { |
| 347 | longId = ((Integer) id).longValue(); |
| 348 | } else if (id instanceof Long) { |
| 349 | longId = (Long) id; |
| 350 | } else if (id instanceof String) { |
| 351 | try { |
| 352 | longId = Long.parseLong((String) id, 10); |
| 353 | } catch (NumberFormatException e) { |
| 354 | log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e); |
| 355 | return null; |
| 356 | } |
| 357 | } else if (id instanceof byte[]) { |
| 358 | try { |
| 359 | longId = ByteBuffer.wrap((byte[]) id).getLong(); |
| 360 | } catch (BufferUnderflowException e) { |
| 361 | log.warn("ID argument {} of type {} is not a parseable long number: {}", id, id.getClass(), e); |
| 362 | return null; |
| 363 | } |
| 364 | } else { |
| 365 | log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass()); |
| 366 | return null; |
| 367 | } |
| 368 | |
| 369 | RamCloudVertex vertex = new RamCloudVertex(longId, this); |
| 370 | |
| 371 | if (vertex.exists()) { |
| 372 | return vertex; |
| 373 | } else { |
| 374 | return null; |
| 375 | } |
| 376 | } |
| 377 | |
| 378 | @Override |
| 379 | public void removeVertex(Vertex vertex) { |
| 380 | ((RamCloudVertex) vertex).remove(); |
| 381 | } |
| 382 | |
| 383 | @Override |
| 384 | public Iterable<Vertex> getVertices() { |
| 385 | JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(vertPropTableId); |
| 386 | List<Vertex> vertices = new LinkedList<Vertex>(); |
| 387 | |
| 388 | while (tableEnum.hasNext()) { |
| 389 | vertices.add(new RamCloudVertex(tableEnum.next().key, this)); |
| 390 | } |
| 391 | |
| 392 | return vertices; |
| 393 | } |
| 394 | |
| 395 | @Override |
| 396 | public Iterable<Vertex> getVertices(String key, Object value) { |
| 397 | long startTime = 0; |
| 398 | long Tstamp1 = 0; |
| 399 | long Tstamp2 = 0; |
| 400 | long Tstamp3 = 0; |
| 401 | if (measureBPTimeProp == 1) { |
| 402 | startTime = System.nanoTime(); |
| 403 | log.error("Performance getVertices(key {}) start at {}", key, startTime); |
| 404 | } |
| 405 | |
| 406 | List<Vertex> vertices = new ArrayList<Vertex>(); |
| 407 | List<Object> vertexList = null; |
| 408 | |
| 409 | JRamCloud vertTable = getRcClient(); |
| 410 | if (measureBPTimeProp == 1) { |
| 411 | Tstamp1 = System.nanoTime(); |
| 412 | log.error("Performance getVertices(key {}) Calling indexedKeys.contains(key) at {}", key, Tstamp1); |
| 413 | } |
| 414 | |
| 415 | |
| 416 | if (indexedKeys.contains(key)) { |
| 417 | PerfMon pm = PerfMon.getInstance(); |
| 418 | if (measureBPTimeProp == 1) { |
| 419 | Tstamp2 = System.nanoTime(); |
| 420 | log.error("Performance getVertices(key {}) Calling new RamCloudKeyIndex at {}", key, Tstamp2); |
| 421 | } |
| 422 | RamCloudKeyIndex KeyIndex = new RamCloudKeyIndex(kidxVertTableId, key, value, this, Vertex.class); |
| 423 | if (measureBPTimeProp == 1) { |
| 424 | Tstamp3 = System.nanoTime(); |
| 425 | log.error("Performance getVertices(key {}) Calling KeyIndex.GetElmIdListForPropValue at {}", key, Tstamp3); |
| 426 | } |
| 427 | vertexList = KeyIndex.getElmIdListForPropValue(value.toString()); |
| 428 | if (vertexList == null) { |
| 429 | if (measureBPTimeProp == 1) { |
| 430 | long endTime = System.nanoTime(); |
| 431 | log.error("Performance getVertices(key {}) does not exists : getRcClient {} indexedKeys.contains(key) {} new_RamCloudKeyIndex {} KeyIndex.get..Value {} total {} diff {}", key, Tstamp1-startTime, Tstamp2-Tstamp1,Tstamp3-Tstamp2, endTime-Tstamp3, endTime - startTime, (endTime-startTime)- (Tstamp1-startTime)- (Tstamp2-Tstamp1)- (Tstamp3-Tstamp2)-(endTime-Tstamp3)); |
| 432 | } |
| 433 | return vertices; |
| 434 | } |
| 435 | |
| 436 | final int mreadMax = 400; |
| 437 | final int size = Math.min(mreadMax, vertexList.size()); |
Yoshi Muroi | 2c17060 | 2014-02-15 08:31:28 -0800 | [diff] [blame] | 438 | |
| 439 | long tableId[] = new long[size]; |
| 440 | byte[] keyData[] = new byte[size][]; |
| 441 | short keySize[] = new short[size]; |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 442 | |
| 443 | int vertexNum = 0; |
| 444 | for (Object vert : vertexList) { |
Yoshi Muroi | 2c17060 | 2014-02-15 08:31:28 -0800 | [diff] [blame] | 445 | tableId[vertexNum] = vertPropTableId; |
| 446 | keyData[vertexNum] = |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 447 | ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong((Long) vert).array(); |
Yoshi Muroi | 2c17060 | 2014-02-15 08:31:28 -0800 | [diff] [blame] | 448 | keySize[vertexNum] = (short) keyData[vertexNum].length; |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 449 | if (vertexNum >= (mreadMax - 1)) { |
| 450 | pm.multiread_start("RamCloudGraph getVertices()"); |
| 451 | JRamCloud.Object outvertPropTable[] = |
Yoshi Muroi | 2c17060 | 2014-02-15 08:31:28 -0800 | [diff] [blame] | 452 | vertTable.multiRead(tableId, keyData, keySize, vertexNum); |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 453 | pm.multiread_end("RamCloudGraph getVertices()"); |
| 454 | for (int i = 0; i < outvertPropTable.length; i++) { |
| 455 | if (outvertPropTable[i] != null) { |
| 456 | vertices.add(new RamCloudVertex(outvertPropTable[i].key, this)); |
| 457 | } |
| 458 | } |
| 459 | vertexNum = 0; |
| 460 | continue; |
| 461 | } |
| 462 | vertexNum++; |
| 463 | } |
| 464 | |
| 465 | if (vertexNum != 0) { |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 466 | long startTime2 = 0; |
| 467 | if (measureRcTimeProp == 1) { |
| 468 | startTime2 = System.nanoTime(); |
| 469 | } |
| 470 | pm.multiread_start("RamCloudGraph getVertices()"); |
Yoshi Muroi | 2c17060 | 2014-02-15 08:31:28 -0800 | [diff] [blame] | 471 | JRamCloud.Object outvertPropTable[] = vertTable.multiRead(tableId, keyData, keySize, vertexNum); |
yoshi | 28bac13 | 2014-01-22 11:00:17 -0800 | [diff] [blame] | 472 | pm.multiread_end("RamCloudGraph getVertices()"); |
| 473 | if (measureRcTimeProp == 1) { |
| 474 | long endTime2 = System.nanoTime(); |
| 475 | log.error("Performance index multiread(key {}, number {}) time {}", key, vertexNum, endTime2 - startTime2); |
| 476 | } |
| 477 | for (int i = 0; i < outvertPropTable.length; i++) { |
| 478 | if (outvertPropTable[i] != null) { |
| 479 | vertices.add(new RamCloudVertex(outvertPropTable[i].key, this)); |
| 480 | } |
| 481 | } |
| 482 | } |
| 483 | } else { |
| 484 | |
| 485 | JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(vertPropTableId); |
| 486 | JRamCloud.Object tableEntry; |
| 487 | |
| 488 | while (tableEnum.hasNext()) { |
| 489 | tableEntry = tableEnum.next(); |
| 490 | if (tableEntry != null) { |
| 491 | //XXX remove temp |
| 492 | // RamCloudVertex temp = new RamCloudVertex(tableEntry.key, this); |
| 493 | Map<String, Object> propMap = RamCloudElement.convertRcBytesToPropertyMapEx(tableEntry.value); |
| 494 | if (propMap.containsKey(key) && propMap.get(key).equals(value)) { |
| 495 | vertices.add(new RamCloudVertex(tableEntry.key, this)); |
| 496 | } |
| 497 | } |
| 498 | } |
| 499 | } |
| 500 | |
| 501 | if (measureBPTimeProp == 1) { |
| 502 | long endTime = System.nanoTime(); |
| 503 | log.error("Performance getVertices exists total time {}.", endTime - startTime); |
| 504 | } |
| 505 | |
| 506 | return vertices; |
| 507 | } |
| 508 | |
| 509 | @Override |
| 510 | public Edge addEdge(Object id, Vertex outVertex, Vertex inVertex, String label) throws IllegalArgumentException { |
| 511 | log.info("Adding edge: [id={}, outVertex={}, inVertex={}, label={}]", id, outVertex, inVertex, label); |
| 512 | |
| 513 | if (label == null) { |
| 514 | throw ExceptionFactory.edgeLabelCanNotBeNull(); |
| 515 | } |
| 516 | |
| 517 | RamCloudEdge newEdge = new RamCloudEdge((RamCloudVertex) outVertex, (RamCloudVertex) inVertex, label, this); |
| 518 | |
| 519 | for (int i = 0; i < 5 ;i++) { |
| 520 | try { |
| 521 | newEdge.create(); |
| 522 | return newEdge; |
| 523 | } catch (Exception e) { |
| 524 | log.warn("Tried to create edge failed: {" + newEdge + "}: ", e); |
| 525 | |
| 526 | if (e instanceof NoSuchElementException) { |
| 527 | log.error("addEdge RETRYING {}", i); |
| 528 | continue; |
| 529 | } |
| 530 | } |
| 531 | } |
| 532 | return null; |
| 533 | } |
| 534 | |
| 535 | public List<Edge> addEdges(Iterable<Edge> edgeEntities) throws IllegalArgumentException { |
| 536 | //TODO WIP: need multi-write |
| 537 | log.info("addEdges start"); |
| 538 | ArrayList<Edge> edges = new ArrayList<Edge>(); |
| 539 | for (Edge edge: edgeEntities) { |
| 540 | edges.add(addEdge(null, edge.getVertex(Direction.OUT), edge.getVertex(Direction.IN), edge.getLabel())); |
| 541 | } |
| 542 | log.info("addVertices end"); |
| 543 | return edges; |
| 544 | } |
| 545 | |
| 546 | public void setProperties(Map<RamCloudVertex, Map<String, Object>> properties) { |
| 547 | // TODO WIP: need multi-write |
| 548 | log.info("setProperties start"); |
| 549 | for (Map.Entry<RamCloudVertex, Map<String, Object>> e: properties.entrySet()) { |
| 550 | e.getKey().setProperties(e.getValue()); |
| 551 | } |
| 552 | log.info("setProperties end"); |
| 553 | } |
| 554 | |
| 555 | @Override |
| 556 | public Edge getEdge(Object id) throws IllegalArgumentException { |
| 557 | byte[] bytearrayId; |
| 558 | |
| 559 | if (id == null) { |
| 560 | throw ExceptionFactory.edgeIdCanNotBeNull(); |
| 561 | } else if (id instanceof byte[]) { |
| 562 | bytearrayId = (byte[]) id; |
| 563 | } else if (id instanceof String) { |
| 564 | bytearrayId = Base64.decode(((String) id)); |
| 565 | } else { |
| 566 | log.warn("ID argument {} of type {} is not supported. Returning null.", id, id.getClass()); |
| 567 | return null; |
| 568 | } |
| 569 | |
| 570 | if (!RamCloudEdge.isValidEdgeId(bytearrayId)) { |
| 571 | log.warn("ID argument {} of type {} is malformed. Returning null.", id, id.getClass()); |
| 572 | return null; |
| 573 | } |
| 574 | |
| 575 | RamCloudEdge edge = new RamCloudEdge(bytearrayId, this); |
| 576 | |
| 577 | if (edge.exists()) { |
| 578 | return edge; |
| 579 | } else { |
| 580 | return null; |
| 581 | } |
| 582 | } |
| 583 | |
| 584 | @Override |
| 585 | public void removeEdge(Edge edge) { |
| 586 | edge.remove(); |
| 587 | } |
| 588 | |
| 589 | @Override |
| 590 | public Iterable<Edge> getEdges() { |
| 591 | JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(edgePropTableId); |
| 592 | List<Edge> edges = new ArrayList<Edge>(); |
| 593 | |
| 594 | while (tableEnum.hasNext()) { |
| 595 | edges.add(new RamCloudEdge(tableEnum.next().key, this)); |
| 596 | } |
| 597 | |
| 598 | return edges; |
| 599 | } |
| 600 | |
| 601 | @Override |
| 602 | public Iterable<Edge> getEdges(String key, Object value) { |
| 603 | JRamCloud.TableEnumerator tableEnum = getRcClient().new TableEnumerator(edgePropTableId); |
| 604 | List<Edge> edges = new ArrayList<Edge>(); |
| 605 | JRamCloud.Object tableEntry; |
| 606 | |
| 607 | while (tableEnum.hasNext()) { |
| 608 | tableEntry = tableEnum.next(); |
| 609 | // FIXME temp |
| 610 | //RamCloudEdge temp = new RamCloudEdge(tableEntry.key, this); |
| 611 | Map<String, Object> propMap = RamCloudElement.convertRcBytesToPropertyMapEx(tableEntry.value); |
| 612 | if (propMap.containsKey(key) && propMap.get(key).equals(value)) { |
| 613 | edges.add(new RamCloudEdge(tableEntry.key, this)); |
| 614 | } |
| 615 | } |
| 616 | |
| 617 | return edges; |
| 618 | } |
| 619 | |
| 620 | @Override |
| 621 | public GraphQuery query() { |
| 622 | return new DefaultGraphQuery(this); |
| 623 | } |
| 624 | |
| 625 | @Override |
| 626 | public void shutdown() { |
| 627 | JRamCloud rcClient = getRcClient(); |
| 628 | rcClient.dropTable(VERT_TABLE_NAME); |
| 629 | rcClient.dropTable(VERT_PROP_TABLE_NAME); |
| 630 | rcClient.dropTable(EDGE_PROP_TABLE_NAME); |
| 631 | rcClient.dropTable(IDX_VERT_TABLE_NAME); |
| 632 | rcClient.dropTable(IDX_EDGE_TABLE_NAME); |
| 633 | rcClient.dropTable(KIDX_VERT_TABLE_NAME); |
| 634 | rcClient.dropTable(KIDX_EDGE_TABLE_NAME); |
| 635 | rcClient.disconnect(); |
| 636 | } |
| 637 | |
| 638 | @Override |
| 639 | public void stopTransaction(Conclusion conclusion) { |
| 640 | // TODO Auto-generated method stub |
| 641 | } |
| 642 | |
| 643 | @Override |
| 644 | public void commit() { |
| 645 | // TODO Auto-generated method stub |
| 646 | } |
| 647 | |
| 648 | @Override |
| 649 | public void rollback() { |
| 650 | // TODO Auto-generated method stub |
| 651 | } |
| 652 | |
| 653 | @Override |
| 654 | public <T extends Element> void dropKeyIndex(String key, Class<T> elementClass) { |
| 655 | throw new UnsupportedOperationException("Not supported yet."); |
| 656 | //FIXME how to dropKeyIndex |
| 657 | //new RamCloudKeyIndex(kidxVertTableId, key, this, elementClass); |
| 658 | //getIndexedKeys(key, elementClass).removeIndex(); |
| 659 | } |
| 660 | |
| 661 | @Override |
| 662 | public <T extends Element> void createKeyIndex(String key, |
| 663 | Class<T> elementClass, Parameter... indexParameters) { |
| 664 | if (key == null) { |
| 665 | return; |
| 666 | } |
| 667 | if (this.indexedKeys.contains(key)) { |
| 668 | return; |
| 669 | } |
| 670 | this.indexedKeys.add(key); |
| 671 | } |
| 672 | |
| 673 | @Override |
| 674 | public <T extends Element> Set< String> getIndexedKeys(Class< T> elementClass) { |
| 675 | if (null != this.indexedKeys) { |
| 676 | return new HashSet<String>(this.indexedKeys); |
| 677 | } else { |
| 678 | return Collections.emptySet(); |
| 679 | } |
| 680 | } |
| 681 | |
| 682 | @Override |
| 683 | public <T extends Element> Index<T> createIndex(String indexName, |
| 684 | Class<T> indexClass, Parameter... indexParameters) { |
| 685 | throw new UnsupportedOperationException("Not supported yet."); |
| 686 | } |
| 687 | |
| 688 | @Override |
| 689 | public <T extends Element> Index<T> getIndex(String indexName, Class<T> indexClass) { |
| 690 | throw new UnsupportedOperationException("Not supported yet."); |
| 691 | } |
| 692 | |
| 693 | @Override |
| 694 | public Iterable<Index<? extends Element>> getIndices() { |
| 695 | throw new UnsupportedOperationException("Not supported yet."); |
| 696 | } |
| 697 | |
| 698 | @Override |
| 699 | public void dropIndex(String indexName) { |
| 700 | throw new UnsupportedOperationException("Not supported yet."); |
| 701 | } |
| 702 | |
| 703 | @Override |
| 704 | public String toString() { |
| 705 | return getClass().getSimpleName().toLowerCase() + "[vertices:" + ((List<Vertex>)getVertices()).size() + " edges:" + ((List<Edge>)getEdges()).size() + "]"; |
| 706 | } |
| 707 | |
| 708 | public static void main(String[] args) { |
| 709 | RamCloudGraph graph = new RamCloudGraph(); |
| 710 | |
| 711 | Vertex a = graph.addVertex(null); |
| 712 | Vertex b = graph.addVertex(null); |
| 713 | Vertex c = graph.addVertex(null); |
| 714 | Vertex d = graph.addVertex(null); |
| 715 | Vertex e = graph.addVertex(null); |
| 716 | Vertex f = graph.addVertex(null); |
| 717 | Vertex g = graph.addVertex(null); |
| 718 | |
| 719 | graph.addEdge(null, a, a, "friend"); |
| 720 | graph.addEdge(null, a, b, "friend1"); |
| 721 | graph.addEdge(null, a, b, "friend2"); |
| 722 | graph.addEdge(null, a, b, "friend3"); |
| 723 | graph.addEdge(null, a, c, "friend"); |
| 724 | graph.addEdge(null, a, d, "friend"); |
| 725 | graph.addEdge(null, a, e, "friend"); |
| 726 | graph.addEdge(null, a, f, "friend"); |
| 727 | graph.addEdge(null, a, g, "friend"); |
| 728 | |
| 729 | graph.shutdown(); |
| 730 | } |
| 731 | } |