blob: 0e992121022e7abf1ef491002e6e13f973ab260f [file] [log] [blame]
Jordan Halterman948d6592017-04-20 17:18:24 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman948d6592017-04-20 17:18:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.Collection;
19import java.util.Collections;
20import java.util.HashMap;
21import java.util.List;
22import java.util.Map;
23import java.util.concurrent.CompletableFuture;
24import java.util.concurrent.ExecutionException;
25
Jordan Halterman13d25a12017-08-01 18:08:38 -070026import com.google.common.base.Charsets;
Jordan Halterman948d6592017-04-20 17:18:24 -070027import com.google.common.cache.Cache;
28import com.google.common.cache.CacheBuilder;
29import com.google.common.collect.Lists;
30import com.google.common.collect.Maps;
31import com.google.common.hash.Hashing;
32import com.google.common.util.concurrent.Futures;
Jordan Halterman13d25a12017-08-01 18:08:38 -070033import org.onlab.util.HexString;
Jordan Halterman948d6592017-04-20 17:18:24 -070034import org.onosproject.cluster.PartitionId;
35import org.onosproject.store.primitives.MapUpdate;
36import org.onosproject.store.primitives.PartitionService;
37import org.onosproject.store.primitives.TransactionId;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.service.AsyncConsistentMap;
40import org.onosproject.store.service.Serializer;
41import org.onosproject.store.service.StorageService;
42import org.onosproject.store.service.TransactionException;
43
44/**
45 * Transaction manager for managing state shared across multiple transactions.
46 */
47public class TransactionManager {
48 private static final int DEFAULT_CACHE_SIZE = 100;
49
50 private final PartitionService partitionService;
51 private final List<PartitionId> sortedPartitions;
52 private final AsyncConsistentMap<TransactionId, Transaction.State> transactions;
53 private final int cacheSize;
Jordan Halterman9fa43032017-07-28 11:00:26 -070054 private final Map<PartitionId, Cache<String, CachedMap>> partitionCache = Maps.newConcurrentMap();
Jordan Halterman948d6592017-04-20 17:18:24 -070055
56 public TransactionManager(StorageService storageService, PartitionService partitionService) {
57 this(storageService, partitionService, DEFAULT_CACHE_SIZE);
58 }
59
60 public TransactionManager(StorageService storageService, PartitionService partitionService, int cacheSize) {
61 this.partitionService = partitionService;
62 this.cacheSize = cacheSize;
63 this.transactions = storageService.<TransactionId, Transaction.State>consistentMapBuilder()
64 .withName("onos-transactions")
65 .withSerializer(Serializer.using(KryoNamespaces.API,
66 Transaction.class,
67 Transaction.State.class))
68 .buildAsyncMap();
69 this.sortedPartitions = Lists.newArrayList(partitionService.getAllPartitionIds());
70 Collections.sort(sortedPartitions);
71 }
72
73 /**
74 * Returns the collection of currently pending transactions.
75 *
76 * @return a collection of currently pending transactions
77 */
78 public Collection<TransactionId> getPendingTransactions() {
79 return Futures.getUnchecked(transactions.keySet());
80 }
81
82 /**
83 * Returns a partitioned transactional map for use within a transaction context.
84 * <p>
85 * The transaction coordinator will return a map that takes advantage of caching that's shared across transaction
86 * contexts.
87 *
88 * @param name the map name
89 * @param serializer the map serializer
90 * @param transactionCoordinator the transaction coordinator for which the map is being created
91 * @param <K> key type
92 * @param <V> value type
93 * @return a partitioned transactional map
94 */
95 <K, V> PartitionedTransactionalMap<K, V> getTransactionalMap(
96 String name,
97 Serializer serializer,
98 TransactionCoordinator transactionCoordinator) {
99 Map<PartitionId, TransactionalMapParticipant<K, V>> partitions = new HashMap<>();
100 for (PartitionId partitionId : partitionService.getAllPartitionIds()) {
101 partitions.put(partitionId, getTransactionalMapPartition(
102 name, partitionId, serializer, transactionCoordinator));
103 }
104
105 Hasher<K> hasher = key -> {
Jordan Halterman13d25a12017-08-01 18:08:38 -0700106 int hashCode = Hashing.sha256()
107 .hashString(HexString.toHexString(serializer.encode(key)), Charsets.UTF_8).asInt();
Jordan Halterman948d6592017-04-20 17:18:24 -0700108 return sortedPartitions.get(Math.abs(hashCode) % sortedPartitions.size());
109 };
110 return new PartitionedTransactionalMap<>(partitions, hasher);
111 }
112
113 @SuppressWarnings("unchecked")
114 private <K, V> TransactionalMapParticipant<K, V> getTransactionalMapPartition(
115 String mapName,
116 PartitionId partitionId,
117 Serializer serializer,
118 TransactionCoordinator transactionCoordinator) {
Jordan Halterman9fa43032017-07-28 11:00:26 -0700119 Cache<String, CachedMap> mapCache = partitionCache.computeIfAbsent(partitionId, p ->
Jordan Halterman948d6592017-04-20 17:18:24 -0700120 CacheBuilder.newBuilder().maximumSize(cacheSize / partitionService.getNumberOfPartitions()).build());
121 try {
Jordan Halterman9fa43032017-07-28 11:00:26 -0700122 CachedMap<K, V> cachedMap = mapCache.get(mapName,
123 () -> new CachedMap<>(partitionService.getDistributedPrimitiveCreator(partitionId)
124 .newAsyncConsistentMap(mapName, serializer)));
Jordan Halterman948d6592017-04-20 17:18:24 -0700125
126 Transaction<MapUpdate<K, V>> transaction = new Transaction<>(
127 transactionCoordinator.transactionId,
Jordan Halterman9fa43032017-07-28 11:00:26 -0700128 cachedMap.baseMap);
129 return new DefaultTransactionalMapParticipant<>(cachedMap.cachedMap.asConsistentMap(), transaction);
Jordan Halterman948d6592017-04-20 17:18:24 -0700130 } catch (ExecutionException e) {
131 throw new TransactionException(e);
132 }
133 }
134
135 /**
136 * Updates the state of a transaction in the transaction registry.
137 *
138 * @param transactionId the transaction identifier
139 * @param state the state of the transaction
140 * @return a completable future to be completed once the transaction state has been updated in the registry
141 */
142 CompletableFuture<Void> updateState(TransactionId transactionId, Transaction.State state) {
143 return transactions.put(transactionId, state).thenApply(v -> null);
144 }
145
146 /**
147 * Removes the given transaction from the transaction registry.
148 *
149 * @param transactionId the transaction identifier
150 * @return a completable future to be completed once the transaction state has been removed from the registry
151 */
152 CompletableFuture<Void> remove(TransactionId transactionId) {
153 return transactions.remove(transactionId).thenApply(v -> null);
154 }
Jordan Halterman9fa43032017-07-28 11:00:26 -0700155
156 private static class CachedMap<K, V> {
157 private final AsyncConsistentMap<K, V> baseMap;
158 private final AsyncConsistentMap<K, V> cachedMap;
159
160 public CachedMap(AsyncConsistentMap<K, V> baseMap) {
161 this.baseMap = baseMap;
162 this.cachedMap = DistributedPrimitives.newCachingMap(baseMap);
163 }
164 }
Jordan Halterman948d6592017-04-20 17:18:24 -0700165}