blob: 6395f6908e38f8a2246dc5748fc270262bcbc1ec [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.onosproject.store.service.MapEvent.Type.INSERT;
19import static org.onosproject.store.service.MapEvent.Type.REMOVE;
20import static org.onosproject.store.service.MapEvent.Type.UPDATE;
Madan Jampani3a9911c2016-02-21 11:25:45 -080021import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080022import io.atomix.copycat.server.Commit;
23import io.atomix.copycat.server.Snapshottable;
24import io.atomix.copycat.server.StateMachineExecutor;
25import io.atomix.copycat.server.session.SessionListener;
26import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
27import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
28import io.atomix.resource.ResourceStateMachine;
Madan Jampani86cb2432016-02-17 11:07:56 -080029import io.atomix.resource.ResourceType;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080030
31import java.util.Collection;
32import java.util.HashMap;
33import java.util.Iterator;
Madan Jampanifc981772016-02-16 09:46:42 -080034import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080035import java.util.Map;
36import java.util.Set;
37import java.util.concurrent.atomic.AtomicLong;
38import java.util.stream.Collectors;
39
40import org.onlab.util.CountDownCompleter;
41import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080042import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080043import org.onosproject.store.primitives.TransactionId;
Madan Jampanifc981772016-02-16 09:46:42 -080044import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampanifc981772016-02-16 09:46:42 -080055import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
56import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
57import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
58import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080059import org.onosproject.store.service.MapEvent;
Madan Jampani74da78b2016-02-09 21:18:36 -080060import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080061import org.onosproject.store.service.Versioned;
62
Madan Jampanifc981772016-02-16 09:46:42 -080063import com.google.common.collect.Lists;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080064import com.google.common.collect.Maps;
65import com.google.common.collect.Sets;
66
67import static com.google.common.base.Preconditions.checkState;
68
69/**
70 * State Machine for {@link AtomixConsistentMap} resource.
71 */
Madan Jampanifc981772016-02-16 09:46:42 -080072public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
Madan Jampani86cb2432016-02-17 11:07:56 -080073
Madan Jampani5e5b3d62016-02-01 16:03:33 -080074 private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
75 private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
76 private final Set<String> preparedKeys = Sets.newHashSet();
Madan Jampanifc981772016-02-16 09:46:42 -080077 private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080078 private AtomicLong versionCounter = new AtomicLong(0);
79
Madan Jampani86cb2432016-02-17 11:07:56 -080080 public AtomixConsistentMapState() {
81 super(new ResourceType(AtomixConsistentMap.class));
82 }
83
Madan Jampani5e5b3d62016-02-01 16:03:33 -080084 @Override
85 public void snapshot(SnapshotWriter writer) {
86 writer.writeLong(versionCounter.get());
87 }
88
89 @Override
90 public void install(SnapshotReader reader) {
91 versionCounter = new AtomicLong(reader.readLong());
92 }
93
94 @Override
95 protected void configure(StateMachineExecutor executor) {
96 // Listeners
Madan Jampanifc981772016-02-16 09:46:42 -080097 executor.register(Listen.class, this::listen);
98 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080099 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -0800100 executor.register(ContainsKey.class, this::containsKey);
101 executor.register(ContainsValue.class, this::containsValue);
102 executor.register(EntrySet.class, this::entrySet);
103 executor.register(Get.class, this::get);
104 executor.register(IsEmpty.class, this::isEmpty);
105 executor.register(KeySet.class, this::keySet);
106 executor.register(Size.class, this::size);
107 executor.register(Values.class, this::values);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800108 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -0800109 executor.register(UpdateAndGet.class, this::updateAndGet);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
Madan Jampanifc981772016-02-16 09:46:42 -0800111 executor.register(TransactionPrepare.class, this::prepare);
112 executor.register(TransactionCommit.class, this::commit);
113 executor.register(TransactionRollback.class, this::rollback);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800114 }
115
116 @Override
117 public void delete() {
118 // Delete Listeners
119 listeners.values().forEach(Commit::close);
120 listeners.clear();
121
122 // Delete Map entries
123 mapEntries.values().forEach(MapEntryValue::discard);
124 mapEntries.clear();
125 }
126
127 /**
128 * Handles a contains key commit.
129 *
Madan Jampanifc981772016-02-16 09:46:42 -0800130 * @param commit containsKey commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800131 * @return {@code true} if map contains key
132 */
Madan Jampanifc981772016-02-16 09:46:42 -0800133 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800134 try {
135 return toVersioned(mapEntries.get(commit.operation().key())) != null;
136 } finally {
137 commit.close();
138 }
139 }
140
141 /**
142 * Handles a contains value commit.
143 *
Madan Jampanifc981772016-02-16 09:46:42 -0800144 * @param commit containsValue commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800145 * @return {@code true} if map contains value
146 */
Madan Jampanifc981772016-02-16 09:46:42 -0800147 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800148 try {
149 Match<byte[]> valueMatch = Match
150 .ifValue(commit.operation().value());
151 return mapEntries.values().stream()
152 .anyMatch(value -> valueMatch.matches(value.value()));
153 } finally {
154 commit.close();
155 }
156 }
157
158 /**
159 * Handles a get commit.
160 *
161 * @param commit
162 * get commit
163 * @return value mapped to key
164 */
Madan Jampanifc981772016-02-16 09:46:42 -0800165 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800166 try {
167 return toVersioned(mapEntries.get(commit.operation().key()));
168 } finally {
169 commit.close();
170 }
171 }
172
173 /**
174 * Handles a count commit.
175 *
Madan Jampanifc981772016-02-16 09:46:42 -0800176 * @param commit size commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800177 * @return number of entries in map
178 */
Madan Jampanifc981772016-02-16 09:46:42 -0800179 protected int size(Commit<? extends Size> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800180 try {
181 return mapEntries.size();
182 } finally {
183 commit.close();
184 }
185 }
186
187 /**
188 * Handles an is empty commit.
189 *
Madan Jampanifc981772016-02-16 09:46:42 -0800190 * @param commit isEmpty commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800191 * @return {@code true} if map is empty
192 */
Madan Jampanifc981772016-02-16 09:46:42 -0800193 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800194 try {
195 return mapEntries.isEmpty();
196 } finally {
197 commit.close();
198 }
199 }
200
201 /**
202 * Handles a keySet commit.
203 *
Madan Jampanifc981772016-02-16 09:46:42 -0800204 * @param commit keySet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800205 * @return set of keys in map
206 */
Madan Jampanifc981772016-02-16 09:46:42 -0800207 protected Set<String> keySet(Commit<? extends KeySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800208 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800209 return mapEntries.keySet().stream().collect(Collectors.toSet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800210 } finally {
211 commit.close();
212 }
213 }
214
215 /**
216 * Handles a values commit.
217 *
Madan Jampanifc981772016-02-16 09:46:42 -0800218 * @param commit values commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800219 * @return collection of values in map
220 */
Madan Jampanifc981772016-02-16 09:46:42 -0800221 protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800222 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800223 return mapEntries.values().stream().map(this::toVersioned).collect(Collectors.toList());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800224 } finally {
225 commit.close();
226 }
227 }
228
229 /**
230 * Handles a entry set commit.
231 *
232 * @param commit
233 * entrySet commit
234 * @return set of map entries
235 */
Madan Jampanifc981772016-02-16 09:46:42 -0800236 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800237 try {
238 return mapEntries
239 .entrySet()
240 .stream()
241 .map(e -> Maps.immutableEntry(e.getKey(),
242 toVersioned(e.getValue())))
243 .collect(Collectors.toSet());
244 } finally {
245 commit.close();
246 }
247 }
248
249 /**
250 * Handles a update and get commit.
251 *
Madan Jampanifc981772016-02-16 09:46:42 -0800252 * @param commit updateAndGet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800253 * @return update result
254 */
Madan Jampanifc981772016-02-16 09:46:42 -0800255 protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800256 MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
257 String key = commit.operation().key();
258 MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
259 Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
260
261 if (updateStatus != MapEntryUpdateResult.Status.OK) {
262 commit.close();
263 return new MapEntryUpdateResult<>(updateStatus, "", key,
264 oldMapValue, oldMapValue);
265 }
266
267 byte[] newValue = commit.operation().value();
268 long newVersion = versionCounter.incrementAndGet();
269 Versioned<byte[]> newMapValue = newValue == null ? null
270 : new Versioned<>(newValue, newVersion);
271
272 MapEvent.Type updateType = newValue == null ? REMOVE
273 : oldCommitValue == null ? INSERT : UPDATE;
274 if (updateType == REMOVE || updateType == UPDATE) {
275 mapEntries.remove(key);
276 oldCommitValue.discard();
277 }
278 if (updateType == INSERT || updateType == UPDATE) {
279 mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
Madan Jampanifc981772016-02-16 09:46:42 -0800280 } else {
281 commit.close();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800282 }
Madan Jampanifc981772016-02-16 09:46:42 -0800283 publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800284 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
285 newMapValue);
286 }
287
288 /**
289 * Handles a clear commit.
290 *
Madan Jampanifc981772016-02-16 09:46:42 -0800291 * @param commit clear commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800292 * @return clear result
293 */
Madan Jampanifc981772016-02-16 09:46:42 -0800294 protected MapEntryUpdateResult.Status clear(Commit<? extends Clear> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800295 try {
296 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
297 .entrySet().iterator();
298 while (iterator.hasNext()) {
299 Map.Entry<String, MapEntryValue> entry = iterator.next();
300 String key = entry.getKey();
301 MapEntryValue value = entry.getValue();
302 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
303 value.version());
Madan Jampanifc981772016-02-16 09:46:42 -0800304 publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800305 value.discard();
306 iterator.remove();
307 }
308 return MapEntryUpdateResult.Status.OK;
309 } finally {
310 commit.close();
311 }
312 }
313
314 /**
315 * Handles a listen commit.
316 *
Madan Jampanifc981772016-02-16 09:46:42 -0800317 * @param commit listen commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800318 */
Madan Jampanifc981772016-02-16 09:46:42 -0800319 protected void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800320 Long sessionId = commit.session().id();
321 listeners.put(sessionId, commit);
322 commit.session()
323 .onStateChange(
324 state -> {
Madan Jampani3a9911c2016-02-21 11:25:45 -0800325 if (state == ServerSession.State.CLOSED
326 || state == ServerSession.State.EXPIRED) {
Madan Jampanifc981772016-02-16 09:46:42 -0800327 Commit<? extends Listen> listener = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800328 if (listener != null) {
329 listener.close();
330 }
331 }
332 });
333 }
334
335 /**
336 * Handles an unlisten commit.
337 *
Madan Jampanifc981772016-02-16 09:46:42 -0800338 * @param commit unlisten commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800339 */
Madan Jampani40f022e2016-03-02 21:35:14 -0800340 protected void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800341 try {
Madan Jampani40f022e2016-03-02 21:35:14 -0800342 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800343 if (listener != null) {
344 listener.close();
345 }
346 } finally {
347 commit.close();
348 }
349 }
350
351 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800352 * Handles an prepare commit.
353 *
Madan Jampanifc981772016-02-16 09:46:42 -0800354 * @param commit transaction prepare commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800355 * @return prepare result
356 */
Madan Jampanifc981772016-02-16 09:46:42 -0800357 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800358 boolean ok = false;
359 try {
Madan Jampani74da78b2016-02-09 21:18:36 -0800360 MapTransaction<String, byte[]> transaction = commit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800361 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800362 String key = update.key();
363 if (preparedKeys.contains(key)) {
364 return PrepareResult.CONCURRENT_TRANSACTION;
365 }
366 MapEntryValue existingValue = mapEntries.get(key);
367 if (existingValue == null) {
368 if (update.currentValue() != null) {
369 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
370 }
371 } else {
372 if (existingValue.version() != update.currentVersion()) {
373 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
374 }
375 }
376 }
377 // No violations detected. Add to pendingTranctions and mark
Madan Jampanifc981772016-02-16 09:46:42 -0800378 // modified keys as locked for updates.
Madan Jampani74da78b2016-02-09 21:18:36 -0800379 pendingTransactions.put(transaction.transactionId(), commit);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800380 transaction.updates().forEach(u -> preparedKeys.add(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800381 ok = true;
382 return PrepareResult.OK;
383 } finally {
384 if (!ok) {
385 commit.close();
386 }
387 }
388 }
389
390 /**
391 * Handles an commit commit (ha!).
392 *
393 * @param commit transaction commit commit
394 * @return commit result
395 */
Madan Jampanifc981772016-02-16 09:46:42 -0800396 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800397 TransactionId transactionId = commit.operation().transactionId();
398 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800399 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800400 .remove(transactionId);
401 if (prepareCommit == null) {
402 return CommitResult.UNKNOWN_TRANSACTION_ID;
403 }
Madan Jampani74da78b2016-02-09 21:18:36 -0800404 MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800405 long totalReferencesToCommit = transaction
406 .updates()
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800407 .stream()
408 .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
409 .count();
Madan Jampanifc981772016-02-16 09:46:42 -0800410 CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800411 new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
Madan Jampanifc981772016-02-16 09:46:42 -0800412 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800413 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800414 String key = update.key();
415 MapEntryValue previousValue = mapEntries.remove(key);
416 MapEntryValue newValue = null;
417 checkState(preparedKeys.remove(key), "key is not prepared");
418 if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
419 newValue = new TransactionalCommit(key,
420 versionCounter.incrementAndGet(), completer);
421 }
Madan Jampanifc981772016-02-16 09:46:42 -0800422 eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
423 if (newValue != null) {
424 mapEntries.put(key, newValue);
425 }
426 if (previousValue != null) {
427 previousValue.discard();
428 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800429 }
Madan Jampanifc981772016-02-16 09:46:42 -0800430 publish(eventsToPublish);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800431 return CommitResult.OK;
432 } finally {
433 commit.close();
434 }
435 }
436
437 /**
438 * Handles an rollback commit (ha!).
439 *
440 * @param commit transaction rollback commit
441 * @return rollback result
442 */
Madan Jampanifc981772016-02-16 09:46:42 -0800443 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800444 TransactionId transactionId = commit.operation().transactionId();
445 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800446 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions.remove(transactionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800447 if (prepareCommit == null) {
448 return RollbackResult.UNKNOWN_TRANSACTION_ID;
449 } else {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800450 prepareCommit.operation()
451 .transaction()
452 .updates()
453 .forEach(u -> preparedKeys.remove(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800454 prepareCommit.close();
455 return RollbackResult.OK;
456 }
457 } finally {
458 commit.close();
459 }
460 }
461
Madan Jampanifc981772016-02-16 09:46:42 -0800462 /**
463 * Computes the update status that would result if the specified update were to applied to
464 * the state machine.
465 *
466 * @param update update
467 * @return status
468 */
469 private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800470 MapEntryValue existingValue = mapEntries.get(update.key());
471 if (existingValue == null && update.value() == null) {
472 return MapEntryUpdateResult.Status.NOOP;
473 }
474 if (preparedKeys.contains(update.key())) {
475 return MapEntryUpdateResult.Status.WRITE_LOCK;
476 }
477 byte[] existingRawValue = existingValue == null ? null : existingValue
478 .value();
479 Long existingVersion = existingValue == null ? null : existingValue
480 .version();
481 return update.valueMatch().matches(existingRawValue)
482 && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
483 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
484 }
485
Madan Jampanifc981772016-02-16 09:46:42 -0800486 /**
487 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
488 * @param value map entry value
489 * @return versioned instance
490 */
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800491 private Versioned<byte[]> toVersioned(MapEntryValue value) {
Madan Jampanifc981772016-02-16 09:46:42 -0800492 return value == null ? null : new Versioned<>(value.value(), value.version());
493 }
494
495 /**
496 * Publishes events to listeners.
497 *
498 * @param events list of map event to publish
499 */
500 private void publish(List<MapEvent<String, byte[]>> events) {
501 listeners.values().forEach(commit -> commit.session().publish(AtomixConsistentMap.CHANGE_SUBJECT, events));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800502 }
503
504 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800505 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800506 }
507
508 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800509 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800510 closeListener(session.id());
511 }
512
513 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800514 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800515 closeListener(session.id());
516 }
517
518 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800519 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800520 closeListener(session.id());
521 }
522
523 private void closeListener(Long sessionId) {
Madan Jampanifc981772016-02-16 09:46:42 -0800524 Commit<? extends Listen> commit = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800525 if (commit != null) {
526 commit.close();
527 }
528 }
529
530 /**
531 * Interface implemented by map values.
532 */
533 private interface MapEntryValue {
534 /**
535 * Returns the raw {@code byte[]}.
536 *
537 * @return raw value
538 */
539 byte[] value();
540
541 /**
542 * Returns the version of the value.
543 *
544 * @return version
545 */
546 long version();
547
548 /**
549 * Discards the value by invoke appropriate clean up actions.
550 */
551 void discard();
552 }
553
554 /**
555 * A {@code MapEntryValue} that is derived from a non-transactional update
556 * i.e. via any standard map update operation.
557 */
558 private class NonTransactionalCommit implements MapEntryValue {
559 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800560 private final Commit<? extends UpdateAndGet> commit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800561
Madan Jampanifc981772016-02-16 09:46:42 -0800562 public NonTransactionalCommit(long version, Commit<? extends UpdateAndGet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800563 this.version = version;
564 this.commit = commit;
565 }
566
567 @Override
568 public byte[] value() {
569 return commit.operation().value();
570 }
571
572 @Override
573 public long version() {
574 return version;
575 }
576
577 @Override
578 public void discard() {
579 commit.close();
580 }
581 }
582
583 /**
584 * A {@code MapEntryValue} that is derived from updates submitted via a
585 * transaction.
586 */
587 private class TransactionalCommit implements MapEntryValue {
588 private final String key;
589 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800590 private final CountDownCompleter<Commit<? extends TransactionPrepare>> completer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800591
592 public TransactionalCommit(
593 String key,
594 long version,
Madan Jampanifc981772016-02-16 09:46:42 -0800595 CountDownCompleter<Commit<? extends TransactionPrepare>> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800596 this.key = key;
597 this.version = version;
598 this.completer = commit;
599 }
600
601 @Override
602 public byte[] value() {
Madan Jampani74da78b2016-02-09 21:18:36 -0800603 MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800604 return valueForKey(key, transaction);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800605 }
606
607 @Override
608 public long version() {
609 return version;
610 }
611
612 @Override
613 public void discard() {
614 completer.countDown();
615 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800616
Madan Jampani74da78b2016-02-09 21:18:36 -0800617 private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800618 MapUpdate<String, byte[]> update = transaction.updates()
619 .stream()
620 .filter(u -> u.key().equals(key))
621 .findFirst()
622 .orElse(null);
623 return update == null ? null : update.value();
624 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800625 }
626}