blob: db39a46881275abd06e070f1fb1c0d6a78e3c7fb [file] [log] [blame]
Madan Jampanibff6d8f2015-03-31 16:53:47 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.consistent.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
Ayaka Koshibee114f042015-05-01 11:43:00 -070019
Madan Jampani50589ac2015-06-08 11:38:46 -070020import java.util.Arrays;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070021import java.util.Collection;
22import java.util.concurrent.CompletableFuture;
23import java.util.stream.Collectors;
24
Madan Jampanibff6d8f2015-03-31 16:53:47 -070025import org.onlab.util.KryoNamespace;
26import org.onosproject.store.serializers.KryoNamespaces;
27import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampani50589ac2015-06-08 11:38:46 -070028import org.onosproject.store.service.ConsistentMapBuilder;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070029import org.onosproject.store.service.DatabaseUpdate;
30import org.onosproject.store.service.Serializer;
31import org.onosproject.store.service.Transaction;
32import org.onosproject.store.service.Versioned;
33import org.onosproject.store.service.Transaction.State;
34
35/**
36 * Agent that runs the two phase commit protocol.
37 */
38public class TransactionManager {
39
Madan Jampanib5d72d52015-04-03 16:53:50 -070040 private static final KryoNamespace KRYO_NAMESPACE = KryoNamespace.newBuilder()
41 .register(KryoNamespaces.BASIC)
42 .nextId(KryoNamespace.FLOATING_ID)
43 .register(Versioned.class)
44 .register(DatabaseUpdate.class)
45 .register(DatabaseUpdate.Type.class)
46 .register(DefaultTransaction.class)
47 .register(Transaction.State.class)
Madan Jampanib5d72d52015-04-03 16:53:50 -070048 .build();
49
Madan Jampani50589ac2015-06-08 11:38:46 -070050 private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE));
Madan Jampanibff6d8f2015-03-31 16:53:47 -070051 private final Database database;
52 private final AsyncConsistentMap<Long, Transaction> transactions;
53
Madan Jampanibff6d8f2015-03-31 16:53:47 -070054 /**
55 * Constructs a new TransactionManager for the specified database instance.
56 *
57 * @param database database
Madan Jampani0dbac7a2015-06-25 10:37:45 -070058 * @param mapBuilder builder for ConsistentMap instances
Madan Jampanibff6d8f2015-03-31 16:53:47 -070059 */
Madan Jampani50589ac2015-06-08 11:38:46 -070060 public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -070061 this.database = checkNotNull(database, "database cannot be null");
Madan Jampani50589ac2015-06-08 11:38:46 -070062 this.transactions = mapBuilder.withName("onos-transactions")
63 .withSerializer(serializer)
64 .buildAsyncMap();
Madan Jampanibff6d8f2015-03-31 16:53:47 -070065 }
66
67 /**
68 * Executes the specified transaction by employing a two phase commit protocol.
69 *
70 * @param transaction transaction to commit
71 * @return transaction result. Result value true indicates a successful commit, false
72 * indicates abort
73 */
74 public CompletableFuture<Boolean> execute(Transaction transaction) {
75 // clean up if this transaction in already in a terminal state.
76 if (transaction.state() == Transaction.State.COMMITTED ||
77 transaction.state() == Transaction.State.ROLLEDBACK) {
78 return transactions.remove(transaction.id()).thenApply(v -> true);
79 } else if (transaction.state() == Transaction.State.COMMITTING) {
80 return commit(transaction);
81 } else if (transaction.state() == Transaction.State.ROLLINGBACK) {
82 return rollback(transaction);
83 } else {
84 return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
85 }
86 }
87
88
89 /**
90 * Returns all transactions in the system.
91 *
92 * @return future for a collection of transactions
93 */
94 public CompletableFuture<Collection<Transaction>> getTransactions() {
95 return transactions.values().thenApply(c -> {
96 Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList());
97 return txns;
98 });
99 }
100
101 private CompletableFuture<Boolean> prepare(Transaction transaction) {
102 return transactions.put(transaction.id(), transaction)
103 .thenCompose(v -> database.prepare(transaction))
104 .thenCompose(status -> transactions.put(
105 transaction.id(),
106 transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK))
107 .thenApply(v -> status));
108 }
109
110 private CompletableFuture<Boolean> commit(Transaction transaction) {
111 return database.commit(transaction)
112 .thenCompose(v -> transactions.put(
113 transaction.id(),
114 transaction.transition(Transaction.State.COMMITTED)))
115 .thenApply(v -> true);
116 }
117
118 private CompletableFuture<Boolean> rollback(Transaction transaction) {
119 return database.rollback(transaction)
120 .thenCompose(v -> transactions.put(
121 transaction.id(),
122 transaction.transition(Transaction.State.ROLLEDBACK)))
123 .thenApply(v -> true);
124 }
Madan Jampani02b7fb82015-05-01 13:01:20 -0700125}