blob: 16e1cdc1c8db3d996e34f5f49fec16ba12343ee3 [file] [log] [blame]
Madan Jampanibff6d8f2015-03-31 16:53:47 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.consistent.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
Ayaka Koshibee114f042015-05-01 11:43:00 -070019
Madan Jampani50589ac2015-06-08 11:38:46 -070020import java.util.Arrays;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070021import java.util.Collection;
22import java.util.concurrent.CompletableFuture;
23import java.util.stream.Collectors;
24
25import org.apache.commons.lang3.tuple.ImmutablePair;
26import org.apache.commons.lang3.tuple.Pair;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.store.serializers.KryoNamespaces;
29import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampani50589ac2015-06-08 11:38:46 -070030import org.onosproject.store.service.ConsistentMapBuilder;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070031import org.onosproject.store.service.DatabaseUpdate;
32import org.onosproject.store.service.Serializer;
33import org.onosproject.store.service.Transaction;
34import org.onosproject.store.service.Versioned;
35import org.onosproject.store.service.Transaction.State;
36
37/**
38 * Agent that runs the two phase commit protocol.
39 */
40public class TransactionManager {
41
Madan Jampanib5d72d52015-04-03 16:53:50 -070042 private static final KryoNamespace KRYO_NAMESPACE = KryoNamespace.newBuilder()
43 .register(KryoNamespaces.BASIC)
44 .nextId(KryoNamespace.FLOATING_ID)
45 .register(Versioned.class)
46 .register(DatabaseUpdate.class)
47 .register(DatabaseUpdate.Type.class)
48 .register(DefaultTransaction.class)
49 .register(Transaction.State.class)
50 .register(Pair.class)
51 .register(ImmutablePair.class)
52 .build();
53
Madan Jampani50589ac2015-06-08 11:38:46 -070054 private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE));
Madan Jampanibff6d8f2015-03-31 16:53:47 -070055 private final Database database;
56 private final AsyncConsistentMap<Long, Transaction> transactions;
57
Madan Jampanibff6d8f2015-03-31 16:53:47 -070058 /**
59 * Constructs a new TransactionManager for the specified database instance.
60 *
61 * @param database database
Madan Jampani0dbac7a2015-06-25 10:37:45 -070062 * @param mapBuilder builder for ConsistentMap instances
Madan Jampanibff6d8f2015-03-31 16:53:47 -070063 */
Madan Jampani50589ac2015-06-08 11:38:46 -070064 public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -070065 this.database = checkNotNull(database, "database cannot be null");
Madan Jampani50589ac2015-06-08 11:38:46 -070066 this.transactions = mapBuilder.withName("onos-transactions")
67 .withSerializer(serializer)
68 .buildAsyncMap();
Madan Jampanibff6d8f2015-03-31 16:53:47 -070069 }
70
71 /**
72 * Executes the specified transaction by employing a two phase commit protocol.
73 *
74 * @param transaction transaction to commit
75 * @return transaction result. Result value true indicates a successful commit, false
76 * indicates abort
77 */
78 public CompletableFuture<Boolean> execute(Transaction transaction) {
79 // clean up if this transaction in already in a terminal state.
80 if (transaction.state() == Transaction.State.COMMITTED ||
81 transaction.state() == Transaction.State.ROLLEDBACK) {
82 return transactions.remove(transaction.id()).thenApply(v -> true);
83 } else if (transaction.state() == Transaction.State.COMMITTING) {
84 return commit(transaction);
85 } else if (transaction.state() == Transaction.State.ROLLINGBACK) {
86 return rollback(transaction);
87 } else {
88 return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
89 }
90 }
91
92
93 /**
94 * Returns all transactions in the system.
95 *
96 * @return future for a collection of transactions
97 */
98 public CompletableFuture<Collection<Transaction>> getTransactions() {
99 return transactions.values().thenApply(c -> {
100 Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList());
101 return txns;
102 });
103 }
104
105 private CompletableFuture<Boolean> prepare(Transaction transaction) {
106 return transactions.put(transaction.id(), transaction)
107 .thenCompose(v -> database.prepare(transaction))
108 .thenCompose(status -> transactions.put(
109 transaction.id(),
110 transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK))
111 .thenApply(v -> status));
112 }
113
114 private CompletableFuture<Boolean> commit(Transaction transaction) {
115 return database.commit(transaction)
116 .thenCompose(v -> transactions.put(
117 transaction.id(),
118 transaction.transition(Transaction.State.COMMITTED)))
119 .thenApply(v -> true);
120 }
121
122 private CompletableFuture<Boolean> rollback(Transaction transaction) {
123 return database.rollback(transaction)
124 .thenCompose(v -> transactions.put(
125 transaction.id(),
126 transaction.transition(Transaction.State.ROLLEDBACK)))
127 .thenApply(v -> true);
128 }
Madan Jampani02b7fb82015-05-01 13:01:20 -0700129}