blob: ea62e72a4a9c05e61c1804174f884d4fde6800ec [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani79924fa2016-09-13 13:57:03 -070018import static com.google.common.base.Preconditions.checkState;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080019import static org.onosproject.store.service.MapEvent.Type.INSERT;
20import static org.onosproject.store.service.MapEvent.Type.REMOVE;
21import static org.onosproject.store.service.MapEvent.Type.UPDATE;
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -070022import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080023import io.atomix.copycat.server.Commit;
24import io.atomix.copycat.server.Snapshottable;
25import io.atomix.copycat.server.StateMachineExecutor;
Madan Jampani79924fa2016-09-13 13:57:03 -070026import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import io.atomix.copycat.server.session.SessionListener;
28import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
29import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
30import io.atomix.resource.ResourceStateMachine;
31
32import java.util.Collection;
33import java.util.HashMap;
34import java.util.Iterator;
Madan Jampanifc981772016-02-16 09:46:42 -080035import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080036import java.util.Map;
Madan Jampani65f24bb2016-03-15 15:16:18 -070037import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080038import java.util.Set;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080039import java.util.stream.Collectors;
40
41import org.onlab.util.CountDownCompleter;
42import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080043import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080044import org.onosproject.store.primitives.TransactionId;
Madan Jampanifc981772016-02-16 09:46:42 -080045import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
Jordan Haltermanf6272442017-04-20 02:18:08 -070050import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.GetOrDefault;
Madan Jampanifc981772016-02-16 09:46:42 -080051import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
54import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
Jordan Halterman948d6592017-04-20 17:18:24 -070055import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionBegin;
Madan Jampanifc981772016-02-16 09:46:42 -080056import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080057import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampani542d9e22016-04-05 15:39:55 -070058import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
Madan Jampanifc981772016-02-16 09:46:42 -080059import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
60import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
61import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
62import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080063import org.onosproject.store.service.MapEvent;
Jordan Halterman948d6592017-04-20 17:18:24 -070064import org.onosproject.store.service.TransactionLog;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080065import org.onosproject.store.service.Versioned;
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -070066import org.slf4j.Logger;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080067
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -070068import com.google.common.base.Throwables;
Madan Jampanifc981772016-02-16 09:46:42 -080069import com.google.common.collect.Lists;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080070import com.google.common.collect.Maps;
71import com.google.common.collect.Sets;
72
Madan Jampani5e5b3d62016-02-01 16:03:33 -080073/**
74 * State Machine for {@link AtomixConsistentMap} resource.
75 */
Madan Jampanifc981772016-02-16 09:46:42 -080076public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
Madan Jampani86cb2432016-02-17 11:07:56 -080077
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -070078 private final Logger log = getLogger(getClass());
Madan Jampani79924fa2016-09-13 13:57:03 -070079 private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080080 private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
81 private final Set<String> preparedKeys = Sets.newHashSet();
Jordan Halterman5f97a302017-04-26 23:41:31 -070082 private final Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
83 private long currentVersion;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080084
Madan Jampani65f24bb2016-03-15 15:16:18 -070085 public AtomixConsistentMapState(Properties properties) {
86 super(properties);
Madan Jampani86cb2432016-02-17 11:07:56 -080087 }
88
Madan Jampani5e5b3d62016-02-01 16:03:33 -080089 @Override
90 public void snapshot(SnapshotWriter writer) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080091 }
92
93 @Override
94 public void install(SnapshotReader reader) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080095 }
96
97 @Override
98 protected void configure(StateMachineExecutor executor) {
99 // Listeners
Madan Jampanifc981772016-02-16 09:46:42 -0800100 executor.register(Listen.class, this::listen);
101 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800102 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -0800103 executor.register(ContainsKey.class, this::containsKey);
104 executor.register(ContainsValue.class, this::containsValue);
105 executor.register(EntrySet.class, this::entrySet);
106 executor.register(Get.class, this::get);
Jordan Haltermanf6272442017-04-20 02:18:08 -0700107 executor.register(GetOrDefault.class, this::getOrDefault);
Madan Jampanifc981772016-02-16 09:46:42 -0800108 executor.register(IsEmpty.class, this::isEmpty);
109 executor.register(KeySet.class, this::keySet);
110 executor.register(Size.class, this::size);
111 executor.register(Values.class, this::values);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800112 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -0800113 executor.register(UpdateAndGet.class, this::updateAndGet);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800114 executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
Jordan Halterman948d6592017-04-20 17:18:24 -0700115 executor.register(TransactionBegin.class, this::begin);
Madan Jampanifc981772016-02-16 09:46:42 -0800116 executor.register(TransactionPrepare.class, this::prepare);
117 executor.register(TransactionCommit.class, this::commit);
118 executor.register(TransactionRollback.class, this::rollback);
Madan Jampani542d9e22016-04-05 15:39:55 -0700119 executor.register(TransactionPrepareAndCommit.class, this::prepareAndCommit);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800120 }
121
122 @Override
123 public void delete() {
124 // Delete Listeners
125 listeners.values().forEach(Commit::close);
126 listeners.clear();
127
128 // Delete Map entries
129 mapEntries.values().forEach(MapEntryValue::discard);
130 mapEntries.clear();
131 }
132
133 /**
134 * Handles a contains key commit.
135 *
Madan Jampanifc981772016-02-16 09:46:42 -0800136 * @param commit containsKey commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800137 * @return {@code true} if map contains key
138 */
Madan Jampanifc981772016-02-16 09:46:42 -0800139 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800140 try {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700141 MapEntryValue value = mapEntries.get(commit.operation().key());
142 return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800143 } finally {
144 commit.close();
145 }
146 }
147
148 /**
149 * Handles a contains value commit.
150 *
Madan Jampanifc981772016-02-16 09:46:42 -0800151 * @param commit containsValue commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800152 * @return {@code true} if map contains value
153 */
Madan Jampanifc981772016-02-16 09:46:42 -0800154 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800155 try {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700156 Match<byte[]> valueMatch = Match.ifValue(commit.operation().value());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800157 return mapEntries.values().stream()
Jordan Halterman5f97a302017-04-26 23:41:31 -0700158 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800159 .anyMatch(value -> valueMatch.matches(value.value()));
160 } finally {
161 commit.close();
162 }
163 }
164
165 /**
166 * Handles a get commit.
167 *
Jordan Haltermanf6272442017-04-20 02:18:08 -0700168 * @param commit get commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800169 * @return value mapped to key
170 */
Madan Jampanifc981772016-02-16 09:46:42 -0800171 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800172 try {
173 return toVersioned(mapEntries.get(commit.operation().key()));
174 } finally {
175 commit.close();
176 }
177 }
178
179 /**
Jordan Haltermanf6272442017-04-20 02:18:08 -0700180 * Handles a get or default commit.
181 *
182 * @param commit get or default commit
183 * @return value mapped to key
184 */
185 protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
186 try {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700187 MapEntryValue value = mapEntries.get(commit.operation().key());
188 if (value == null) {
189 return new Versioned<>(commit.operation().defaultValue(), 0);
190 } else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
191 return new Versioned<>(commit.operation().defaultValue(), value.version);
192 } else {
193 return new Versioned<>(value.value(), value.version);
194 }
Jordan Haltermanf6272442017-04-20 02:18:08 -0700195 } finally {
196 commit.close();
197 }
198 }
199
200 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800201 * Handles a count commit.
202 *
Madan Jampanifc981772016-02-16 09:46:42 -0800203 * @param commit size commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800204 * @return number of entries in map
205 */
Madan Jampanifc981772016-02-16 09:46:42 -0800206 protected int size(Commit<? extends Size> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800207 try {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700208 return (int) mapEntries.values().stream()
209 .filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
210 .count();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800211 } finally {
212 commit.close();
213 }
214 }
215
216 /**
217 * Handles an is empty commit.
218 *
Madan Jampanifc981772016-02-16 09:46:42 -0800219 * @param commit isEmpty commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800220 * @return {@code true} if map is empty
221 */
Madan Jampanifc981772016-02-16 09:46:42 -0800222 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800223 try {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700224 return mapEntries.values().stream()
225 .noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800226 } finally {
227 commit.close();
228 }
229 }
230
231 /**
232 * Handles a keySet commit.
233 *
Madan Jampanifc981772016-02-16 09:46:42 -0800234 * @param commit keySet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800235 * @return set of keys in map
236 */
Madan Jampanifc981772016-02-16 09:46:42 -0800237 protected Set<String> keySet(Commit<? extends KeySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800238 try {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700239 return mapEntries.entrySet().stream()
240 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
241 .map(Map.Entry::getKey)
242 .collect(Collectors.toSet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800243 } finally {
244 commit.close();
245 }
246 }
247
248 /**
249 * Handles a values commit.
250 *
Madan Jampanifc981772016-02-16 09:46:42 -0800251 * @param commit values commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800252 * @return collection of values in map
253 */
Madan Jampanifc981772016-02-16 09:46:42 -0800254 protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800255 try {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700256 return mapEntries.entrySet().stream()
257 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
258 .map(entry -> toVersioned(entry.getValue()))
259 .collect(Collectors.toList());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800260 } finally {
261 commit.close();
262 }
263 }
264
265 /**
266 * Handles a entry set commit.
267 *
268 * @param commit
269 * entrySet commit
270 * @return set of map entries
271 */
Madan Jampanifc981772016-02-16 09:46:42 -0800272 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800273 try {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700274 return mapEntries.entrySet().stream()
275 .filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
276 .map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800277 .collect(Collectors.toSet());
278 } finally {
279 commit.close();
280 }
281 }
282
283 /**
284 * Handles a update and get commit.
285 *
Madan Jampanifc981772016-02-16 09:46:42 -0800286 * @param commit updateAndGet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800287 * @return update result
288 */
Madan Jampanifc981772016-02-16 09:46:42 -0800289 protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
Madan Jampaniea98f412016-06-22 09:05:40 -0700290 try {
291 MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
292 String key = commit.operation().key();
293 MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
294 Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800295
Madan Jampaniea98f412016-06-22 09:05:40 -0700296 if (updateStatus != MapEntryUpdateResult.Status.OK) {
297 commit.close();
Jordan Haltermane5ce1452017-05-09 12:15:22 -0700298 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, oldMapValue);
Madan Jampaniea98f412016-06-22 09:05:40 -0700299 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800300
Madan Jampaniea98f412016-06-22 09:05:40 -0700301 byte[] newValue = commit.operation().value();
Jordan Halterman5f97a302017-04-26 23:41:31 -0700302 currentVersion = commit.index();
Madan Jampaniea98f412016-06-22 09:05:40 -0700303 Versioned<byte[]> newMapValue = newValue == null ? null
Jordan Halterman5f97a302017-04-26 23:41:31 -0700304 : new Versioned<>(newValue, currentVersion);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800305
Madan Jampaniea98f412016-06-22 09:05:40 -0700306 MapEvent.Type updateType = newValue == null ? REMOVE
307 : oldCommitValue == null ? INSERT : UPDATE;
Jordan Halterman5f97a302017-04-26 23:41:31 -0700308
309 // If a value existed in the map, remove and discard the value to ensure disk can be freed.
Madan Jampaniea98f412016-06-22 09:05:40 -0700310 if (updateType == REMOVE || updateType == UPDATE) {
311 mapEntries.remove(key);
312 oldCommitValue.discard();
313 }
Jordan Halterman5f97a302017-04-26 23:41:31 -0700314
315 // If this is an insert/update commit, add the commit to the map entries.
Madan Jampaniea98f412016-06-22 09:05:40 -0700316 if (updateType == INSERT || updateType == UPDATE) {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700317 mapEntries.put(key, new NonTransactionalCommit(commit));
318 } else if (!activeTransactions.isEmpty()) {
319 // If this is a delete but transactions are currently running, ensure tombstones are retained
320 // for version checks.
321 TombstoneCommit tombstone = new TombstoneCommit(
322 commit.index(),
323 new CountDownCompleter<>(commit, 1, Commit::close));
324 mapEntries.put(key, tombstone);
Madan Jampaniea98f412016-06-22 09:05:40 -0700325 } else {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700326 // If no transactions are in progress, we can safely delete the key from memory.
Madan Jampaniea98f412016-06-22 09:05:40 -0700327 commit.close();
328 }
Jordan Halterman5f97a302017-04-26 23:41:31 -0700329
Madan Jampaniea98f412016-06-22 09:05:40 -0700330 publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
Jordan Haltermane5ce1452017-05-09 12:15:22 -0700331 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, newMapValue);
Madan Jampaniea98f412016-06-22 09:05:40 -0700332 } catch (Exception e) {
333 log.error("State machine operation failed", e);
334 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800335 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800336 }
337
338 /**
339 * Handles a clear commit.
340 *
Madan Jampanifc981772016-02-16 09:46:42 -0800341 * @param commit clear commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800342 * @return clear result
343 */
Madan Jampanifc981772016-02-16 09:46:42 -0800344 protected MapEntryUpdateResult.Status clear(Commit<? extends Clear> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800345 try {
346 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
347 .entrySet().iterator();
348 while (iterator.hasNext()) {
349 Map.Entry<String, MapEntryValue> entry = iterator.next();
350 String key = entry.getKey();
351 MapEntryValue value = entry.getValue();
352 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
353 value.version());
Madan Jampanifc981772016-02-16 09:46:42 -0800354 publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800355 value.discard();
356 iterator.remove();
357 }
358 return MapEntryUpdateResult.Status.OK;
359 } finally {
360 commit.close();
361 }
362 }
363
364 /**
365 * Handles a listen commit.
366 *
Madan Jampanifc981772016-02-16 09:46:42 -0800367 * @param commit listen commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800368 */
Madan Jampanifc981772016-02-16 09:46:42 -0800369 protected void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800370 Long sessionId = commit.session().id();
Madan Jampanie6038872016-03-03 12:14:37 -0800371 if (listeners.putIfAbsent(sessionId, commit) != null) {
372 commit.close();
373 return;
374 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800375 commit.session()
376 .onStateChange(
377 state -> {
Madan Jampani3a9911c2016-02-21 11:25:45 -0800378 if (state == ServerSession.State.CLOSED
379 || state == ServerSession.State.EXPIRED) {
Madan Jampanifc981772016-02-16 09:46:42 -0800380 Commit<? extends Listen> listener = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800381 if (listener != null) {
382 listener.close();
383 }
384 }
385 });
386 }
387
388 /**
389 * Handles an unlisten commit.
390 *
Madan Jampanifc981772016-02-16 09:46:42 -0800391 * @param commit unlisten commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800392 */
Madan Jampani40f022e2016-03-02 21:35:14 -0800393 protected void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800394 try {
Madan Jampani40f022e2016-03-02 21:35:14 -0800395 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800396 if (listener != null) {
397 listener.close();
398 }
399 } finally {
400 commit.close();
401 }
402 }
403
404 /**
Jordan Halterman948d6592017-04-20 17:18:24 -0700405 * Handles a begin commit.
406 *
407 * @param commit transaction begin commit
408 * @return transaction state version
409 */
410 protected long begin(Commit<? extends TransactionBegin> commit) {
411 try {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700412 long version = commit.index();
413 activeTransactions.put(commit.operation().transactionId(), new TransactionScope(version));
414 return version;
Jordan Halterman948d6592017-04-20 17:18:24 -0700415 } finally {
416 commit.close();
417 }
418 }
419
420 /**
Madan Jampani542d9e22016-04-05 15:39:55 -0700421 * Handles an prepare and commit commit.
422 *
423 * @param commit transaction prepare and commit commit
424 * @return prepare result
425 */
426 protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
Jordan Halterman881e4502017-05-10 13:46:55 -0700427 TransactionId transactionId = commit.operation().transactionLog().transactionId();
Madan Jampani542d9e22016-04-05 15:39:55 -0700428 PrepareResult prepareResult = prepare(commit);
Jordan Halterman881e4502017-05-10 13:46:55 -0700429 TransactionScope transactionScope = activeTransactions.remove(transactionId);
Madan Jampani542d9e22016-04-05 15:39:55 -0700430 if (prepareResult == PrepareResult.OK) {
Jordan Halterman881e4502017-05-10 13:46:55 -0700431 this.currentVersion = commit.index();
Jordan Halterman5f97a302017-04-26 23:41:31 -0700432 transactionScope = transactionScope.prepared(commit);
433 commit(transactionScope);
Jordan Halterman881e4502017-05-10 13:46:55 -0700434 } else if (transactionScope != null) {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700435 transactionScope.close();
Madan Jampani542d9e22016-04-05 15:39:55 -0700436 }
Jordan Halterman5f97a302017-04-26 23:41:31 -0700437 discardTombstones();
Madan Jampani542d9e22016-04-05 15:39:55 -0700438 return prepareResult;
439 }
440
441 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800442 * Handles an prepare commit.
443 *
Madan Jampanifc981772016-02-16 09:46:42 -0800444 * @param commit transaction prepare commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800445 * @return prepare result
446 */
Madan Jampanifc981772016-02-16 09:46:42 -0800447 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800448 boolean ok = false;
Jordan Halterman5f97a302017-04-26 23:41:31 -0700449
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800450 try {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700451 TransactionLog<MapUpdate<String, byte[]>> transactionLog = commit.operation().transactionLog();
452
453 // Iterate through records in the transaction log and perform isolation checks.
454 for (MapUpdate<String, byte[]> record : transactionLog.records()) {
455 String key = record.key();
456
457 // If the record is a VERSION_MATCH then check that the record's version matches the current
458 // version of the state machine.
459 if (record.type() == MapUpdate.Type.VERSION_MATCH && key == null) {
460 if (record.version() > currentVersion) {
461 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
462 } else {
463 continue;
464 }
465 }
466
467 // If the prepared keys already contains the key contained within the record, that indicates a
468 // conflict with a concurrent transaction.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800469 if (preparedKeys.contains(key)) {
470 return PrepareResult.CONCURRENT_TRANSACTION;
471 }
Jordan Halterman5f97a302017-04-26 23:41:31 -0700472
473 // Read the existing value from the map.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800474 MapEntryValue existingValue = mapEntries.get(key);
Jordan Halterman5f97a302017-04-26 23:41:31 -0700475
476 // Note: if the existing value is null, that means the key has not changed during the transaction,
477 // otherwise a tombstone would have been retained.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800478 if (existingValue == null) {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700479 // If the value is null, ensure the version is equal to the transaction version.
480 if (record.version() != transactionLog.version()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800481 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
482 }
483 } else {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700484 // If the value is non-null, compare the current version with the record version.
485 if (existingValue.version() > record.version()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800486 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
487 }
488 }
489 }
Jordan Halterman5f97a302017-04-26 23:41:31 -0700490
491 // No violations detected. Mark modified keys locked for transactions.
492 transactionLog.records().forEach(record -> {
493 if (record.type() != MapUpdate.Type.VERSION_MATCH) {
494 preparedKeys.add(record.key());
495 }
496 });
497
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800498 ok = true;
Jordan Halterman5f97a302017-04-26 23:41:31 -0700499
500 // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
501 // coordinator is communicating with another node. Transactions assume that the client is communicating
502 // with a single leader in order to limit the overhead of retaining tombstones.
503 TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
504 if (transactionScope == null) {
505 activeTransactions.put(
506 transactionLog.transactionId(),
507 new TransactionScope(transactionLog.version(), commit));
508 return PrepareResult.PARTIAL_FAILURE;
509 } else {
510 activeTransactions.put(
511 transactionLog.transactionId(),
512 transactionScope.prepared(commit));
513 return PrepareResult.OK;
514 }
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700515 } catch (Exception e) {
516 log.warn("Failure applying {}", commit, e);
517 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800518 } finally {
519 if (!ok) {
520 commit.close();
521 }
522 }
523 }
524
525 /**
526 * Handles an commit commit (ha!).
527 *
528 * @param commit transaction commit commit
529 * @return commit result
530 */
Madan Jampanifc981772016-02-16 09:46:42 -0800531 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800532 TransactionId transactionId = commit.operation().transactionId();
Jordan Halterman5f97a302017-04-26 23:41:31 -0700533 TransactionScope transactionScope = activeTransactions.remove(transactionId);
534 if (transactionScope == null) {
535 return CommitResult.UNKNOWN_TRANSACTION_ID;
536 }
537
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800538 try {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700539 this.currentVersion = commit.index();
540 return commit(transactionScope.committed(commit));
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700541 } catch (Exception e) {
542 log.warn("Failure applying {}", commit, e);
543 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800544 } finally {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700545 discardTombstones();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800546 }
547 }
548
Jordan Halterman5f97a302017-04-26 23:41:31 -0700549 /**
550 * Applies committed operations to the state machine.
551 */
552 private CommitResult commit(TransactionScope transactionScope) {
553 TransactionLog<MapUpdate<String, byte[]>> transactionLog = transactionScope.transactionLog();
554 boolean retainTombstones = !activeTransactions.isEmpty();
555
556 // Count the total number of keys that will be set by this transaction. This is necessary to do reference
557 // counting for garbage collection.
558 long totalReferencesToCommit = transactionLog.records().stream()
559 // No keys are set for version checks. For deletes, references are only retained of tombstones
560 // need to be retained for concurrent transactions.
561 .filter(record -> record.type() != MapUpdate.Type.VERSION_MATCH && record.type() != MapUpdate.Type.LOCK
562 && (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH || retainTombstones))
Madan Jampani542d9e22016-04-05 15:39:55 -0700563 .count();
Jordan Halterman5f97a302017-04-26 23:41:31 -0700564
565 // Create a count down completer that counts references to the transaction commit for garbage collection.
566 CountDownCompleter<TransactionScope> completer = new CountDownCompleter<>(
567 transactionScope, totalReferencesToCommit, TransactionScope::close);
568
Madan Jampani542d9e22016-04-05 15:39:55 -0700569 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
Jordan Halterman5f97a302017-04-26 23:41:31 -0700570 for (MapUpdate<String, byte[]> record : transactionLog.records()) {
571 if (record.type() == MapUpdate.Type.VERSION_MATCH) {
572 continue;
573 }
574
575 String key = record.key();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700576 checkState(preparedKeys.remove(key), "key is not prepared");
Jordan Halterman5f97a302017-04-26 23:41:31 -0700577
578 if (record.type() == MapUpdate.Type.LOCK) {
579 continue;
580 }
581
Madan Jampani542d9e22016-04-05 15:39:55 -0700582 MapEntryValue previousValue = mapEntries.remove(key);
583 MapEntryValue newValue = null;
Jordan Halterman5f97a302017-04-26 23:41:31 -0700584
585 // If the record is not a delete, create a transactional commit.
586 if (record.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
587 newValue = new TransactionalCommit(currentVersion, record.value(), completer);
588 } else if (retainTombstones) {
589 // For deletes, if tombstones need to be retained then create and store a tombstone commit.
590 newValue = new TombstoneCommit(currentVersion, completer);
Madan Jampani542d9e22016-04-05 15:39:55 -0700591 }
Jordan Halterman5f97a302017-04-26 23:41:31 -0700592
Madan Jampani542d9e22016-04-05 15:39:55 -0700593 eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
Jordan Halterman5f97a302017-04-26 23:41:31 -0700594
Madan Jampani542d9e22016-04-05 15:39:55 -0700595 if (newValue != null) {
596 mapEntries.put(key, newValue);
597 }
Jordan Halterman5f97a302017-04-26 23:41:31 -0700598
Madan Jampani542d9e22016-04-05 15:39:55 -0700599 if (previousValue != null) {
600 previousValue.discard();
601 }
602 }
603 publish(eventsToPublish);
604 return CommitResult.OK;
605 }
606
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800607 /**
608 * Handles an rollback commit (ha!).
609 *
610 * @param commit transaction rollback commit
611 * @return rollback result
612 */
Madan Jampanifc981772016-02-16 09:46:42 -0800613 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800614 TransactionId transactionId = commit.operation().transactionId();
Jordan Halterman5f97a302017-04-26 23:41:31 -0700615 TransactionScope transactionScope = activeTransactions.remove(transactionId);
616 if (transactionScope == null) {
617 return RollbackResult.UNKNOWN_TRANSACTION_ID;
618 } else if (!transactionScope.isPrepared()) {
619 discardTombstones();
620 transactionScope.close();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800621 commit.close();
Jordan Halterman5f97a302017-04-26 23:41:31 -0700622 return RollbackResult.OK;
623 } else {
624 try {
625 transactionScope.transactionLog().records()
626 .forEach(record -> {
627 if (record.type() != MapUpdate.Type.VERSION_MATCH) {
628 preparedKeys.remove(record.key());
629 }
630 });
631 return RollbackResult.OK;
632 } finally {
633 discardTombstones();
634 transactionScope.close();
635 commit.close();
636 }
637 }
638
639 }
640
641 /**
642 * Discards tombstones no longer needed by active transactions.
643 */
644 private void discardTombstones() {
645 if (activeTransactions.isEmpty()) {
646 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
647 while (iterator.hasNext()) {
648 MapEntryValue value = iterator.next().getValue();
649 if (value.type() == MapEntryValue.Type.TOMBSTONE) {
650 iterator.remove();
651 value.discard();
652 }
653 }
654 } else {
655 long lowWaterMark = activeTransactions.values().stream()
656 .mapToLong(TransactionScope::version)
657 .min().getAsLong();
658 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
659 while (iterator.hasNext()) {
660 MapEntryValue value = iterator.next().getValue();
661 if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
662 iterator.remove();
663 value.discard();
664 }
665 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800666 }
667 }
668
Madan Jampanifc981772016-02-16 09:46:42 -0800669 /**
670 * Computes the update status that would result if the specified update were to applied to
671 * the state machine.
672 *
673 * @param update update
674 * @return status
675 */
676 private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800677 MapEntryValue existingValue = mapEntries.get(update.key());
Jordan Haltermane5ce1452017-05-09 12:15:22 -0700678 boolean isEmpty = existingValue == null || existingValue.type() == MapEntryValue.Type.TOMBSTONE;
679 if (isEmpty && update.value() == null) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800680 return MapEntryUpdateResult.Status.NOOP;
681 }
682 if (preparedKeys.contains(update.key())) {
683 return MapEntryUpdateResult.Status.WRITE_LOCK;
684 }
Jordan Haltermane5ce1452017-05-09 12:15:22 -0700685 byte[] existingRawValue = isEmpty ? null : existingValue.value();
686 Long existingVersion = isEmpty ? null : existingValue.version();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800687 return update.valueMatch().matches(existingRawValue)
688 && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
689 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
690 }
691
Madan Jampanifc981772016-02-16 09:46:42 -0800692 /**
693 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
694 * @param value map entry value
695 * @return versioned instance
696 */
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800697 private Versioned<byte[]> toVersioned(MapEntryValue value) {
Jordan Halterman5f97a302017-04-26 23:41:31 -0700698 return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
699 ? new Versioned<>(value.value(), value.version()) : null;
Madan Jampanifc981772016-02-16 09:46:42 -0800700 }
701
702 /**
703 * Publishes events to listeners.
704 *
705 * @param events list of map event to publish
706 */
707 private void publish(List<MapEvent<String, byte[]>> events) {
708 listeners.values().forEach(commit -> commit.session().publish(AtomixConsistentMap.CHANGE_SUBJECT, events));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800709 }
710
711 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800712 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800713 }
714
715 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800716 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800717 closeListener(session.id());
718 }
719
720 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800721 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800722 closeListener(session.id());
723 }
724
725 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800726 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800727 closeListener(session.id());
728 }
729
730 private void closeListener(Long sessionId) {
Madan Jampanifc981772016-02-16 09:46:42 -0800731 Commit<? extends Listen> commit = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800732 if (commit != null) {
733 commit.close();
734 }
735 }
736
737 /**
738 * Interface implemented by map values.
739 */
Jordan Halterman5f97a302017-04-26 23:41:31 -0700740 private abstract static class MapEntryValue {
741 protected final Type type;
742 protected final long version;
743
744 MapEntryValue(Type type, long version) {
745 this.type = type;
746 this.version = version;
747 }
748
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800749 /**
Jordan Halterman5f97a302017-04-26 23:41:31 -0700750 * Returns the value type.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800751 *
Jordan Halterman5f97a302017-04-26 23:41:31 -0700752 * @return the value type
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800753 */
Jordan Halterman5f97a302017-04-26 23:41:31 -0700754 Type type() {
755 return type;
756 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800757
758 /**
759 * Returns the version of the value.
760 *
761 * @return version
762 */
Jordan Halterman5f97a302017-04-26 23:41:31 -0700763 long version() {
764 return version;
765 }
766
767 /**
768 * Returns the raw {@code byte[]}.
769 *
770 * @return raw value
771 */
772 abstract byte[] value();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800773
774 /**
775 * Discards the value by invoke appropriate clean up actions.
776 */
Jordan Halterman5f97a302017-04-26 23:41:31 -0700777 abstract void discard();
778
779 /**
780 * Value type.
781 */
782 enum Type {
783 VALUE,
784 TOMBSTONE,
785 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800786 }
787
788 /**
789 * A {@code MapEntryValue} that is derived from a non-transactional update
790 * i.e. via any standard map update operation.
791 */
Jordan Halterman5f97a302017-04-26 23:41:31 -0700792 private static class NonTransactionalCommit extends MapEntryValue {
Madan Jampanifc981772016-02-16 09:46:42 -0800793 private final Commit<? extends UpdateAndGet> commit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800794
Jordan Halterman5f97a302017-04-26 23:41:31 -0700795 NonTransactionalCommit(Commit<? extends UpdateAndGet> commit) {
796 super(Type.VALUE, commit.index());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800797 this.commit = commit;
798 }
799
800 @Override
Jordan Halterman5f97a302017-04-26 23:41:31 -0700801 byte[] value() {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800802 return commit.operation().value();
803 }
804
805 @Override
Jordan Halterman5f97a302017-04-26 23:41:31 -0700806 void discard() {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800807 commit.close();
808 }
809 }
810
811 /**
812 * A {@code MapEntryValue} that is derived from updates submitted via a
813 * transaction.
814 */
Jordan Halterman5f97a302017-04-26 23:41:31 -0700815 private static class TransactionalCommit extends MapEntryValue {
816 private final byte[] value;
817 private final CountDownCompleter<?> completer;
818
819 TransactionalCommit(long version, byte[] value, CountDownCompleter<?> completer) {
820 super(Type.VALUE, version);
821 this.value = value;
822 this.completer = completer;
823 }
824
825 @Override
826 byte[] value() {
827 return value;
828 }
829
830 @Override
831 void discard() {
832 completer.countDown();
833 }
834 }
835
836 /**
837 * A {@code MapEntryValue} that represents a deleted entry.
838 */
839 private static class TombstoneCommit extends MapEntryValue {
840 private final CountDownCompleter<?> completer;
841
842 public TombstoneCommit(long version, CountDownCompleter<?> completer) {
843 super(Type.TOMBSTONE, version);
844 this.completer = completer;
845 }
846
847 @Override
848 byte[] value() {
849 throw new UnsupportedOperationException();
850 }
851
852 @Override
853 void discard() {
854 completer.countDown();
855 }
856 }
857
858 /**
859 * Map transaction scope.
860 */
861 private static final class TransactionScope {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800862 private final long version;
Jordan Halterman5f97a302017-04-26 23:41:31 -0700863 private final Commit<? extends TransactionPrepare> prepareCommit;
864 private final Commit<? extends TransactionCommit> commitCommit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800865
Jordan Halterman5f97a302017-04-26 23:41:31 -0700866 private TransactionScope(long version) {
867 this(version, null, null);
868 }
869
870 private TransactionScope(
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800871 long version,
Jordan Halterman5f97a302017-04-26 23:41:31 -0700872 Commit<? extends TransactionPrepare> prepareCommit) {
873 this(version, prepareCommit, null);
874 }
875
876 private TransactionScope(
877 long version,
878 Commit<? extends TransactionPrepare> prepareCommit,
879 Commit<? extends TransactionCommit> commitCommit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800880 this.version = version;
Jordan Halterman5f97a302017-04-26 23:41:31 -0700881 this.prepareCommit = prepareCommit;
882 this.commitCommit = commitCommit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800883 }
884
Jordan Halterman5f97a302017-04-26 23:41:31 -0700885 /**
886 * Returns the transaction version.
887 *
888 * @return the transaction version
889 */
890 long version() {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800891 return version;
892 }
893
Jordan Halterman5f97a302017-04-26 23:41:31 -0700894 /**
895 * Returns whether this is a prepared transaction scope.
896 *
897 * @return whether this is a prepared transaction scope
898 */
899 boolean isPrepared() {
900 return prepareCommit != null;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800901 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800902
Jordan Halterman5f97a302017-04-26 23:41:31 -0700903 /**
904 * Returns the transaction commit log.
905 *
906 * @return the transaction commit log
907 */
908 TransactionLog<MapUpdate<String, byte[]>> transactionLog() {
909 checkState(isPrepared());
910 return prepareCommit.operation().transactionLog();
911 }
912
913 /**
914 * Returns a new transaction scope with a prepare commit.
915 *
916 * @param commit the prepare commit
917 * @return new transaction scope updated with the prepare commit
918 */
919 TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
920 return new TransactionScope(version, commit);
921 }
922
923 /**
924 * Returns a new transaction scope with a commit commit.
925 *
926 * @param commit the commit commit ;-)
927 * @return new transaction scope updated with the commit commit
928 */
929 TransactionScope committed(Commit<? extends TransactionCommit> commit) {
930 checkState(isPrepared());
931 return new TransactionScope(version, prepareCommit, commit);
932 }
933
934 /**
935 * Closes the transaction and all associated commits.
936 */
937 void close() {
938 if (prepareCommit != null) {
939 prepareCommit.close();
940 }
941 if (commitCommit != null) {
942 commitCommit.close();
943 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800944 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800945 }
946}