blob: 7d8d4c44662d80b3be6ddfe6ca2fc1b8ec7efe9b [file] [log] [blame]
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.Futures;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.PartitionService;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.TransactionException;
/**
* Transaction manager for managing state shared across multiple transactions.
*/
public class TransactionManager {
private static final int DEFAULT_CACHE_SIZE = 100;
private final PartitionService partitionService;
private final List<PartitionId> sortedPartitions;
private final AsyncConsistentMap<TransactionId, Transaction.State> transactions;
private final long cacheSize;
private final int buckets;
private final Map<PartitionId, Cache<String, CachedMap>> partitionCache = Maps.newConcurrentMap();
public TransactionManager(StorageService storageService, PartitionService partitionService, int buckets) {
this(storageService, partitionService, DEFAULT_CACHE_SIZE, buckets);
}
public TransactionManager(
StorageService storageService,
PartitionService partitionService,
int cacheSize,
int buckets) {
this.partitionService = partitionService;
this.cacheSize = cacheSize;
this.buckets = buckets;
this.transactions = storageService.<TransactionId, Transaction.State>consistentMapBuilder()
.withName("onos-transactions")
.withSerializer(Serializer.using(KryoNamespaces.API,
Transaction.class,
Transaction.State.class))
.buildAsyncMap();
this.sortedPartitions = Lists.newArrayList(partitionService.getAllPartitionIds());
Collections.sort(sortedPartitions);
}
/**
* Returns the collection of currently pending transactions.
*
* @return a collection of currently pending transactions
*/
public Collection<TransactionId> getPendingTransactions() {
return Futures.getUnchecked(transactions.keySet());
}
/**
* Returns a partitioned transactional map for use within a transaction context.
* <p>
* The transaction coordinator will return a map that takes advantage of caching that's shared across transaction
* contexts.
*
* @param name the map name
* @param serializer the map serializer
* @param transactionCoordinator the transaction coordinator for which the map is being created
* @param <K> key type
* @param <V> value type
* @return a partitioned transactional map
*/
<K, V> PartitionedTransactionalMap<K, V> getTransactionalMap(
String name,
Serializer serializer,
TransactionCoordinator transactionCoordinator) {
Map<PartitionId, TransactionalMapParticipant<K, V>> partitions = new HashMap<>();
for (PartitionId partitionId : partitionService.getAllPartitionIds()) {
partitions.put(partitionId, getTransactionalMapPartition(
name, partitionId, serializer, transactionCoordinator));
}
Hasher<K> hasher = key -> {
int bucket = Math.abs(Hashing.murmur3_32().hashBytes(serializer.encode(key)).asInt()) % buckets;
int partition = Hashing.consistentHash(bucket, sortedPartitions.size());
return sortedPartitions.get(partition);
};
return new PartitionedTransactionalMap<>(partitions, hasher);
}
@SuppressWarnings("unchecked")
private <K, V> TransactionalMapParticipant<K, V> getTransactionalMapPartition(
String mapName,
PartitionId partitionId,
Serializer serializer,
TransactionCoordinator transactionCoordinator) {
Cache<String, CachedMap> mapCache = partitionCache.computeIfAbsent(partitionId, p ->
CacheBuilder.newBuilder().maximumSize(cacheSize / partitionService.getNumberOfPartitions()).build());
try {
CachedMap<K, V> cachedMap = mapCache.get(mapName,
() -> new CachedMap<>(partitionService.getDistributedPrimitiveCreator(partitionId)
.newAsyncConsistentMap(mapName, serializer)));
Transaction<MapUpdate<K, V>> transaction = new Transaction<>(
transactionCoordinator.transactionId,
cachedMap.baseMap);
return new DefaultTransactionalMapParticipant<>(cachedMap.cachedMap.asConsistentMap(), transaction);
} catch (ExecutionException e) {
throw new TransactionException(e);
}
}
/**
* Updates the state of a transaction in the transaction registry.
*
* @param transactionId the transaction identifier
* @param state the state of the transaction
* @return a completable future to be completed once the transaction state has been updated in the registry
*/
CompletableFuture<Void> updateState(TransactionId transactionId, Transaction.State state) {
return transactions.put(transactionId, state).thenApply(v -> null);
}
/**
* Removes the given transaction from the transaction registry.
*
* @param transactionId the transaction identifier
* @return a completable future to be completed once the transaction state has been removed from the registry
*/
CompletableFuture<Void> remove(TransactionId transactionId) {
return transactions.remove(transactionId).thenApply(v -> null);
}
private static class CachedMap<K, V> {
private final AsyncConsistentMap<K, V> baseMap;
private final AsyncConsistentMap<K, V> cachedMap;
public CachedMap(AsyncConsistentMap<K, V> baseMap) {
this.baseMap = baseMap;
this.cachedMap = DistributedPrimitives.newCachingMap(baseMap);
}
}
}