Implementation of conditional write for vertex property table (ONOS-937)
Change-Id: I6ecd81d68ed6d780fc330f1d753b377e0f899f6f
diff --git a/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudElement.java b/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudElement.java
index f1183c5..9241093 100644
--- a/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudElement.java
+++ b/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudElement.java
@@ -43,6 +43,7 @@
private byte[] rcPropTableKey;
private long rcPropTableId;
private RamCloudGraph graph;
+ private long propVersion;
private static final ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
@Override
@@ -79,6 +80,7 @@
pm.read_start("RamCloudElement getPropertyMap()");
propTableEntry = vertTable.read(rcPropTableId, rcPropTableKey);
pm.read_end("RamCloudElement getPropertyMap()");
+ propVersion = propTableEntry.version;
if (propTableEntry.value.length > 1024 * 1024 * 0.9) {
log.warn("Element[id={}] property map size is near 1MB limit!", new String(rcPropTableKey));
}
@@ -113,59 +115,27 @@
return null;
} else if (byteArray.length != 0) {
PerfMon pm = PerfMon.getInstance();
- long startTime = 0;
- if(RamCloudGraph.measureSerializeTimeProp == 1) {
- startTime = System.nanoTime();
- }
pm.deser_start("RamCloudElement convertRcBytesToPropertyMap()");
ByteBufferInput input = new ByteBufferInput(byteArray);
TreeMap map = kryo.get().readObject(input, TreeMap.class);
pm.deser_end("RamCloudElement convertRcBytesToPropertyMap()");
- if(RamCloudGraph.measureSerializeTimeProp == 1) {
- long endTime = System.nanoTime();
- log.error("Performance element kryo deserialization key {} {} size {}", this, endTime - startTime, byteArray.length);
- }
return map;
} else {
return new TreeMap<String, Object>();
}
}
- private void setPropertyMap(Map<String, Object> map) {
+ private static byte[] convertVertexPropertyMapToRcBytes(Map<String, Object> map) {
byte[] rcValue;
PerfMon pm = PerfMon.getInstance();
- long startKryoTime = 0;
- if(RamCloudGraph.measureSerializeTimeProp == 1) {
- startKryoTime = System.nanoTime();
- }
pm.ser_start("RamCloudElement setPropertyMap()");
byte[] rcTemp = new byte[1024*1024];
Output output = new Output(rcTemp);
kryo.get().writeObject(output, map);
- long midKryoTime = 0;
- if(RamCloudGraph.measureSerializeTimeProp == 1) {
- midKryoTime = System.nanoTime();
- }
rcValue = output.toBytes();
pm.ser_end("RamCloudElement setPropertyMap()");
- if(RamCloudGraph.measureSerializeTimeProp == 1) {
- long endKryoTime = System.nanoTime();
- log.error("Performance element kryo serialization key {} mid {}, total {}, size {}", this, midKryoTime - startKryoTime, endKryoTime - startKryoTime, rcValue.length);
- }
-
- long startTime = 0;
- JRamCloud vertTable = graph.getRcClient();
- if (graph.measureRcTimeProp == 1) {
- startTime = System.nanoTime();
- }
- pm.write_start("RamCloudElement setPropertyMap()");
- vertTable.write(rcPropTableId, rcPropTableKey, rcValue);
- pm.write_end("RamCloudElement setPropertyMap()");
- if (graph.measureRcTimeProp == 1) {
- long endTime = System.nanoTime();
- log.error("Performance setPropertyMap write time key {} {}", this, endTime - startTime);
- }
+ return rcValue;
}
@Override
@@ -184,34 +154,61 @@
return getPropertyMap();
}
public void setProperties(Map<String, Object> properties) {
- Map<String, Object> map = getPropertyMap();
+ Map<String, Object> map = getPropertyMap();
Map<String, Object> oldValueMap = new HashMap<String, Object>(map.size());
- for (Map.Entry<String, Object> property : properties.entrySet()) {
- String key = property.getKey();
- if (key == null) {
- throw ExceptionFactory.propertyKeyCanNotBeNull();
- }
+ for (Map.Entry<String, Object> property : properties.entrySet()) {
+ String key = property.getKey();
+ if (key == null) {
+ throw ExceptionFactory.propertyKeyCanNotBeNull();
+ }
- if (key.equals("")) {
- throw ExceptionFactory.propertyKeyCanNotBeEmpty();
- }
+ if (key.equals("")) {
+ throw ExceptionFactory.propertyKeyCanNotBeEmpty();
+ }
- if (key.equals("id")) {
- throw ExceptionFactory.propertyKeyIdIsReserved();
- }
+ if (key.equals("id")) {
+ throw ExceptionFactory.propertyKeyIdIsReserved();
+ }
- if (this instanceof RamCloudEdge && key.equals("label")) {
- throw ExceptionFactory.propertyKeyLabelIsReservedForEdges();
- }
- Object value = property.getValue();
- if (value == null) {
- throw ExceptionFactory.propertyValueCanNotBeNull();
- }
+ if (this instanceof RamCloudEdge && key.equals("label")) {
+ throw ExceptionFactory.propertyKeyLabelIsReservedForEdges();
+ }
+ Object value = property.getValue();
+ if (value == null) {
+ throw ExceptionFactory.propertyValueCanNotBeNull();
+ }
- oldValueMap.put(key, map.put(key, value));
+ oldValueMap.put(key, map.put(key, value));
- }
- setPropertyMap(map);
+ }
+ byte[] rcValue = convertVertexPropertyMapToRcBytes(map);
+
+ if (rcValue.length != 0) {
+ if (!writeWithRules(rcValue)) {
+ log.debug("getSetProperties cond. write failure RETRYING 1");
+ for (int i = 0; i < graph.CONDITIONALWRITE_RETRY_MAX ; i++){
+ map = getPropertyMap();
+ oldValueMap = new HashMap<String, Object>(map.size());
+ for (Map.Entry<String, Object> property : properties.entrySet()) {
+ String key = property.getKey();
+ Object value = property.getValue();
+ oldValueMap.put(key, map.put(key, value));
+ }
+
+ rcValue = convertVertexPropertyMapToRcBytes(map);
+ if (rcValue.length != 0) {
+ if (writeWithRules(rcValue)) {
+ break;
+ } else {
+ log.debug("getSetProperties cond. write failure RETRYING {}", i+1);
+ if (i + 1 == graph.CONDITIONALWRITE_RETRY_MAX) {
+ log.error("setProperties cond. write failure Gaveup RETRYING");
+ }
+ }
+ }
+ }
+ }
+ }
// TODO use multi-write
for (Map.Entry<String, Object> oldProperty : oldValueMap.entrySet()) {
@@ -229,25 +226,25 @@
}
@Override
- public void setProperty(String key, Object value) {
- Object oldValue;
- if (value == null) {
+ public void setProperty(String propKey, Object propValue) {
+ Object oldValue = null;
+ if (propValue == null) {
throw ExceptionFactory.propertyValueCanNotBeNull();
}
- if (key == null) {
+ if (propKey == null) {
throw ExceptionFactory.propertyKeyCanNotBeNull();
}
- if (key.equals("")) {
+ if (propKey.equals("")) {
throw ExceptionFactory.propertyKeyCanNotBeEmpty();
}
- if (key.equals("id")) {
+ if (propKey.equals("id")) {
throw ExceptionFactory.propertyKeyIdIsReserved();
}
- if (this instanceof RamCloudEdge && key.equals("label")) {
+ if (this instanceof RamCloudEdge && propKey.equals("label")) {
throw ExceptionFactory.propertyKeyLabelIsReservedForEdges();
}
@@ -256,41 +253,74 @@
startTime = System.nanoTime();
}
- Map<String, Object> map = getPropertyMap();
- oldValue = map.put(key, value);
- setPropertyMap(map);
+ for (int i = 0; i < graph.CONDITIONALWRITE_RETRY_MAX; i++) {
+ Map<String, Object> map = getPropertyMap();
+ oldValue = map.put(propKey, propValue);
+
+ byte[] rcValue = convertVertexPropertyMapToRcBytes(map);
+
+ if (rcValue.length != 0) {
+ if (writeWithRules(rcValue)) {
+ break;
+ } else {
+ log.debug("setProperty(String {}, Object {}) cond. write failure RETRYING {}", propKey, propValue, i+1);
+ if (i + 1 == graph.CONDITIONALWRITE_RETRY_MAX) {
+ log.error("setProperty(String {}, Object {}) cond. write failure Gaveup RETRYING", propKey, propValue);
+ }
+ }
+ }
+ }
boolean ret = false;
if (this instanceof RamCloudVertex) {
- RamCloudKeyIndex keyIndex = new RamCloudKeyIndex(graph.kidxVertTableId, key, value, graph, Vertex.class);
- ret = keyIndex.autoUpdate(key, value, oldValue, this);
+ RamCloudKeyIndex keyIndex = new RamCloudKeyIndex(graph.kidxVertTableId, propKey, propValue, graph, Vertex.class);
+ ret = keyIndex.autoUpdate(propKey, propValue, oldValue, this);
} else {
- RamCloudKeyIndex keyIndex = new RamCloudKeyIndex(graph.kidxVertTableId, key, value, graph, Edge.class);
- keyIndex.autoUpdate(key, value, oldValue, this);
+ RamCloudKeyIndex keyIndex = new RamCloudKeyIndex(graph.kidxVertTableId, propKey, propValue, graph, Edge.class);
+ keyIndex.autoUpdate(propKey, propValue, oldValue, this);
}
if (graph.measureBPTimeProp == 1) {
long endTime = System.nanoTime();
if (ret) {
- log.error("Performance vertex setProperty(key {}) which is index total time {}", key, endTime - startTime);
+ log.error("Performance vertex setProperty(key {}) which is total time {}", propKey, endTime - startTime);
} else {
- log.error("Performance vertex setProperty(key {}) does not index time {}", key, endTime - startTime);
+ log.error("Performance vertex setProperty(key {}) does not time {}", propKey, endTime - startTime);
}
}
}
+ protected boolean writeWithRules(byte[] rcValue) {
+ return RamCloudWrite.writeWithRules(this.rcPropTableId, this.rcPropTableKey, rcValue, this.propVersion, this.graph, RamCloudWrite.PerfMonEnum.WRITE);
+ }
+
@Override
- public <T> T removeProperty(String key) {
- Map<String, Object> map = getPropertyMap();
- T retVal = (T) map.remove(key);
- setPropertyMap(map);
+ public <T> T removeProperty(String propKey) {
+ T retVal = null;
+ for (int i = 0; i < graph.CONDITIONALWRITE_RETRY_MAX; i++) {
+ Map<String, Object> map = getPropertyMap();
+ retVal = (T) map.remove(propKey);
+ byte[] rcValue = convertVertexPropertyMapToRcBytes(map);
+
+ if (rcValue.length != 0) {
+ if (writeWithRules(rcValue)) {
+ break;
+ } else {
+ log.debug("removeProperty(String {}) cond. write failure RETRYING {}", propKey, i+1);
+ if (i + 1 == graph.CONDITIONALWRITE_RETRY_MAX) {
+ log.error("removeProperty(String {}) cond. write failure Gaveup RETRYING", propKey);
+ }
+ }
+ }
+ }
+
if (this instanceof RamCloudVertex) {
- RamCloudKeyIndex keyIndex = new RamCloudKeyIndex(graph.kidxVertTableId, key, retVal, graph, Vertex.class);
- keyIndex.autoRemove(key, retVal.toString(), this);
+ RamCloudKeyIndex keyIndex = new RamCloudKeyIndex(graph.kidxVertTableId, propKey, retVal, graph, Vertex.class);
+ keyIndex.autoRemove(propKey, retVal.toString(), this);
} else {
- RamCloudKeyIndex keyIndex = new RamCloudKeyIndex(graph.kidxVertTableId, key, retVal, graph, Edge.class);
- keyIndex.autoRemove(key, retVal.toString(), this);
+ RamCloudKeyIndex keyIndex = new RamCloudKeyIndex(graph.kidxVertTableId, propKey, retVal, graph, Edge.class);
+ keyIndex.autoRemove(propKey, retVal.toString(), this);
}
return retVal;
diff --git a/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudGraph.java b/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudGraph.java
index 4843b01..9871aaa 100644
--- a/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudGraph.java
+++ b/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudGraph.java
@@ -84,6 +84,8 @@
private final long INSTANCE_ID_RANGE = 100000;
private String coordinatorLocation;
private static final Features FEATURES = new Features();
+ // FIXME better loop variable name
+ public final int CONDITIONALWRITE_RETRY_MAX = 100;
public final long measureBPTimeProp = Long.valueOf(System.getProperty("benchmark.measureBP", "0"));
public final long measureRcTimeProp = Long.valueOf(System.getProperty("benchmark.measureRc", "0"));
public static final long measureSerializeTimeProp = Long.valueOf(System.getProperty("benchmark.measureSerializeTimeProp", "0"));
@@ -279,7 +281,7 @@
}
if (instanceEntry != null) {
long curInstanceId = 1;
- for (int i = 0 ; i < 100 ; i++) {
+ for (int i = 0 ; i < CONDITIONALWRITE_RETRY_MAX ; i++) {
Map<String, Long> propMap = null;
if (instanceEntry.value == null) {
log.warn("Got a null byteArray argument");
diff --git a/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudIndex.java b/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudIndex.java
index 0ba951d..634df25 100644
--- a/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudIndex.java
+++ b/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudIndex.java
@@ -51,21 +51,6 @@
// FIXME this should not be defined here
private long indexVersion;
-// private static final ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
-// @Override
-// protected Kryo initialValue() {
-// Kryo kryo = new Kryo();
-// kryo.setRegistrationRequired(true);
-// kryo.register(Long.class);
-// kryo.register(String.class);
-// kryo.register(TreeMap.class);
-// kryo.register(ArrayList.class);
-// kryo.setReferences(false);
-// return kryo;
-// }
-// };
-
-
public RamCloudIndex(long tableId, String indexName, Object propValue, RamCloudGraph graph, Class<T> indexClass) {
this.tableId = tableId;
this.graph = graph;
@@ -89,7 +74,6 @@
JRamCloud.Object vertTableEntry;
JRamCloud vertTable = graph.getRcClient();
- //vertTableEntry = graph.getRcClient().read(tableId, rcKey);
pm.indexread_start("RamCloudIndex exists()");
vertTableEntry = vertTable.read(tableId, rcKey);
pm.indexread_end("RamCloudIndex exists()");
@@ -110,7 +94,6 @@
JRamCloud.RejectRules rules = rcClient.new RejectRules();
rules.setExists();
- //graph.getRcClient().writeRule(tableId, rcKey, ByteBuffer.allocate(0).array(), rules);
pm.indexwrite_start("RamCloudIndex create()");
rcClient.writeRule(tableId, rcKey, ByteBuffer.allocate(0).array(), rules);
pm.indexwrite_end("RamCloudIndex create()");
@@ -181,8 +164,7 @@
create();
- // FIXME give more meaningful loop variable
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < graph.CONDITIONALWRITE_RETRY_MAX; i++) {
Map<Object, List<Object>> map = readIndexPropertyMapFromDB();
List<Object> values = map.get(propValue);
if (values == null) {
@@ -193,20 +175,14 @@
values.add(elmId);
}
- //Masa commented out the following measurement b/c Serialization delay is measured in onvertIndexPropertyMapToRcBytes(map)
- //long serStartTime = System.nanoTime();
byte[] rcValue = convertIndexPropertyMapToRcBytes(map);
- //if(RamCloudGraph.measureSerializeTimeProp == 1) {
- // long serEndTime = System.nanoTime();
- // log.error("Performance index kryo serialization [id={}] {} size {}", elmId, serEndTime - serStartTime, rcValue.length);
- //}
if (rcValue.length != 0) {
if (writeWithRules(rcValue)) {
break;
} else {
log.debug("getSetProperty(String {}, Object {}) cond. write failure RETRYING {}", propValue, elmId, i+1);
- if (i == 100) {
+ if (i + 1 == graph.CONDITIONALWRITE_RETRY_MAX) {
log.error("getSetProperty(String {}, Object {}) cond. write failure Gaveup RETRYING", propValue, elmId);
}
}
@@ -261,9 +237,7 @@
log.error("Index name mismatch indexName:{}, remove({},{},...). SOMETHING IS WRONG", indexName, propName, propValue);
}
- // FIXME better loop variable name
- final int MAX_RETRYS = 100;
- for (int i = 0; i < MAX_RETRYS; ++i) {
+ for (int i = 0; i < graph.CONDITIONALWRITE_RETRY_MAX; ++i) {
Map<Object, List<Object>> map = readIndexPropertyMapFromDB();
if (map.containsKey(propValue)) {
@@ -281,17 +255,7 @@
// no change to DB so exit now
return;
}
- //long startTime = System.nanoTime();
- //if(RamCloudGraph.measureSerializeTimeProp == 1) {
- // pm.ser_start("SC");
- //}
byte[] rcValue = convertIndexPropertyMapToRcBytes(map);
- //if(RamCloudGraph.measureSerializeTimeProp == 1) {
- // pm.ser_end("SC");
- //long endTime = System.nanoTime();
- //pm.ser_add(endTime - startTime);
- //log.error("Performance index kryo serialization for removal key {} {} size {}", element, endTime - startTime, rcValue.length);
- //}
if (rcValue.length == 0) {
return;
@@ -301,7 +265,7 @@
break;
} else {
log.debug("remove({}, {}, T element) write failure RETRYING {}", propName, propValue, (i + 1));
- if (i + 1 == MAX_RETRYS) {
+ if (i + 1 == graph.CONDITIONALWRITE_RETRY_MAX) {
log.error("remove({}, {}, T element) write failed completely. gave up RETRYING", propName, propValue);
}
}
@@ -341,15 +305,14 @@
// nothing to write
continue;
}
- if (writeWithRules(tableId, tableEntry.key, rcValue, tableEntry.version, graph)) {
+ if (RamCloudWrite.writeWithRules(tableId, tableEntry.key, rcValue, tableEntry.version, graph, RamCloudWrite.PerfMonEnum.INDEXWRITE)) {
// cond. write success
continue;
} else {
// cond. write failure
log.debug("removeElement({}, {}, ...) cond. key/value write failure RETRYING", tableId, element );
// FIXME Dirty hack
- final int RETRY_MAX = 100;
- for (int retry = RETRY_MAX; retry >= 0; --retry) {
+ for (int retry = graph.CONDITIONALWRITE_RETRY_MAX; retry >= 0; --retry) {
RamCloudKeyIndex idx = new RamCloudKeyIndex(tableId, tableEntry.key, graph, element.getClass());
Map<Object, List<Object>> rereadMap = idx.readIndexPropertyMapFromDB();
@@ -370,7 +333,7 @@
}
if (idx.writeWithRules(convertIndexPropertyMapToRcBytes(rereadMap))) {
- log.debug("removeElement({}, {}, ...) cond. key/value {} write failure RETRYING {}", tableId, element, rereadMap, RETRY_MAX - retry);
+ log.debug("removeElement({}, {}, ...) cond. key/value {} write failure RETRYING {}", tableId, element, rereadMap, graph.CONDITIONALWRITE_RETRY_MAX - retry);
// cond. re-write success
break;
}
@@ -384,31 +347,20 @@
}
public Map<Object, List<Object>> readIndexPropertyMapFromDB() {
- //log.debug("getIndexPropertyMap() ");
JRamCloud.Object propTableEntry;
- long startTime = 0;
PerfMon pm = PerfMon.getInstance();
try {
JRamCloud vertTable = graph.getRcClient();
- if (graph.measureRcTimeProp == 1) {
- startTime = System.nanoTime();
- }
- //propTableEntry = graph.getRcClient().read(tableId, rcKey);
pm.indexread_start("RamCloudIndex readIndexPropertyMapFromDB()");
propTableEntry = vertTable.read(tableId, rcKey);
pm.indexread_end("RamCloudIndex readIndexPropertyMapFromDB()");
- if (graph.measureRcTimeProp == 1) {
- long endTime = System.nanoTime();
- log.error("Performance readIndexPropertyMapFromDB(indexName {}) read time {}", indexName, endTime - startTime);
- }
indexVersion = propTableEntry.version;
} catch (Exception e) {
pm.indexread_end("RamCloudIndex readIndexPropertyMapFromDB()");
indexVersion = 0;
- if (graph.measureRcTimeProp == 1) {
- long endTime = System.nanoTime();
- log.error("Performance readIndexPropertyMapFromDB(indexName {}) exception read time {}", indexName, endTime - startTime);
+ if (e instanceof JRamCloud.ObjectDoesntExistException) {
+ return null;
}
log.warn("readIndexPropertyMapFromDB() Element does not have a index property table entry! tableId :" + tableId + " indexName : " + indexName + " ", e);
return null;
@@ -423,10 +375,6 @@
return null;
} else if (byteArray.length != 0) {
PerfMon pm = PerfMon.getInstance();
- long startTime = 0;
- if(RamCloudGraph.measureSerializeTimeProp == 1) {
- startTime = System.nanoTime();
- }
pm.indexdeser_start("RamCloudIndex convertRcBytesToIndexPropertyMap()");
IndexBlob blob;
TreeMap<Object, List<Object>> map = new TreeMap<Object, List<Object>>();
@@ -434,18 +382,12 @@
blob = IndexBlob.parseFrom(byteArray);
List const_list = blob.getVertexIdList();
ArrayList list = new ArrayList<>(const_list);
-// ByteBufferInput input = new ByteBufferInput(byteArray);
-// ArrayList list = kryo.get().readObject(input, ArrayList.class);
map.put(rcKeyToPropName(rcKey), list);
} catch (InvalidProtocolBufferException e) {
log.error("{" + toString() + "}: Read malformed edge list: ", e);
} finally {
pm.indexdeser_end("RamCloudIndex convertRcBytesToIndexPropertyMap()");
}
- if(RamCloudGraph.measureSerializeTimeProp == 1) {
- long endTime = System.nanoTime();
- log.error("Performance index kryo deserialization [id=N/A] {} size {}", endTime - startTime, byteArray.length);
- }
return map;
} else {
return new TreeMap<Object, List<Object>>();
@@ -454,10 +396,6 @@
public static byte[] convertIndexPropertyMapToRcBytes(Map<Object, List<Object>> map) {
PerfMon pm = PerfMon.getInstance();
- long startTime = 0;
- if(RamCloudGraph.measureSerializeTimeProp == 1) {
- startTime = System.nanoTime();
- }
byte[] bytes;
pm.indexser_start("RamCloudIndex convertIndexPropertyMapToRcBytes()");
@@ -468,47 +406,12 @@
}
IndexBlob blob = builder.build();
bytes = blob.toByteArray();
-// ByteBufferOutput output = new ByteBufferOutput(1024*1024);
-// if ( map.values().size() == 0 ) {
-// kryo.get().writeObject(output, new ArrayList<Object>());
-// } else {
-// kryo.get().writeObject(output, vtxIds);
-// }
-// bytes = output.toBytes();
pm.indexser_end("RamCloudIndex convertIndexPropertyMapToRcBytes()");
- if(RamCloudGraph.measureSerializeTimeProp == 1) {
- long endTime = System.nanoTime();
- log.error("Performance index ProtoBuff serialization {}, size={}", endTime - startTime, bytes);
- }
return bytes;
}
protected boolean writeWithRules(byte[] rcValue) {
- return writeWithRules(this.tableId, this.rcKey, rcValue, this.indexVersion, this.graph);
- }
-
- private static boolean writeWithRules(long tableId, byte[] rcKey, byte[] rcValue, long expectedVersion, RamCloudGraph graph) {
- JRamCloud.RejectRules rules = graph.getRcClient().new RejectRules();
-
- if (expectedVersion == 0) {
- rules.setExists();
- } else {
- rules.setNeVersion(expectedVersion);
- }
-
- PerfMon pm = PerfMon.getInstance();
- try {
- JRamCloud vertTable = graph.getRcClient();
- pm.indexwrite_start("RamCloudIndex writeWithRules()");
- vertTable.writeRule(tableId, rcKey, rcValue, rules);
- pm.indexwrite_end("RamCloudIndex writeWithRules()");
- } catch (Exception e) {
- pm.indexwrite_end("RamCloudIndex writeWithRules()");
- pm.indexwrite_condfail("RamCloudIndex writeWithRules()");
- log.debug("Cond. Write index property: {} failed {} expected version: {}", rcKeyToIndexName(rcKey), e, expectedVersion);
- return false;
- }
- return true;
+ return RamCloudWrite.writeWithRules(this.tableId, this.rcKey, rcValue, this.indexVersion, this.graph, RamCloudWrite.PerfMonEnum.INDEXWRITE);
}
public List<Object> getElmIdListForPropValue(Object propValue) {
@@ -526,7 +429,7 @@
}
public <T> T removeIndexProperty(String key) {
- for (int i = 0; i < 100; ++i) {
+ for (int i = 0; i < graph.CONDITIONALWRITE_RETRY_MAX; ++i) {
Map<Object, List<Object>> map = readIndexPropertyMapFromDB();
T retVal = (T) map.remove(key);
byte[] rcValue = convertIndexPropertyMapToRcBytes(map);
diff --git a/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudVertex.java b/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudVertex.java
index 421db77..b41dca8 100644
--- a/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudVertex.java
+++ b/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudVertex.java
@@ -189,11 +189,11 @@
/*
* RamCloudVertex specific methods
*/
- private static byte[] idToRcKey(long id) {
+ public static byte[] idToRcKey(long id) {
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(id).array();
}
- private static long rcKeyToId(byte[] rcKey) {
+ public static long rcKeyToId(byte[] rcKey) {
return ByteBuffer.wrap(rcKey).order(ByteOrder.LITTLE_ENDIAN).getLong();
}
diff --git a/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudWrite.java b/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudWrite.java
new file mode 100644
index 0000000..6b3fba2
--- /dev/null
+++ b/src/main/java/com/tinkerpop/blueprints/impls/ramcloud/RamCloudWrite.java
@@ -0,0 +1,47 @@
+package com.tinkerpop.blueprints.impls.ramcloud;
+
+import edu.stanford.ramcloud.JRamCloud;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RamCloudWrite {
+ public enum PerfMonEnum {
+ WRITE,
+ INDEXWRITE
+ }
+ private final static Logger log = LoggerFactory.getLogger(RamCloudGraph.class);
+
+ public static boolean writeWithRules(long tableId, byte[] rcKey, byte[] rcValue, long expectedVersion, RamCloudGraph graph, PerfMonEnum perfMonKind) {
+ JRamCloud.RejectRules rules = graph.getRcClient().new RejectRules();
+
+ if (expectedVersion == 0) {
+ rules.setExists();
+ } else {
+ rules.setNeVersion(expectedVersion);
+ }
+
+ PerfMon pm = PerfMon.getInstance();
+ try {
+ JRamCloud rcClient = graph.getRcClient();
+ if (perfMonKind.equals(PerfMonEnum.WRITE)) {
+ pm.write_start("RamCloudIndex writeWithRules()");
+ } else if (perfMonKind.equals(PerfMonEnum.INDEXWRITE)) {
+ pm.indexwrite_start("RamCloudIndex writeWithRules()");
+ }
+ rcClient.writeRule(tableId, rcKey, rcValue, rules);
+ pm.indexwrite_end("RamCloudIndex writeWithRules()");
+ } catch (Exception e) {
+ if (perfMonKind.equals(PerfMonEnum.WRITE)) {
+ pm.write_end("RamCloudIndex writeWithRules()");
+ pm.write_condfail("RamCloudIndex writeWithRules()");
+ log.debug("Cond. Write property: {} failed {} expected version: {}", RamCloudVertex.rcKeyToId(rcKey), e, expectedVersion);
+ } else if (perfMonKind.equals(PerfMonEnum.INDEXWRITE)) {
+ pm.indexwrite_end("RamCloudIndex writeWithRules()");
+ pm.indexwrite_condfail("RamCloudIndex writeWithRules()");
+ log.debug("Cond. Write index property: {} failed {} expected version: {}", RamCloudIndex.rcKeyToIndexName(rcKey), e, expectedVersion);
+ }
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/graph/DBOperation.java b/src/main/java/net/onrc/onos/graph/DBOperation.java
index 6e278d9..c291544 100644
--- a/src/main/java/net/onrc/onos/graph/DBOperation.java
+++ b/src/main/java/net/onrc/onos/graph/DBOperation.java
@@ -56,7 +56,6 @@
*/
@Override
public ISwitchObject newSwitch(final String dpid) {
- //System.out.println("newSwitch");
ISwitchObject obj = (ISwitchObject) conn.getFramedGraph().addVertex(null, ISwitchObject.class);
if (obj != null) {
obj.setType("switch");
@@ -79,7 +78,6 @@
*/
@Override
public Iterable<ISwitchObject> getAllSwitches() {
- //System.out.println("getAllSwitches");
Iterable<ISwitchObject> switches = conn.getFramedGraph().getVertices("type", "switch", ISwitchObject.class);
return switches;
}
@@ -89,7 +87,6 @@
*/
@Override
public Iterable<ISwitchObject> getInactiveSwitches() {
- //System.out.println("getInactiveSwitches");
Iterable<ISwitchObject> switches = conn.getFramedGraph().getVertices("type", "switch", ISwitchObject.class);
List<ISwitchObject> inactiveSwitches = new ArrayList<ISwitchObject>();
@@ -117,13 +114,11 @@
*/
@Override
public void removeSwitch(ISwitchObject sw) {
- //System.out.println("removeSwitch");
conn.getFramedGraph().removeVertex(sw.asVertex());
}
@Override
public IPortObject newPort(String dpid, Short portNum) {
- //System.out.println("newPort");
IPortObject obj = (IPortObject) conn.getFramedGraph().addVertex(null, IPortObject.class);
if (obj != null) {
obj.setType("port");
@@ -171,7 +166,6 @@
*/
@Override
public void removePort(IPortObject port) {
- //System.out.println("removeProt");
if (conn.getFramedGraph() != null) {
conn.getFramedGraph().removeVertex(port.asVertex());
}
@@ -182,7 +176,6 @@
*/
@Override
public IDeviceObject newDevice() {
- //System.out.println("newDevice");
IDeviceObject obj = (IDeviceObject) conn.getFramedGraph().addVertex(null, IDeviceObject.class);
if (obj != null) {
obj.setType("device");
@@ -195,7 +188,6 @@
*/
@Override
public Iterable<IDeviceObject> getDevices() {
- //System.out.println("getDeiveces");
return conn.getFramedGraph() != null ? conn.getFramedGraph().getVertices("type", "device", IDeviceObject.class) : null;
}
@@ -205,7 +197,6 @@
*/
@Override
public void removeDevice(IDeviceObject dev) {
- //System.out.println("removeDevice");
if (conn.getFramedGraph() != null) {
conn.getFramedGraph().removeVertex(dev.asVertex());
}
@@ -216,9 +207,7 @@
*/
@Override
public IFlowPath newFlowPath() {
- //System.out.println("newFlowPath");
IFlowPath flowPath = (IFlowPath)conn.getFramedGraph().addVertex(null, IFlowPath.class);
- //System.out.println("flowPath : " + flowPath);
if (flowPath != null) {
flowPath.setType("flow");
}
@@ -297,7 +286,6 @@
*/
@Override
public Iterable<IFlowPath> getAllFlowPaths() {
- //System.out.println("getAllFlowPaths");
Iterable<IFlowPath> flowPaths = conn.getFramedGraph().getVertices("type", "flow", IFlowPath.class);
List<IFlowPath> nonNullFlows = new ArrayList<IFlowPath>();
@@ -316,7 +304,6 @@
*/
@Override
public void removeFlowPath(IFlowPath flowPath) {
- //System.out.println("removeFlowPath");
conn.getFramedGraph().removeVertex(flowPath.asVertex());
}
@@ -346,7 +333,6 @@
*/
@Override
public void removeFlowEntry(IFlowEntry flowEntry) {
- //System.out.println("removeFlowEntry");
conn.getFramedGraph().removeVertex(flowEntry.asVertex());
}
@@ -355,7 +341,6 @@
*/
@Override
public IFlowEntry newFlowEntry() {
- //System.out.println("newFlowEntry");
IFlowEntry flowEntry = (IFlowEntry) conn.getFramedGraph().addVertex(null, IFlowEntry.class);
if (flowEntry != null) {
flowEntry.setType("flow_entry");
@@ -402,7 +387,6 @@
}
public void removeIpv4Address(IIpv4Address ipv4Address) {
- //System.out.println("removeIpv4Address");
conn.getFramedGraph().removeVertex(ipv4Address.asVertex());
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
index 99a2eb1..3aea9a4 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
@@ -35,7 +35,7 @@
@Override
public List<Link> getActiveLinks() {
- dbop = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
+ dbop = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloud.conf");
//dbop = GraphDBManager.getDBOperation("", "");
//dbop.commit(); //Commit to ensure we see latest data
Iterable<ISwitchObject> switches = dbop.getActiveSwitches();
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 04f99a1..74c8075 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -126,7 +126,7 @@
* Startup processing.
*/
private void startup() {
- this.dbHandler = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
+ this.dbHandler = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloud.conf");
//
// Obtain the initial Topology state