blob: 23d43991c50b10f099e50f13fc59e54102cd62f0 [file] [log] [blame]
Madan Jampani7e55c662016-02-15 21:13:53 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani7e55c662016-02-15 21:13:53 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.Set;
19import java.util.concurrent.CompletableFuture;
20import java.util.stream.Collectors;
21
Jordan Halterman948d6592017-04-20 17:18:24 -070022import com.google.common.collect.Sets;
Madan Jampani7e55c662016-02-15 21:13:53 -080023import org.onlab.util.Tools;
24import org.onosproject.store.primitives.TransactionId;
Madan Jampani3780d4b2016-04-04 18:18:24 -070025import org.onosproject.store.service.CommitStatus;
Jordan Halterman948d6592017-04-20 17:18:24 -070026import org.onosproject.store.service.Serializer;
27import org.onosproject.store.service.TransactionalMap;
Jordan Halterman03b83182017-05-09 12:11:22 -070028import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
Jordan Halterman948d6592017-04-20 17:18:24 -070030
31import static com.google.common.base.MoreObjects.toStringHelper;
Madan Jampani7e55c662016-02-15 21:13:53 -080032
33/**
Jordan Halterman948d6592017-04-20 17:18:24 -070034 * Transaction coordinator.
Madan Jampani7e55c662016-02-15 21:13:53 -080035 */
36public class TransactionCoordinator {
Jordan Halterman03b83182017-05-09 12:11:22 -070037 private final Logger log = LoggerFactory.getLogger(getClass());
Jordan Halterman948d6592017-04-20 17:18:24 -070038 protected final TransactionId transactionId;
39 protected final TransactionManager transactionManager;
40 protected final Set<TransactionParticipant> transactionParticipants = Sets.newConcurrentHashSet();
Madan Jampani7e55c662016-02-15 21:13:53 -080041
Jordan Halterman948d6592017-04-20 17:18:24 -070042 public TransactionCoordinator(TransactionId transactionId, TransactionManager transactionManager) {
43 this.transactionId = transactionId;
44 this.transactionManager = transactionManager;
Madan Jampani7e55c662016-02-15 21:13:53 -080045 }
46
47 /**
Jordan Halterman948d6592017-04-20 17:18:24 -070048 * Returns a transactional map for this transaction.
Thomas Vachuska708d3032016-02-18 11:11:46 -080049 *
Jordan Halterman948d6592017-04-20 17:18:24 -070050 * @param name the transactional map name
51 * @param serializer the serializer
52 * @param <K> key type
53 * @param <V> value type
54 * @return a transactional map for this transaction
Madan Jampani7e55c662016-02-15 21:13:53 -080055 */
Jordan Halterman948d6592017-04-20 17:18:24 -070056 public <K, V> TransactionalMap<K, V> getTransactionalMap(String name, Serializer serializer) {
57 PartitionedTransactionalMap<K, V> map = transactionManager.getTransactionalMap(name, serializer, this);
58 transactionParticipants.addAll(map.participants());
59 return map;
60 }
Madan Jampani7e55c662016-02-15 21:13:53 -080061
Jordan Halterman948d6592017-04-20 17:18:24 -070062 /**
63 * Commits the transaction.
64 *
65 * @return the transaction commit status
66 */
67 public CompletableFuture<CommitStatus> commit() {
68 long totalParticipants = transactionParticipants.stream()
69 .filter(TransactionParticipant::hasPendingUpdates)
70 .count();
71
72 if (totalParticipants == 0) {
Jordan Halterman03b83182017-05-09 12:11:22 -070073 log.debug("No transaction participants, skipping commit", totalParticipants);
Madan Jampani542d9e22016-04-05 15:39:55 -070074 return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
Jordan Halterman948d6592017-04-20 17:18:24 -070075 } else if (totalParticipants == 1) {
Jordan Halterman03b83182017-05-09 12:11:22 -070076 log.debug("Committing transaction {} via 1 participant", transactionId);
Madan Jampani542d9e22016-04-05 15:39:55 -070077 return transactionParticipants.stream()
Jordan Halterman948d6592017-04-20 17:18:24 -070078 .filter(TransactionParticipant::hasPendingUpdates)
79 .findFirst()
80 .get()
81 .prepareAndCommit()
82 .thenApply(v -> v ? CommitStatus.SUCCESS : CommitStatus.FAILURE);
Madan Jampani542d9e22016-04-05 15:39:55 -070083 } else {
Jordan Halterman03b83182017-05-09 12:11:22 -070084 log.debug("Committing transaction {} via {} participants", transactionId, totalParticipants);
Jordan Halterman948d6592017-04-20 17:18:24 -070085 Set<TransactionParticipant> transactionParticipants = this.transactionParticipants.stream()
86 .filter(TransactionParticipant::hasPendingUpdates)
87 .collect(Collectors.toSet());
88
89 CompletableFuture<CommitStatus> status = transactionManager.updateState(
90 transactionId, Transaction.State.PREPARING)
91 .thenCompose(v -> prepare(transactionParticipants))
Madan Jampani7e55c662016-02-15 21:13:53 -080092 .thenCompose(result -> result
Jordan Halterman948d6592017-04-20 17:18:24 -070093 ? transactionManager.updateState(transactionId, Transaction.State.COMMITTING)
94 .thenCompose(v -> commit(transactionParticipants))
95 .thenApply(v -> CommitStatus.SUCCESS)
96 : transactionManager.updateState(transactionId, Transaction.State.ROLLING_BACK)
97 .thenCompose(v -> rollback(transactionParticipants))
98 .thenApply(v -> CommitStatus.FAILURE));
99 return status.thenCompose(v -> transactionManager.remove(transactionId).thenApply(u -> v));
Madan Jampani542d9e22016-04-05 15:39:55 -0700100 }
Madan Jampani7e55c662016-02-15 21:13:53 -0800101 }
102
Jordan Halterman948d6592017-04-20 17:18:24 -0700103 /**
104 * Performs the prepare phase of the two-phase commit protocol for the given transaction participants.
105 *
106 * @param transactionParticipants the transaction participants for which to prepare the transaction
107 * @return a completable future indicating whether <em>all</em> prepares succeeded
108 */
109 protected CompletableFuture<Boolean> prepare(Set<TransactionParticipant> transactionParticipants) {
Jordan Halterman03b83182017-05-09 12:11:22 -0700110 log.trace("Preparing transaction {} via {}", transactionId, transactionParticipants);
Madan Jampani3780d4b2016-04-04 18:18:24 -0700111 return Tools.allOf(transactionParticipants.stream()
Jordan Halterman948d6592017-04-20 17:18:24 -0700112 .map(TransactionParticipant::prepare)
113 .collect(Collectors.toList()))
114 .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
Madan Jampani7e55c662016-02-15 21:13:53 -0800115 }
116
Jordan Halterman948d6592017-04-20 17:18:24 -0700117 /**
118 * Performs the commit phase of the two-phase commit protocol for the given transaction participants.
119 *
120 * @param transactionParticipants the transaction participants for which to commit the transaction
121 * @return a completable future to be completed once the commits are complete
122 */
123 protected CompletableFuture<Void> commit(Set<TransactionParticipant> transactionParticipants) {
Jordan Halterman03b83182017-05-09 12:11:22 -0700124 log.trace("Committing transaction {} via {}", transactionId, transactionParticipants);
Madan Jampani7e55c662016-02-15 21:13:53 -0800125 return CompletableFuture.allOf(transactionParticipants.stream()
Jordan Halterman948d6592017-04-20 17:18:24 -0700126 .map(TransactionParticipant::commit)
127 .toArray(CompletableFuture[]::new));
Madan Jampani7e55c662016-02-15 21:13:53 -0800128 }
129
Jordan Halterman948d6592017-04-20 17:18:24 -0700130 /**
131 * Rolls back transactions for the given participants.
132 *
133 * @param transactionParticipants the transaction participants for which to roll back the transaction
134 * @return a completable future to be completed once the rollbacks are complete
135 */
136 protected CompletableFuture<Void> rollback(Set<TransactionParticipant> transactionParticipants) {
Jordan Halterman03b83182017-05-09 12:11:22 -0700137 log.trace("Rolling back transaction {} via {}", transactionId, transactionParticipants);
Madan Jampani7e55c662016-02-15 21:13:53 -0800138 return CompletableFuture.allOf(transactionParticipants.stream()
Jordan Halterman948d6592017-04-20 17:18:24 -0700139 .map(TransactionParticipant::rollback)
140 .toArray(CompletableFuture[]::new));
141 }
142
143 @Override
144 public String toString() {
145 return toStringHelper(this)
146 .add("transactionId", transactionId)
147 .add("participants", transactionParticipants)
148 .toString();
Madan Jampani7e55c662016-02-15 21:13:53 -0800149 }
150}