blob: 4ed3a720d592a387d9ed3d841713092b491ab118 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.onosproject.store.service.MapEvent.Type.INSERT;
19import static org.onosproject.store.service.MapEvent.Type.REMOVE;
20import static org.onosproject.store.service.MapEvent.Type.UPDATE;
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -070021import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani3a9911c2016-02-21 11:25:45 -080022import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080023import io.atomix.copycat.server.Commit;
24import io.atomix.copycat.server.Snapshottable;
25import io.atomix.copycat.server.StateMachineExecutor;
26import io.atomix.copycat.server.session.SessionListener;
27import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
28import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
29import io.atomix.resource.ResourceStateMachine;
30
31import java.util.Collection;
32import java.util.HashMap;
33import java.util.Iterator;
Madan Jampanifc981772016-02-16 09:46:42 -080034import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080035import java.util.Map;
Madan Jampani65f24bb2016-03-15 15:16:18 -070036import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080037import java.util.Set;
38import java.util.concurrent.atomic.AtomicLong;
39import java.util.stream.Collectors;
40
41import org.onlab.util.CountDownCompleter;
42import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080043import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080044import org.onosproject.store.primitives.TransactionId;
Madan Jampanifc981772016-02-16 09:46:42 -080045import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
54import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampani542d9e22016-04-05 15:39:55 -070056import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
Madan Jampanifc981772016-02-16 09:46:42 -080057import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
58import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
59import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
60import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080061import org.onosproject.store.service.MapEvent;
Madan Jampani74da78b2016-02-09 21:18:36 -080062import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080063import org.onosproject.store.service.Versioned;
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -070064import org.slf4j.Logger;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080065
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -070066import com.google.common.base.Throwables;
Madan Jampanifc981772016-02-16 09:46:42 -080067import com.google.common.collect.Lists;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080068import com.google.common.collect.Maps;
69import com.google.common.collect.Sets;
70
71import static com.google.common.base.Preconditions.checkState;
72
73/**
74 * State Machine for {@link AtomixConsistentMap} resource.
75 */
Madan Jampanifc981772016-02-16 09:46:42 -080076public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
Madan Jampani86cb2432016-02-17 11:07:56 -080077
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -070078 private final Logger log = getLogger(getClass());
Madan Jampani5e5b3d62016-02-01 16:03:33 -080079 private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
80 private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
81 private final Set<String> preparedKeys = Sets.newHashSet();
Madan Jampanifc981772016-02-16 09:46:42 -080082 private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080083 private AtomicLong versionCounter = new AtomicLong(0);
84
Madan Jampani65f24bb2016-03-15 15:16:18 -070085 public AtomixConsistentMapState(Properties properties) {
86 super(properties);
Madan Jampani86cb2432016-02-17 11:07:56 -080087 }
88
Madan Jampani5e5b3d62016-02-01 16:03:33 -080089 @Override
90 public void snapshot(SnapshotWriter writer) {
91 writer.writeLong(versionCounter.get());
92 }
93
94 @Override
95 public void install(SnapshotReader reader) {
96 versionCounter = new AtomicLong(reader.readLong());
97 }
98
99 @Override
100 protected void configure(StateMachineExecutor executor) {
101 // Listeners
Madan Jampanifc981772016-02-16 09:46:42 -0800102 executor.register(Listen.class, this::listen);
103 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800104 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -0800105 executor.register(ContainsKey.class, this::containsKey);
106 executor.register(ContainsValue.class, this::containsValue);
107 executor.register(EntrySet.class, this::entrySet);
108 executor.register(Get.class, this::get);
109 executor.register(IsEmpty.class, this::isEmpty);
110 executor.register(KeySet.class, this::keySet);
111 executor.register(Size.class, this::size);
112 executor.register(Values.class, this::values);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800113 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -0800114 executor.register(UpdateAndGet.class, this::updateAndGet);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800115 executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
Madan Jampanifc981772016-02-16 09:46:42 -0800116 executor.register(TransactionPrepare.class, this::prepare);
117 executor.register(TransactionCommit.class, this::commit);
118 executor.register(TransactionRollback.class, this::rollback);
Madan Jampani542d9e22016-04-05 15:39:55 -0700119 executor.register(TransactionPrepareAndCommit.class, this::prepareAndCommit);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800120 }
121
122 @Override
123 public void delete() {
124 // Delete Listeners
125 listeners.values().forEach(Commit::close);
126 listeners.clear();
127
128 // Delete Map entries
129 mapEntries.values().forEach(MapEntryValue::discard);
130 mapEntries.clear();
131 }
132
133 /**
134 * Handles a contains key commit.
135 *
Madan Jampanifc981772016-02-16 09:46:42 -0800136 * @param commit containsKey commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800137 * @return {@code true} if map contains key
138 */
Madan Jampanifc981772016-02-16 09:46:42 -0800139 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800140 try {
141 return toVersioned(mapEntries.get(commit.operation().key())) != null;
142 } finally {
143 commit.close();
144 }
145 }
146
147 /**
148 * Handles a contains value commit.
149 *
Madan Jampanifc981772016-02-16 09:46:42 -0800150 * @param commit containsValue commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800151 * @return {@code true} if map contains value
152 */
Madan Jampanifc981772016-02-16 09:46:42 -0800153 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800154 try {
155 Match<byte[]> valueMatch = Match
156 .ifValue(commit.operation().value());
157 return mapEntries.values().stream()
158 .anyMatch(value -> valueMatch.matches(value.value()));
159 } finally {
160 commit.close();
161 }
162 }
163
164 /**
165 * Handles a get commit.
166 *
167 * @param commit
168 * get commit
169 * @return value mapped to key
170 */
Madan Jampanifc981772016-02-16 09:46:42 -0800171 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800172 try {
173 return toVersioned(mapEntries.get(commit.operation().key()));
174 } finally {
175 commit.close();
176 }
177 }
178
179 /**
180 * Handles a count commit.
181 *
Madan Jampanifc981772016-02-16 09:46:42 -0800182 * @param commit size commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800183 * @return number of entries in map
184 */
Madan Jampanifc981772016-02-16 09:46:42 -0800185 protected int size(Commit<? extends Size> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800186 try {
187 return mapEntries.size();
188 } finally {
189 commit.close();
190 }
191 }
192
193 /**
194 * Handles an is empty commit.
195 *
Madan Jampanifc981772016-02-16 09:46:42 -0800196 * @param commit isEmpty commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800197 * @return {@code true} if map is empty
198 */
Madan Jampanifc981772016-02-16 09:46:42 -0800199 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800200 try {
201 return mapEntries.isEmpty();
202 } finally {
203 commit.close();
204 }
205 }
206
207 /**
208 * Handles a keySet commit.
209 *
Madan Jampanifc981772016-02-16 09:46:42 -0800210 * @param commit keySet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800211 * @return set of keys in map
212 */
Madan Jampanifc981772016-02-16 09:46:42 -0800213 protected Set<String> keySet(Commit<? extends KeySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800214 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800215 return mapEntries.keySet().stream().collect(Collectors.toSet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800216 } finally {
217 commit.close();
218 }
219 }
220
221 /**
222 * Handles a values commit.
223 *
Madan Jampanifc981772016-02-16 09:46:42 -0800224 * @param commit values commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800225 * @return collection of values in map
226 */
Madan Jampanifc981772016-02-16 09:46:42 -0800227 protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800228 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800229 return mapEntries.values().stream().map(this::toVersioned).collect(Collectors.toList());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800230 } finally {
231 commit.close();
232 }
233 }
234
235 /**
236 * Handles a entry set commit.
237 *
238 * @param commit
239 * entrySet commit
240 * @return set of map entries
241 */
Madan Jampanifc981772016-02-16 09:46:42 -0800242 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800243 try {
244 return mapEntries
245 .entrySet()
246 .stream()
247 .map(e -> Maps.immutableEntry(e.getKey(),
248 toVersioned(e.getValue())))
249 .collect(Collectors.toSet());
250 } finally {
251 commit.close();
252 }
253 }
254
255 /**
256 * Handles a update and get commit.
257 *
Madan Jampanifc981772016-02-16 09:46:42 -0800258 * @param commit updateAndGet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800259 * @return update result
260 */
Madan Jampanifc981772016-02-16 09:46:42 -0800261 protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
Madan Jampaniea98f412016-06-22 09:05:40 -0700262 try {
263 MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
264 String key = commit.operation().key();
265 MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
266 Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800267
Madan Jampaniea98f412016-06-22 09:05:40 -0700268 if (updateStatus != MapEntryUpdateResult.Status.OK) {
269 commit.close();
270 return new MapEntryUpdateResult<>(updateStatus, "", key,
271 oldMapValue, oldMapValue);
272 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800273
Madan Jampaniea98f412016-06-22 09:05:40 -0700274 byte[] newValue = commit.operation().value();
275 long newVersion = versionCounter.incrementAndGet();
276 Versioned<byte[]> newMapValue = newValue == null ? null
277 : new Versioned<>(newValue, newVersion);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800278
Madan Jampaniea98f412016-06-22 09:05:40 -0700279 MapEvent.Type updateType = newValue == null ? REMOVE
280 : oldCommitValue == null ? INSERT : UPDATE;
281 if (updateType == REMOVE || updateType == UPDATE) {
282 mapEntries.remove(key);
283 oldCommitValue.discard();
284 }
285 if (updateType == INSERT || updateType == UPDATE) {
286 mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
287 } else {
288 commit.close();
289 }
290 publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
291 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
292 newMapValue);
293 } catch (Exception e) {
294 log.error("State machine operation failed", e);
295 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800296 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800297 }
298
299 /**
300 * Handles a clear commit.
301 *
Madan Jampanifc981772016-02-16 09:46:42 -0800302 * @param commit clear commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800303 * @return clear result
304 */
Madan Jampanifc981772016-02-16 09:46:42 -0800305 protected MapEntryUpdateResult.Status clear(Commit<? extends Clear> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800306 try {
307 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
308 .entrySet().iterator();
309 while (iterator.hasNext()) {
310 Map.Entry<String, MapEntryValue> entry = iterator.next();
311 String key = entry.getKey();
312 MapEntryValue value = entry.getValue();
313 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
314 value.version());
Madan Jampanifc981772016-02-16 09:46:42 -0800315 publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800316 value.discard();
317 iterator.remove();
318 }
319 return MapEntryUpdateResult.Status.OK;
320 } finally {
321 commit.close();
322 }
323 }
324
325 /**
326 * Handles a listen commit.
327 *
Madan Jampanifc981772016-02-16 09:46:42 -0800328 * @param commit listen commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800329 */
Madan Jampanifc981772016-02-16 09:46:42 -0800330 protected void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800331 Long sessionId = commit.session().id();
Madan Jampanie6038872016-03-03 12:14:37 -0800332 if (listeners.putIfAbsent(sessionId, commit) != null) {
333 commit.close();
334 return;
335 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800336 commit.session()
337 .onStateChange(
338 state -> {
Madan Jampani3a9911c2016-02-21 11:25:45 -0800339 if (state == ServerSession.State.CLOSED
340 || state == ServerSession.State.EXPIRED) {
Madan Jampanifc981772016-02-16 09:46:42 -0800341 Commit<? extends Listen> listener = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800342 if (listener != null) {
343 listener.close();
344 }
345 }
346 });
347 }
348
349 /**
350 * Handles an unlisten commit.
351 *
Madan Jampanifc981772016-02-16 09:46:42 -0800352 * @param commit unlisten commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800353 */
Madan Jampani40f022e2016-03-02 21:35:14 -0800354 protected void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800355 try {
Madan Jampani40f022e2016-03-02 21:35:14 -0800356 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800357 if (listener != null) {
358 listener.close();
359 }
360 } finally {
361 commit.close();
362 }
363 }
364
365 /**
Madan Jampani542d9e22016-04-05 15:39:55 -0700366 * Handles an prepare and commit commit.
367 *
368 * @param commit transaction prepare and commit commit
369 * @return prepare result
370 */
371 protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
372 PrepareResult prepareResult = prepare(commit);
373 if (prepareResult == PrepareResult.OK) {
374 commitInternal(commit.operation().transaction().transactionId());
375 }
376 return prepareResult;
377 }
378
379 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800380 * Handles an prepare commit.
381 *
Madan Jampanifc981772016-02-16 09:46:42 -0800382 * @param commit transaction prepare commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800383 * @return prepare result
384 */
Madan Jampanifc981772016-02-16 09:46:42 -0800385 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800386 boolean ok = false;
387 try {
Madan Jampani74da78b2016-02-09 21:18:36 -0800388 MapTransaction<String, byte[]> transaction = commit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800389 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800390 String key = update.key();
391 if (preparedKeys.contains(key)) {
392 return PrepareResult.CONCURRENT_TRANSACTION;
393 }
394 MapEntryValue existingValue = mapEntries.get(key);
395 if (existingValue == null) {
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700396 if (update.type() != MapUpdate.Type.PUT_IF_ABSENT) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800397 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
398 }
399 } else {
400 if (existingValue.version() != update.currentVersion()) {
401 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
402 }
403 }
404 }
405 // No violations detected. Add to pendingTranctions and mark
Madan Jampanifc981772016-02-16 09:46:42 -0800406 // modified keys as locked for updates.
Madan Jampani74da78b2016-02-09 21:18:36 -0800407 pendingTransactions.put(transaction.transactionId(), commit);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800408 transaction.updates().forEach(u -> preparedKeys.add(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800409 ok = true;
410 return PrepareResult.OK;
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700411 } catch (Exception e) {
412 log.warn("Failure applying {}", commit, e);
413 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800414 } finally {
415 if (!ok) {
416 commit.close();
417 }
418 }
419 }
420
421 /**
422 * Handles an commit commit (ha!).
423 *
424 * @param commit transaction commit commit
425 * @return commit result
426 */
Madan Jampanifc981772016-02-16 09:46:42 -0800427 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800428 TransactionId transactionId = commit.operation().transactionId();
429 try {
Madan Jampani542d9e22016-04-05 15:39:55 -0700430 return commitInternal(transactionId);
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700431 } catch (Exception e) {
432 log.warn("Failure applying {}", commit, e);
433 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800434 } finally {
435 commit.close();
436 }
437 }
438
Madan Jampani542d9e22016-04-05 15:39:55 -0700439 private CommitResult commitInternal(TransactionId transactionId) {
440 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
441 .remove(transactionId);
442 if (prepareCommit == null) {
443 return CommitResult.UNKNOWN_TRANSACTION_ID;
444 }
445 MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
446 long totalReferencesToCommit = transaction
447 .updates()
448 .stream()
449 .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
450 .count();
451 CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
452 new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
453 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
454 for (MapUpdate<String, byte[]> update : transaction.updates()) {
455 String key = update.key();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700456 checkState(preparedKeys.remove(key), "key is not prepared");
Madan Jampani542d9e22016-04-05 15:39:55 -0700457 MapEntryValue previousValue = mapEntries.remove(key);
458 MapEntryValue newValue = null;
Madan Jampani542d9e22016-04-05 15:39:55 -0700459 if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700460 newValue = new TransactionalCommit(key, versionCounter.incrementAndGet(), completer);
Madan Jampani542d9e22016-04-05 15:39:55 -0700461 }
462 eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
463 if (newValue != null) {
464 mapEntries.put(key, newValue);
465 }
466 if (previousValue != null) {
467 previousValue.discard();
468 }
469 }
470 publish(eventsToPublish);
471 return CommitResult.OK;
472 }
473
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800474 /**
475 * Handles an rollback commit (ha!).
476 *
477 * @param commit transaction rollback commit
478 * @return rollback result
479 */
Madan Jampanifc981772016-02-16 09:46:42 -0800480 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800481 TransactionId transactionId = commit.operation().transactionId();
482 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800483 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions.remove(transactionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800484 if (prepareCommit == null) {
485 return RollbackResult.UNKNOWN_TRANSACTION_ID;
486 } else {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800487 prepareCommit.operation()
488 .transaction()
489 .updates()
490 .forEach(u -> preparedKeys.remove(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800491 prepareCommit.close();
492 return RollbackResult.OK;
493 }
494 } finally {
495 commit.close();
496 }
497 }
498
Madan Jampanifc981772016-02-16 09:46:42 -0800499 /**
500 * Computes the update status that would result if the specified update were to applied to
501 * the state machine.
502 *
503 * @param update update
504 * @return status
505 */
506 private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800507 MapEntryValue existingValue = mapEntries.get(update.key());
508 if (existingValue == null && update.value() == null) {
509 return MapEntryUpdateResult.Status.NOOP;
510 }
511 if (preparedKeys.contains(update.key())) {
512 return MapEntryUpdateResult.Status.WRITE_LOCK;
513 }
514 byte[] existingRawValue = existingValue == null ? null : existingValue
515 .value();
516 Long existingVersion = existingValue == null ? null : existingValue
517 .version();
518 return update.valueMatch().matches(existingRawValue)
519 && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
520 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
521 }
522
Madan Jampanifc981772016-02-16 09:46:42 -0800523 /**
524 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
525 * @param value map entry value
526 * @return versioned instance
527 */
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800528 private Versioned<byte[]> toVersioned(MapEntryValue value) {
Madan Jampanifc981772016-02-16 09:46:42 -0800529 return value == null ? null : new Versioned<>(value.value(), value.version());
530 }
531
532 /**
533 * Publishes events to listeners.
534 *
535 * @param events list of map event to publish
536 */
537 private void publish(List<MapEvent<String, byte[]>> events) {
538 listeners.values().forEach(commit -> commit.session().publish(AtomixConsistentMap.CHANGE_SUBJECT, events));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800539 }
540
541 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800542 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800543 }
544
545 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800546 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800547 closeListener(session.id());
548 }
549
550 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800551 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800552 closeListener(session.id());
553 }
554
555 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800556 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800557 closeListener(session.id());
558 }
559
560 private void closeListener(Long sessionId) {
Madan Jampanifc981772016-02-16 09:46:42 -0800561 Commit<? extends Listen> commit = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800562 if (commit != null) {
563 commit.close();
564 }
565 }
566
567 /**
568 * Interface implemented by map values.
569 */
570 private interface MapEntryValue {
571 /**
572 * Returns the raw {@code byte[]}.
573 *
574 * @return raw value
575 */
576 byte[] value();
577
578 /**
579 * Returns the version of the value.
580 *
581 * @return version
582 */
583 long version();
584
585 /**
586 * Discards the value by invoke appropriate clean up actions.
587 */
588 void discard();
589 }
590
591 /**
592 * A {@code MapEntryValue} that is derived from a non-transactional update
593 * i.e. via any standard map update operation.
594 */
595 private class NonTransactionalCommit implements MapEntryValue {
596 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800597 private final Commit<? extends UpdateAndGet> commit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800598
Madan Jampanifc981772016-02-16 09:46:42 -0800599 public NonTransactionalCommit(long version, Commit<? extends UpdateAndGet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800600 this.version = version;
601 this.commit = commit;
602 }
603
604 @Override
605 public byte[] value() {
606 return commit.operation().value();
607 }
608
609 @Override
610 public long version() {
611 return version;
612 }
613
614 @Override
615 public void discard() {
616 commit.close();
617 }
618 }
619
620 /**
621 * A {@code MapEntryValue} that is derived from updates submitted via a
622 * transaction.
623 */
624 private class TransactionalCommit implements MapEntryValue {
625 private final String key;
626 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800627 private final CountDownCompleter<Commit<? extends TransactionPrepare>> completer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800628
629 public TransactionalCommit(
630 String key,
631 long version,
Madan Jampanifc981772016-02-16 09:46:42 -0800632 CountDownCompleter<Commit<? extends TransactionPrepare>> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800633 this.key = key;
634 this.version = version;
635 this.completer = commit;
636 }
637
638 @Override
639 public byte[] value() {
Madan Jampani74da78b2016-02-09 21:18:36 -0800640 MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800641 return valueForKey(key, transaction);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800642 }
643
644 @Override
645 public long version() {
646 return version;
647 }
648
649 @Override
650 public void discard() {
651 completer.countDown();
652 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800653
Madan Jampani74da78b2016-02-09 21:18:36 -0800654 private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800655 MapUpdate<String, byte[]> update = transaction.updates()
656 .stream()
657 .filter(u -> u.key().equals(key))
658 .findFirst()
659 .orElse(null);
660 return update == null ? null : update.value();
661 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800662 }
663}