| /* |
| * Copyright 2017-present Open Networking Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.onosproject.store.primitives.impl; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.hash.Hashing; |
| import com.google.common.util.concurrent.Futures; |
| import org.onosproject.cluster.PartitionId; |
| import org.onosproject.store.primitives.MapUpdate; |
| import org.onosproject.store.primitives.PartitionService; |
| import org.onosproject.store.primitives.TransactionId; |
| import org.onosproject.store.serializers.KryoNamespaces; |
| import org.onosproject.store.service.AsyncConsistentMap; |
| import org.onosproject.store.service.Serializer; |
| import org.onosproject.store.service.StorageService; |
| import org.onosproject.store.service.TransactionException; |
| |
| /** |
| * Transaction manager for managing state shared across multiple transactions. |
| */ |
| public class TransactionManager { |
| private static final int DEFAULT_CACHE_SIZE = 100; |
| |
| private final PartitionService partitionService; |
| private final List<PartitionId> sortedPartitions; |
| private final AsyncConsistentMap<TransactionId, Transaction.State> transactions; |
| private final int cacheSize; |
| private final int buckets; |
| private final Map<PartitionId, Cache<String, CachedMap>> partitionCache = Maps.newConcurrentMap(); |
| |
| public TransactionManager(StorageService storageService, PartitionService partitionService, int buckets) { |
| this(storageService, partitionService, DEFAULT_CACHE_SIZE, buckets); |
| } |
| |
| public TransactionManager( |
| StorageService storageService, |
| PartitionService partitionService, |
| int cacheSize, |
| int buckets) { |
| this.partitionService = partitionService; |
| this.cacheSize = cacheSize; |
| this.buckets = buckets; |
| this.transactions = storageService.<TransactionId, Transaction.State>consistentMapBuilder() |
| .withName("onos-transactions") |
| .withSerializer(Serializer.using(KryoNamespaces.API, |
| Transaction.class, |
| Transaction.State.class)) |
| .buildAsyncMap(); |
| this.sortedPartitions = Lists.newArrayList(partitionService.getAllPartitionIds()); |
| Collections.sort(sortedPartitions); |
| } |
| |
| /** |
| * Returns the collection of currently pending transactions. |
| * |
| * @return a collection of currently pending transactions |
| */ |
| public Collection<TransactionId> getPendingTransactions() { |
| return Futures.getUnchecked(transactions.keySet()); |
| } |
| |
| /** |
| * Returns a partitioned transactional map for use within a transaction context. |
| * <p> |
| * The transaction coordinator will return a map that takes advantage of caching that's shared across transaction |
| * contexts. |
| * |
| * @param name the map name |
| * @param serializer the map serializer |
| * @param transactionCoordinator the transaction coordinator for which the map is being created |
| * @param <K> key type |
| * @param <V> value type |
| * @return a partitioned transactional map |
| */ |
| <K, V> PartitionedTransactionalMap<K, V> getTransactionalMap( |
| String name, |
| Serializer serializer, |
| TransactionCoordinator transactionCoordinator) { |
| Map<PartitionId, TransactionalMapParticipant<K, V>> partitions = new HashMap<>(); |
| for (PartitionId partitionId : partitionService.getAllPartitionIds()) { |
| partitions.put(partitionId, getTransactionalMapPartition( |
| name, partitionId, serializer, transactionCoordinator)); |
| } |
| |
| Hasher<K> hasher = key -> { |
| int bucket = Math.abs(Hashing.murmur3_32().hashBytes(serializer.encode(key)).asInt()) % buckets; |
| int partition = Hashing.consistentHash(bucket, sortedPartitions.size()); |
| return sortedPartitions.get(partition); |
| }; |
| return new PartitionedTransactionalMap<>(partitions, hasher); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <K, V> TransactionalMapParticipant<K, V> getTransactionalMapPartition( |
| String mapName, |
| PartitionId partitionId, |
| Serializer serializer, |
| TransactionCoordinator transactionCoordinator) { |
| Cache<String, CachedMap> mapCache = partitionCache.computeIfAbsent(partitionId, p -> |
| CacheBuilder.newBuilder().maximumSize(cacheSize / partitionService.getNumberOfPartitions()).build()); |
| try { |
| CachedMap<K, V> cachedMap = mapCache.get(mapName, |
| () -> new CachedMap<>(partitionService.getDistributedPrimitiveCreator(partitionId) |
| .newAsyncConsistentMap(mapName, serializer))); |
| |
| Transaction<MapUpdate<K, V>> transaction = new Transaction<>( |
| transactionCoordinator.transactionId, |
| cachedMap.baseMap); |
| return new DefaultTransactionalMapParticipant<>(cachedMap.cachedMap.asConsistentMap(), transaction); |
| } catch (ExecutionException e) { |
| throw new TransactionException(e); |
| } |
| } |
| |
| /** |
| * Updates the state of a transaction in the transaction registry. |
| * |
| * @param transactionId the transaction identifier |
| * @param state the state of the transaction |
| * @return a completable future to be completed once the transaction state has been updated in the registry |
| */ |
| CompletableFuture<Void> updateState(TransactionId transactionId, Transaction.State state) { |
| return transactions.put(transactionId, state).thenApply(v -> null); |
| } |
| |
| /** |
| * Removes the given transaction from the transaction registry. |
| * |
| * @param transactionId the transaction identifier |
| * @return a completable future to be completed once the transaction state has been removed from the registry |
| */ |
| CompletableFuture<Void> remove(TransactionId transactionId) { |
| return transactions.remove(transactionId).thenApply(v -> null); |
| } |
| |
| private static class CachedMap<K, V> { |
| private final AsyncConsistentMap<K, V> baseMap; |
| private final AsyncConsistentMap<K, V> cachedMap; |
| |
| public CachedMap(AsyncConsistentMap<K, V> baseMap) { |
| this.baseMap = baseMap; |
| this.cachedMap = DistributedPrimitives.newCachingMap(baseMap); |
| } |
| } |
| } |