blob: a764b6334fa2dc433fd47dc0942d1f64120042d2 [file] [log] [blame]
Yuta HIGUCHI23efab32014-04-28 15:34:41 -07001diff --git a/src/MasterService.cc b/src/MasterService.cc
2index 93cfb5d..38d250f 100644
3--- a/src/MasterService.cc
4+++ b/src/MasterService.cc
5@@ -1750,25 +1750,38 @@ MasterService::increment(const WireFormat::Increment::Request* reqHdr,
6 Status *status = &respHdr->common.status;
7
8 Buffer value;
9+ uint64_t version;
10+ int64_t newValue;
11 RejectRules rejectRules = reqHdr->rejectRules;
12- *status = objectManager.readObject(key, &value, &rejectRules, NULL);
13- if (*status != STATUS_OK)
14- return;
15+ RejectRules updateRejectRules;
16+ memset(&updateRejectRules, 0, sizeof(updateRejectRules));
17
18- if (value.getTotalLength() != sizeof(int64_t)) {
19- *status = STATUS_INVALID_OBJECT;
20- return;
21- }
22+ do {
23+ value.reset();
24+ *status = objectManager.readObject(key, &value, &rejectRules, &version);
25+ if (*status != STATUS_OK)
26+ return;
27+
28+ if (value.getTotalLength() != sizeof(int64_t)) {
29+ *status = STATUS_INVALID_OBJECT;
30+ return;
31+ }
32+
33+ const int64_t oldValue = *value.getOffset<int64_t>(0);
34+ newValue = oldValue + reqHdr->incrementValue;
35+
36+ // Write the new value back
37+ Buffer newValueBuffer;
38+ newValueBuffer.append(&newValue, sizeof(int64_t));
39
40- const int64_t oldValue = *value.getOffset<int64_t>(0);
41- int64_t newValue = oldValue + reqHdr->incrementValue;
42+ // reject rule to check atomic update
43+ updateRejectRules.givenVersion = version;
44+ updateRejectRules.versionNeGiven = true;
45
46- // Write the new value back
47- Buffer newValueBuffer;
48- newValueBuffer.append(&newValue, sizeof(int64_t));
49+ *status = objectManager.writeObject(key, newValueBuffer,
50+ &updateRejectRules, &respHdr->version);
51+ } while (*status == STATUS_WRONG_VERSION);
52
53- *status = objectManager.writeObject(key, newValueBuffer,
54- &rejectRules, &respHdr->version);
55 if (*status != STATUS_OK)
56 return;
57 objectManager.syncChanges();