blob: 84cd53a2fda633e8574bf56dacf353e36a0e586c [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani79924fa2016-09-13 13:57:03 -070018import static com.google.common.base.Preconditions.checkState;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080019import static org.onosproject.store.service.MapEvent.Type.INSERT;
20import static org.onosproject.store.service.MapEvent.Type.REMOVE;
21import static org.onosproject.store.service.MapEvent.Type.UPDATE;
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -070022import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080023import io.atomix.copycat.server.Commit;
24import io.atomix.copycat.server.Snapshottable;
25import io.atomix.copycat.server.StateMachineExecutor;
Madan Jampani79924fa2016-09-13 13:57:03 -070026import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import io.atomix.copycat.server.session.SessionListener;
28import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
29import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
30import io.atomix.resource.ResourceStateMachine;
31
32import java.util.Collection;
33import java.util.HashMap;
34import java.util.Iterator;
Madan Jampanifc981772016-02-16 09:46:42 -080035import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080036import java.util.Map;
Madan Jampani65f24bb2016-03-15 15:16:18 -070037import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080038import java.util.Set;
39import java.util.concurrent.atomic.AtomicLong;
40import java.util.stream.Collectors;
41
42import org.onlab.util.CountDownCompleter;
43import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080044import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080045import org.onosproject.store.primitives.TransactionId;
Madan Jampanifc981772016-02-16 09:46:42 -080046import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
51import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
52import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
53import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
54import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
55import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080056import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
Madan Jampani542d9e22016-04-05 15:39:55 -070057import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
Madan Jampanifc981772016-02-16 09:46:42 -080058import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
59import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
60import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
61import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080062import org.onosproject.store.service.MapEvent;
Madan Jampani74da78b2016-02-09 21:18:36 -080063import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080064import org.onosproject.store.service.Versioned;
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -070065import org.slf4j.Logger;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080066
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -070067import com.google.common.base.Throwables;
Madan Jampanifc981772016-02-16 09:46:42 -080068import com.google.common.collect.Lists;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080069import com.google.common.collect.Maps;
70import com.google.common.collect.Sets;
71
Madan Jampani5e5b3d62016-02-01 16:03:33 -080072/**
73 * State Machine for {@link AtomixConsistentMap} resource.
74 */
Madan Jampanifc981772016-02-16 09:46:42 -080075public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
Madan Jampani86cb2432016-02-17 11:07:56 -080076
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -070077 private final Logger log = getLogger(getClass());
Madan Jampani79924fa2016-09-13 13:57:03 -070078 private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080079 private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
80 private final Set<String> preparedKeys = Sets.newHashSet();
Madan Jampanifc981772016-02-16 09:46:42 -080081 private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080082 private AtomicLong versionCounter = new AtomicLong(0);
83
Madan Jampani65f24bb2016-03-15 15:16:18 -070084 public AtomixConsistentMapState(Properties properties) {
85 super(properties);
Madan Jampani86cb2432016-02-17 11:07:56 -080086 }
87
Madan Jampani5e5b3d62016-02-01 16:03:33 -080088 @Override
89 public void snapshot(SnapshotWriter writer) {
90 writer.writeLong(versionCounter.get());
91 }
92
93 @Override
94 public void install(SnapshotReader reader) {
95 versionCounter = new AtomicLong(reader.readLong());
96 }
97
98 @Override
99 protected void configure(StateMachineExecutor executor) {
100 // Listeners
Madan Jampanifc981772016-02-16 09:46:42 -0800101 executor.register(Listen.class, this::listen);
102 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800103 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -0800104 executor.register(ContainsKey.class, this::containsKey);
105 executor.register(ContainsValue.class, this::containsValue);
106 executor.register(EntrySet.class, this::entrySet);
107 executor.register(Get.class, this::get);
108 executor.register(IsEmpty.class, this::isEmpty);
109 executor.register(KeySet.class, this::keySet);
110 executor.register(Size.class, this::size);
111 executor.register(Values.class, this::values);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800112 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -0800113 executor.register(UpdateAndGet.class, this::updateAndGet);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800114 executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
Madan Jampanifc981772016-02-16 09:46:42 -0800115 executor.register(TransactionPrepare.class, this::prepare);
116 executor.register(TransactionCommit.class, this::commit);
117 executor.register(TransactionRollback.class, this::rollback);
Madan Jampani542d9e22016-04-05 15:39:55 -0700118 executor.register(TransactionPrepareAndCommit.class, this::prepareAndCommit);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800119 }
120
121 @Override
122 public void delete() {
123 // Delete Listeners
124 listeners.values().forEach(Commit::close);
125 listeners.clear();
126
127 // Delete Map entries
128 mapEntries.values().forEach(MapEntryValue::discard);
129 mapEntries.clear();
130 }
131
132 /**
133 * Handles a contains key commit.
134 *
Madan Jampanifc981772016-02-16 09:46:42 -0800135 * @param commit containsKey commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800136 * @return {@code true} if map contains key
137 */
Madan Jampanifc981772016-02-16 09:46:42 -0800138 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800139 try {
140 return toVersioned(mapEntries.get(commit.operation().key())) != null;
141 } finally {
142 commit.close();
143 }
144 }
145
146 /**
147 * Handles a contains value commit.
148 *
Madan Jampanifc981772016-02-16 09:46:42 -0800149 * @param commit containsValue commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800150 * @return {@code true} if map contains value
151 */
Madan Jampanifc981772016-02-16 09:46:42 -0800152 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800153 try {
154 Match<byte[]> valueMatch = Match
155 .ifValue(commit.operation().value());
156 return mapEntries.values().stream()
157 .anyMatch(value -> valueMatch.matches(value.value()));
158 } finally {
159 commit.close();
160 }
161 }
162
163 /**
164 * Handles a get commit.
165 *
166 * @param commit
167 * get commit
168 * @return value mapped to key
169 */
Madan Jampanifc981772016-02-16 09:46:42 -0800170 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800171 try {
172 return toVersioned(mapEntries.get(commit.operation().key()));
173 } finally {
174 commit.close();
175 }
176 }
177
178 /**
179 * Handles a count commit.
180 *
Madan Jampanifc981772016-02-16 09:46:42 -0800181 * @param commit size commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800182 * @return number of entries in map
183 */
Madan Jampanifc981772016-02-16 09:46:42 -0800184 protected int size(Commit<? extends Size> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800185 try {
186 return mapEntries.size();
187 } finally {
188 commit.close();
189 }
190 }
191
192 /**
193 * Handles an is empty commit.
194 *
Madan Jampanifc981772016-02-16 09:46:42 -0800195 * @param commit isEmpty commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800196 * @return {@code true} if map is empty
197 */
Madan Jampanifc981772016-02-16 09:46:42 -0800198 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800199 try {
200 return mapEntries.isEmpty();
201 } finally {
202 commit.close();
203 }
204 }
205
206 /**
207 * Handles a keySet commit.
208 *
Madan Jampanifc981772016-02-16 09:46:42 -0800209 * @param commit keySet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800210 * @return set of keys in map
211 */
Madan Jampanifc981772016-02-16 09:46:42 -0800212 protected Set<String> keySet(Commit<? extends KeySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800213 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800214 return mapEntries.keySet().stream().collect(Collectors.toSet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800215 } finally {
216 commit.close();
217 }
218 }
219
220 /**
221 * Handles a values commit.
222 *
Madan Jampanifc981772016-02-16 09:46:42 -0800223 * @param commit values commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800224 * @return collection of values in map
225 */
Madan Jampanifc981772016-02-16 09:46:42 -0800226 protected Collection<Versioned<byte[]>> values(Commit<? extends Values> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800227 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800228 return mapEntries.values().stream().map(this::toVersioned).collect(Collectors.toList());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800229 } finally {
230 commit.close();
231 }
232 }
233
234 /**
235 * Handles a entry set commit.
236 *
237 * @param commit
238 * entrySet commit
239 * @return set of map entries
240 */
Madan Jampanifc981772016-02-16 09:46:42 -0800241 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<? extends EntrySet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800242 try {
243 return mapEntries
244 .entrySet()
245 .stream()
246 .map(e -> Maps.immutableEntry(e.getKey(),
247 toVersioned(e.getValue())))
248 .collect(Collectors.toSet());
249 } finally {
250 commit.close();
251 }
252 }
253
254 /**
255 * Handles a update and get commit.
256 *
Madan Jampanifc981772016-02-16 09:46:42 -0800257 * @param commit updateAndGet commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800258 * @return update result
259 */
Madan Jampanifc981772016-02-16 09:46:42 -0800260 protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
Madan Jampaniea98f412016-06-22 09:05:40 -0700261 try {
262 MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
263 String key = commit.operation().key();
264 MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
265 Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800266
Madan Jampaniea98f412016-06-22 09:05:40 -0700267 if (updateStatus != MapEntryUpdateResult.Status.OK) {
268 commit.close();
269 return new MapEntryUpdateResult<>(updateStatus, "", key,
270 oldMapValue, oldMapValue);
271 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800272
Madan Jampaniea98f412016-06-22 09:05:40 -0700273 byte[] newValue = commit.operation().value();
274 long newVersion = versionCounter.incrementAndGet();
275 Versioned<byte[]> newMapValue = newValue == null ? null
276 : new Versioned<>(newValue, newVersion);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800277
Madan Jampaniea98f412016-06-22 09:05:40 -0700278 MapEvent.Type updateType = newValue == null ? REMOVE
279 : oldCommitValue == null ? INSERT : UPDATE;
280 if (updateType == REMOVE || updateType == UPDATE) {
281 mapEntries.remove(key);
282 oldCommitValue.discard();
283 }
284 if (updateType == INSERT || updateType == UPDATE) {
285 mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
286 } else {
287 commit.close();
288 }
289 publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
290 return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
291 newMapValue);
292 } catch (Exception e) {
293 log.error("State machine operation failed", e);
294 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800295 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800296 }
297
298 /**
299 * Handles a clear commit.
300 *
Madan Jampanifc981772016-02-16 09:46:42 -0800301 * @param commit clear commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800302 * @return clear result
303 */
Madan Jampanifc981772016-02-16 09:46:42 -0800304 protected MapEntryUpdateResult.Status clear(Commit<? extends Clear> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800305 try {
306 Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
307 .entrySet().iterator();
308 while (iterator.hasNext()) {
309 Map.Entry<String, MapEntryValue> entry = iterator.next();
310 String key = entry.getKey();
311 MapEntryValue value = entry.getValue();
312 Versioned<byte[]> removedValue = new Versioned<>(value.value(),
313 value.version());
Madan Jampanifc981772016-02-16 09:46:42 -0800314 publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800315 value.discard();
316 iterator.remove();
317 }
318 return MapEntryUpdateResult.Status.OK;
319 } finally {
320 commit.close();
321 }
322 }
323
324 /**
325 * Handles a listen commit.
326 *
Madan Jampanifc981772016-02-16 09:46:42 -0800327 * @param commit listen commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800328 */
Madan Jampanifc981772016-02-16 09:46:42 -0800329 protected void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800330 Long sessionId = commit.session().id();
Madan Jampanie6038872016-03-03 12:14:37 -0800331 if (listeners.putIfAbsent(sessionId, commit) != null) {
332 commit.close();
333 return;
334 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800335 commit.session()
336 .onStateChange(
337 state -> {
Madan Jampani3a9911c2016-02-21 11:25:45 -0800338 if (state == ServerSession.State.CLOSED
339 || state == ServerSession.State.EXPIRED) {
Madan Jampanifc981772016-02-16 09:46:42 -0800340 Commit<? extends Listen> listener = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800341 if (listener != null) {
342 listener.close();
343 }
344 }
345 });
346 }
347
348 /**
349 * Handles an unlisten commit.
350 *
Madan Jampanifc981772016-02-16 09:46:42 -0800351 * @param commit unlisten commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800352 */
Madan Jampani40f022e2016-03-02 21:35:14 -0800353 protected void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800354 try {
Madan Jampani40f022e2016-03-02 21:35:14 -0800355 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800356 if (listener != null) {
357 listener.close();
358 }
359 } finally {
360 commit.close();
361 }
362 }
363
364 /**
Madan Jampani542d9e22016-04-05 15:39:55 -0700365 * Handles an prepare and commit commit.
366 *
367 * @param commit transaction prepare and commit commit
368 * @return prepare result
369 */
370 protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
371 PrepareResult prepareResult = prepare(commit);
372 if (prepareResult == PrepareResult.OK) {
373 commitInternal(commit.operation().transaction().transactionId());
374 }
375 return prepareResult;
376 }
377
378 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800379 * Handles an prepare commit.
380 *
Madan Jampanifc981772016-02-16 09:46:42 -0800381 * @param commit transaction prepare commit
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800382 * @return prepare result
383 */
Madan Jampanifc981772016-02-16 09:46:42 -0800384 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800385 boolean ok = false;
386 try {
Madan Jampani74da78b2016-02-09 21:18:36 -0800387 MapTransaction<String, byte[]> transaction = commit.operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800388 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800389 String key = update.key();
390 if (preparedKeys.contains(key)) {
391 return PrepareResult.CONCURRENT_TRANSACTION;
392 }
393 MapEntryValue existingValue = mapEntries.get(key);
394 if (existingValue == null) {
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700395 if (update.type() != MapUpdate.Type.PUT_IF_ABSENT) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800396 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
397 }
398 } else {
399 if (existingValue.version() != update.currentVersion()) {
400 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
401 }
402 }
403 }
Jordan Halterman820e39f2017-02-05 22:15:36 -0800404 // No violations detected. Add to pendingTransactions and mark
Madan Jampanifc981772016-02-16 09:46:42 -0800405 // modified keys as locked for updates.
Madan Jampani74da78b2016-02-09 21:18:36 -0800406 pendingTransactions.put(transaction.transactionId(), commit);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800407 transaction.updates().forEach(u -> preparedKeys.add(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800408 ok = true;
409 return PrepareResult.OK;
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700410 } catch (Exception e) {
411 log.warn("Failure applying {}", commit, e);
412 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800413 } finally {
414 if (!ok) {
415 commit.close();
416 }
417 }
418 }
419
420 /**
421 * Handles an commit commit (ha!).
422 *
423 * @param commit transaction commit commit
424 * @return commit result
425 */
Madan Jampanifc981772016-02-16 09:46:42 -0800426 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800427 TransactionId transactionId = commit.operation().transactionId();
428 try {
Madan Jampani542d9e22016-04-05 15:39:55 -0700429 return commitInternal(transactionId);
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700430 } catch (Exception e) {
431 log.warn("Failure applying {}", commit, e);
432 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800433 } finally {
434 commit.close();
435 }
436 }
437
Madan Jampani542d9e22016-04-05 15:39:55 -0700438 private CommitResult commitInternal(TransactionId transactionId) {
439 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
440 .remove(transactionId);
441 if (prepareCommit == null) {
442 return CommitResult.UNKNOWN_TRANSACTION_ID;
443 }
444 MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
445 long totalReferencesToCommit = transaction
446 .updates()
447 .stream()
448 .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
449 .count();
450 CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
451 new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
452 List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
453 for (MapUpdate<String, byte[]> update : transaction.updates()) {
454 String key = update.key();
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700455 checkState(preparedKeys.remove(key), "key is not prepared");
Madan Jampani542d9e22016-04-05 15:39:55 -0700456 MapEntryValue previousValue = mapEntries.remove(key);
457 MapEntryValue newValue = null;
Madan Jampani542d9e22016-04-05 15:39:55 -0700458 if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
HIGUCHI Yutacaad26b2016-04-16 16:06:11 -0700459 newValue = new TransactionalCommit(key, versionCounter.incrementAndGet(), completer);
Madan Jampani542d9e22016-04-05 15:39:55 -0700460 }
461 eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
462 if (newValue != null) {
463 mapEntries.put(key, newValue);
464 }
465 if (previousValue != null) {
466 previousValue.discard();
467 }
468 }
469 publish(eventsToPublish);
470 return CommitResult.OK;
471 }
472
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800473 /**
474 * Handles an rollback commit (ha!).
475 *
476 * @param commit transaction rollback commit
477 * @return rollback result
478 */
Madan Jampanifc981772016-02-16 09:46:42 -0800479 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800480 TransactionId transactionId = commit.operation().transactionId();
481 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800482 Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions.remove(transactionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800483 if (prepareCommit == null) {
484 return RollbackResult.UNKNOWN_TRANSACTION_ID;
485 } else {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800486 prepareCommit.operation()
487 .transaction()
488 .updates()
489 .forEach(u -> preparedKeys.remove(u.key()));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800490 prepareCommit.close();
491 return RollbackResult.OK;
492 }
493 } finally {
494 commit.close();
495 }
496 }
497
Madan Jampanifc981772016-02-16 09:46:42 -0800498 /**
499 * Computes the update status that would result if the specified update were to applied to
500 * the state machine.
501 *
502 * @param update update
503 * @return status
504 */
505 private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800506 MapEntryValue existingValue = mapEntries.get(update.key());
507 if (existingValue == null && update.value() == null) {
508 return MapEntryUpdateResult.Status.NOOP;
509 }
510 if (preparedKeys.contains(update.key())) {
511 return MapEntryUpdateResult.Status.WRITE_LOCK;
512 }
513 byte[] existingRawValue = existingValue == null ? null : existingValue
514 .value();
515 Long existingVersion = existingValue == null ? null : existingValue
516 .version();
517 return update.valueMatch().matches(existingRawValue)
518 && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
519 : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
520 }
521
Madan Jampanifc981772016-02-16 09:46:42 -0800522 /**
523 * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
524 * @param value map entry value
525 * @return versioned instance
526 */
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800527 private Versioned<byte[]> toVersioned(MapEntryValue value) {
Madan Jampanifc981772016-02-16 09:46:42 -0800528 return value == null ? null : new Versioned<>(value.value(), value.version());
529 }
530
531 /**
532 * Publishes events to listeners.
533 *
534 * @param events list of map event to publish
535 */
536 private void publish(List<MapEvent<String, byte[]>> events) {
537 listeners.values().forEach(commit -> commit.session().publish(AtomixConsistentMap.CHANGE_SUBJECT, events));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800538 }
539
540 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800541 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800542 }
543
544 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800545 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800546 closeListener(session.id());
547 }
548
549 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800550 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800551 closeListener(session.id());
552 }
553
554 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800555 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800556 closeListener(session.id());
557 }
558
559 private void closeListener(Long sessionId) {
Madan Jampanifc981772016-02-16 09:46:42 -0800560 Commit<? extends Listen> commit = listeners.remove(sessionId);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800561 if (commit != null) {
562 commit.close();
563 }
564 }
565
566 /**
567 * Interface implemented by map values.
568 */
569 private interface MapEntryValue {
570 /**
571 * Returns the raw {@code byte[]}.
572 *
573 * @return raw value
574 */
575 byte[] value();
576
577 /**
578 * Returns the version of the value.
579 *
580 * @return version
581 */
582 long version();
583
584 /**
585 * Discards the value by invoke appropriate clean up actions.
586 */
587 void discard();
588 }
589
590 /**
591 * A {@code MapEntryValue} that is derived from a non-transactional update
592 * i.e. via any standard map update operation.
593 */
594 private class NonTransactionalCommit implements MapEntryValue {
595 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800596 private final Commit<? extends UpdateAndGet> commit;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800597
Madan Jampanifc981772016-02-16 09:46:42 -0800598 public NonTransactionalCommit(long version, Commit<? extends UpdateAndGet> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800599 this.version = version;
600 this.commit = commit;
601 }
602
603 @Override
604 public byte[] value() {
605 return commit.operation().value();
606 }
607
608 @Override
609 public long version() {
610 return version;
611 }
612
613 @Override
614 public void discard() {
615 commit.close();
616 }
617 }
618
619 /**
620 * A {@code MapEntryValue} that is derived from updates submitted via a
621 * transaction.
622 */
623 private class TransactionalCommit implements MapEntryValue {
624 private final String key;
625 private final long version;
Madan Jampanifc981772016-02-16 09:46:42 -0800626 private final CountDownCompleter<Commit<? extends TransactionPrepare>> completer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800627
628 public TransactionalCommit(
629 String key,
630 long version,
Madan Jampanifc981772016-02-16 09:46:42 -0800631 CountDownCompleter<Commit<? extends TransactionPrepare>> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800632 this.key = key;
633 this.version = version;
634 this.completer = commit;
635 }
636
637 @Override
638 public byte[] value() {
Madan Jampani74da78b2016-02-09 21:18:36 -0800639 MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800640 return valueForKey(key, transaction);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800641 }
642
643 @Override
644 public long version() {
645 return version;
646 }
647
648 @Override
649 public void discard() {
650 completer.countDown();
651 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800652
Madan Jampani74da78b2016-02-09 21:18:36 -0800653 private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800654 MapUpdate<String, byte[]> update = transaction.updates()
655 .stream()
656 .filter(u -> u.key().equals(key))
657 .findFirst()
658 .orElse(null);
659 return update == null ? null : update.value();
660 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800661 }
662}