blob: a764b6334fa2dc433fd47dc0942d1f64120042d2 [file] [log] [blame]
diff --git a/src/MasterService.cc b/src/MasterService.cc
index 93cfb5d..38d250f 100644
--- a/src/MasterService.cc
+++ b/src/MasterService.cc
@@ -1750,25 +1750,38 @@ MasterService::increment(const WireFormat::Increment::Request* reqHdr,
Status *status = &respHdr->common.status;
Buffer value;
+ uint64_t version;
+ int64_t newValue;
RejectRules rejectRules = reqHdr->rejectRules;
- *status = objectManager.readObject(key, &value, &rejectRules, NULL);
- if (*status != STATUS_OK)
- return;
+ RejectRules updateRejectRules;
+ memset(&updateRejectRules, 0, sizeof(updateRejectRules));
- if (value.getTotalLength() != sizeof(int64_t)) {
- *status = STATUS_INVALID_OBJECT;
- return;
- }
+ do {
+ value.reset();
+ *status = objectManager.readObject(key, &value, &rejectRules, &version);
+ if (*status != STATUS_OK)
+ return;
+
+ if (value.getTotalLength() != sizeof(int64_t)) {
+ *status = STATUS_INVALID_OBJECT;
+ return;
+ }
+
+ const int64_t oldValue = *value.getOffset<int64_t>(0);
+ newValue = oldValue + reqHdr->incrementValue;
+
+ // Write the new value back
+ Buffer newValueBuffer;
+ newValueBuffer.append(&newValue, sizeof(int64_t));
- const int64_t oldValue = *value.getOffset<int64_t>(0);
- int64_t newValue = oldValue + reqHdr->incrementValue;
+ // reject rule to check atomic update
+ updateRejectRules.givenVersion = version;
+ updateRejectRules.versionNeGiven = true;
- // Write the new value back
- Buffer newValueBuffer;
- newValueBuffer.append(&newValue, sizeof(int64_t));
+ *status = objectManager.writeObject(key, newValueBuffer,
+ &updateRejectRules, &respHdr->version);
+ } while (*status == STATUS_WRONG_VERSION);
- *status = objectManager.writeObject(key, newValueBuffer,
- &rejectRules, &respHdr->version);
if (*status != STATUS_OK)
return;
objectManager.syncChanges();