Added multi Write Objects
Change-Id: Idcabd295085e31a8d5090cb5f6e2c24b8259f950
diff --git a/src/main/java/net/onrc/onos/datastore/RCClient.java b/src/main/java/net/onrc/onos/datastore/RCClient.java
index df6a8ce..1abe74c 100644
--- a/src/main/java/net/onrc/onos/datastore/RCClient.java
+++ b/src/main/java/net/onrc/onos/datastore/RCClient.java
@@ -4,10 +4,17 @@
public class RCClient {
+ // Value taken from RAMCloud's Status.h
+ // FIXME These constants should be defined by JRamCloud
+ public static final int STATUS_OK = 0;
+
// FIXME come up with a proper way to retrieve configuration
public static final int MAX_MULTI_READS = Integer.valueOf(System
.getProperty("ramcloud.max_multi_reads", "400"));
+ public static final int MAX_MULTI_WRITES = Integer.valueOf(System
+ .getProperty("ramcloud.max_multi_writes", "800"));
+
private static final ThreadLocal<JRamCloud> tlsRCClient = new ThreadLocal<JRamCloud>() {
@Override
protected JRamCloud initialValue() {
diff --git a/src/main/java/net/onrc/onos/datastore/RCObject.java b/src/main/java/net/onrc/onos/datastore/RCObject.java
index 37bc193..100cbd9 100644
--- a/src/main/java/net/onrc/onos/datastore/RCObject.java
+++ b/src/main/java/net/onrc/onos/datastore/RCObject.java
@@ -7,7 +7,9 @@
import java.util.Iterator;
import java.util.Map;
+import net.onrc.onos.datastore.RCObject.WriteOp.STATUS;
import net.onrc.onos.datastore.RCTable.Entry;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -16,8 +18,11 @@
import com.esotericsoftware.kryo.io.Output;
import edu.stanford.ramcloud.JRamCloud;
+import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
+import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject;
import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
+import edu.stanford.ramcloud.JRamCloud.RejectRules;
import edu.stanford.ramcloud.JRamCloud.TableEnumerator;
import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
@@ -94,6 +99,19 @@
return key;
}
+ /**
+ * Get the byte array value of this object
+ *
+ * @note will trigger serialization, if value was null.
+ * @return
+ */
+ protected byte[] getValue() {
+ if (value == null) {
+ serializeAndSetValue();
+ }
+ return value;
+ }
+
public long getVersion() {
return version;
}
@@ -239,7 +257,7 @@
*
* @param objects
* RCObjects to read
- * @return true if there exist an failed read.
+ * @return true if there exist a failed read.
*/
public static boolean multiRead(Collection<RCObject> objects) {
boolean fail_exists = false;
@@ -307,13 +325,131 @@
return fail_exists;
}
- public static Iterable<RCObject> getAllObjects(
- RCTable table) {
+ public static class WriteOp {
+ public enum STATUS {
+ NOT_EXECUTED, SUCCESS, FAILED
+ }
+
+ public enum OPS {
+ CREATE, UPDATE, FORCE_CREATE
+ }
+
+ private RCObject obj;
+ private OPS op;
+ private STATUS status;
+
+ public static WriteOp Create(RCObject obj) {
+ return new WriteOp(obj, OPS.CREATE);
+ }
+
+ public static WriteOp Update(RCObject obj) {
+ return new WriteOp(obj, OPS.UPDATE);
+ }
+
+ public static WriteOp ForceCreate(RCObject obj) {
+ return new WriteOp(obj, OPS.FORCE_CREATE);
+ }
+
+ public WriteOp(RCObject obj, OPS op) {
+ this.obj = obj;
+ this.op = op;
+ this.status = STATUS.NOT_EXECUTED;
+ }
+
+ public boolean hasSucceed() {
+ return status == STATUS.SUCCESS;
+ }
+
+ public RCObject getObject() {
+ return obj;
+ }
+
+ public OPS getOp() {
+ return op;
+ }
+ }
+
+ public static boolean multiWrite(Collection<WriteOp> objects) {
+ boolean fail_exists = false;
+
+ ArrayList<WriteOp> req = new ArrayList<>();
+ Iterator<WriteOp> it = objects.iterator();
+ while (it.hasNext()) {
+
+ req.add(it.next());
+
+ if (req.size() >= RCClient.MAX_MULTI_WRITES) {
+ // dispatch multiWrite
+ fail_exists |= multiWriteInternal(req);
+ req.clear();
+ }
+ }
+
+ if (!req.isEmpty()) {
+ // dispatch multiWrite
+ fail_exists |= multiWriteInternal(req);
+ req.clear();
+ }
+
+ return fail_exists;
+ }
+
+ private static boolean multiWriteInternal(ArrayList<WriteOp> ops) {
+
+ boolean fail_exists = false;
+ MultiWriteObject multiWriteObjects[] = new MultiWriteObject[ops.size()];
+ JRamCloud rcClient = RCClient.getClient();
+
+ for (int i = 0; i < multiWriteObjects.length; ++i) {
+ WriteOp op = ops.get(i);
+ RCObject obj = op.getObject();
+
+ // FIXME JRamCloud.RejectRules definition is messed up
+ RejectRules rules = rcClient.new RejectRules();
+
+ switch (op.getOp()) {
+ case CREATE:
+ rules.setExists();
+ break;
+ case FORCE_CREATE:
+ // no reject rule
+ break;
+ case UPDATE:
+ rules.setDoesntExists();
+ rules.setNeVersion(obj.getVersion());
+ break;
+ }
+ multiWriteObjects[i] = new MultiWriteObject(obj.getTableId(),
+ obj.getKey(), obj.getValue(), rules);
+ }
+
+ MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects);
+ assert (results.length == multiWriteObjects.length);
+
+ for (int i = 0; i < results.length; ++i) {
+ WriteOp op = ops.get(i);
+
+ if (results[i] != null
+ && results[i].getStatus() == RCClient.STATUS_OK) {
+ op.status = STATUS.SUCCESS;
+
+ RCObject obj = op.getObject();
+ obj.version = results[i].getVersion();
+ } else {
+ op.status = STATUS.FAILED;
+ fail_exists = true;
+ }
+
+ }
+
+ return fail_exists;
+ }
+
+ public static Iterable<RCObject> getAllObjects(RCTable table) {
return new ObjectEnumerator(table);
}
- public static class ObjectEnumerator implements
- Iterable<RCObject> {
+ public static class ObjectEnumerator implements Iterable<RCObject> {
private RCTable table;
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java b/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java
index aee9c7e..6858e68 100644
--- a/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java
+++ b/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java
@@ -248,7 +248,7 @@
}
assert (swRead.getStatus() == STATUS.ACTIVE);
for (byte[] portId : swRead.getAllPortIds()) {
- // bad example code, portId is not expected to be ASCII string
+ // XXX bad example code, portId is not expected to be ASCII string
log.debug("PortId: {}", new String(portId));
}
assert (swRead.getAllPortIds().size() == 2);
@@ -271,7 +271,7 @@
}
assert (swRead2.getStatus() == STATUS.INACTIVE);
for (byte[] portId : swRead2.getAllPortIds()) {
- // bad example code, portId is not expected to be ASCII string
+ // XXX bad example code, portId is not expected to be ASCII string
log.debug("PortId: {}", new String(portId));
}
assert (swRead2.getAllPortIds().size() == 1);
@@ -364,17 +364,16 @@
d2.addPortId(sw2p2.getId());
sw2p2.addDeviceId(d2.getId());
- try {
- sw2.create();
- log.debug("Create {}", sw2);
- sw2p1.create();
- log.debug("Create {}", sw2p1);
- sw2p2.create();
- log.debug("Create {}", sw2p2);
- d2.create();
- log.debug("Create {}", d2);
- } catch (ObjectExistsException e) {
- log.error("One of Switch/Port/Device creation failed", e);
+ boolean failed = RCObject.multiWrite(Arrays.asList(
+ RCObject.WriteOp.Create(sw2), RCObject.WriteOp.Create(sw2p1),
+ RCObject.WriteOp.Create(sw2p2), RCObject.WriteOp.Create(d2)));
+ if (failed) {
+ log.error("One of Switch/Port/Device creation failed");
+ } else {
+ log.debug("Create {} Version:{}", sw2, sw2.getVersion());
+ log.debug("Create {} Version:{}", sw2p1, sw2p1.getVersion());
+ log.debug("Create {} Version:{}", sw2p2, sw2p2.getVersion());
+ log.debug("Create {} Version:{}", d2, d2.getVersion());
}
RCLink l1 = new RCLink(0x1L, 2L, 0x2L, 1L);
@@ -427,15 +426,15 @@
try {
port.read();
assert (port.getDpid() == 0x1L);
- log.debug("Port 0x1:{} - LinkIDs:{} DeviceIDs:{}",
- port.getNumber(), port.getAllLinkIds(),
+ log.debug("{} - LinkIDs:{} DeviceIDs:{}",
+ port, port.getAllLinkIds(),
port.getAllDeviceIds());
for (byte[] deviceId : port.getAllDeviceIds()) {
RCDevice device = RCDevice.createFromKey(deviceId);
try {
device.read();
- log.debug("Device {} - PortIDs:{}", device.getMac(),
+ log.debug("{} - PortIDs:{}", device,
device.getAllPortIds());
} catch (ObjectDoesntExistException e) {
log.error("Reading Device failed", e);
@@ -457,7 +456,7 @@
}
}
- RCSwitch sw2 = new RCSwitch(0x1L);
+ RCSwitch sw2 = new RCSwitch(0x2L);
try {
sw2.read();
log.debug("{}", sw2);
@@ -472,16 +471,16 @@
RCPort port = RCPort.createFromKey(portId);
try {
port.read();
- assert (port.getDpid() == 0x1L);
- log.debug("Port 0x2:{} - LinkIDs:{} DeviceIDs:{}",
- port.getNumber(), port.getAllLinkIds(),
+ assert (port.getDpid() == 0x2L);
+ log.debug("{} - LinkIDs:{} DeviceIDs:{}",
+ port, port.getAllLinkIds(),
port.getAllDeviceIds());
for (byte[] deviceId : port.getAllDeviceIds()) {
RCDevice device = RCDevice.createFromKey(deviceId);
try {
device.read();
- log.debug("Device {} - PortIDs:{}", device,
+ log.debug("{} - PortIDs:{}", device,
device.getAllPortIds());
} catch (ObjectDoesntExistException e) {
log.error("Reading Device failed", e);