blob: 41a1a9f488bf3f7654c598516bdeb16f67f6f463 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import java.util.Arrays;
20import java.util.HashMap;
21import java.util.Iterator;
Sithara Punnassery61a80252017-08-07 11:16:08 -070022import java.util.LinkedHashMap;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070023import java.util.List;
24import java.util.Map;
25import java.util.Optional;
26import java.util.Queue;
Sithara Punnassery61a80252017-08-07 11:16:08 -070027import java.util.Set;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070028import java.util.TreeMap;
29import java.util.concurrent.atomic.AtomicLong;
30import java.util.stream.Collectors;
31
32import com.esotericsoftware.kryo.Kryo;
33import com.esotericsoftware.kryo.io.Input;
34import com.esotericsoftware.kryo.io.Output;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070035import com.google.common.collect.Lists;
Sithara Punnassery61a80252017-08-07 11:16:08 -070036import com.google.common.collect.Maps;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070037import com.google.common.collect.Queues;
Sithara Punnassery61a80252017-08-07 11:16:08 -070038import com.google.common.collect.Sets;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070039import io.atomix.protocols.raft.event.EventType;
40import io.atomix.protocols.raft.service.AbstractRaftService;
41import io.atomix.protocols.raft.service.Commit;
42import io.atomix.protocols.raft.service.RaftServiceExecutor;
43import io.atomix.protocols.raft.session.RaftSession;
44import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
45import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
46import org.onlab.util.KryoNamespace;
47import org.onlab.util.Match;
Sithara Punnassery61a80252017-08-07 11:16:08 -070048import org.onosproject.store.primitives.NodeUpdate;
49import org.onosproject.store.primitives.TransactionId;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070050import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
51import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
52import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
53import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
54import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
Sithara Punnassery61a80252017-08-07 11:16:08 -070055import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionBegin;
56import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepare;
57import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepareAndCommit;
58import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionCommit;
59import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionRollback;
60
Jordan Haltermane853d032017-08-01 15:10:28 -070061import org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070062import org.onosproject.store.serializers.KryoNamespaces;
63import org.onosproject.store.service.DocumentPath;
64import org.onosproject.store.service.DocumentTree;
65import org.onosproject.store.service.DocumentTreeEvent;
66import org.onosproject.store.service.DocumentTreeEvent.Type;
67import org.onosproject.store.service.IllegalDocumentModificationException;
68import org.onosproject.store.service.NoSuchDocumentPathException;
Jordan Haltermand0d80352017-08-10 15:08:27 -070069import org.onosproject.store.service.Ordering;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070070import org.onosproject.store.service.Serializer;
Sithara Punnassery61a80252017-08-07 11:16:08 -070071import org.onosproject.store.service.TransactionLog;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070072import org.onosproject.store.service.Versioned;
73
Sithara Punnassery61a80252017-08-07 11:16:08 -070074import static com.google.common.base.Preconditions.checkState;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070075import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
Sithara Punnassery61a80252017-08-07 11:16:08 -070076import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.BEGIN;
77import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE;
78import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE_AND_COMMIT;
79import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.COMMIT;
80import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ROLLBACK;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070081import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
82import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
83import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070084import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
Sithara Punnassery61a80252017-08-07 11:16:08 -070085import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
86import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070087
88/**
89 * State Machine for {@link AtomixDocumentTree} resource.
90 */
91public class AtomixDocumentTreeService extends AbstractRaftService {
92 private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
93 .register(KryoNamespaces.BASIC)
94 .register(AtomixDocumentTreeOperations.NAMESPACE)
95 .register(AtomixDocumentTreeEvents.NAMESPACE)
96 .register(new com.esotericsoftware.kryo.Serializer<Listener>() {
97 @Override
98 public void write(Kryo kryo, Output output, Listener listener) {
99 output.writeLong(listener.session.sessionId().id());
100 kryo.writeObject(output, listener.path);
101 }
102
103 @Override
104 public Listener read(Kryo kryo, Input input, Class<Listener> type) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700105 return new Listener(sessions().getSession(input.readLong()),
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700106 kryo.readObjectOrNull(input, DocumentPath.class));
107 }
108 }, Listener.class)
109 .register(Versioned.class)
110 .register(DocumentPath.class)
Sithara Punnassery61a80252017-08-07 11:16:08 -0700111 .register(new LinkedHashMap().keySet().getClass())
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700112 .register(TreeMap.class)
Jordan Haltermand0d80352017-08-10 15:08:27 -0700113 .register(Ordering.class)
Sithara Punnassery61a80252017-08-07 11:16:08 -0700114 .register(TransactionScope.class)
115 .register(TransactionLog.class)
116 .register(TransactionId.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700117 .register(SessionListenCommits.class)
118 .register(new com.esotericsoftware.kryo.Serializer<DefaultDocumentTree>() {
119 @Override
120 public void write(Kryo kryo, Output output, DefaultDocumentTree object) {
121 kryo.writeObject(output, object.root);
122 }
123
124 @Override
125 @SuppressWarnings("unchecked")
126 public DefaultDocumentTree read(Kryo kryo, Input input, Class<DefaultDocumentTree> type) {
127 return new DefaultDocumentTree(versionCounter::incrementAndGet,
128 kryo.readObject(input, DefaultDocumentTreeNode.class));
129 }
130 }, DefaultDocumentTree.class)
131 .register(DefaultDocumentTreeNode.class)
132 .build());
133
134 private Map<Long, SessionListenCommits> listeners = new HashMap<>();
135 private AtomicLong versionCounter = new AtomicLong(0);
Jordan Haltermand0d80352017-08-10 15:08:27 -0700136 private DocumentTree<byte[]> docTree;
Sithara Punnassery61a80252017-08-07 11:16:08 -0700137 private Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
138 private Set<DocumentPath> preparedKeys = Sets.newHashSet();
Jordan Haltermand0d80352017-08-10 15:08:27 -0700139
140 public AtomixDocumentTreeService(Ordering ordering) {
141 this.docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet, ordering);
142 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700143
144 @Override
145 public void snapshot(SnapshotWriter writer) {
146 writer.writeLong(versionCounter.get());
147 writer.writeObject(listeners, serializer::encode);
148 writer.writeObject(docTree, serializer::encode);
Sithara Punnassery61a80252017-08-07 11:16:08 -0700149 writer.writeObject(preparedKeys, serializer::encode);
150 writer.writeObject(activeTransactions, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700151 }
152
153 @Override
154 public void install(SnapshotReader reader) {
155 versionCounter = new AtomicLong(reader.readLong());
156 listeners = reader.readObject(serializer::decode);
157 docTree = reader.readObject(serializer::decode);
Sithara Punnassery61a80252017-08-07 11:16:08 -0700158 preparedKeys = reader.readObject(serializer::decode);
159 activeTransactions = reader.readObject(serializer::decode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700160 }
161
162 @Override
163 protected void configure(RaftServiceExecutor executor) {
164 // Listeners
165 executor.register(ADD_LISTENER, serializer::decode, this::listen);
166 executor.register(REMOVE_LISTENER, serializer::decode, this::unlisten);
167 // queries
168 executor.register(GET, serializer::decode, this::get, serializer::encode);
169 executor.register(GET_CHILDREN, serializer::decode, this::getChildren, serializer::encode);
170 // commands
171 executor.register(UPDATE, serializer::decode, this::update, serializer::encode);
172 executor.register(CLEAR, this::clear);
Sithara Punnassery61a80252017-08-07 11:16:08 -0700173 executor.register(BEGIN, serializer::decode, this::begin, serializer::encode);
174 executor.register(PREPARE, serializer::decode, this::prepare, serializer::encode);
175 executor.register(PREPARE_AND_COMMIT, serializer::decode, this::prepareAndCommit, serializer::encode);
176 executor.register(COMMIT, serializer::decode, this::commit, serializer::encode);
177 executor.register(ROLLBACK, serializer::decode, this::rollback, serializer::encode);
178 }
179
180 /**
181 * Returns a boolean indicating whether the given path is currently locked by a transaction.
182 *
183 * @param path the path to check
184 * @return whether the given path is locked by a running transaction
185 */
186 private boolean isLocked(DocumentPath path) {
187 return preparedKeys.contains(path);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700188 }
189
190 protected void listen(Commit<? extends Listen> commit) {
191 Long sessionId = commit.session().sessionId().id();
192 listeners.computeIfAbsent(sessionId, k -> new SessionListenCommits())
193 .add(new Listener(commit.session(), commit.value().path()));
194 }
195
196 protected void unlisten(Commit<? extends Unlisten> commit) {
197 Long sessionId = commit.session().sessionId().id();
198 SessionListenCommits listenCommits = listeners.get(sessionId);
199 if (listenCommits != null) {
200 listenCommits.remove(commit);
201 }
202 }
203
204 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
205 try {
206 Versioned<byte[]> value = docTree.get(commit.value().path());
Jordan Haltermane853d032017-08-01 15:10:28 -0700207 return value == null ? null : value.map(node -> node);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700208 } catch (IllegalStateException e) {
209 return null;
210 }
211 }
212
Jordan Haltermane853d032017-08-01 15:10:28 -0700213 protected DocumentTreeResult<Map<String, Versioned<byte[]>>> getChildren(Commit<? extends GetChildren> commit) {
214 try {
215 return DocumentTreeResult.ok(docTree.getChildren(commit.value().path()));
216 } catch (NoSuchDocumentPathException e) {
217 return DocumentTreeResult.invalidPath();
218 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700219 }
220
Jordan Haltermane853d032017-08-01 15:10:28 -0700221 protected DocumentTreeResult<Versioned<byte[]>> update(Commit<? extends Update> commit) {
222 DocumentTreeResult<Versioned<byte[]>> result = null;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700223 DocumentPath path = commit.value().path();
Sithara Punnassery61a80252017-08-07 11:16:08 -0700224
225 // If the path is locked by a transaction, return a WRITE_LOCK error.
226 if (isLocked(path)) {
227 return DocumentTreeResult.writeLock();
228 }
229
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700230 Versioned<byte[]> currentValue = docTree.get(path);
231 try {
232 Match<Long> versionMatch = commit.value().versionMatch();
233 Match<byte[]> valueMatch = commit.value().valueMatch();
234
235 if (versionMatch.matches(currentValue == null ? null : currentValue.version())
236 && valueMatch.matches(currentValue == null ? null : currentValue.value())) {
237 if (commit.value().value() == null) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700238 Versioned<byte[]> oldValue = docTree.removeNode(path);
239 result = new DocumentTreeResult<>(Status.OK, oldValue);
240 if (oldValue != null) {
241 notifyListeners(new DocumentTreeEvent<>(
242 path,
243 Type.DELETED,
244 Optional.empty(),
245 Optional.of(oldValue)));
246 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700247 } else {
Jordan Haltermane853d032017-08-01 15:10:28 -0700248 Versioned<byte[]> oldValue = docTree.set(path, commit.value().value().orElse(null));
249 Versioned<byte[]> newValue = docTree.get(path);
250 result = new DocumentTreeResult<>(Status.OK, newValue);
251 if (oldValue == null) {
252 notifyListeners(new DocumentTreeEvent<>(
253 path,
254 Type.CREATED,
255 Optional.of(newValue),
256 Optional.empty()));
257 } else {
258 notifyListeners(new DocumentTreeEvent<>(
259 path,
260 Type.UPDATED,
261 Optional.of(newValue),
262 Optional.of(oldValue)));
263 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700264 }
Jordan Haltermane853d032017-08-01 15:10:28 -0700265 } else {
266 result = new DocumentTreeResult<>(
267 commit.value().value() == null ? Status.INVALID_PATH : Status.NOOP,
268 currentValue);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700269 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700270 } catch (IllegalDocumentModificationException e) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700271 result = DocumentTreeResult.illegalModification();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700272 } catch (NoSuchDocumentPathException e) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700273 result = DocumentTreeResult.invalidPath();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700274 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700275 logger().error("Failed to apply {} to state machine", commit.value(), e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800276 throw new IllegalStateException(e);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700277 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700278 return result;
279 }
280
281 protected void clear(Commit<Void> commit) {
282 Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
283 Map<String, Versioned<byte[]>> topLevelChildren = docTree.getChildren(DocumentPath.from("root"));
284 toClearQueue.addAll(topLevelChildren.keySet()
285 .stream()
286 .map(name -> new DocumentPath(name, DocumentPath.from("root")))
287 .collect(Collectors.toList()));
288 while (!toClearQueue.isEmpty()) {
289 DocumentPath path = toClearQueue.remove();
290 Map<String, Versioned<byte[]>> children = docTree.getChildren(path);
291 if (children.size() == 0) {
292 docTree.removeNode(path);
293 } else {
294 children.keySet().forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
295 toClearQueue.add(path);
296 }
297 }
298 }
299
Sithara Punnassery61a80252017-08-07 11:16:08 -0700300 /**
301 * Handles a begin commit.
302 *
303 * @param commit transaction begin commit
304 * @return transaction state version
305 */
306 protected long begin(Commit<? extends TransactionBegin> commit) {
307 long version = commit.index();
308 activeTransactions.put(commit.value().transactionId(), new TransactionScope(version));
309 return version;
310 }
311
312 /**
313 * Handles an prepare commit.
314 *
315 * @param commit transaction prepare commit
316 * @return prepare result
317 */
318 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
319 try {
320 TransactionLog<NodeUpdate<byte[]>> transactionLog = commit.value().transactionLog();
321 // Iterate through records in the transaction log and perform isolation checks.
322 for (NodeUpdate<byte[]> record : transactionLog.records()) {
323 DocumentPath path = record.path();
324
325 // If the prepared keys already contains the key contained within the record, that indicates a
326 // conflict with a concurrent transaction.
327 if (preparedKeys.contains(path)) {
328 return PrepareResult.CONCURRENT_TRANSACTION;
329 }
330
331 // Read the existing value from the map.
332 Versioned<byte[]> existingValue = docTree.get(path);
333
334 // If the update is an UPDATE_NODE or DELETE_NODE, verify that versions match.
335 switch (record.type()) {
336 case UPDATE_NODE:
337 case DELETE_NODE:
338 if (existingValue == null || existingValue.version() != record.version()) {
339 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
340 }
341 default:
342 break;
343 }
344 }
345
346 // No violations detected. Mark modified keys locked for transactions.
347 transactionLog.records().forEach(record -> {
348 preparedKeys.add(record.path());
349 });
350
351 // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
352 // coordinator is communicating with another node. Transactions assume that the client is communicating
353 // with a single leader in order to limit the overhead of retaining tombstones.
354 TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
355 if (transactionScope == null) {
356 activeTransactions.put(
357 transactionLog.transactionId(),
358 new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
359 return PrepareResult.PARTIAL_FAILURE;
360 } else {
361 activeTransactions.put(
362 transactionLog.transactionId(),
363 transactionScope.prepared(commit));
364 return PrepareResult.OK;
365 }
366 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700367 logger().warn("Failure applying {}", commit, e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800368 throw new IllegalStateException(e);
Sithara Punnassery61a80252017-08-07 11:16:08 -0700369 }
370 }
371
372 /**
373 * Handles an prepare and commit commit.
374 *
375 * @param commit transaction prepare and commit commit
376 * @return prepare result
377 */
378 protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
379 TransactionId transactionId = commit.value().transactionLog().transactionId();
380 PrepareResult prepareResult = prepare(commit);
381 TransactionScope transactionScope = activeTransactions.remove(transactionId);
382 if (prepareResult == PrepareResult.OK) {
383 transactionScope = transactionScope.prepared(commit);
384 commitTransaction(transactionScope);
385 }
386 return prepareResult;
387 }
388
389 /**
390 * Applies committed operations to the state machine.
391 */
392 private CommitResult commitTransaction(TransactionScope transactionScope) {
393 TransactionLog<NodeUpdate<byte[]>> transactionLog = transactionScope.transactionLog();
394
395 List<DocumentTreeEvent<byte[]>> eventsToPublish = Lists.newArrayList();
396 DocumentTreeEvent<byte[]> start = new DocumentTreeEvent<>(
397 DocumentPath.from(transactionScope.transactionLog().transactionId().toString()),
398 Type.TRANSACTION_START,
399 Optional.empty(),
400 Optional.empty());
401 eventsToPublish.add(start);
402
403 for (NodeUpdate<byte[]> record : transactionLog.records()) {
404 DocumentPath path = record.path();
405 checkState(preparedKeys.remove(path), "path is not prepared");
406
Yuta HIGUCHI47d96092017-11-17 14:05:26 -0800407 // FIXME revisit this block, it never respects NodeUpdate type
408
Sithara Punnassery61a80252017-08-07 11:16:08 -0700409 Versioned<byte[]> previousValue = null;
410 try {
411 previousValue = docTree.removeNode(path);
412 } catch (NoSuchDocumentPathException e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700413 logger().info("Value is being inserted first time");
Sithara Punnassery61a80252017-08-07 11:16:08 -0700414 }
415
416 if (record.value() != null) {
417 if (docTree.create(path, record.value())) {
418 Versioned<byte[]> newValue = docTree.get(path);
419 eventsToPublish.add(new DocumentTreeEvent<>(
420 path,
421 Optional.ofNullable(newValue),
422 Optional.ofNullable(previousValue)));
423 }
424 } else if (previousValue != null) {
425 eventsToPublish.add(new DocumentTreeEvent<>(
426 path,
427 Optional.empty(),
428 Optional.of(previousValue)));
429 }
430 }
431
Yuta HIGUCHI47d96092017-11-17 14:05:26 -0800432 DocumentTreeEvent<byte[]> end = new DocumentTreeEvent<>(
Sithara Punnassery61a80252017-08-07 11:16:08 -0700433 DocumentPath.from(transactionScope.transactionLog().transactionId().toString()),
434 Type.TRANSACTION_END,
435 Optional.empty(),
436 Optional.empty());
437 eventsToPublish.add(end);
438 publish(eventsToPublish);
439
440 return CommitResult.OK;
441 }
442
443 /**
444 * Handles an commit commit (ha!).
445 *
446 * @param commit transaction commit commit
447 * @return commit result
448 */
449 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
450 TransactionId transactionId = commit.value().transactionId();
451 TransactionScope transactionScope = activeTransactions.remove(transactionId);
452 if (transactionScope == null) {
453 return CommitResult.UNKNOWN_TRANSACTION_ID;
454 }
455 try {
456 return commitTransaction(transactionScope);
457 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700458 logger().warn("Failure applying {}", commit, e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800459 throw new IllegalStateException(e);
Sithara Punnassery61a80252017-08-07 11:16:08 -0700460 }
461 }
462
463 /**
464 * Handles an rollback commit (ha!).
465 *
466 * @param commit transaction rollback commit
467 * @return rollback result
468 */
469 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
470 TransactionId transactionId = commit.value().transactionId();
471 TransactionScope transactionScope = activeTransactions.remove(transactionId);
472 if (transactionScope == null) {
473 return RollbackResult.UNKNOWN_TRANSACTION_ID;
474 } else if (!transactionScope.isPrepared()) {
475 return RollbackResult.OK;
476 } else {
477 transactionScope.transactionLog().records()
478 .forEach(record -> {
479 preparedKeys.remove(record.path());
480 });
481 return RollbackResult.OK;
482 }
483
484 }
485
486 /**
487 * Map transaction scope.
488 */
489 private static final class TransactionScope {
490 private final long version;
491 private final TransactionLog<NodeUpdate<byte[]>> transactionLog;
492
493 private TransactionScope(long version) {
494 this(version, null);
495 }
496
497 private TransactionScope(long version, TransactionLog<NodeUpdate<byte[]>> transactionLog) {
498 this.version = version;
499 this.transactionLog = transactionLog;
500 }
501
502 /**
503 * Returns the transaction version.
504 *
505 * @return the transaction version
506 */
507 long version() {
508 return version;
509 }
510
511 /**
512 * Returns whether this is a prepared transaction scope.
513 *
514 * @return whether this is a prepared transaction scope
515 */
516 boolean isPrepared() {
517 return transactionLog != null;
518 }
519
520 /**
521 * Returns the transaction commit log.
522 *
523 * @return the transaction commit log
524 */
525 TransactionLog<NodeUpdate<byte[]>> transactionLog() {
Yuta HIGUCHI47d96092017-11-17 14:05:26 -0800526 checkState(isPrepared(), "Transaction not prepared");
Sithara Punnassery61a80252017-08-07 11:16:08 -0700527 return transactionLog;
528 }
529
530 /**
531 * Returns a new transaction scope with a prepare commit.
532 *
533 * @param commit the prepare commit
534 * @return new transaction scope updated with the prepare commit
535 */
536 TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
537 return new TransactionScope(version, commit.value().transactionLog());
538 }
539 }
540
541 private void publish(List<DocumentTreeEvent<byte[]>> events) {
542 listeners.values().forEach(session -> {
543 session.publish(CHANGE, events);
544 });
545 }
546
Jordan Haltermane853d032017-08-01 15:10:28 -0700547 private void notifyListeners(DocumentTreeEvent<byte[]> event) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700548 listeners.values()
549 .stream()
550 .filter(l -> event.path().isDescendentOf(l.leastCommonAncestorPath()))
551 .forEach(listener -> listener.publish(CHANGE, Arrays.asList(event)));
552 }
553
554 @Override
555 public void onExpire(RaftSession session) {
556 closeListener(session.sessionId().id());
557 }
558
559 @Override
560 public void onClose(RaftSession session) {
561 closeListener(session.sessionId().id());
562 }
563
564 private void closeListener(Long sessionId) {
565 listeners.remove(sessionId);
566 }
567
568 private class SessionListenCommits {
569 private final List<Listener> listeners = Lists.newArrayList();
570 private DocumentPath leastCommonAncestorPath;
571
572 public void add(Listener listener) {
573 listeners.add(listener);
574 recomputeLeastCommonAncestor();
575 }
576
577 public void remove(Commit<? extends Unlisten> commit) {
578 // Remove the first listen commit with path matching path in unlisten commit
579 Iterator<Listener> iterator = listeners.iterator();
580 while (iterator.hasNext()) {
581 Listener listener = iterator.next();
582 if (listener.path().equals(commit.value().path())) {
583 iterator.remove();
584 }
585 }
586 recomputeLeastCommonAncestor();
587 }
588
589 public DocumentPath leastCommonAncestorPath() {
590 return leastCommonAncestorPath;
591 }
592
593 public <M> void publish(EventType topic, M message) {
594 listeners.stream().findAny().ifPresent(listener ->
595 listener.session().publish(topic, serializer::encode, message));
596 }
597
598 private void recomputeLeastCommonAncestor() {
599 this.leastCommonAncestorPath = DocumentPath.leastCommonAncestor(listeners.stream()
600 .map(Listener::path)
601 .collect(Collectors.toList()));
602 }
603 }
604
605 private static class Listener {
606 private final RaftSession session;
607 private final DocumentPath path;
608
609 public Listener(RaftSession session, DocumentPath path) {
610 this.session = session;
611 this.path = path;
612 }
613
614 public DocumentPath path() {
615 return path;
616 }
617
618 public RaftSession session() {
619 return session;
620 }
621 }
622}