blob: d8e993058222506221f81200b4c8334646bc726a [file] [log] [blame]
Madan Jampanibff6d8f2015-03-31 16:53:47 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.consistent.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
Ayaka Koshibee114f042015-05-01 11:43:00 -070019
Madan Jampanibff6d8f2015-03-31 16:53:47 -070020import java.util.Collection;
21import java.util.concurrent.CompletableFuture;
22import java.util.stream.Collectors;
23
24import org.apache.commons.lang3.tuple.ImmutablePair;
25import org.apache.commons.lang3.tuple.Pair;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.store.serializers.KryoNamespaces;
28import org.onosproject.store.service.AsyncConsistentMap;
29import org.onosproject.store.service.DatabaseUpdate;
30import org.onosproject.store.service.Serializer;
31import org.onosproject.store.service.Transaction;
32import org.onosproject.store.service.Versioned;
33import org.onosproject.store.service.Transaction.State;
34
35/**
36 * Agent that runs the two phase commit protocol.
37 */
38public class TransactionManager {
39
Madan Jampanib5d72d52015-04-03 16:53:50 -070040 private static final KryoNamespace KRYO_NAMESPACE = KryoNamespace.newBuilder()
41 .register(KryoNamespaces.BASIC)
42 .nextId(KryoNamespace.FLOATING_ID)
43 .register(Versioned.class)
44 .register(DatabaseUpdate.class)
45 .register(DatabaseUpdate.Type.class)
46 .register(DefaultTransaction.class)
47 .register(Transaction.State.class)
48 .register(Pair.class)
49 .register(ImmutablePair.class)
50 .build();
51
52 private final Serializer serializer = Serializer.using(KRYO_NAMESPACE);
Madan Jampanibff6d8f2015-03-31 16:53:47 -070053 private final Database database;
54 private final AsyncConsistentMap<Long, Transaction> transactions;
55
Madan Jampanibff6d8f2015-03-31 16:53:47 -070056 /**
57 * Constructs a new TransactionManager for the specified database instance.
58 *
59 * @param database database
60 */
61 public TransactionManager(Database database) {
62 this.database = checkNotNull(database, "database cannot be null");
Madan Jampani02b7fb82015-05-01 13:01:20 -070063 this.transactions = new DefaultAsyncConsistentMap<>("onos-transactions", this.database, serializer, false);
Madan Jampanibff6d8f2015-03-31 16:53:47 -070064 }
65
66 /**
67 * Executes the specified transaction by employing a two phase commit protocol.
68 *
69 * @param transaction transaction to commit
70 * @return transaction result. Result value true indicates a successful commit, false
71 * indicates abort
72 */
73 public CompletableFuture<Boolean> execute(Transaction transaction) {
74 // clean up if this transaction in already in a terminal state.
75 if (transaction.state() == Transaction.State.COMMITTED ||
76 transaction.state() == Transaction.State.ROLLEDBACK) {
77 return transactions.remove(transaction.id()).thenApply(v -> true);
78 } else if (transaction.state() == Transaction.State.COMMITTING) {
79 return commit(transaction);
80 } else if (transaction.state() == Transaction.State.ROLLINGBACK) {
81 return rollback(transaction);
82 } else {
83 return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
84 }
85 }
86
87
88 /**
89 * Returns all transactions in the system.
90 *
91 * @return future for a collection of transactions
92 */
93 public CompletableFuture<Collection<Transaction>> getTransactions() {
94 return transactions.values().thenApply(c -> {
95 Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList());
96 return txns;
97 });
98 }
99
100 private CompletableFuture<Boolean> prepare(Transaction transaction) {
101 return transactions.put(transaction.id(), transaction)
102 .thenCompose(v -> database.prepare(transaction))
103 .thenCompose(status -> transactions.put(
104 transaction.id(),
105 transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK))
106 .thenApply(v -> status));
107 }
108
109 private CompletableFuture<Boolean> commit(Transaction transaction) {
110 return database.commit(transaction)
111 .thenCompose(v -> transactions.put(
112 transaction.id(),
113 transaction.transition(Transaction.State.COMMITTED)))
114 .thenApply(v -> true);
115 }
116
117 private CompletableFuture<Boolean> rollback(Transaction transaction) {
118 return database.rollback(transaction)
119 .thenCompose(v -> transactions.put(
120 transaction.id(),
121 transaction.transition(Transaction.State.ROLLEDBACK)))
122 .thenApply(v -> true);
123 }
Madan Jampani02b7fb82015-05-01 13:01:20 -0700124}