blob: fc6e58d0328d6f8daa20b29ac947905877f86ce8 [file] [log] [blame]
Madan Jampanibff6d8f2015-03-31 16:53:47 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.consistent.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
Ayaka Koshibee114f042015-05-01 11:43:00 -070019
Madan Jampani50589ac2015-06-08 11:38:46 -070020import java.util.Arrays;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070021import java.util.Collection;
22import java.util.concurrent.CompletableFuture;
23import java.util.stream.Collectors;
24
Madan Jampanibff6d8f2015-03-31 16:53:47 -070025import org.onlab.util.KryoNamespace;
26import org.onosproject.store.serializers.KryoNamespaces;
27import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampani50589ac2015-06-08 11:38:46 -070028import org.onosproject.store.service.ConsistentMapBuilder;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070029import org.onosproject.store.service.DatabaseUpdate;
30import org.onosproject.store.service.Serializer;
31import org.onosproject.store.service.Transaction;
32import org.onosproject.store.service.Versioned;
33import org.onosproject.store.service.Transaction.State;
34
Madan Jampanibab51a42015-08-10 13:53:35 -070035import com.google.common.collect.ImmutableList;
36
Madan Jampanibff6d8f2015-03-31 16:53:47 -070037/**
38 * Agent that runs the two phase commit protocol.
39 */
40public class TransactionManager {
41
Madan Jampanib5d72d52015-04-03 16:53:50 -070042 private static final KryoNamespace KRYO_NAMESPACE = KryoNamespace.newBuilder()
43 .register(KryoNamespaces.BASIC)
44 .nextId(KryoNamespace.FLOATING_ID)
45 .register(Versioned.class)
46 .register(DatabaseUpdate.class)
47 .register(DatabaseUpdate.Type.class)
48 .register(DefaultTransaction.class)
49 .register(Transaction.State.class)
Madan Jampanib5d72d52015-04-03 16:53:50 -070050 .build();
51
Madan Jampani50589ac2015-06-08 11:38:46 -070052 private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE));
Madan Jampanibff6d8f2015-03-31 16:53:47 -070053 private final Database database;
54 private final AsyncConsistentMap<Long, Transaction> transactions;
55
Madan Jampanibff6d8f2015-03-31 16:53:47 -070056 /**
57 * Constructs a new TransactionManager for the specified database instance.
58 *
59 * @param database database
Madan Jampani0dbac7a2015-06-25 10:37:45 -070060 * @param mapBuilder builder for ConsistentMap instances
Madan Jampanibff6d8f2015-03-31 16:53:47 -070061 */
Madan Jampani50589ac2015-06-08 11:38:46 -070062 public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -070063 this.database = checkNotNull(database, "database cannot be null");
Madan Jampani50589ac2015-06-08 11:38:46 -070064 this.transactions = mapBuilder.withName("onos-transactions")
65 .withSerializer(serializer)
66 .buildAsyncMap();
Madan Jampanibff6d8f2015-03-31 16:53:47 -070067 }
68
69 /**
70 * Executes the specified transaction by employing a two phase commit protocol.
71 *
72 * @param transaction transaction to commit
73 * @return transaction result. Result value true indicates a successful commit, false
74 * indicates abort
75 */
Madan Jampanibab51a42015-08-10 13:53:35 -070076 public CompletableFuture<CommitResponse> execute(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -070077 // clean up if this transaction in already in a terminal state.
78 if (transaction.state() == Transaction.State.COMMITTED ||
79 transaction.state() == Transaction.State.ROLLEDBACK) {
Madan Jampanibab51a42015-08-10 13:53:35 -070080 return transactions.remove(transaction.id()).thenApply(v -> CommitResponse.success(ImmutableList.of()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -070081 } else if (transaction.state() == Transaction.State.COMMITTING) {
82 return commit(transaction);
83 } else if (transaction.state() == Transaction.State.ROLLINGBACK) {
Madan Jampanibab51a42015-08-10 13:53:35 -070084 return rollback(transaction).thenApply(v -> CommitResponse.success(ImmutableList.of()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -070085 } else {
86 return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
87 }
88 }
89
90
91 /**
92 * Returns all transactions in the system.
93 *
94 * @return future for a collection of transactions
95 */
96 public CompletableFuture<Collection<Transaction>> getTransactions() {
97 return transactions.values().thenApply(c -> {
98 Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList());
99 return txns;
100 });
101 }
102
103 private CompletableFuture<Boolean> prepare(Transaction transaction) {
104 return transactions.put(transaction.id(), transaction)
105 .thenCompose(v -> database.prepare(transaction))
106 .thenCompose(status -> transactions.put(
107 transaction.id(),
108 transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK))
109 .thenApply(v -> status));
110 }
111
Madan Jampanibab51a42015-08-10 13:53:35 -0700112 private CompletableFuture<CommitResponse> commit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700113 return database.commit(transaction)
Madan Jampanibab51a42015-08-10 13:53:35 -0700114 .whenComplete((r, e) -> transactions.put(
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700115 transaction.id(),
Madan Jampanibab51a42015-08-10 13:53:35 -0700116 transaction.transition(Transaction.State.COMMITTED)));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700117 }
118
Madan Jampanibab51a42015-08-10 13:53:35 -0700119 private CompletableFuture<CommitResponse> rollback(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700120 return database.rollback(transaction)
121 .thenCompose(v -> transactions.put(
122 transaction.id(),
123 transaction.transition(Transaction.State.ROLLEDBACK)))
Madan Jampanibab51a42015-08-10 13:53:35 -0700124 .thenApply(v -> CommitResponse.failure());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700125 }
Madan Jampani02b7fb82015-05-01 13:01:20 -0700126}