blob: 98262663dd79d662c17fcb1db40e4de5df8261a6 [file] [log] [blame]
Madan Jampani7e55c662016-02-15 21:13:53 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani7e55c662016-02-15 21:13:53 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.Set;
19import java.util.concurrent.CompletableFuture;
20import java.util.stream.Collectors;
21
Jordan Halterman948d6592017-04-20 17:18:24 -070022import com.google.common.collect.Sets;
Madan Jampani7e55c662016-02-15 21:13:53 -080023import org.onlab.util.Tools;
24import org.onosproject.store.primitives.TransactionId;
Madan Jampani3780d4b2016-04-04 18:18:24 -070025import org.onosproject.store.service.CommitStatus;
Jordan Halterman948d6592017-04-20 17:18:24 -070026import org.onosproject.store.service.Serializer;
27import org.onosproject.store.service.TransactionalMap;
28
29import static com.google.common.base.MoreObjects.toStringHelper;
Madan Jampani7e55c662016-02-15 21:13:53 -080030
31/**
Jordan Halterman948d6592017-04-20 17:18:24 -070032 * Transaction coordinator.
Madan Jampani7e55c662016-02-15 21:13:53 -080033 */
34public class TransactionCoordinator {
Jordan Halterman948d6592017-04-20 17:18:24 -070035 protected final TransactionId transactionId;
36 protected final TransactionManager transactionManager;
37 protected final Set<TransactionParticipant> transactionParticipants = Sets.newConcurrentHashSet();
Madan Jampani7e55c662016-02-15 21:13:53 -080038
Jordan Halterman948d6592017-04-20 17:18:24 -070039 public TransactionCoordinator(TransactionId transactionId, TransactionManager transactionManager) {
40 this.transactionId = transactionId;
41 this.transactionManager = transactionManager;
Madan Jampani7e55c662016-02-15 21:13:53 -080042 }
43
44 /**
Jordan Halterman948d6592017-04-20 17:18:24 -070045 * Returns a transactional map for this transaction.
Thomas Vachuska708d3032016-02-18 11:11:46 -080046 *
Jordan Halterman948d6592017-04-20 17:18:24 -070047 * @param name the transactional map name
48 * @param serializer the serializer
49 * @param <K> key type
50 * @param <V> value type
51 * @return a transactional map for this transaction
Madan Jampani7e55c662016-02-15 21:13:53 -080052 */
Jordan Halterman948d6592017-04-20 17:18:24 -070053 public <K, V> TransactionalMap<K, V> getTransactionalMap(String name, Serializer serializer) {
54 PartitionedTransactionalMap<K, V> map = transactionManager.getTransactionalMap(name, serializer, this);
55 transactionParticipants.addAll(map.participants());
56 return map;
57 }
Madan Jampani7e55c662016-02-15 21:13:53 -080058
Jordan Halterman948d6592017-04-20 17:18:24 -070059 /**
60 * Commits the transaction.
61 *
62 * @return the transaction commit status
63 */
64 public CompletableFuture<CommitStatus> commit() {
65 long totalParticipants = transactionParticipants.stream()
66 .filter(TransactionParticipant::hasPendingUpdates)
67 .count();
68
69 if (totalParticipants == 0) {
Madan Jampani542d9e22016-04-05 15:39:55 -070070 return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
Jordan Halterman948d6592017-04-20 17:18:24 -070071 } else if (totalParticipants == 1) {
Madan Jampani542d9e22016-04-05 15:39:55 -070072 return transactionParticipants.stream()
Jordan Halterman948d6592017-04-20 17:18:24 -070073 .filter(TransactionParticipant::hasPendingUpdates)
74 .findFirst()
75 .get()
76 .prepareAndCommit()
77 .thenApply(v -> v ? CommitStatus.SUCCESS : CommitStatus.FAILURE);
Madan Jampani542d9e22016-04-05 15:39:55 -070078 } else {
Jordan Halterman948d6592017-04-20 17:18:24 -070079 Set<TransactionParticipant> transactionParticipants = this.transactionParticipants.stream()
80 .filter(TransactionParticipant::hasPendingUpdates)
81 .collect(Collectors.toSet());
82
83 CompletableFuture<CommitStatus> status = transactionManager.updateState(
84 transactionId, Transaction.State.PREPARING)
85 .thenCompose(v -> prepare(transactionParticipants))
Madan Jampani7e55c662016-02-15 21:13:53 -080086 .thenCompose(result -> result
Jordan Halterman948d6592017-04-20 17:18:24 -070087 ? transactionManager.updateState(transactionId, Transaction.State.COMMITTING)
88 .thenCompose(v -> commit(transactionParticipants))
89 .thenApply(v -> CommitStatus.SUCCESS)
90 : transactionManager.updateState(transactionId, Transaction.State.ROLLING_BACK)
91 .thenCompose(v -> rollback(transactionParticipants))
92 .thenApply(v -> CommitStatus.FAILURE));
93 return status.thenCompose(v -> transactionManager.remove(transactionId).thenApply(u -> v));
Madan Jampani542d9e22016-04-05 15:39:55 -070094 }
Madan Jampani7e55c662016-02-15 21:13:53 -080095 }
96
Jordan Halterman948d6592017-04-20 17:18:24 -070097 /**
98 * Performs the prepare phase of the two-phase commit protocol for the given transaction participants.
99 *
100 * @param transactionParticipants the transaction participants for which to prepare the transaction
101 * @return a completable future indicating whether <em>all</em> prepares succeeded
102 */
103 protected CompletableFuture<Boolean> prepare(Set<TransactionParticipant> transactionParticipants) {
Madan Jampani3780d4b2016-04-04 18:18:24 -0700104 return Tools.allOf(transactionParticipants.stream()
Jordan Halterman948d6592017-04-20 17:18:24 -0700105 .map(TransactionParticipant::prepare)
106 .collect(Collectors.toList()))
107 .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
Madan Jampani7e55c662016-02-15 21:13:53 -0800108 }
109
Jordan Halterman948d6592017-04-20 17:18:24 -0700110 /**
111 * Performs the commit phase of the two-phase commit protocol for the given transaction participants.
112 *
113 * @param transactionParticipants the transaction participants for which to commit the transaction
114 * @return a completable future to be completed once the commits are complete
115 */
116 protected CompletableFuture<Void> commit(Set<TransactionParticipant> transactionParticipants) {
Madan Jampani7e55c662016-02-15 21:13:53 -0800117 return CompletableFuture.allOf(transactionParticipants.stream()
Jordan Halterman948d6592017-04-20 17:18:24 -0700118 .map(TransactionParticipant::commit)
119 .toArray(CompletableFuture[]::new));
Madan Jampani7e55c662016-02-15 21:13:53 -0800120 }
121
Jordan Halterman948d6592017-04-20 17:18:24 -0700122 /**
123 * Rolls back transactions for the given participants.
124 *
125 * @param transactionParticipants the transaction participants for which to roll back the transaction
126 * @return a completable future to be completed once the rollbacks are complete
127 */
128 protected CompletableFuture<Void> rollback(Set<TransactionParticipant> transactionParticipants) {
Madan Jampani7e55c662016-02-15 21:13:53 -0800129 return CompletableFuture.allOf(transactionParticipants.stream()
Jordan Halterman948d6592017-04-20 17:18:24 -0700130 .map(TransactionParticipant::rollback)
131 .toArray(CompletableFuture[]::new));
132 }
133
134 @Override
135 public String toString() {
136 return toStringHelper(this)
137 .add("transactionId", transactionId)
138 .add("participants", transactionParticipants)
139 .toString();
Madan Jampani7e55c662016-02-15 21:13:53 -0800140 }
141}