blob: 72e52c270b310eec0659e306f04117d129fd3522 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.onosproject.store.service.MapEvent.Type.INSERT;
19import static org.onosproject.store.service.MapEvent.Type.REMOVE;
20import static org.onosproject.store.service.MapEvent.Type.UPDATE;
21import io.atomix.copycat.client.session.Session;
22import io.atomix.copycat.server.Commit;
23import io.atomix.copycat.server.Snapshottable;
24import io.atomix.copycat.server.StateMachineExecutor;
25import io.atomix.copycat.server.session.SessionListener;
26import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
27import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
28import io.atomix.resource.ResourceStateMachine;
29
30import java.util.Collection;
31import java.util.HashMap;
32import java.util.Iterator;
Madan Jampanifc981772016-02-16 09:46:42 -080033import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080034import java.util.Map;
35import java.util.Set;
36import java.util.concurrent.atomic.AtomicLong;
37import java.util.stream.Collectors;
38
39import org.onlab.util.CountDownCompleter;
40import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080041import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080042import org.onosproject.store.primitives.TransactionId;
Madan Jampanifc981772016-02-16 09:46:42 -080043import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080053import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampanifc981772016-02-16 09:46:42 -080054import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
55import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
56import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
57import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080058import org.onosproject.store.service.MapEvent;
Madan Jampani74da78b2016-02-09 21:18:36 -080059import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080060import org.onosproject.store.service.Versioned;
61
Madan Jampanifc981772016-02-16 09:46:42 -080062import com.google.common.collect.Lists;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080063import com.google.common.collect.Maps;
64import com.google.common.collect.Sets;
65
66import static com.google.common.base.Preconditions.checkState;
67
68/**
69 * State Machine for {@link AtomixConsistentMap} resource.
70 */
Madan Jampanifc981772016-02-16 09:46:42 -080071public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080072 private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
73 private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
74 private final Set<String> preparedKeys = Sets.newHashSet();
Madan Jampanifc981772016-02-16 09:46:42 -080075 private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080076 private AtomicLong versionCounter = new AtomicLong(0);
77
78 @Override
79 public void snapshot(SnapshotWriter writer) {
80 writer.writeLong(versionCounter.get());
81 }
82
83 @Override
84 public void install(SnapshotReader reader) {
85 versionCounter = new AtomicLong(reader.readLong());
86 }
87
88 @Override
89 protected void configure(StateMachineExecutor executor) {
90 // Listeners
Madan Jampanifc981772016-02-16 09:46:42 -080091 executor.register(Listen.class, this::listen);
92 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080093 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -080094 executor.register(ContainsKey.class, this::containsKey);
95 executor.register(ContainsValue.class, this::containsValue);
96 executor.register(EntrySet.class, this::entrySet);
97 executor.register(Get.class, this::get);
98 executor.register(IsEmpty.class, this::isEmpty);
99 executor.register(KeySet.class, this::keySet);
100 executor.register(Size.class, this::size);
101 executor.register(Values.class, this::values);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800102 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -0800103 executor.register(UpdateAndGet.class, this::updateAndGet);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800104 executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
Madan Jampanifc981772016-02-16 09:46:42 -0800105 executor.register(TransactionPrepare.class, this::prepare);
106 executor.register(TransactionCommit.class, this::commit);
107 executor.register(TransactionRollback.class, this::rollback);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800108 }
109
110 @Override
111 public void delete() {
112 // Delete Listeners
113 listeners.values().forEach(Commit::close);
114 listeners.clear();
115
116 // Delete Map entries
117 mapEntries.values().forEach(MapEntryValue::discard);
118 mapEntries.clear();
119 }
120
121 /**
122 * Handles a contains key commit.
123 *
Madan Jampanifc981772016-02-16 09:46:42 -0800124 * @param commit containsKey commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800125 * @return {@code true} if map contains key
126 */
Madan Jampanifc981772016-02-16 09:46:42 -0800127 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800128 try {
129 return toVersioned(mapEntries.get(commit.operation().key())) != null;
130 } finally {
131 commit.close();
132 }
133 }
134
135 /**
136 * Handles a contains value commit.
137 *
Madan Jampanifc981772016-02-16 09:46:42 -0800138 * @param commit containsValue commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800139 * @return {@code true} if map contains value
140 */
Madan Jampanifc981772016-02-16 09:46:42 -0800141 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800142 try {
143 Match<byte[]> valueMatch = Match
144 .ifValue(commit.operation().value());
145 return mapEntries.values().stream()
146 .anyMatch(value -> valueMatch.matches(value.value()));
147 } finally {
148 commit.close();
149 }
150 }
151
152 /**
153 * Handles a get commit.
154 *
155 * @param commit
156 * get commit
157 * @return value mapped to key
158 */
Madan Jampanifc981772016-02-16 09:46:42 -0800159 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800160 try {
161 return toVersioned(mapEntries.get(commit.operation().key()));
162 } finally {
163 commit.close();
164 }
165 }
166
167 /**
168 * Handles a count commit.
169 *
Madan Jampanifc981772016-02-16 09:46:42 -0800170 * @param commit size commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800171 * @return number of entries in map
172 */
Madan Jampanifc981772016-02-16 09:46:42 -0800173 protected int size(Commit<? extends Size> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800174 try {
175 return mapEntries.size();
176 } finally {
177 commit.close();
178 }
179 }
180
181 /**
182 * Handles an is empty commit.
183 *
Madan Jampanifc981772016-02-16 09:46:42 -0800184 * @param commit isEmpty commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800185 * @return {@code true} if map is empty
186 */
Madan Jampanifc981772016-02-16 09:46:42 -0800187 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800188 try {
189 return mapEntries.isEmpty();
190 } finally {
191 commit.close();
192 }
193 }
194
195 /**
196 * Handles a keySet commit.
197 *
Madan Jampanifc981772016-02-16 09:46:42 -0800198 * @param commit keySet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800199 * @return set of keys in map
200 */
Madan Jampanifc981772016-02-16 09:46:42 -0800201 protected Set<String> keySet(Commit<? extends KeySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800202 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800203 return mapEntries.keySet().stream().collect(Collectors.toSet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800204 } finally {
205 commit.close();
206 }
207 }
208
209 /**
210 * Handles a values commit.
211 *
Madan Jampanifc981772016-02-16 09:46:42 -0800212 * @param commit values commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800213 * @return collection of values in map
214 */
Madan Jampanifc981772016-02-16 09:46:42 -0800215 protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800216 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800217 return mapEntries.values().stream().map(this::toVersioned).collect(Collectors.toList());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800218 } finally {
219 commit.close();
220 }
221 }
222
223 /**
224 * Handles a entry set commit.
225 *
226 * @param commit
227 * entrySet commit
228 * @return set of map entries
229 */
Madan Jampanifc981772016-02-16 09:46:42 -0800230 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800231 try {
232 return mapEntries
233 .entrySet()
234 .stream()
235 .map(e -> Maps.immutableEntry(e.getKey(),
236 toVersioned(e.getValue())))
237 .collect(Collectors.toSet());
238 } finally {
239 commit.close();
240 }
241 }
242
243 /**
244 * Handles a update and get commit.
245 *
Madan Jampanifc981772016-02-16 09:46:42 -0800246 * @param commit updateAndGet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800247 * @return update result
248 */
Madan Jampanifc981772016-02-16 09:46:42 -0800249 protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800250 MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
251 String key = commit.operation().key();
252 MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
253 Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
254
255 if (updateStatus != MapEntryUpdateResult.Status.OK) {
256 commit.close();
257 return new MapEntryUpdateResult<>(updateStatus, "", key,
258 oldMapValue, oldMapValue);
259 }
260
261 byte[] newValue = commit.operation().value();
262 long newVersion = versionCounter.incrementAndGet();
263 Versioned<byte[]> newMapValue = newValue == null ? null
264 : new Versioned<>(newValue, newVersion);
265
266 MapEvent.Type updateType = newValue == null ? REMOVE
267 : oldCommitValue == null ? INSERT : UPDATE;
268 if (updateType == REMOVE || updateType == UPDATE) {
269 mapEntries.remove(key);
270 oldCommitValue.discard();
271 }
272 if (updateType == INSERT || updateType == UPDATE) {
273 mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
Madan Jampanifc981772016-02-16 09:46:42 -0800274 } else {
275 commit.close();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800276 }
Madan Jampanifc981772016-02-16 09:46:42 -0800277 publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800278 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
279 newMapValue);
280 }
281
282 /**
283 * Handles a clear commit.
284 *
Madan Jampanifc981772016-02-16 09:46:42 -0800285 * @param commit clear commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800286 * @return clear result
287 */
Madan Jampanifc981772016-02-16 09:46:42 -0800288 protected MapEntryUpdateResult.Status clear(Commit<? extends Clear> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800289 try {
290 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
291 .entrySet().iterator();
292 while (iterator.hasNext()) {
293 Map.Entry<String, MapEntryValue> entry = iterator.next();
294 String key = entry.getKey();
295 MapEntryValue value = entry.getValue();
296 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
297 value.version());
Madan Jampanifc981772016-02-16 09:46:42 -0800298 publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800299 value.discard();
300 iterator.remove();
301 }
302 return MapEntryUpdateResult.Status.OK;
303 } finally {
304 commit.close();
305 }
306 }
307
308 /**
309 * Handles a listen commit.
310 *
Madan Jampanifc981772016-02-16 09:46:42 -0800311 * @param commit listen commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800312 */
Madan Jampanifc981772016-02-16 09:46:42 -0800313 protected void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800314 Long sessionId = commit.session().id();
315 listeners.put(sessionId, commit);
316 commit.session()
317 .onStateChange(
318 state -> {
319 if (state == Session.State.CLOSED
320 || state == Session.State.EXPIRED) {
Madan Jampanifc981772016-02-16 09:46:42 -0800321 Commit<? extends Listen> listener = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800322 if (listener != null) {
323 listener.close();
324 }
325 }
326 });
327 }
328
329 /**
330 * Handles an unlisten commit.
331 *
Madan Jampanifc981772016-02-16 09:46:42 -0800332 * @param commit unlisten commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800333 */
334 protected void unlisten(
Madan Jampanifc981772016-02-16 09:46:42 -0800335 Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800336 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800337 Commit<? extends Listen> listener = listeners.remove(commit.session());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800338 if (listener != null) {
339 listener.close();
340 }
341 } finally {
342 commit.close();
343 }
344 }
345
346 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800347 * Handles an prepare commit.
348 *
Madan Jampanifc981772016-02-16 09:46:42 -0800349 * @param commit transaction prepare commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800350 * @return prepare result
351 */
Madan Jampanifc981772016-02-16 09:46:42 -0800352 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800353 boolean ok = false;
354 try {
Madan Jampani74da78b2016-02-09 21:18:36 -0800355 MapTransaction<String, byte[]> transaction = commit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800356 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800357 String key = update.key();
358 if (preparedKeys.contains(key)) {
359 return PrepareResult.CONCURRENT_TRANSACTION;
360 }
361 MapEntryValue existingValue = mapEntries.get(key);
362 if (existingValue == null) {
363 if (update.currentValue() != null) {
364 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
365 }
366 } else {
367 if (existingValue.version() != update.currentVersion()) {
368 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
369 }
370 }
371 }
372 // No violations detected. Add to pendingTranctions and mark
Madan Jampanifc981772016-02-16 09:46:42 -0800373 // modified keys as locked for updates.
Madan Jampani74da78b2016-02-09 21:18:36 -0800374 pendingTransactions.put(transaction.transactionId(), commit);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800375 transaction.updates().forEach(u -> preparedKeys.add(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800376 ok = true;
377 return PrepareResult.OK;
378 } finally {
379 if (!ok) {
380 commit.close();
381 }
382 }
383 }
384
385 /**
386 * Handles an commit commit (ha!).
387 *
388 * @param commit transaction commit commit
389 * @return commit result
390 */
Madan Jampanifc981772016-02-16 09:46:42 -0800391 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800392 TransactionId transactionId = commit.operation().transactionId();
393 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800394 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800395 .remove(transactionId);
396 if (prepareCommit == null) {
397 return CommitResult.UNKNOWN_TRANSACTION_ID;
398 }
Madan Jampani74da78b2016-02-09 21:18:36 -0800399 MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800400 long totalReferencesToCommit = transaction
401 .updates()
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800402 .stream()
403 .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
404 .count();
Madan Jampanifc981772016-02-16 09:46:42 -0800405 CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800406 new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
Madan Jampanifc981772016-02-16 09:46:42 -0800407 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800408 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800409 String key = update.key();
410 MapEntryValue previousValue = mapEntries.remove(key);
411 MapEntryValue newValue = null;
412 checkState(preparedKeys.remove(key), "key is not prepared");
413 if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
414 newValue = new TransactionalCommit(key,
415 versionCounter.incrementAndGet(), completer);
416 }
Madan Jampanifc981772016-02-16 09:46:42 -0800417 eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
418 if (newValue != null) {
419 mapEntries.put(key, newValue);
420 }
421 if (previousValue != null) {
422 previousValue.discard();
423 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800424 }
Madan Jampanifc981772016-02-16 09:46:42 -0800425 publish(eventsToPublish);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800426 return CommitResult.OK;
427 } finally {
428 commit.close();
429 }
430 }
431
432 /**
433 * Handles an rollback commit (ha!).
434 *
435 * @param commit transaction rollback commit
436 * @return rollback result
437 */
Madan Jampanifc981772016-02-16 09:46:42 -0800438 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800439 TransactionId transactionId = commit.operation().transactionId();
440 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800441 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions.remove(transactionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800442 if (prepareCommit == null) {
443 return RollbackResult.UNKNOWN_TRANSACTION_ID;
444 } else {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800445 prepareCommit.operation()
446 .transaction()
447 .updates()
448 .forEach(u -> preparedKeys.remove(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800449 prepareCommit.close();
450 return RollbackResult.OK;
451 }
452 } finally {
453 commit.close();
454 }
455 }
456
Madan Jampanifc981772016-02-16 09:46:42 -0800457 /**
458 * Computes the update status that would result if the specified update were to applied to
459 * the state machine.
460 *
461 * @param update update
462 * @return status
463 */
464 private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800465 MapEntryValue existingValue = mapEntries.get(update.key());
466 if (existingValue == null && update.value() == null) {
467 return MapEntryUpdateResult.Status.NOOP;
468 }
469 if (preparedKeys.contains(update.key())) {
470 return MapEntryUpdateResult.Status.WRITE_LOCK;
471 }
472 byte[] existingRawValue = existingValue == null ? null : existingValue
473 .value();
474 Long existingVersion = existingValue == null ? null : existingValue
475 .version();
476 return update.valueMatch().matches(existingRawValue)
477 && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
478 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
479 }
480
Madan Jampanifc981772016-02-16 09:46:42 -0800481 /**
482 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
483 * @param value map entry value
484 * @return versioned instance
485 */
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800486 private Versioned<byte[]> toVersioned(MapEntryValue value) {
Madan Jampanifc981772016-02-16 09:46:42 -0800487 return value == null ? null : new Versioned<>(value.value(), value.version());
488 }
489
490 /**
491 * Publishes events to listeners.
492 *
493 * @param events list of map event to publish
494 */
495 private void publish(List<MapEvent<String, byte[]>> events) {
496 listeners.values().forEach(commit -> commit.session().publish(AtomixConsistentMap.CHANGE_SUBJECT, events));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800497 }
498
499 @Override
500 public void register(Session session) {
501 }
502
503 @Override
504 public void unregister(Session session) {
505 closeListener(session.id());
506 }
507
508 @Override
509 public void expire(Session session) {
510 closeListener(session.id());
511 }
512
513 @Override
514 public void close(Session session) {
515 closeListener(session.id());
516 }
517
518 private void closeListener(Long sessionId) {
Madan Jampanifc981772016-02-16 09:46:42 -0800519 Commit<? extends Listen> commit = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800520 if (commit != null) {
521 commit.close();
522 }
523 }
524
525 /**
526 * Interface implemented by map values.
527 */
528 private interface MapEntryValue {
529 /**
530 * Returns the raw {@code byte[]}.
531 *
532 * @return raw value
533 */
534 byte[] value();
535
536 /**
537 * Returns the version of the value.
538 *
539 * @return version
540 */
541 long version();
542
543 /**
544 * Discards the value by invoke appropriate clean up actions.
545 */
546 void discard();
547 }
548
549 /**
550 * A {@code MapEntryValue} that is derived from a non-transactional update
551 * i.e. via any standard map update operation.
552 */
553 private class NonTransactionalCommit implements MapEntryValue {
554 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800555 private final Commit<? extends UpdateAndGet> commit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800556
Madan Jampanifc981772016-02-16 09:46:42 -0800557 public NonTransactionalCommit(long version, Commit<? extends UpdateAndGet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800558 this.version = version;
559 this.commit = commit;
560 }
561
562 @Override
563 public byte[] value() {
564 return commit.operation().value();
565 }
566
567 @Override
568 public long version() {
569 return version;
570 }
571
572 @Override
573 public void discard() {
574 commit.close();
575 }
576 }
577
578 /**
579 * A {@code MapEntryValue} that is derived from updates submitted via a
580 * transaction.
581 */
582 private class TransactionalCommit implements MapEntryValue {
583 private final String key;
584 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800585 private final CountDownCompleter<Commit<? extends TransactionPrepare>> completer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800586
587 public TransactionalCommit(
588 String key,
589 long version,
Madan Jampanifc981772016-02-16 09:46:42 -0800590 CountDownCompleter<Commit<? extends TransactionPrepare>> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800591 this.key = key;
592 this.version = version;
593 this.completer = commit;
594 }
595
596 @Override
597 public byte[] value() {
Madan Jampani74da78b2016-02-09 21:18:36 -0800598 MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800599 return valueForKey(key, transaction);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800600 }
601
602 @Override
603 public long version() {
604 return version;
605 }
606
607 @Override
608 public void discard() {
609 completer.countDown();
610 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800611
Madan Jampani74da78b2016-02-09 21:18:36 -0800612 private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800613 MapUpdate<String, byte[]> update = transaction.updates()
614 .stream()
615 .filter(u -> u.key().equals(key))
616 .findFirst()
617 .orElse(null);
618 return update == null ? null : update.value();
619 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800620 }
621}