blob: 983e27e1a1402645a4e13d8ff145be593aa7211a [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.onosproject.store.service.MapEvent.Type.INSERT;
19import static org.onosproject.store.service.MapEvent.Type.REMOVE;
20import static org.onosproject.store.service.MapEvent.Type.UPDATE;
Madan Jampani3a9911c2016-02-21 11:25:45 -080021import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080022import io.atomix.copycat.server.Commit;
23import io.atomix.copycat.server.Snapshottable;
24import io.atomix.copycat.server.StateMachineExecutor;
25import io.atomix.copycat.server.session.SessionListener;
26import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
27import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
28import io.atomix.resource.ResourceStateMachine;
29
30import java.util.Collection;
31import java.util.HashMap;
32import java.util.Iterator;
Madan Jampanifc981772016-02-16 09:46:42 -080033import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080034import java.util.Map;
Madan Jampani65f24bb2016-03-15 15:16:18 -070035import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080036import java.util.Set;
37import java.util.concurrent.atomic.AtomicLong;
38import java.util.stream.Collectors;
39
40import org.onlab.util.CountDownCompleter;
41import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080042import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080043import org.onosproject.store.primitives.TransactionId;
Madan Jampanifc981772016-02-16 09:46:42 -080044import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampani542d9e22016-04-05 15:39:55 -070055import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
Madan Jampanifc981772016-02-16 09:46:42 -080056import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
57import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
58import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
59import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080060import org.onosproject.store.service.MapEvent;
Madan Jampani74da78b2016-02-09 21:18:36 -080061import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080062import org.onosproject.store.service.Versioned;
63
Madan Jampanifc981772016-02-16 09:46:42 -080064import com.google.common.collect.Lists;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080065import com.google.common.collect.Maps;
66import com.google.common.collect.Sets;
67
68import static com.google.common.base.Preconditions.checkState;
69
70/**
71 * State Machine for {@link AtomixConsistentMap} resource.
72 */
Madan Jampanifc981772016-02-16 09:46:42 -080073public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
Madan Jampani86cb2432016-02-17 11:07:56 -080074
Madan Jampani5e5b3d62016-02-01 16:03:33 -080075 private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
76 private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
77 private final Set<String> preparedKeys = Sets.newHashSet();
Madan Jampanifc981772016-02-16 09:46:42 -080078 private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080079 private AtomicLong versionCounter = new AtomicLong(0);
80
Madan Jampani65f24bb2016-03-15 15:16:18 -070081 public AtomixConsistentMapState(Properties properties) {
82 super(properties);
Madan Jampani86cb2432016-02-17 11:07:56 -080083 }
84
Madan Jampani5e5b3d62016-02-01 16:03:33 -080085 @Override
86 public void snapshot(SnapshotWriter writer) {
87 writer.writeLong(versionCounter.get());
88 }
89
90 @Override
91 public void install(SnapshotReader reader) {
92 versionCounter = new AtomicLong(reader.readLong());
93 }
94
95 @Override
96 protected void configure(StateMachineExecutor executor) {
97 // Listeners
Madan Jampanifc981772016-02-16 09:46:42 -080098 executor.register(Listen.class, this::listen);
99 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800100 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -0800101 executor.register(ContainsKey.class, this::containsKey);
102 executor.register(ContainsValue.class, this::containsValue);
103 executor.register(EntrySet.class, this::entrySet);
104 executor.register(Get.class, this::get);
105 executor.register(IsEmpty.class, this::isEmpty);
106 executor.register(KeySet.class, this::keySet);
107 executor.register(Size.class, this::size);
108 executor.register(Values.class, this::values);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800109 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -0800110 executor.register(UpdateAndGet.class, this::updateAndGet);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800111 executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
Madan Jampanifc981772016-02-16 09:46:42 -0800112 executor.register(TransactionPrepare.class, this::prepare);
113 executor.register(TransactionCommit.class, this::commit);
114 executor.register(TransactionRollback.class, this::rollback);
Madan Jampani542d9e22016-04-05 15:39:55 -0700115 executor.register(TransactionPrepareAndCommit.class, this::prepareAndCommit);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800116 }
117
118 @Override
119 public void delete() {
120 // Delete Listeners
121 listeners.values().forEach(Commit::close);
122 listeners.clear();
123
124 // Delete Map entries
125 mapEntries.values().forEach(MapEntryValue::discard);
126 mapEntries.clear();
127 }
128
129 /**
130 * Handles a contains key commit.
131 *
Madan Jampanifc981772016-02-16 09:46:42 -0800132 * @param commit containsKey commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800133 * @return {@code true} if map contains key
134 */
Madan Jampanifc981772016-02-16 09:46:42 -0800135 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800136 try {
137 return toVersioned(mapEntries.get(commit.operation().key())) != null;
138 } finally {
139 commit.close();
140 }
141 }
142
143 /**
144 * Handles a contains value commit.
145 *
Madan Jampanifc981772016-02-16 09:46:42 -0800146 * @param commit containsValue commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800147 * @return {@code true} if map contains value
148 */
Madan Jampanifc981772016-02-16 09:46:42 -0800149 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800150 try {
151 Match<byte[]> valueMatch = Match
152 .ifValue(commit.operation().value());
153 return mapEntries.values().stream()
154 .anyMatch(value -> valueMatch.matches(value.value()));
155 } finally {
156 commit.close();
157 }
158 }
159
160 /**
161 * Handles a get commit.
162 *
163 * @param commit
164 * get commit
165 * @return value mapped to key
166 */
Madan Jampanifc981772016-02-16 09:46:42 -0800167 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800168 try {
169 return toVersioned(mapEntries.get(commit.operation().key()));
170 } finally {
171 commit.close();
172 }
173 }
174
175 /**
176 * Handles a count commit.
177 *
Madan Jampanifc981772016-02-16 09:46:42 -0800178 * @param commit size commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800179 * @return number of entries in map
180 */
Madan Jampanifc981772016-02-16 09:46:42 -0800181 protected int size(Commit<? extends Size> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800182 try {
183 return mapEntries.size();
184 } finally {
185 commit.close();
186 }
187 }
188
189 /**
190 * Handles an is empty commit.
191 *
Madan Jampanifc981772016-02-16 09:46:42 -0800192 * @param commit isEmpty commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800193 * @return {@code true} if map is empty
194 */
Madan Jampanifc981772016-02-16 09:46:42 -0800195 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800196 try {
197 return mapEntries.isEmpty();
198 } finally {
199 commit.close();
200 }
201 }
202
203 /**
204 * Handles a keySet commit.
205 *
Madan Jampanifc981772016-02-16 09:46:42 -0800206 * @param commit keySet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800207 * @return set of keys in map
208 */
Madan Jampanifc981772016-02-16 09:46:42 -0800209 protected Set<String> keySet(Commit<? extends KeySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800210 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800211 return mapEntries.keySet().stream().collect(Collectors.toSet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800212 } finally {
213 commit.close();
214 }
215 }
216
217 /**
218 * Handles a values commit.
219 *
Madan Jampanifc981772016-02-16 09:46:42 -0800220 * @param commit values commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800221 * @return collection of values in map
222 */
Madan Jampanifc981772016-02-16 09:46:42 -0800223 protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800224 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800225 return mapEntries.values().stream().map(this::toVersioned).collect(Collectors.toList());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800226 } finally {
227 commit.close();
228 }
229 }
230
231 /**
232 * Handles a entry set commit.
233 *
234 * @param commit
235 * entrySet commit
236 * @return set of map entries
237 */
Madan Jampanifc981772016-02-16 09:46:42 -0800238 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800239 try {
240 return mapEntries
241 .entrySet()
242 .stream()
243 .map(e -> Maps.immutableEntry(e.getKey(),
244 toVersioned(e.getValue())))
245 .collect(Collectors.toSet());
246 } finally {
247 commit.close();
248 }
249 }
250
251 /**
252 * Handles a update and get commit.
253 *
Madan Jampanifc981772016-02-16 09:46:42 -0800254 * @param commit updateAndGet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800255 * @return update result
256 */
Madan Jampanifc981772016-02-16 09:46:42 -0800257 protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800258 MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
259 String key = commit.operation().key();
260 MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
261 Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
262
263 if (updateStatus != MapEntryUpdateResult.Status.OK) {
264 commit.close();
265 return new MapEntryUpdateResult<>(updateStatus, "", key,
266 oldMapValue, oldMapValue);
267 }
268
269 byte[] newValue = commit.operation().value();
270 long newVersion = versionCounter.incrementAndGet();
271 Versioned<byte[]> newMapValue = newValue == null ? null
272 : new Versioned<>(newValue, newVersion);
273
274 MapEvent.Type updateType = newValue == null ? REMOVE
275 : oldCommitValue == null ? INSERT : UPDATE;
276 if (updateType == REMOVE || updateType == UPDATE) {
277 mapEntries.remove(key);
278 oldCommitValue.discard();
279 }
280 if (updateType == INSERT || updateType == UPDATE) {
281 mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
Madan Jampanifc981772016-02-16 09:46:42 -0800282 } else {
283 commit.close();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800284 }
Madan Jampanifc981772016-02-16 09:46:42 -0800285 publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800286 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
287 newMapValue);
288 }
289
290 /**
291 * Handles a clear commit.
292 *
Madan Jampanifc981772016-02-16 09:46:42 -0800293 * @param commit clear commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800294 * @return clear result
295 */
Madan Jampanifc981772016-02-16 09:46:42 -0800296 protected MapEntryUpdateResult.Status clear(Commit<? extends Clear> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800297 try {
298 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
299 .entrySet().iterator();
300 while (iterator.hasNext()) {
301 Map.Entry<String, MapEntryValue> entry = iterator.next();
302 String key = entry.getKey();
303 MapEntryValue value = entry.getValue();
304 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
305 value.version());
Madan Jampanifc981772016-02-16 09:46:42 -0800306 publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800307 value.discard();
308 iterator.remove();
309 }
310 return MapEntryUpdateResult.Status.OK;
311 } finally {
312 commit.close();
313 }
314 }
315
316 /**
317 * Handles a listen commit.
318 *
Madan Jampanifc981772016-02-16 09:46:42 -0800319 * @param commit listen commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800320 */
Madan Jampanifc981772016-02-16 09:46:42 -0800321 protected void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800322 Long sessionId = commit.session().id();
Madan Jampanie6038872016-03-03 12:14:37 -0800323 if (listeners.putIfAbsent(sessionId, commit) != null) {
324 commit.close();
325 return;
326 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800327 commit.session()
328 .onStateChange(
329 state -> {
Madan Jampani3a9911c2016-02-21 11:25:45 -0800330 if (state == ServerSession.State.CLOSED
331 || state == ServerSession.State.EXPIRED) {
Madan Jampanifc981772016-02-16 09:46:42 -0800332 Commit<? extends Listen> listener = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800333 if (listener != null) {
334 listener.close();
335 }
336 }
337 });
338 }
339
340 /**
341 * Handles an unlisten commit.
342 *
Madan Jampanifc981772016-02-16 09:46:42 -0800343 * @param commit unlisten commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800344 */
Madan Jampani40f022e2016-03-02 21:35:14 -0800345 protected void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800346 try {
Madan Jampani40f022e2016-03-02 21:35:14 -0800347 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800348 if (listener != null) {
349 listener.close();
350 }
351 } finally {
352 commit.close();
353 }
354 }
355
356 /**
Madan Jampani542d9e22016-04-05 15:39:55 -0700357 * Handles an prepare and commit commit.
358 *
359 * @param commit transaction prepare and commit commit
360 * @return prepare result
361 */
362 protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
363 PrepareResult prepareResult = prepare(commit);
364 if (prepareResult == PrepareResult.OK) {
365 commitInternal(commit.operation().transaction().transactionId());
366 }
367 return prepareResult;
368 }
369
370 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800371 * Handles an prepare commit.
372 *
Madan Jampanifc981772016-02-16 09:46:42 -0800373 * @param commit transaction prepare commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800374 * @return prepare result
375 */
Madan Jampanifc981772016-02-16 09:46:42 -0800376 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800377 boolean ok = false;
378 try {
Madan Jampani74da78b2016-02-09 21:18:36 -0800379 MapTransaction<String, byte[]> transaction = commit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800380 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800381 String key = update.key();
382 if (preparedKeys.contains(key)) {
383 return PrepareResult.CONCURRENT_TRANSACTION;
384 }
385 MapEntryValue existingValue = mapEntries.get(key);
386 if (existingValue == null) {
387 if (update.currentValue() != null) {
388 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
389 }
390 } else {
391 if (existingValue.version() != update.currentVersion()) {
392 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
393 }
394 }
395 }
396 // No violations detected. Add to pendingTranctions and mark
Madan Jampanifc981772016-02-16 09:46:42 -0800397 // modified keys as locked for updates.
Madan Jampani74da78b2016-02-09 21:18:36 -0800398 pendingTransactions.put(transaction.transactionId(), commit);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800399 transaction.updates().forEach(u -> preparedKeys.add(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800400 ok = true;
401 return PrepareResult.OK;
402 } finally {
403 if (!ok) {
404 commit.close();
405 }
406 }
407 }
408
409 /**
410 * Handles an commit commit (ha!).
411 *
412 * @param commit transaction commit commit
413 * @return commit result
414 */
Madan Jampanifc981772016-02-16 09:46:42 -0800415 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800416 TransactionId transactionId = commit.operation().transactionId();
417 try {
Madan Jampani542d9e22016-04-05 15:39:55 -0700418 return commitInternal(transactionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800419 } finally {
420 commit.close();
421 }
422 }
423
Madan Jampani542d9e22016-04-05 15:39:55 -0700424 private CommitResult commitInternal(TransactionId transactionId) {
425 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
426 .remove(transactionId);
427 if (prepareCommit == null) {
428 return CommitResult.UNKNOWN_TRANSACTION_ID;
429 }
430 MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
431 long totalReferencesToCommit = transaction
432 .updates()
433 .stream()
434 .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
435 .count();
436 CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
437 new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
438 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
439 for (MapUpdate<String, byte[]> update : transaction.updates()) {
440 String key = update.key();
441 MapEntryValue previousValue = mapEntries.remove(key);
442 MapEntryValue newValue = null;
443 checkState(preparedKeys.remove(key), "key is not prepared");
444 if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
445 newValue = new TransactionalCommit(key,
446 versionCounter.incrementAndGet(), completer);
447 }
448 eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
449 if (newValue != null) {
450 mapEntries.put(key, newValue);
451 }
452 if (previousValue != null) {
453 previousValue.discard();
454 }
455 }
456 publish(eventsToPublish);
457 return CommitResult.OK;
458 }
459
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800460 /**
461 * Handles an rollback commit (ha!).
462 *
463 * @param commit transaction rollback commit
464 * @return rollback result
465 */
Madan Jampanifc981772016-02-16 09:46:42 -0800466 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800467 TransactionId transactionId = commit.operation().transactionId();
468 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800469 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions.remove(transactionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800470 if (prepareCommit == null) {
471 return RollbackResult.UNKNOWN_TRANSACTION_ID;
472 } else {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800473 prepareCommit.operation()
474 .transaction()
475 .updates()
476 .forEach(u -> preparedKeys.remove(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800477 prepareCommit.close();
478 return RollbackResult.OK;
479 }
480 } finally {
481 commit.close();
482 }
483 }
484
Madan Jampanifc981772016-02-16 09:46:42 -0800485 /**
486 * Computes the update status that would result if the specified update were to applied to
487 * the state machine.
488 *
489 * @param update update
490 * @return status
491 */
492 private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800493 MapEntryValue existingValue = mapEntries.get(update.key());
494 if (existingValue == null && update.value() == null) {
495 return MapEntryUpdateResult.Status.NOOP;
496 }
497 if (preparedKeys.contains(update.key())) {
498 return MapEntryUpdateResult.Status.WRITE_LOCK;
499 }
500 byte[] existingRawValue = existingValue == null ? null : existingValue
501 .value();
502 Long existingVersion = existingValue == null ? null : existingValue
503 .version();
504 return update.valueMatch().matches(existingRawValue)
505 && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
506 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
507 }
508
Madan Jampanifc981772016-02-16 09:46:42 -0800509 /**
510 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
511 * @param value map entry value
512 * @return versioned instance
513 */
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800514 private Versioned<byte[]> toVersioned(MapEntryValue value) {
Madan Jampanifc981772016-02-16 09:46:42 -0800515 return value == null ? null : new Versioned<>(value.value(), value.version());
516 }
517
518 /**
519 * Publishes events to listeners.
520 *
521 * @param events list of map event to publish
522 */
523 private void publish(List<MapEvent<String, byte[]>> events) {
524 listeners.values().forEach(commit -> commit.session().publish(AtomixConsistentMap.CHANGE_SUBJECT, events));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800525 }
526
527 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800528 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800529 }
530
531 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800532 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800533 closeListener(session.id());
534 }
535
536 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800537 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800538 closeListener(session.id());
539 }
540
541 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800542 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800543 closeListener(session.id());
544 }
545
546 private void closeListener(Long sessionId) {
Madan Jampanifc981772016-02-16 09:46:42 -0800547 Commit<? extends Listen> commit = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800548 if (commit != null) {
549 commit.close();
550 }
551 }
552
553 /**
554 * Interface implemented by map values.
555 */
556 private interface MapEntryValue {
557 /**
558 * Returns the raw {@code byte[]}.
559 *
560 * @return raw value
561 */
562 byte[] value();
563
564 /**
565 * Returns the version of the value.
566 *
567 * @return version
568 */
569 long version();
570
571 /**
572 * Discards the value by invoke appropriate clean up actions.
573 */
574 void discard();
575 }
576
577 /**
578 * A {@code MapEntryValue} that is derived from a non-transactional update
579 * i.e. via any standard map update operation.
580 */
581 private class NonTransactionalCommit implements MapEntryValue {
582 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800583 private final Commit<? extends UpdateAndGet> commit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800584
Madan Jampanifc981772016-02-16 09:46:42 -0800585 public NonTransactionalCommit(long version, Commit<? extends UpdateAndGet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800586 this.version = version;
587 this.commit = commit;
588 }
589
590 @Override
591 public byte[] value() {
592 return commit.operation().value();
593 }
594
595 @Override
596 public long version() {
597 return version;
598 }
599
600 @Override
601 public void discard() {
602 commit.close();
603 }
604 }
605
606 /**
607 * A {@code MapEntryValue} that is derived from updates submitted via a
608 * transaction.
609 */
610 private class TransactionalCommit implements MapEntryValue {
611 private final String key;
612 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800613 private final CountDownCompleter<Commit<? extends TransactionPrepare>> completer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800614
615 public TransactionalCommit(
616 String key,
617 long version,
Madan Jampanifc981772016-02-16 09:46:42 -0800618 CountDownCompleter<Commit<? extends TransactionPrepare>> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800619 this.key = key;
620 this.version = version;
621 this.completer = commit;
622 }
623
624 @Override
625 public byte[] value() {
Madan Jampani74da78b2016-02-09 21:18:36 -0800626 MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800627 return valueForKey(key, transaction);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800628 }
629
630 @Override
631 public long version() {
632 return version;
633 }
634
635 @Override
636 public void discard() {
637 completer.countDown();
638 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800639
Madan Jampani74da78b2016-02-09 21:18:36 -0800640 private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800641 MapUpdate<String, byte[]> update = transaction.updates()
642 .stream()
643 .filter(u -> u.key().equals(key))
644 .findFirst()
645 .orElse(null);
646 return update == null ? null : update.value();
647 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800648 }
649}