Added multi Write Objects

Change-Id: Idcabd295085e31a8d5090cb5f6e2c24b8259f950
diff --git a/src/main/java/net/onrc/onos/datastore/RCClient.java b/src/main/java/net/onrc/onos/datastore/RCClient.java
index df6a8ce..1abe74c 100644
--- a/src/main/java/net/onrc/onos/datastore/RCClient.java
+++ b/src/main/java/net/onrc/onos/datastore/RCClient.java
@@ -4,10 +4,17 @@
 
 public class RCClient {
 
+    // Value taken from RAMCloud's Status.h
+    // FIXME These constants should be defined by JRamCloud
+    public static final int STATUS_OK = 0;
+
     // FIXME come up with a proper way to retrieve configuration
     public static final int MAX_MULTI_READS = Integer.valueOf(System
 	    .getProperty("ramcloud.max_multi_reads", "400"));
 
+    public static final int MAX_MULTI_WRITES = Integer.valueOf(System
+	    .getProperty("ramcloud.max_multi_writes", "800"));
+
     private static final ThreadLocal<JRamCloud> tlsRCClient = new ThreadLocal<JRamCloud>() {
 	@Override
 	protected JRamCloud initialValue() {
diff --git a/src/main/java/net/onrc/onos/datastore/RCObject.java b/src/main/java/net/onrc/onos/datastore/RCObject.java
index 37bc193..100cbd9 100644
--- a/src/main/java/net/onrc/onos/datastore/RCObject.java
+++ b/src/main/java/net/onrc/onos/datastore/RCObject.java
@@ -7,7 +7,9 @@
 import java.util.Iterator;
 import java.util.Map;
 
+import net.onrc.onos.datastore.RCObject.WriteOp.STATUS;
 import net.onrc.onos.datastore.RCTable.Entry;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -16,8 +18,11 @@
 import com.esotericsoftware.kryo.io.Output;
 
 import edu.stanford.ramcloud.JRamCloud;
+import edu.stanford.ramcloud.JRamCloud.MultiWriteObject;
+import edu.stanford.ramcloud.JRamCloud.MultiWriteRspObject;
 import edu.stanford.ramcloud.JRamCloud.ObjectDoesntExistException;
 import edu.stanford.ramcloud.JRamCloud.ObjectExistsException;
+import edu.stanford.ramcloud.JRamCloud.RejectRules;
 import edu.stanford.ramcloud.JRamCloud.TableEnumerator;
 import edu.stanford.ramcloud.JRamCloud.WrongVersionException;
 
@@ -94,6 +99,19 @@
 	return key;
     }
 
+    /**
+     * Get the byte array value of this object
+     *
+     * @note will trigger serialization, if value was null.
+     * @return
+     */
+    protected byte[] getValue() {
+	if (value == null) {
+	    serializeAndSetValue();
+	}
+	return value;
+    }
+
     public long getVersion() {
 	return version;
     }
@@ -239,7 +257,7 @@
      *
      * @param objects
      *            RCObjects to read
-     * @return true if there exist an failed read.
+     * @return true if there exist a failed read.
      */
     public static boolean multiRead(Collection<RCObject> objects) {
 	boolean fail_exists = false;
@@ -307,13 +325,131 @@
 	return fail_exists;
     }
 
-    public static Iterable<RCObject> getAllObjects(
-	    RCTable table) {
+    public static class WriteOp {
+	public enum STATUS {
+	    NOT_EXECUTED, SUCCESS, FAILED
+	}
+
+	public enum OPS {
+	    CREATE, UPDATE, FORCE_CREATE
+	}
+
+	private RCObject obj;
+	private OPS op;
+	private STATUS status;
+
+	public static WriteOp Create(RCObject obj) {
+	    return new WriteOp(obj, OPS.CREATE);
+	}
+
+	public static WriteOp Update(RCObject obj) {
+	    return new WriteOp(obj, OPS.UPDATE);
+	}
+
+	public static WriteOp ForceCreate(RCObject obj) {
+	    return new WriteOp(obj, OPS.FORCE_CREATE);
+	}
+
+	public WriteOp(RCObject obj, OPS op) {
+	    this.obj = obj;
+	    this.op = op;
+	    this.status = STATUS.NOT_EXECUTED;
+	}
+
+	public boolean hasSucceed() {
+	    return status == STATUS.SUCCESS;
+	}
+
+	public RCObject getObject() {
+	    return obj;
+	}
+
+	public OPS getOp() {
+	    return op;
+	}
+    }
+
+    public static boolean multiWrite(Collection<WriteOp> objects) {
+	boolean fail_exists = false;
+
+	ArrayList<WriteOp> req = new ArrayList<>();
+	Iterator<WriteOp> it = objects.iterator();
+	while (it.hasNext()) {
+
+	    req.add(it.next());
+
+	    if (req.size() >= RCClient.MAX_MULTI_WRITES) {
+		// dispatch multiWrite
+		fail_exists |= multiWriteInternal(req);
+		req.clear();
+	    }
+	}
+
+	if (!req.isEmpty()) {
+	    // dispatch multiWrite
+	    fail_exists |= multiWriteInternal(req);
+	    req.clear();
+	}
+
+	return fail_exists;
+    }
+
+    private static boolean multiWriteInternal(ArrayList<WriteOp> ops) {
+
+	boolean fail_exists = false;
+	MultiWriteObject multiWriteObjects[] = new MultiWriteObject[ops.size()];
+	JRamCloud rcClient = RCClient.getClient();
+
+	for (int i = 0; i < multiWriteObjects.length; ++i) {
+	    WriteOp op = ops.get(i);
+	    RCObject obj = op.getObject();
+
+	    // FIXME JRamCloud.RejectRules definition is messed up
+	    RejectRules rules = rcClient.new RejectRules();
+
+	    switch (op.getOp()) {
+	    case CREATE:
+		rules.setExists();
+		break;
+	    case FORCE_CREATE:
+		// no reject rule
+		break;
+	    case UPDATE:
+		rules.setDoesntExists();
+		rules.setNeVersion(obj.getVersion());
+		break;
+	    }
+	    multiWriteObjects[i] = new MultiWriteObject(obj.getTableId(),
+		    obj.getKey(), obj.getValue(), rules);
+	}
+
+	MultiWriteRspObject[] results = rcClient.multiWrite(multiWriteObjects);
+	assert (results.length == multiWriteObjects.length);
+
+	for (int i = 0; i < results.length; ++i) {
+	    WriteOp op = ops.get(i);
+
+	    if (results[i] != null
+		    && results[i].getStatus() == RCClient.STATUS_OK) {
+		op.status = STATUS.SUCCESS;
+
+		RCObject obj = op.getObject();
+		obj.version = results[i].getVersion();
+	    } else {
+		op.status = STATUS.FAILED;
+		fail_exists = true;
+	    }
+
+	}
+
+	return fail_exists;
+    }
+
+    public static Iterable<RCObject> getAllObjects(RCTable table) {
 	return new ObjectEnumerator(table);
     }
 
-    public static class ObjectEnumerator implements
-	    Iterable<RCObject> {
+    public static class ObjectEnumerator implements Iterable<RCObject> {
 
 	private RCTable table;
 
diff --git a/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java b/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java
index aee9c7e..6858e68 100644
--- a/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java
+++ b/src/main/java/net/onrc/onos/datastore/topology/RCSwitch.java
@@ -248,7 +248,7 @@
 	}
 	assert (swRead.getStatus() == STATUS.ACTIVE);
 	for (byte[] portId : swRead.getAllPortIds()) {
-	    // bad example code, portId is not expected to be ASCII string
+	    // XXX bad example code, portId is not expected to be ASCII string
 	    log.debug("PortId: {}", new String(portId));
 	}
 	assert (swRead.getAllPortIds().size() == 2);
@@ -271,7 +271,7 @@
 	}
 	assert (swRead2.getStatus() == STATUS.INACTIVE);
 	for (byte[] portId : swRead2.getAllPortIds()) {
-	    // bad example code, portId is not expected to be ASCII string
+	    // XXX bad example code, portId is not expected to be ASCII string
 	    log.debug("PortId: {}", new String(portId));
 	}
 	assert (swRead2.getAllPortIds().size() == 1);
@@ -364,17 +364,16 @@
 	d2.addPortId(sw2p2.getId());
 	sw2p2.addDeviceId(d2.getId());
 
-	try {
-	    sw2.create();
-	    log.debug("Create {}", sw2);
-	    sw2p1.create();
-	    log.debug("Create {}", sw2p1);
-	    sw2p2.create();
-	    log.debug("Create {}", sw2p2);
-	    d2.create();
-	    log.debug("Create {}", d2);
-	} catch (ObjectExistsException e) {
-	    log.error("One of Switch/Port/Device creation failed", e);
+	boolean failed = RCObject.multiWrite(Arrays.asList(
+	        RCObject.WriteOp.Create(sw2), RCObject.WriteOp.Create(sw2p1),
+	        RCObject.WriteOp.Create(sw2p2), RCObject.WriteOp.Create(d2)));
+	if (failed) {
+	    log.error("One of Switch/Port/Device creation failed");
+	} else {
+	    log.debug("Create {} Version:{}", sw2, sw2.getVersion());
+	    log.debug("Create {} Version:{}", sw2p1, sw2p1.getVersion());
+	    log.debug("Create {} Version:{}", sw2p2, sw2p2.getVersion());
+	    log.debug("Create {} Version:{}", d2, d2.getVersion());
 	}
 
 	RCLink l1 = new RCLink(0x1L, 2L, 0x2L, 1L);
@@ -427,15 +426,15 @@
 	    try {
 		port.read();
 		assert (port.getDpid() == 0x1L);
-		log.debug("Port 0x1:{} - LinkIDs:{} DeviceIDs:{}",
-		        port.getNumber(), port.getAllLinkIds(),
+		log.debug("{} - LinkIDs:{} DeviceIDs:{}",
+		        port, port.getAllLinkIds(),
 		        port.getAllDeviceIds());
 
 		for (byte[] deviceId : port.getAllDeviceIds()) {
 		    RCDevice device = RCDevice.createFromKey(deviceId);
 		    try {
 			device.read();
-			log.debug("Device {} - PortIDs:{}", device.getMac(),
+			log.debug("{} - PortIDs:{}", device,
 			        device.getAllPortIds());
 		    } catch (ObjectDoesntExistException e) {
 			log.error("Reading Device failed", e);
@@ -457,7 +456,7 @@
 	    }
 	}
 
-	RCSwitch sw2 = new RCSwitch(0x1L);
+	RCSwitch sw2 = new RCSwitch(0x2L);
 	try {
 	    sw2.read();
 	    log.debug("{}", sw2);
@@ -472,16 +471,16 @@
 	    RCPort port = RCPort.createFromKey(portId);
 	    try {
 		port.read();
-		assert (port.getDpid() == 0x1L);
-		log.debug("Port 0x2:{} - LinkIDs:{} DeviceIDs:{}",
-		        port.getNumber(), port.getAllLinkIds(),
+		assert (port.getDpid() == 0x2L);
+		log.debug("{} - LinkIDs:{} DeviceIDs:{}",
+		        port, port.getAllLinkIds(),
 		        port.getAllDeviceIds());
 
 		for (byte[] deviceId : port.getAllDeviceIds()) {
 		    RCDevice device = RCDevice.createFromKey(deviceId);
 		    try {
 			device.read();
-			log.debug("Device {} - PortIDs:{}", device,
+			log.debug("{} - PortIDs:{}", device,
 			        device.getAllPortIds());
 		    } catch (ObjectDoesntExistException e) {
 			log.error("Reading Device failed", e);