blob: aad73822e650eebabef8c88569a98f114453ec48 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.onosproject.store.service.MapEvent.Type.INSERT;
19import static org.onosproject.store.service.MapEvent.Type.REMOVE;
20import static org.onosproject.store.service.MapEvent.Type.UPDATE;
Madan Jampani3a9911c2016-02-21 11:25:45 -080021import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080022import io.atomix.copycat.server.Commit;
23import io.atomix.copycat.server.Snapshottable;
24import io.atomix.copycat.server.StateMachineExecutor;
25import io.atomix.copycat.server.session.SessionListener;
26import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
27import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
28import io.atomix.resource.ResourceStateMachine;
29
30import java.util.Collection;
31import java.util.HashMap;
32import java.util.Iterator;
Madan Jampanifc981772016-02-16 09:46:42 -080033import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080034import java.util.Map;
Madan Jampani65f24bb2016-03-15 15:16:18 -070035import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080036import java.util.Set;
37import java.util.concurrent.atomic.AtomicLong;
38import java.util.stream.Collectors;
39
40import org.onlab.util.CountDownCompleter;
41import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080042import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080043import org.onosproject.store.primitives.TransactionId;
Madan Jampanifc981772016-02-16 09:46:42 -080044import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampanifc981772016-02-16 09:46:42 -080055import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
56import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
57import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
58import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080059import org.onosproject.store.service.MapEvent;
Madan Jampani74da78b2016-02-09 21:18:36 -080060import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080061import org.onosproject.store.service.Versioned;
62
Madan Jampanifc981772016-02-16 09:46:42 -080063import com.google.common.collect.Lists;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080064import com.google.common.collect.Maps;
65import com.google.common.collect.Sets;
66
67import static com.google.common.base.Preconditions.checkState;
68
69/**
70 * State Machine for {@link AtomixConsistentMap} resource.
71 */
Madan Jampanifc981772016-02-16 09:46:42 -080072public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
Madan Jampani86cb2432016-02-17 11:07:56 -080073
Madan Jampani5e5b3d62016-02-01 16:03:33 -080074 private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
75 private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
76 private final Set<String> preparedKeys = Sets.newHashSet();
Madan Jampanifc981772016-02-16 09:46:42 -080077 private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080078 private AtomicLong versionCounter = new AtomicLong(0);
79
Madan Jampani65f24bb2016-03-15 15:16:18 -070080 public AtomixConsistentMapState(Properties properties) {
81 super(properties);
Madan Jampani86cb2432016-02-17 11:07:56 -080082 }
83
Madan Jampani5e5b3d62016-02-01 16:03:33 -080084 @Override
85 public void snapshot(SnapshotWriter writer) {
86 writer.writeLong(versionCounter.get());
87 }
88
89 @Override
90 public void install(SnapshotReader reader) {
91 versionCounter = new AtomicLong(reader.readLong());
92 }
93
94 @Override
95 protected void configure(StateMachineExecutor executor) {
96 // Listeners
Madan Jampanifc981772016-02-16 09:46:42 -080097 executor.register(Listen.class, this::listen);
98 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080099 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -0800100 executor.register(ContainsKey.class, this::containsKey);
101 executor.register(ContainsValue.class, this::containsValue);
102 executor.register(EntrySet.class, this::entrySet);
103 executor.register(Get.class, this::get);
104 executor.register(IsEmpty.class, this::isEmpty);
105 executor.register(KeySet.class, this::keySet);
106 executor.register(Size.class, this::size);
107 executor.register(Values.class, this::values);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800108 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -0800109 executor.register(UpdateAndGet.class, this::updateAndGet);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
Madan Jampanifc981772016-02-16 09:46:42 -0800111 executor.register(TransactionPrepare.class, this::prepare);
112 executor.register(TransactionCommit.class, this::commit);
113 executor.register(TransactionRollback.class, this::rollback);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800114 }
115
116 @Override
117 public void delete() {
118 // Delete Listeners
119 listeners.values().forEach(Commit::close);
120 listeners.clear();
121
122 // Delete Map entries
123 mapEntries.values().forEach(MapEntryValue::discard);
124 mapEntries.clear();
125 }
126
127 /**
128 * Handles a contains key commit.
129 *
Madan Jampanifc981772016-02-16 09:46:42 -0800130 * @param commit containsKey commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800131 * @return {@code true} if map contains key
132 */
Madan Jampanifc981772016-02-16 09:46:42 -0800133 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800134 try {
135 return toVersioned(mapEntries.get(commit.operation().key())) != null;
136 } finally {
137 commit.close();
138 }
139 }
140
141 /**
142 * Handles a contains value commit.
143 *
Madan Jampanifc981772016-02-16 09:46:42 -0800144 * @param commit containsValue commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800145 * @return {@code true} if map contains value
146 */
Madan Jampanifc981772016-02-16 09:46:42 -0800147 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800148 try {
149 Match<byte[]> valueMatch = Match
150 .ifValue(commit.operation().value());
151 return mapEntries.values().stream()
152 .anyMatch(value -> valueMatch.matches(value.value()));
153 } finally {
154 commit.close();
155 }
156 }
157
158 /**
159 * Handles a get commit.
160 *
161 * @param commit
162 * get commit
163 * @return value mapped to key
164 */
Madan Jampanifc981772016-02-16 09:46:42 -0800165 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800166 try {
167 return toVersioned(mapEntries.get(commit.operation().key()));
168 } finally {
169 commit.close();
170 }
171 }
172
173 /**
174 * Handles a count commit.
175 *
Madan Jampanifc981772016-02-16 09:46:42 -0800176 * @param commit size commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800177 * @return number of entries in map
178 */
Madan Jampanifc981772016-02-16 09:46:42 -0800179 protected int size(Commit<? extends Size> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800180 try {
181 return mapEntries.size();
182 } finally {
183 commit.close();
184 }
185 }
186
187 /**
188 * Handles an is empty commit.
189 *
Madan Jampanifc981772016-02-16 09:46:42 -0800190 * @param commit isEmpty commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800191 * @return {@code true} if map is empty
192 */
Madan Jampanifc981772016-02-16 09:46:42 -0800193 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800194 try {
195 return mapEntries.isEmpty();
196 } finally {
197 commit.close();
198 }
199 }
200
201 /**
202 * Handles a keySet commit.
203 *
Madan Jampanifc981772016-02-16 09:46:42 -0800204 * @param commit keySet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800205 * @return set of keys in map
206 */
Madan Jampanifc981772016-02-16 09:46:42 -0800207 protected Set<String> keySet(Commit<? extends KeySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800208 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800209 return mapEntries.keySet().stream().collect(Collectors.toSet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800210 } finally {
211 commit.close();
212 }
213 }
214
215 /**
216 * Handles a values commit.
217 *
Madan Jampanifc981772016-02-16 09:46:42 -0800218 * @param commit values commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800219 * @return collection of values in map
220 */
Madan Jampanifc981772016-02-16 09:46:42 -0800221 protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800222 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800223 return mapEntries.values().stream().map(this::toVersioned).collect(Collectors.toList());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800224 } finally {
225 commit.close();
226 }
227 }
228
229 /**
230 * Handles a entry set commit.
231 *
232 * @param commit
233 * entrySet commit
234 * @return set of map entries
235 */
Madan Jampanifc981772016-02-16 09:46:42 -0800236 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800237 try {
238 return mapEntries
239 .entrySet()
240 .stream()
241 .map(e -> Maps.immutableEntry(e.getKey(),
242 toVersioned(e.getValue())))
243 .collect(Collectors.toSet());
244 } finally {
245 commit.close();
246 }
247 }
248
249 /**
250 * Handles a update and get commit.
251 *
Madan Jampanifc981772016-02-16 09:46:42 -0800252 * @param commit updateAndGet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800253 * @return update result
254 */
Madan Jampanifc981772016-02-16 09:46:42 -0800255 protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800256 MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
257 String key = commit.operation().key();
258 MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
259 Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
260
261 if (updateStatus != MapEntryUpdateResult.Status.OK) {
262 commit.close();
263 return new MapEntryUpdateResult<>(updateStatus, "", key,
264 oldMapValue, oldMapValue);
265 }
266
267 byte[] newValue = commit.operation().value();
268 long newVersion = versionCounter.incrementAndGet();
269 Versioned<byte[]> newMapValue = newValue == null ? null
270 : new Versioned<>(newValue, newVersion);
271
272 MapEvent.Type updateType = newValue == null ? REMOVE
273 : oldCommitValue == null ? INSERT : UPDATE;
274 if (updateType == REMOVE || updateType == UPDATE) {
275 mapEntries.remove(key);
276 oldCommitValue.discard();
277 }
278 if (updateType == INSERT || updateType == UPDATE) {
279 mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
Madan Jampanifc981772016-02-16 09:46:42 -0800280 } else {
281 commit.close();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800282 }
Madan Jampanifc981772016-02-16 09:46:42 -0800283 publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800284 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
285 newMapValue);
286 }
287
288 /**
289 * Handles a clear commit.
290 *
Madan Jampanifc981772016-02-16 09:46:42 -0800291 * @param commit clear commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800292 * @return clear result
293 */
Madan Jampanifc981772016-02-16 09:46:42 -0800294 protected MapEntryUpdateResult.Status clear(Commit<? extends Clear> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800295 try {
296 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
297 .entrySet().iterator();
298 while (iterator.hasNext()) {
299 Map.Entry<String, MapEntryValue> entry = iterator.next();
300 String key = entry.getKey();
301 MapEntryValue value = entry.getValue();
302 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
303 value.version());
Madan Jampanifc981772016-02-16 09:46:42 -0800304 publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800305 value.discard();
306 iterator.remove();
307 }
308 return MapEntryUpdateResult.Status.OK;
309 } finally {
310 commit.close();
311 }
312 }
313
314 /**
315 * Handles a listen commit.
316 *
Madan Jampanifc981772016-02-16 09:46:42 -0800317 * @param commit listen commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800318 */
Madan Jampanifc981772016-02-16 09:46:42 -0800319 protected void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800320 Long sessionId = commit.session().id();
Madan Jampanie6038872016-03-03 12:14:37 -0800321 if (listeners.putIfAbsent(sessionId, commit) != null) {
322 commit.close();
323 return;
324 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800325 commit.session()
326 .onStateChange(
327 state -> {
Madan Jampani3a9911c2016-02-21 11:25:45 -0800328 if (state == ServerSession.State.CLOSED
329 || state == ServerSession.State.EXPIRED) {
Madan Jampanifc981772016-02-16 09:46:42 -0800330 Commit<? extends Listen> listener = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800331 if (listener != null) {
332 listener.close();
333 }
334 }
335 });
336 }
337
338 /**
339 * Handles an unlisten commit.
340 *
Madan Jampanifc981772016-02-16 09:46:42 -0800341 * @param commit unlisten commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800342 */
Madan Jampani40f022e2016-03-02 21:35:14 -0800343 protected void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800344 try {
Madan Jampani40f022e2016-03-02 21:35:14 -0800345 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800346 if (listener != null) {
347 listener.close();
348 }
349 } finally {
350 commit.close();
351 }
352 }
353
354 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800355 * Handles an prepare commit.
356 *
Madan Jampanifc981772016-02-16 09:46:42 -0800357 * @param commit transaction prepare commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800358 * @return prepare result
359 */
Madan Jampanifc981772016-02-16 09:46:42 -0800360 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800361 boolean ok = false;
362 try {
Madan Jampani74da78b2016-02-09 21:18:36 -0800363 MapTransaction<String, byte[]> transaction = commit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800364 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800365 String key = update.key();
366 if (preparedKeys.contains(key)) {
367 return PrepareResult.CONCURRENT_TRANSACTION;
368 }
369 MapEntryValue existingValue = mapEntries.get(key);
370 if (existingValue == null) {
371 if (update.currentValue() != null) {
372 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
373 }
374 } else {
375 if (existingValue.version() != update.currentVersion()) {
376 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
377 }
378 }
379 }
380 // No violations detected. Add to pendingTranctions and mark
Madan Jampanifc981772016-02-16 09:46:42 -0800381 // modified keys as locked for updates.
Madan Jampani74da78b2016-02-09 21:18:36 -0800382 pendingTransactions.put(transaction.transactionId(), commit);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800383 transaction.updates().forEach(u -> preparedKeys.add(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800384 ok = true;
385 return PrepareResult.OK;
386 } finally {
387 if (!ok) {
388 commit.close();
389 }
390 }
391 }
392
393 /**
394 * Handles an commit commit (ha!).
395 *
396 * @param commit transaction commit commit
397 * @return commit result
398 */
Madan Jampanifc981772016-02-16 09:46:42 -0800399 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800400 TransactionId transactionId = commit.operation().transactionId();
401 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800402 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800403 .remove(transactionId);
404 if (prepareCommit == null) {
405 return CommitResult.UNKNOWN_TRANSACTION_ID;
406 }
Madan Jampani74da78b2016-02-09 21:18:36 -0800407 MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800408 long totalReferencesToCommit = transaction
409 .updates()
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800410 .stream()
411 .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
412 .count();
Madan Jampanifc981772016-02-16 09:46:42 -0800413 CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800414 new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
Madan Jampanifc981772016-02-16 09:46:42 -0800415 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800416 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800417 String key = update.key();
418 MapEntryValue previousValue = mapEntries.remove(key);
419 MapEntryValue newValue = null;
420 checkState(preparedKeys.remove(key), "key is not prepared");
421 if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
422 newValue = new TransactionalCommit(key,
423 versionCounter.incrementAndGet(), completer);
424 }
Madan Jampanifc981772016-02-16 09:46:42 -0800425 eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
426 if (newValue != null) {
427 mapEntries.put(key, newValue);
428 }
429 if (previousValue != null) {
430 previousValue.discard();
431 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800432 }
Madan Jampanifc981772016-02-16 09:46:42 -0800433 publish(eventsToPublish);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800434 return CommitResult.OK;
435 } finally {
436 commit.close();
437 }
438 }
439
440 /**
441 * Handles an rollback commit (ha!).
442 *
443 * @param commit transaction rollback commit
444 * @return rollback result
445 */
Madan Jampanifc981772016-02-16 09:46:42 -0800446 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800447 TransactionId transactionId = commit.operation().transactionId();
448 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800449 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions.remove(transactionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800450 if (prepareCommit == null) {
451 return RollbackResult.UNKNOWN_TRANSACTION_ID;
452 } else {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800453 prepareCommit.operation()
454 .transaction()
455 .updates()
456 .forEach(u -> preparedKeys.remove(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800457 prepareCommit.close();
458 return RollbackResult.OK;
459 }
460 } finally {
461 commit.close();
462 }
463 }
464
Madan Jampanifc981772016-02-16 09:46:42 -0800465 /**
466 * Computes the update status that would result if the specified update were to applied to
467 * the state machine.
468 *
469 * @param update update
470 * @return status
471 */
472 private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800473 MapEntryValue existingValue = mapEntries.get(update.key());
474 if (existingValue == null && update.value() == null) {
475 return MapEntryUpdateResult.Status.NOOP;
476 }
477 if (preparedKeys.contains(update.key())) {
478 return MapEntryUpdateResult.Status.WRITE_LOCK;
479 }
480 byte[] existingRawValue = existingValue == null ? null : existingValue
481 .value();
482 Long existingVersion = existingValue == null ? null : existingValue
483 .version();
484 return update.valueMatch().matches(existingRawValue)
485 && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
486 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
487 }
488
Madan Jampanifc981772016-02-16 09:46:42 -0800489 /**
490 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
491 * @param value map entry value
492 * @return versioned instance
493 */
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800494 private Versioned<byte[]> toVersioned(MapEntryValue value) {
Madan Jampanifc981772016-02-16 09:46:42 -0800495 return value == null ? null : new Versioned<>(value.value(), value.version());
496 }
497
498 /**
499 * Publishes events to listeners.
500 *
501 * @param events list of map event to publish
502 */
503 private void publish(List<MapEvent<String, byte[]>> events) {
504 listeners.values().forEach(commit -> commit.session().publish(AtomixConsistentMap.CHANGE_SUBJECT, events));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800505 }
506
507 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800508 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800509 }
510
511 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800512 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800513 closeListener(session.id());
514 }
515
516 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800517 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800518 closeListener(session.id());
519 }
520
521 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800522 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800523 closeListener(session.id());
524 }
525
526 private void closeListener(Long sessionId) {
Madan Jampanifc981772016-02-16 09:46:42 -0800527 Commit<? extends Listen> commit = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800528 if (commit != null) {
529 commit.close();
530 }
531 }
532
533 /**
534 * Interface implemented by map values.
535 */
536 private interface MapEntryValue {
537 /**
538 * Returns the raw {@code byte[]}.
539 *
540 * @return raw value
541 */
542 byte[] value();
543
544 /**
545 * Returns the version of the value.
546 *
547 * @return version
548 */
549 long version();
550
551 /**
552 * Discards the value by invoke appropriate clean up actions.
553 */
554 void discard();
555 }
556
557 /**
558 * A {@code MapEntryValue} that is derived from a non-transactional update
559 * i.e. via any standard map update operation.
560 */
561 private class NonTransactionalCommit implements MapEntryValue {
562 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800563 private final Commit<? extends UpdateAndGet> commit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800564
Madan Jampanifc981772016-02-16 09:46:42 -0800565 public NonTransactionalCommit(long version, Commit<? extends UpdateAndGet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800566 this.version = version;
567 this.commit = commit;
568 }
569
570 @Override
571 public byte[] value() {
572 return commit.operation().value();
573 }
574
575 @Override
576 public long version() {
577 return version;
578 }
579
580 @Override
581 public void discard() {
582 commit.close();
583 }
584 }
585
586 /**
587 * A {@code MapEntryValue} that is derived from updates submitted via a
588 * transaction.
589 */
590 private class TransactionalCommit implements MapEntryValue {
591 private final String key;
592 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800593 private final CountDownCompleter<Commit<? extends TransactionPrepare>> completer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800594
595 public TransactionalCommit(
596 String key,
597 long version,
Madan Jampanifc981772016-02-16 09:46:42 -0800598 CountDownCompleter<Commit<? extends TransactionPrepare>> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800599 this.key = key;
600 this.version = version;
601 this.completer = commit;
602 }
603
604 @Override
605 public byte[] value() {
Madan Jampani74da78b2016-02-09 21:18:36 -0800606 MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800607 return valueForKey(key, transaction);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800608 }
609
610 @Override
611 public long version() {
612 return version;
613 }
614
615 @Override
616 public void discard() {
617 completer.countDown();
618 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800619
Madan Jampani74da78b2016-02-09 21:18:36 -0800620 private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800621 MapUpdate<String, byte[]> update = transaction.updates()
622 .stream()
623 .filter(u -> u.key().equals(key))
624 .findFirst()
625 .orElse(null);
626 return update == null ? null : update.value();
627 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800628 }
629}