blob: 7d8d4c44662d80b3be6ddfe6ca2fc1b8ec7efe9b [file] [log] [blame]
Jordan Halterman948d6592017-04-20 17:18:24 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman948d6592017-04-20 17:18:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.Collection;
19import java.util.Collections;
20import java.util.HashMap;
21import java.util.List;
22import java.util.Map;
23import java.util.concurrent.CompletableFuture;
24import java.util.concurrent.ExecutionException;
25
26import com.google.common.cache.Cache;
27import com.google.common.cache.CacheBuilder;
28import com.google.common.collect.Lists;
29import com.google.common.collect.Maps;
30import com.google.common.hash.Hashing;
31import com.google.common.util.concurrent.Futures;
32import org.onosproject.cluster.PartitionId;
33import org.onosproject.store.primitives.MapUpdate;
34import org.onosproject.store.primitives.PartitionService;
35import org.onosproject.store.primitives.TransactionId;
36import org.onosproject.store.serializers.KryoNamespaces;
37import org.onosproject.store.service.AsyncConsistentMap;
38import org.onosproject.store.service.Serializer;
39import org.onosproject.store.service.StorageService;
40import org.onosproject.store.service.TransactionException;
41
42/**
43 * Transaction manager for managing state shared across multiple transactions.
44 */
45public class TransactionManager {
46 private static final int DEFAULT_CACHE_SIZE = 100;
47
48 private final PartitionService partitionService;
49 private final List<PartitionId> sortedPartitions;
50 private final AsyncConsistentMap<TransactionId, Transaction.State> transactions;
Ray Milkey3717e602018-02-01 13:49:47 -080051 private final long cacheSize;
Jordan Halterman93fd0122017-08-03 21:38:36 -070052 private final int buckets;
Jordan Halterman9fa43032017-07-28 11:00:26 -070053 private final Map<PartitionId, Cache<String, CachedMap>> partitionCache = Maps.newConcurrentMap();
Jordan Halterman948d6592017-04-20 17:18:24 -070054
Jordan Halterman93fd0122017-08-03 21:38:36 -070055 public TransactionManager(StorageService storageService, PartitionService partitionService, int buckets) {
56 this(storageService, partitionService, DEFAULT_CACHE_SIZE, buckets);
Jordan Halterman948d6592017-04-20 17:18:24 -070057 }
58
Jordan Halterman93fd0122017-08-03 21:38:36 -070059 public TransactionManager(
60 StorageService storageService,
61 PartitionService partitionService,
62 int cacheSize,
63 int buckets) {
Jordan Halterman948d6592017-04-20 17:18:24 -070064 this.partitionService = partitionService;
65 this.cacheSize = cacheSize;
Jordan Halterman93fd0122017-08-03 21:38:36 -070066 this.buckets = buckets;
Jordan Halterman948d6592017-04-20 17:18:24 -070067 this.transactions = storageService.<TransactionId, Transaction.State>consistentMapBuilder()
68 .withName("onos-transactions")
69 .withSerializer(Serializer.using(KryoNamespaces.API,
70 Transaction.class,
71 Transaction.State.class))
72 .buildAsyncMap();
73 this.sortedPartitions = Lists.newArrayList(partitionService.getAllPartitionIds());
74 Collections.sort(sortedPartitions);
75 }
76
77 /**
78 * Returns the collection of currently pending transactions.
79 *
80 * @return a collection of currently pending transactions
81 */
82 public Collection<TransactionId> getPendingTransactions() {
83 return Futures.getUnchecked(transactions.keySet());
84 }
85
86 /**
87 * Returns a partitioned transactional map for use within a transaction context.
88 * <p>
89 * The transaction coordinator will return a map that takes advantage of caching that's shared across transaction
90 * contexts.
91 *
92 * @param name the map name
93 * @param serializer the map serializer
94 * @param transactionCoordinator the transaction coordinator for which the map is being created
95 * @param <K> key type
96 * @param <V> value type
97 * @return a partitioned transactional map
98 */
99 <K, V> PartitionedTransactionalMap<K, V> getTransactionalMap(
100 String name,
101 Serializer serializer,
102 TransactionCoordinator transactionCoordinator) {
103 Map<PartitionId, TransactionalMapParticipant<K, V>> partitions = new HashMap<>();
104 for (PartitionId partitionId : partitionService.getAllPartitionIds()) {
105 partitions.put(partitionId, getTransactionalMapPartition(
106 name, partitionId, serializer, transactionCoordinator));
107 }
108
109 Hasher<K> hasher = key -> {
Jordan Halterman93fd0122017-08-03 21:38:36 -0700110 int bucket = Math.abs(Hashing.murmur3_32().hashBytes(serializer.encode(key)).asInt()) % buckets;
111 int partition = Hashing.consistentHash(bucket, sortedPartitions.size());
112 return sortedPartitions.get(partition);
Jordan Halterman948d6592017-04-20 17:18:24 -0700113 };
114 return new PartitionedTransactionalMap<>(partitions, hasher);
115 }
116
117 @SuppressWarnings("unchecked")
118 private <K, V> TransactionalMapParticipant<K, V> getTransactionalMapPartition(
119 String mapName,
120 PartitionId partitionId,
121 Serializer serializer,
122 TransactionCoordinator transactionCoordinator) {
Jordan Halterman9fa43032017-07-28 11:00:26 -0700123 Cache<String, CachedMap> mapCache = partitionCache.computeIfAbsent(partitionId, p ->
Jordan Halterman948d6592017-04-20 17:18:24 -0700124 CacheBuilder.newBuilder().maximumSize(cacheSize / partitionService.getNumberOfPartitions()).build());
125 try {
Jordan Halterman9fa43032017-07-28 11:00:26 -0700126 CachedMap<K, V> cachedMap = mapCache.get(mapName,
127 () -> new CachedMap<>(partitionService.getDistributedPrimitiveCreator(partitionId)
128 .newAsyncConsistentMap(mapName, serializer)));
Jordan Halterman948d6592017-04-20 17:18:24 -0700129
130 Transaction<MapUpdate<K, V>> transaction = new Transaction<>(
131 transactionCoordinator.transactionId,
Jordan Halterman9fa43032017-07-28 11:00:26 -0700132 cachedMap.baseMap);
133 return new DefaultTransactionalMapParticipant<>(cachedMap.cachedMap.asConsistentMap(), transaction);
Jordan Halterman948d6592017-04-20 17:18:24 -0700134 } catch (ExecutionException e) {
135 throw new TransactionException(e);
136 }
137 }
138
139 /**
140 * Updates the state of a transaction in the transaction registry.
141 *
142 * @param transactionId the transaction identifier
143 * @param state the state of the transaction
144 * @return a completable future to be completed once the transaction state has been updated in the registry
145 */
146 CompletableFuture<Void> updateState(TransactionId transactionId, Transaction.State state) {
147 return transactions.put(transactionId, state).thenApply(v -> null);
148 }
149
150 /**
151 * Removes the given transaction from the transaction registry.
152 *
153 * @param transactionId the transaction identifier
154 * @return a completable future to be completed once the transaction state has been removed from the registry
155 */
156 CompletableFuture<Void> remove(TransactionId transactionId) {
157 return transactions.remove(transactionId).thenApply(v -> null);
158 }
Jordan Halterman9fa43032017-07-28 11:00:26 -0700159
160 private static class CachedMap<K, V> {
161 private final AsyncConsistentMap<K, V> baseMap;
162 private final AsyncConsistentMap<K, V> cachedMap;
163
164 public CachedMap(AsyncConsistentMap<K, V> baseMap) {
165 this.baseMap = baseMap;
166 this.cachedMap = DistributedPrimitives.newCachingMap(baseMap);
167 }
168 }
Jordan Halterman948d6592017-04-20 17:18:24 -0700169}