blob: b87f784243d1f8aeb667f15fc25d9be70371b1a7 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import java.util.Arrays;
20import java.util.HashMap;
21import java.util.Iterator;
Sithara Punnassery61a80252017-08-07 11:16:08 -070022import java.util.LinkedHashMap;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070023import java.util.List;
24import java.util.Map;
25import java.util.Optional;
26import java.util.Queue;
Sithara Punnassery61a80252017-08-07 11:16:08 -070027import java.util.Set;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070028import java.util.TreeMap;
29import java.util.concurrent.atomic.AtomicLong;
30import java.util.stream.Collectors;
31
32import com.esotericsoftware.kryo.Kryo;
33import com.esotericsoftware.kryo.io.Input;
34import com.esotericsoftware.kryo.io.Output;
35import com.google.common.base.Throwables;
36import com.google.common.collect.Lists;
Sithara Punnassery61a80252017-08-07 11:16:08 -070037import com.google.common.collect.Maps;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070038import com.google.common.collect.Queues;
Sithara Punnassery61a80252017-08-07 11:16:08 -070039import com.google.common.collect.Sets;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070040import io.atomix.protocols.raft.event.EventType;
41import io.atomix.protocols.raft.service.AbstractRaftService;
42import io.atomix.protocols.raft.service.Commit;
43import io.atomix.protocols.raft.service.RaftServiceExecutor;
44import io.atomix.protocols.raft.session.RaftSession;
45import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
46import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
47import org.onlab.util.KryoNamespace;
48import org.onlab.util.Match;
Sithara Punnassery61a80252017-08-07 11:16:08 -070049import org.onosproject.store.primitives.NodeUpdate;
50import org.onosproject.store.primitives.TransactionId;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070051import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
52import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
53import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
54import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
55import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
Sithara Punnassery61a80252017-08-07 11:16:08 -070056import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionBegin;
57import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepare;
58import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepareAndCommit;
59import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionCommit;
60import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionRollback;
61
Jordan Haltermane853d032017-08-01 15:10:28 -070062import org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070063import org.onosproject.store.serializers.KryoNamespaces;
64import org.onosproject.store.service.DocumentPath;
65import org.onosproject.store.service.DocumentTree;
66import org.onosproject.store.service.DocumentTreeEvent;
67import org.onosproject.store.service.DocumentTreeEvent.Type;
68import org.onosproject.store.service.IllegalDocumentModificationException;
69import org.onosproject.store.service.NoSuchDocumentPathException;
Jordan Haltermand0d80352017-08-10 15:08:27 -070070import org.onosproject.store.service.Ordering;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070071import org.onosproject.store.service.Serializer;
Sithara Punnassery61a80252017-08-07 11:16:08 -070072import org.onosproject.store.service.TransactionLog;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070073import org.onosproject.store.service.Versioned;
74
Sithara Punnassery61a80252017-08-07 11:16:08 -070075import static com.google.common.base.Preconditions.checkState;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070076import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
Sithara Punnassery61a80252017-08-07 11:16:08 -070077import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.BEGIN;
78import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE;
79import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE_AND_COMMIT;
80import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.COMMIT;
81import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ROLLBACK;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070082import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
83import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
84import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070085import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
Sithara Punnassery61a80252017-08-07 11:16:08 -070086import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
87import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070088
89/**
90 * State Machine for {@link AtomixDocumentTree} resource.
91 */
92public class AtomixDocumentTreeService extends AbstractRaftService {
93 private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
94 .register(KryoNamespaces.BASIC)
95 .register(AtomixDocumentTreeOperations.NAMESPACE)
96 .register(AtomixDocumentTreeEvents.NAMESPACE)
97 .register(new com.esotericsoftware.kryo.Serializer<Listener>() {
98 @Override
99 public void write(Kryo kryo, Output output, Listener listener) {
100 output.writeLong(listener.session.sessionId().id());
101 kryo.writeObject(output, listener.path);
102 }
103
104 @Override
105 public Listener read(Kryo kryo, Input input, Class<Listener> type) {
106 return new Listener(getSessions().getSession(input.readLong()),
107 kryo.readObjectOrNull(input, DocumentPath.class));
108 }
109 }, Listener.class)
110 .register(Versioned.class)
111 .register(DocumentPath.class)
Sithara Punnassery61a80252017-08-07 11:16:08 -0700112 .register(new LinkedHashMap().keySet().getClass())
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700113 .register(TreeMap.class)
Jordan Haltermand0d80352017-08-10 15:08:27 -0700114 .register(Ordering.class)
Sithara Punnassery61a80252017-08-07 11:16:08 -0700115 .register(TransactionScope.class)
116 .register(TransactionLog.class)
117 .register(TransactionId.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700118 .register(SessionListenCommits.class)
119 .register(new com.esotericsoftware.kryo.Serializer<DefaultDocumentTree>() {
120 @Override
121 public void write(Kryo kryo, Output output, DefaultDocumentTree object) {
122 kryo.writeObject(output, object.root);
123 }
124
125 @Override
126 @SuppressWarnings("unchecked")
127 public DefaultDocumentTree read(Kryo kryo, Input input, Class<DefaultDocumentTree> type) {
128 return new DefaultDocumentTree(versionCounter::incrementAndGet,
129 kryo.readObject(input, DefaultDocumentTreeNode.class));
130 }
131 }, DefaultDocumentTree.class)
132 .register(DefaultDocumentTreeNode.class)
133 .build());
134
135 private Map<Long, SessionListenCommits> listeners = new HashMap<>();
136 private AtomicLong versionCounter = new AtomicLong(0);
Jordan Haltermand0d80352017-08-10 15:08:27 -0700137 private DocumentTree<byte[]> docTree;
Sithara Punnassery61a80252017-08-07 11:16:08 -0700138 private Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
139 private Set<DocumentPath> preparedKeys = Sets.newHashSet();
Jordan Haltermand0d80352017-08-10 15:08:27 -0700140
141 public AtomixDocumentTreeService(Ordering ordering) {
142 this.docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet, ordering);
143 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700144
145 @Override
146 public void snapshot(SnapshotWriter writer) {
147 writer.writeLong(versionCounter.get());
148 writer.writeObject(listeners, serializer::encode);
149 writer.writeObject(docTree, serializer::encode);
Sithara Punnassery61a80252017-08-07 11:16:08 -0700150 writer.writeObject(preparedKeys, serializer::encode);
151 writer.writeObject(activeTransactions, serializer::encode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700152 }
153
154 @Override
155 public void install(SnapshotReader reader) {
156 versionCounter = new AtomicLong(reader.readLong());
157 listeners = reader.readObject(serializer::decode);
158 docTree = reader.readObject(serializer::decode);
Sithara Punnassery61a80252017-08-07 11:16:08 -0700159 preparedKeys = reader.readObject(serializer::decode);
160 activeTransactions = reader.readObject(serializer::decode);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700161 }
162
163 @Override
164 protected void configure(RaftServiceExecutor executor) {
165 // Listeners
166 executor.register(ADD_LISTENER, serializer::decode, this::listen);
167 executor.register(REMOVE_LISTENER, serializer::decode, this::unlisten);
168 // queries
169 executor.register(GET, serializer::decode, this::get, serializer::encode);
170 executor.register(GET_CHILDREN, serializer::decode, this::getChildren, serializer::encode);
171 // commands
172 executor.register(UPDATE, serializer::decode, this::update, serializer::encode);
173 executor.register(CLEAR, this::clear);
Sithara Punnassery61a80252017-08-07 11:16:08 -0700174 executor.register(BEGIN, serializer::decode, this::begin, serializer::encode);
175 executor.register(PREPARE, serializer::decode, this::prepare, serializer::encode);
176 executor.register(PREPARE_AND_COMMIT, serializer::decode, this::prepareAndCommit, serializer::encode);
177 executor.register(COMMIT, serializer::decode, this::commit, serializer::encode);
178 executor.register(ROLLBACK, serializer::decode, this::rollback, serializer::encode);
179 }
180
181 /**
182 * Returns a boolean indicating whether the given path is currently locked by a transaction.
183 *
184 * @param path the path to check
185 * @return whether the given path is locked by a running transaction
186 */
187 private boolean isLocked(DocumentPath path) {
188 return preparedKeys.contains(path);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700189 }
190
191 protected void listen(Commit<? extends Listen> commit) {
192 Long sessionId = commit.session().sessionId().id();
193 listeners.computeIfAbsent(sessionId, k -> new SessionListenCommits())
194 .add(new Listener(commit.session(), commit.value().path()));
195 }
196
197 protected void unlisten(Commit<? extends Unlisten> commit) {
198 Long sessionId = commit.session().sessionId().id();
199 SessionListenCommits listenCommits = listeners.get(sessionId);
200 if (listenCommits != null) {
201 listenCommits.remove(commit);
202 }
203 }
204
205 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
206 try {
207 Versioned<byte[]> value = docTree.get(commit.value().path());
Jordan Haltermane853d032017-08-01 15:10:28 -0700208 return value == null ? null : value.map(node -> node);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700209 } catch (IllegalStateException e) {
210 return null;
211 }
212 }
213
Jordan Haltermane853d032017-08-01 15:10:28 -0700214 protected DocumentTreeResult<Map<String, Versioned<byte[]>>> getChildren(Commit<? extends GetChildren> commit) {
215 try {
216 return DocumentTreeResult.ok(docTree.getChildren(commit.value().path()));
217 } catch (NoSuchDocumentPathException e) {
218 return DocumentTreeResult.invalidPath();
219 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700220 }
221
Jordan Haltermane853d032017-08-01 15:10:28 -0700222 protected DocumentTreeResult<Versioned<byte[]>> update(Commit<? extends Update> commit) {
223 DocumentTreeResult<Versioned<byte[]>> result = null;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700224 DocumentPath path = commit.value().path();
Sithara Punnassery61a80252017-08-07 11:16:08 -0700225
226 // If the path is locked by a transaction, return a WRITE_LOCK error.
227 if (isLocked(path)) {
228 return DocumentTreeResult.writeLock();
229 }
230
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700231 Versioned<byte[]> currentValue = docTree.get(path);
232 try {
233 Match<Long> versionMatch = commit.value().versionMatch();
234 Match<byte[]> valueMatch = commit.value().valueMatch();
235
236 if (versionMatch.matches(currentValue == null ? null : currentValue.version())
237 && valueMatch.matches(currentValue == null ? null : currentValue.value())) {
238 if (commit.value().value() == null) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700239 Versioned<byte[]> oldValue = docTree.removeNode(path);
240 result = new DocumentTreeResult<>(Status.OK, oldValue);
241 if (oldValue != null) {
242 notifyListeners(new DocumentTreeEvent<>(
243 path,
244 Type.DELETED,
245 Optional.empty(),
246 Optional.of(oldValue)));
247 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700248 } else {
Jordan Haltermane853d032017-08-01 15:10:28 -0700249 Versioned<byte[]> oldValue = docTree.set(path, commit.value().value().orElse(null));
250 Versioned<byte[]> newValue = docTree.get(path);
251 result = new DocumentTreeResult<>(Status.OK, newValue);
252 if (oldValue == null) {
253 notifyListeners(new DocumentTreeEvent<>(
254 path,
255 Type.CREATED,
256 Optional.of(newValue),
257 Optional.empty()));
258 } else {
259 notifyListeners(new DocumentTreeEvent<>(
260 path,
261 Type.UPDATED,
262 Optional.of(newValue),
263 Optional.of(oldValue)));
264 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700265 }
Jordan Haltermane853d032017-08-01 15:10:28 -0700266 } else {
267 result = new DocumentTreeResult<>(
268 commit.value().value() == null ? Status.INVALID_PATH : Status.NOOP,
269 currentValue);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700270 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700271 } catch (IllegalDocumentModificationException e) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700272 result = DocumentTreeResult.illegalModification();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700273 } catch (NoSuchDocumentPathException e) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700274 result = DocumentTreeResult.invalidPath();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700275 } catch (Exception e) {
276 getLogger().error("Failed to apply {} to state machine", commit.value(), e);
277 throw Throwables.propagate(e);
278 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700279 return result;
280 }
281
282 protected void clear(Commit<Void> commit) {
283 Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
284 Map<String, Versioned<byte[]>> topLevelChildren = docTree.getChildren(DocumentPath.from("root"));
285 toClearQueue.addAll(topLevelChildren.keySet()
286 .stream()
287 .map(name -> new DocumentPath(name, DocumentPath.from("root")))
288 .collect(Collectors.toList()));
289 while (!toClearQueue.isEmpty()) {
290 DocumentPath path = toClearQueue.remove();
291 Map<String, Versioned<byte[]>> children = docTree.getChildren(path);
292 if (children.size() == 0) {
293 docTree.removeNode(path);
294 } else {
295 children.keySet().forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
296 toClearQueue.add(path);
297 }
298 }
299 }
300
Sithara Punnassery61a80252017-08-07 11:16:08 -0700301 /**
302 * Handles a begin commit.
303 *
304 * @param commit transaction begin commit
305 * @return transaction state version
306 */
307 protected long begin(Commit<? extends TransactionBegin> commit) {
308 long version = commit.index();
309 activeTransactions.put(commit.value().transactionId(), new TransactionScope(version));
310 return version;
311 }
312
313 /**
314 * Handles an prepare commit.
315 *
316 * @param commit transaction prepare commit
317 * @return prepare result
318 */
319 protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
320 try {
321 TransactionLog<NodeUpdate<byte[]>> transactionLog = commit.value().transactionLog();
322 // Iterate through records in the transaction log and perform isolation checks.
323 for (NodeUpdate<byte[]> record : transactionLog.records()) {
324 DocumentPath path = record.path();
325
326 // If the prepared keys already contains the key contained within the record, that indicates a
327 // conflict with a concurrent transaction.
328 if (preparedKeys.contains(path)) {
329 return PrepareResult.CONCURRENT_TRANSACTION;
330 }
331
332 // Read the existing value from the map.
333 Versioned<byte[]> existingValue = docTree.get(path);
334
335 // If the update is an UPDATE_NODE or DELETE_NODE, verify that versions match.
336 switch (record.type()) {
337 case UPDATE_NODE:
338 case DELETE_NODE:
339 if (existingValue == null || existingValue.version() != record.version()) {
340 return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
341 }
342 default:
343 break;
344 }
345 }
346
347 // No violations detected. Mark modified keys locked for transactions.
348 transactionLog.records().forEach(record -> {
349 preparedKeys.add(record.path());
350 });
351
352 // Update the transaction scope. If the transaction scope is not set on this node, that indicates the
353 // coordinator is communicating with another node. Transactions assume that the client is communicating
354 // with a single leader in order to limit the overhead of retaining tombstones.
355 TransactionScope transactionScope = activeTransactions.get(transactionLog.transactionId());
356 if (transactionScope == null) {
357 activeTransactions.put(
358 transactionLog.transactionId(),
359 new TransactionScope(transactionLog.version(), commit.value().transactionLog()));
360 return PrepareResult.PARTIAL_FAILURE;
361 } else {
362 activeTransactions.put(
363 transactionLog.transactionId(),
364 transactionScope.prepared(commit));
365 return PrepareResult.OK;
366 }
367 } catch (Exception e) {
368 getLogger().warn("Failure applying {}", commit, e);
369 throw Throwables.propagate(e);
370 }
371 }
372
373 /**
374 * Handles an prepare and commit commit.
375 *
376 * @param commit transaction prepare and commit commit
377 * @return prepare result
378 */
379 protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
380 TransactionId transactionId = commit.value().transactionLog().transactionId();
381 PrepareResult prepareResult = prepare(commit);
382 TransactionScope transactionScope = activeTransactions.remove(transactionId);
383 if (prepareResult == PrepareResult.OK) {
384 transactionScope = transactionScope.prepared(commit);
385 commitTransaction(transactionScope);
386 }
387 return prepareResult;
388 }
389
390 /**
391 * Applies committed operations to the state machine.
392 */
393 private CommitResult commitTransaction(TransactionScope transactionScope) {
394 TransactionLog<NodeUpdate<byte[]>> transactionLog = transactionScope.transactionLog();
395
396 List<DocumentTreeEvent<byte[]>> eventsToPublish = Lists.newArrayList();
397 DocumentTreeEvent<byte[]> start = new DocumentTreeEvent<>(
398 DocumentPath.from(transactionScope.transactionLog().transactionId().toString()),
399 Type.TRANSACTION_START,
400 Optional.empty(),
401 Optional.empty());
402 eventsToPublish.add(start);
403
404 for (NodeUpdate<byte[]> record : transactionLog.records()) {
405 DocumentPath path = record.path();
406 checkState(preparedKeys.remove(path), "path is not prepared");
407
408 Versioned<byte[]> previousValue = null;
409 try {
410 previousValue = docTree.removeNode(path);
411 } catch (NoSuchDocumentPathException e) {
412 getLogger().info("Value is being inserted first time");
413 }
414
415 if (record.value() != null) {
416 if (docTree.create(path, record.value())) {
417 Versioned<byte[]> newValue = docTree.get(path);
418 eventsToPublish.add(new DocumentTreeEvent<>(
419 path,
420 Optional.ofNullable(newValue),
421 Optional.ofNullable(previousValue)));
422 }
423 } else if (previousValue != null) {
424 eventsToPublish.add(new DocumentTreeEvent<>(
425 path,
426 Optional.empty(),
427 Optional.of(previousValue)));
428 }
429 }
430
431 DocumentTreeEvent<byte[]> end = new DocumentTreeEvent<byte[]>(
432 DocumentPath.from(transactionScope.transactionLog().transactionId().toString()),
433 Type.TRANSACTION_END,
434 Optional.empty(),
435 Optional.empty());
436 eventsToPublish.add(end);
437 publish(eventsToPublish);
438
439 return CommitResult.OK;
440 }
441
442 /**
443 * Handles an commit commit (ha!).
444 *
445 * @param commit transaction commit commit
446 * @return commit result
447 */
448 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
449 TransactionId transactionId = commit.value().transactionId();
450 TransactionScope transactionScope = activeTransactions.remove(transactionId);
451 if (transactionScope == null) {
452 return CommitResult.UNKNOWN_TRANSACTION_ID;
453 }
454 try {
455 return commitTransaction(transactionScope);
456 } catch (Exception e) {
457 getLogger().warn("Failure applying {}", commit, e);
458 throw Throwables.propagate(e);
459 }
460 }
461
462 /**
463 * Handles an rollback commit (ha!).
464 *
465 * @param commit transaction rollback commit
466 * @return rollback result
467 */
468 protected RollbackResult rollback(Commit<? extends TransactionRollback> commit) {
469 TransactionId transactionId = commit.value().transactionId();
470 TransactionScope transactionScope = activeTransactions.remove(transactionId);
471 if (transactionScope == null) {
472 return RollbackResult.UNKNOWN_TRANSACTION_ID;
473 } else if (!transactionScope.isPrepared()) {
474 return RollbackResult.OK;
475 } else {
476 transactionScope.transactionLog().records()
477 .forEach(record -> {
478 preparedKeys.remove(record.path());
479 });
480 return RollbackResult.OK;
481 }
482
483 }
484
485 /**
486 * Map transaction scope.
487 */
488 private static final class TransactionScope {
489 private final long version;
490 private final TransactionLog<NodeUpdate<byte[]>> transactionLog;
491
492 private TransactionScope(long version) {
493 this(version, null);
494 }
495
496 private TransactionScope(long version, TransactionLog<NodeUpdate<byte[]>> transactionLog) {
497 this.version = version;
498 this.transactionLog = transactionLog;
499 }
500
501 /**
502 * Returns the transaction version.
503 *
504 * @return the transaction version
505 */
506 long version() {
507 return version;
508 }
509
510 /**
511 * Returns whether this is a prepared transaction scope.
512 *
513 * @return whether this is a prepared transaction scope
514 */
515 boolean isPrepared() {
516 return transactionLog != null;
517 }
518
519 /**
520 * Returns the transaction commit log.
521 *
522 * @return the transaction commit log
523 */
524 TransactionLog<NodeUpdate<byte[]>> transactionLog() {
525 checkState(isPrepared());
526 return transactionLog;
527 }
528
529 /**
530 * Returns a new transaction scope with a prepare commit.
531 *
532 * @param commit the prepare commit
533 * @return new transaction scope updated with the prepare commit
534 */
535 TransactionScope prepared(Commit<? extends TransactionPrepare> commit) {
536 return new TransactionScope(version, commit.value().transactionLog());
537 }
538 }
539
540 private void publish(List<DocumentTreeEvent<byte[]>> events) {
541 listeners.values().forEach(session -> {
542 session.publish(CHANGE, events);
543 });
544 }
545
Jordan Haltermane853d032017-08-01 15:10:28 -0700546 private void notifyListeners(DocumentTreeEvent<byte[]> event) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700547 listeners.values()
548 .stream()
549 .filter(l -> event.path().isDescendentOf(l.leastCommonAncestorPath()))
550 .forEach(listener -> listener.publish(CHANGE, Arrays.asList(event)));
551 }
552
553 @Override
554 public void onExpire(RaftSession session) {
555 closeListener(session.sessionId().id());
556 }
557
558 @Override
559 public void onClose(RaftSession session) {
560 closeListener(session.sessionId().id());
561 }
562
563 private void closeListener(Long sessionId) {
564 listeners.remove(sessionId);
565 }
566
567 private class SessionListenCommits {
568 private final List<Listener> listeners = Lists.newArrayList();
569 private DocumentPath leastCommonAncestorPath;
570
571 public void add(Listener listener) {
572 listeners.add(listener);
573 recomputeLeastCommonAncestor();
574 }
575
576 public void remove(Commit<? extends Unlisten> commit) {
577 // Remove the first listen commit with path matching path in unlisten commit
578 Iterator<Listener> iterator = listeners.iterator();
579 while (iterator.hasNext()) {
580 Listener listener = iterator.next();
581 if (listener.path().equals(commit.value().path())) {
582 iterator.remove();
583 }
584 }
585 recomputeLeastCommonAncestor();
586 }
587
588 public DocumentPath leastCommonAncestorPath() {
589 return leastCommonAncestorPath;
590 }
591
592 public <M> void publish(EventType topic, M message) {
593 listeners.stream().findAny().ifPresent(listener ->
594 listener.session().publish(topic, serializer::encode, message));
595 }
596
597 private void recomputeLeastCommonAncestor() {
598 this.leastCommonAncestorPath = DocumentPath.leastCommonAncestor(listeners.stream()
599 .map(Listener::path)
600 .collect(Collectors.toList()));
601 }
602 }
603
604 private static class Listener {
605 private final RaftSession session;
606 private final DocumentPath path;
607
608 public Listener(RaftSession session, DocumentPath path) {
609 this.session = session;
610 this.path = path;
611 }
612
613 public DocumentPath path() {
614 return path;
615 }
616
617 public RaftSession session() {
618 return session;
619 }
620 }
621}