If the value of the intents writing into ramcloud exceeds the 1MB limit split the size into multiple writes.
Allocate the byte buffer once review comment from Yuta.
Change-Id: Ib58637eb6cb4d19526ec8c4d7e915c3aa2a65057
diff --git a/src/main/java/net/onrc/onos/intent/persist/PersistIntent.java b/src/main/java/net/onrc/onos/intent/persist/PersistIntent.java
index 515b0bf..380a0f2 100755
--- a/src/main/java/net/onrc/onos/intent/persist/PersistIntent.java
+++ b/src/main/java/net/onrc/onos/intent/persist/PersistIntent.java
@@ -9,6 +9,8 @@
import edu.stanford.ramcloud.JRamCloud;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import net.onrc.onos.datagrid.web.IntentResource;
import net.onrc.onos.datastore.RCTable;
@@ -29,7 +31,8 @@
private long range = 10000L;
private final IControllerRegistryService controllerRegistry;
NetworkGraph graph = null;
- private final String intentJournal = "G:IntentJournal";
+ private final static String intentJournal = "G:IntentJournal";
+ private final static int valueStoreLimit = 1024 * 1024;
private RCTable table;
private Kryo kryo = new Kryo();
private ByteArrayOutputStream stream;
@@ -61,6 +64,7 @@
}
private long getNextBlock() {
+ // XXX This method is not thread safe, may lose allocated IdBlock
idBlock = controllerRegistry.allocateUniqueIdBlock(range);
nextId = new AtomicLong(idBlock.getStart());
rangeEnd = idBlock.getEnd();
@@ -73,15 +77,40 @@
// TODO call controllerRegistry.isClusterLeader()
if (leader) {
try {
+ // reserve key 10 entries for multi-write if size over 1MB
+ key *= 10;
kryo.writeObject(output, operations);
output.close();
+ ByteBuffer keyBytes = ByteBuffer.allocate(8).putLong(key);
byte[] buffer = stream.toByteArray();
- table.create(String.valueOf(key).getBytes(), buffer);
+ int total = buffer.length;
+ if ((total >= valueStoreLimit )) {
+ int writeCount = total / valueStoreLimit;
+ int remainder = total % valueStoreLimit;
+ int upperIndex = 0;
+ for (int i = 0; i < writeCount; i++, key++) {
+ keyBytes.clear();
+ keyBytes.putLong(key);
+ keyBytes.flip();
+ upperIndex = (i * valueStoreLimit + valueStoreLimit) - 1;
+ log.debug("writing using indexes {}:{}", (i*valueStoreLimit) ,upperIndex);
+ table.create(keyBytes.array(), Arrays.copyOfRange(buffer, i * valueStoreLimit, upperIndex));
+ }
+ if (remainder > 0) {
+ keyBytes.clear();
+ keyBytes.putLong(key);
+ keyBytes.flip();
+ log.debug("writing using indexes {}:{}" ,upperIndex ,total);
+ table.create(keyBytes.array(), Arrays.copyOfRange(buffer, upperIndex + 1, total - 1));
+ }
+ } else {
+ keyBytes.flip();
+ table.create(keyBytes.array(), buffer);
+ }
log.debug("key is {} value length is {}", key, buffer.length);
stream.reset();
stream.close();
log.debug("persist operations to ramcloud size of operations: {}", operations.size());
- if (buffer.length > 921600 ) log.error("oversize key {} value length is {}", key, buffer.length);
ret = true;
} catch (JRamCloud.ObjectExistsException ex) {
log.warn("Failed to store intent journal with key " + key);