blob: 9c9b019f5a18a57ae751b496b5ecfb471a373cb3 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.onosproject.store.service.MapEvent.Type.INSERT;
19import static org.onosproject.store.service.MapEvent.Type.REMOVE;
20import static org.onosproject.store.service.MapEvent.Type.UPDATE;
21import io.atomix.copycat.client.session.Session;
22import io.atomix.copycat.server.Commit;
23import io.atomix.copycat.server.Snapshottable;
24import io.atomix.copycat.server.StateMachineExecutor;
25import io.atomix.copycat.server.session.SessionListener;
26import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
27import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
28import io.atomix.resource.ResourceStateMachine;
Madan Jampani86cb2432016-02-17 11:07:56 -080029import io.atomix.resource.ResourceType;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080030
31import java.util.Collection;
32import java.util.HashMap;
33import java.util.Iterator;
Madan Jampanifc981772016-02-16 09:46:42 -080034import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080035import java.util.Map;
36import java.util.Set;
37import java.util.concurrent.atomic.AtomicLong;
38import java.util.stream.Collectors;
39
40import org.onlab.util.CountDownCompleter;
41import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080042import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080043import org.onosproject.store.primitives.TransactionId;
Madan Jampanifc981772016-02-16 09:46:42 -080044import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampanifc981772016-02-16 09:46:42 -080055import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
56import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
57import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
58import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080059import org.onosproject.store.service.MapEvent;
Madan Jampani74da78b2016-02-09 21:18:36 -080060import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080061import org.onosproject.store.service.Versioned;
62
Madan Jampanifc981772016-02-16 09:46:42 -080063import com.google.common.collect.Lists;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080064import com.google.common.collect.Maps;
65import com.google.common.collect.Sets;
66
67import static com.google.common.base.Preconditions.checkState;
68
69/**
70 * State Machine for {@link AtomixConsistentMap} resource.
71 */
Madan Jampanifc981772016-02-16 09:46:42 -080072public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
Madan Jampani86cb2432016-02-17 11:07:56 -080073
Madan Jampani5e5b3d62016-02-01 16:03:33 -080074 private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
75 private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
76 private final Set<String> preparedKeys = Sets.newHashSet();
Madan Jampanifc981772016-02-16 09:46:42 -080077 private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080078 private AtomicLong versionCounter = new AtomicLong(0);
79
Madan Jampani86cb2432016-02-17 11:07:56 -080080 public AtomixConsistentMapState() {
81 super(new ResourceType(AtomixConsistentMap.class));
82 }
83
Madan Jampani5e5b3d62016-02-01 16:03:33 -080084 @Override
85 public void snapshot(SnapshotWriter writer) {
86 writer.writeLong(versionCounter.get());
87 }
88
89 @Override
90 public void install(SnapshotReader reader) {
91 versionCounter = new AtomicLong(reader.readLong());
92 }
93
94 @Override
95 protected void configure(StateMachineExecutor executor) {
96 // Listeners
Madan Jampanifc981772016-02-16 09:46:42 -080097 executor.register(Listen.class, this::listen);
98 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080099 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -0800100 executor.register(ContainsKey.class, this::containsKey);
101 executor.register(ContainsValue.class, this::containsValue);
102 executor.register(EntrySet.class, this::entrySet);
103 executor.register(Get.class, this::get);
104 executor.register(IsEmpty.class, this::isEmpty);
105 executor.register(KeySet.class, this::keySet);
106 executor.register(Size.class, this::size);
107 executor.register(Values.class, this::values);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800108 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -0800109 executor.register(UpdateAndGet.class, this::updateAndGet);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
Madan Jampanifc981772016-02-16 09:46:42 -0800111 executor.register(TransactionPrepare.class, this::prepare);
112 executor.register(TransactionCommit.class, this::commit);
113 executor.register(TransactionRollback.class, this::rollback);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800114 }
115
116 @Override
117 public void delete() {
118 // Delete Listeners
119 listeners.values().forEach(Commit::close);
120 listeners.clear();
121
122 // Delete Map entries
123 mapEntries.values().forEach(MapEntryValue::discard);
124 mapEntries.clear();
125 }
126
127 /**
128 * Handles a contains key commit.
129 *
Madan Jampanifc981772016-02-16 09:46:42 -0800130 * @param commit containsKey commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800131 * @return {@code true} if map contains key
132 */
Madan Jampanifc981772016-02-16 09:46:42 -0800133 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800134 try {
135 return toVersioned(mapEntries.get(commit.operation().key())) != null;
136 } finally {
137 commit.close();
138 }
139 }
140
141 /**
142 * Handles a contains value commit.
143 *
Madan Jampanifc981772016-02-16 09:46:42 -0800144 * @param commit containsValue commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800145 * @return {@code true} if map contains value
146 */
Madan Jampanifc981772016-02-16 09:46:42 -0800147 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800148 try {
149 Match<byte[]> valueMatch = Match
150 .ifValue(commit.operation().value());
151 return mapEntries.values().stream()
152 .anyMatch(value -> valueMatch.matches(value.value()));
153 } finally {
154 commit.close();
155 }
156 }
157
158 /**
159 * Handles a get commit.
160 *
161 * @param commit
162 * get commit
163 * @return value mapped to key
164 */
Madan Jampanifc981772016-02-16 09:46:42 -0800165 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800166 try {
167 return toVersioned(mapEntries.get(commit.operation().key()));
168 } finally {
169 commit.close();
170 }
171 }
172
173 /**
174 * Handles a count commit.
175 *
Madan Jampanifc981772016-02-16 09:46:42 -0800176 * @param commit size commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800177 * @return number of entries in map
178 */
Madan Jampanifc981772016-02-16 09:46:42 -0800179 protected int size(Commit<? extends Size> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800180 try {
181 return mapEntries.size();
182 } finally {
183 commit.close();
184 }
185 }
186
187 /**
188 * Handles an is empty commit.
189 *
Madan Jampanifc981772016-02-16 09:46:42 -0800190 * @param commit isEmpty commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800191 * @return {@code true} if map is empty
192 */
Madan Jampanifc981772016-02-16 09:46:42 -0800193 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800194 try {
195 return mapEntries.isEmpty();
196 } finally {
197 commit.close();
198 }
199 }
200
201 /**
202 * Handles a keySet commit.
203 *
Madan Jampanifc981772016-02-16 09:46:42 -0800204 * @param commit keySet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800205 * @return set of keys in map
206 */
Madan Jampanifc981772016-02-16 09:46:42 -0800207 protected Set<String> keySet(Commit<? extends KeySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800208 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800209 return mapEntries.keySet().stream().collect(Collectors.toSet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800210 } finally {
211 commit.close();
212 }
213 }
214
215 /**
216 * Handles a values commit.
217 *
Madan Jampanifc981772016-02-16 09:46:42 -0800218 * @param commit values commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800219 * @return collection of values in map
220 */
Madan Jampanifc981772016-02-16 09:46:42 -0800221 protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800222 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800223 return mapEntries.values().stream().map(this::toVersioned).collect(Collectors.toList());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800224 } finally {
225 commit.close();
226 }
227 }
228
229 /**
230 * Handles a entry set commit.
231 *
232 * @param commit
233 * entrySet commit
234 * @return set of map entries
235 */
Madan Jampanifc981772016-02-16 09:46:42 -0800236 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800237 try {
238 return mapEntries
239 .entrySet()
240 .stream()
241 .map(e -> Maps.immutableEntry(e.getKey(),
242 toVersioned(e.getValue())))
243 .collect(Collectors.toSet());
244 } finally {
245 commit.close();
246 }
247 }
248
249 /**
250 * Handles a update and get commit.
251 *
Madan Jampanifc981772016-02-16 09:46:42 -0800252 * @param commit updateAndGet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800253 * @return update result
254 */
Madan Jampanifc981772016-02-16 09:46:42 -0800255 protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800256 MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
257 String key = commit.operation().key();
258 MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
259 Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
260
261 if (updateStatus != MapEntryUpdateResult.Status.OK) {
262 commit.close();
263 return new MapEntryUpdateResult<>(updateStatus, "", key,
264 oldMapValue, oldMapValue);
265 }
266
267 byte[] newValue = commit.operation().value();
268 long newVersion = versionCounter.incrementAndGet();
269 Versioned<byte[]> newMapValue = newValue == null ? null
270 : new Versioned<>(newValue, newVersion);
271
272 MapEvent.Type updateType = newValue == null ? REMOVE
273 : oldCommitValue == null ? INSERT : UPDATE;
274 if (updateType == REMOVE || updateType == UPDATE) {
275 mapEntries.remove(key);
276 oldCommitValue.discard();
277 }
278 if (updateType == INSERT || updateType == UPDATE) {
279 mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
Madan Jampanifc981772016-02-16 09:46:42 -0800280 } else {
281 commit.close();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800282 }
Madan Jampanifc981772016-02-16 09:46:42 -0800283 publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800284 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
285 newMapValue);
286 }
287
288 /**
289 * Handles a clear commit.
290 *
Madan Jampanifc981772016-02-16 09:46:42 -0800291 * @param commit clear commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800292 * @return clear result
293 */
Madan Jampanifc981772016-02-16 09:46:42 -0800294 protected MapEntryUpdateResult.Status clear(Commit<? extends Clear> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800295 try {
296 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
297 .entrySet().iterator();
298 while (iterator.hasNext()) {
299 Map.Entry<String, MapEntryValue> entry = iterator.next();
300 String key = entry.getKey();
301 MapEntryValue value = entry.getValue();
302 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
303 value.version());
Madan Jampanifc981772016-02-16 09:46:42 -0800304 publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800305 value.discard();
306 iterator.remove();
307 }
308 return MapEntryUpdateResult.Status.OK;
309 } finally {
310 commit.close();
311 }
312 }
313
314 /**
315 * Handles a listen commit.
316 *
Madan Jampanifc981772016-02-16 09:46:42 -0800317 * @param commit listen commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800318 */
Madan Jampanifc981772016-02-16 09:46:42 -0800319 protected void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800320 Long sessionId = commit.session().id();
321 listeners.put(sessionId, commit);
322 commit.session()
323 .onStateChange(
324 state -> {
325 if (state == Session.State.CLOSED
326 || state == Session.State.EXPIRED) {
Madan Jampanifc981772016-02-16 09:46:42 -0800327 Commit<? extends Listen> listener = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800328 if (listener != null) {
329 listener.close();
330 }
331 }
332 });
333 }
334
335 /**
336 * Handles an unlisten commit.
337 *
Madan Jampanifc981772016-02-16 09:46:42 -0800338 * @param commit unlisten commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800339 */
340 protected void unlisten(
Madan Jampanifc981772016-02-16 09:46:42 -0800341 Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800342 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800343 Commit<? extends Listen> listener = listeners.remove(commit.session());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800344 if (listener != null) {
345 listener.close();
346 }
347 } finally {
348 commit.close();
349 }
350 }
351
352 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800353 * Handles an prepare commit.
354 *
Madan Jampanifc981772016-02-16 09:46:42 -0800355 * @param commit transaction prepare commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800356 * @return prepare result
357 */
Madan Jampanifc981772016-02-16 09:46:42 -0800358 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800359 boolean ok = false;
360 try {
Madan Jampani74da78b2016-02-09 21:18:36 -0800361 MapTransaction<String, byte[]> transaction = commit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800362 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800363 String key = update.key();
364 if (preparedKeys.contains(key)) {
365 return PrepareResult.CONCURRENT_TRANSACTION;
366 }
367 MapEntryValue existingValue = mapEntries.get(key);
368 if (existingValue == null) {
369 if (update.currentValue() != null) {
370 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
371 }
372 } else {
373 if (existingValue.version() != update.currentVersion()) {
374 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
375 }
376 }
377 }
378 // No violations detected. Add to pendingTranctions and mark
Madan Jampanifc981772016-02-16 09:46:42 -0800379 // modified keys as locked for updates.
Madan Jampani74da78b2016-02-09 21:18:36 -0800380 pendingTransactions.put(transaction.transactionId(), commit);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800381 transaction.updates().forEach(u -> preparedKeys.add(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800382 ok = true;
383 return PrepareResult.OK;
384 } finally {
385 if (!ok) {
386 commit.close();
387 }
388 }
389 }
390
391 /**
392 * Handles an commit commit (ha!).
393 *
394 * @param commit transaction commit commit
395 * @return commit result
396 */
Madan Jampanifc981772016-02-16 09:46:42 -0800397 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800398 TransactionId transactionId = commit.operation().transactionId();
399 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800400 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800401 .remove(transactionId);
402 if (prepareCommit == null) {
403 return CommitResult.UNKNOWN_TRANSACTION_ID;
404 }
Madan Jampani74da78b2016-02-09 21:18:36 -0800405 MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800406 long totalReferencesToCommit = transaction
407 .updates()
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800408 .stream()
409 .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
410 .count();
Madan Jampanifc981772016-02-16 09:46:42 -0800411 CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800412 new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
Madan Jampanifc981772016-02-16 09:46:42 -0800413 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800414 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800415 String key = update.key();
416 MapEntryValue previousValue = mapEntries.remove(key);
417 MapEntryValue newValue = null;
418 checkState(preparedKeys.remove(key), "key is not prepared");
419 if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
420 newValue = new TransactionalCommit(key,
421 versionCounter.incrementAndGet(), completer);
422 }
Madan Jampanifc981772016-02-16 09:46:42 -0800423 eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
424 if (newValue != null) {
425 mapEntries.put(key, newValue);
426 }
427 if (previousValue != null) {
428 previousValue.discard();
429 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800430 }
Madan Jampanifc981772016-02-16 09:46:42 -0800431 publish(eventsToPublish);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800432 return CommitResult.OK;
433 } finally {
434 commit.close();
435 }
436 }
437
438 /**
439 * Handles an rollback commit (ha!).
440 *
441 * @param commit transaction rollback commit
442 * @return rollback result
443 */
Madan Jampanifc981772016-02-16 09:46:42 -0800444 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800445 TransactionId transactionId = commit.operation().transactionId();
446 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800447 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions.remove(transactionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800448 if (prepareCommit == null) {
449 return RollbackResult.UNKNOWN_TRANSACTION_ID;
450 } else {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800451 prepareCommit.operation()
452 .transaction()
453 .updates()
454 .forEach(u -> preparedKeys.remove(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800455 prepareCommit.close();
456 return RollbackResult.OK;
457 }
458 } finally {
459 commit.close();
460 }
461 }
462
Madan Jampanifc981772016-02-16 09:46:42 -0800463 /**
464 * Computes the update status that would result if the specified update were to applied to
465 * the state machine.
466 *
467 * @param update update
468 * @return status
469 */
470 private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800471 MapEntryValue existingValue = mapEntries.get(update.key());
472 if (existingValue == null && update.value() == null) {
473 return MapEntryUpdateResult.Status.NOOP;
474 }
475 if (preparedKeys.contains(update.key())) {
476 return MapEntryUpdateResult.Status.WRITE_LOCK;
477 }
478 byte[] existingRawValue = existingValue == null ? null : existingValue
479 .value();
480 Long existingVersion = existingValue == null ? null : existingValue
481 .version();
482 return update.valueMatch().matches(existingRawValue)
483 && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
484 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
485 }
486
Madan Jampanifc981772016-02-16 09:46:42 -0800487 /**
488 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
489 * @param value map entry value
490 * @return versioned instance
491 */
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800492 private Versioned<byte[]> toVersioned(MapEntryValue value) {
Madan Jampanifc981772016-02-16 09:46:42 -0800493 return value == null ? null : new Versioned<>(value.value(), value.version());
494 }
495
496 /**
497 * Publishes events to listeners.
498 *
499 * @param events list of map event to publish
500 */
501 private void publish(List<MapEvent<String, byte[]>> events) {
502 listeners.values().forEach(commit -> commit.session().publish(AtomixConsistentMap.CHANGE_SUBJECT, events));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800503 }
504
505 @Override
506 public void register(Session session) {
507 }
508
509 @Override
510 public void unregister(Session session) {
511 closeListener(session.id());
512 }
513
514 @Override
515 public void expire(Session session) {
516 closeListener(session.id());
517 }
518
519 @Override
520 public void close(Session session) {
521 closeListener(session.id());
522 }
523
524 private void closeListener(Long sessionId) {
Madan Jampanifc981772016-02-16 09:46:42 -0800525 Commit<? extends Listen> commit = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800526 if (commit != null) {
527 commit.close();
528 }
529 }
530
531 /**
532 * Interface implemented by map values.
533 */
534 private interface MapEntryValue {
535 /**
536 * Returns the raw {@code byte[]}.
537 *
538 * @return raw value
539 */
540 byte[] value();
541
542 /**
543 * Returns the version of the value.
544 *
545 * @return version
546 */
547 long version();
548
549 /**
550 * Discards the value by invoke appropriate clean up actions.
551 */
552 void discard();
553 }
554
555 /**
556 * A {@code MapEntryValue} that is derived from a non-transactional update
557 * i.e. via any standard map update operation.
558 */
559 private class NonTransactionalCommit implements MapEntryValue {
560 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800561 private final Commit<? extends UpdateAndGet> commit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800562
Madan Jampanifc981772016-02-16 09:46:42 -0800563 public NonTransactionalCommit(long version, Commit<? extends UpdateAndGet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800564 this.version = version;
565 this.commit = commit;
566 }
567
568 @Override
569 public byte[] value() {
570 return commit.operation().value();
571 }
572
573 @Override
574 public long version() {
575 return version;
576 }
577
578 @Override
579 public void discard() {
580 commit.close();
581 }
582 }
583
584 /**
585 * A {@code MapEntryValue} that is derived from updates submitted via a
586 * transaction.
587 */
588 private class TransactionalCommit implements MapEntryValue {
589 private final String key;
590 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800591 private final CountDownCompleter<Commit<? extends TransactionPrepare>> completer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800592
593 public TransactionalCommit(
594 String key,
595 long version,
Madan Jampanifc981772016-02-16 09:46:42 -0800596 CountDownCompleter<Commit<? extends TransactionPrepare>> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800597 this.key = key;
598 this.version = version;
599 this.completer = commit;
600 }
601
602 @Override
603 public byte[] value() {
Madan Jampani74da78b2016-02-09 21:18:36 -0800604 MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800605 return valueForKey(key, transaction);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800606 }
607
608 @Override
609 public long version() {
610 return version;
611 }
612
613 @Override
614 public void discard() {
615 completer.countDown();
616 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800617
Madan Jampani74da78b2016-02-09 21:18:36 -0800618 private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800619 MapUpdate<String, byte[]> update = transaction.updates()
620 .stream()
621 .filter(u -> u.key().equals(key))
622 .findFirst()
623 .orElse(null);
624 return update == null ? null : update.value();
625 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800626 }
627}