blob: 67c28d80476e66897c96d869df3d8bf7b5439d76 [file] [log] [blame]
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Sho SHIMIZUe18cb122016-02-22 21:04:56 -080016package org.onosproject.store.resource.impl;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070017
18import com.google.common.annotations.Beta;
Sho SHIMIZUe7db6142015-11-04 11:24:22 -080019import com.google.common.collect.ImmutableList;
Sho SHIMIZU83258ae2016-01-29 17:39:07 -080020import com.google.common.collect.ImmutableSet;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
Sho SHIMIZUe18cb122016-02-22 21:04:56 -080026import org.onosproject.net.resource.ContinuousResource;
27import org.onosproject.net.resource.ContinuousResourceId;
28import org.onosproject.net.resource.DiscreteResource;
29import org.onosproject.net.resource.DiscreteResourceId;
Sho SHIMIZU22fb2832016-05-06 11:44:03 -070030import org.onosproject.net.resource.Resource;
Sho SHIMIZUe18cb122016-02-22 21:04:56 -080031import org.onosproject.net.resource.ResourceAllocation;
32import org.onosproject.net.resource.ResourceConsumer;
33import org.onosproject.net.resource.ResourceEvent;
34import org.onosproject.net.resource.ResourceId;
Sho SHIMIZUe18cb122016-02-22 21:04:56 -080035import org.onosproject.net.resource.ResourceStore;
36import org.onosproject.net.resource.ResourceStoreDelegate;
37import org.onosproject.net.resource.Resources;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080038import org.onosproject.store.AbstractStore;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070039import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani3780d4b2016-04-04 18:18:24 -070040import org.onosproject.store.service.CommitStatus;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070041import org.onosproject.store.service.Serializer;
42import org.onosproject.store.service.StorageService;
43import org.onosproject.store.service.TransactionContext;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070044import org.slf4j.Logger;
45import org.slf4j.LoggerFactory;
46
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070047import java.util.Arrays;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070048import java.util.Collection;
Sho SHIMIZU69420fe2016-02-09 15:01:07 -080049import java.util.LinkedHashMap;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070050import java.util.List;
51import java.util.Map;
52import java.util.Optional;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080053import java.util.Set;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070054import java.util.stream.Collectors;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080055import java.util.stream.Stream;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070056
57import static com.google.common.base.Preconditions.checkArgument;
58import static com.google.common.base.Preconditions.checkNotNull;
Sho SHIMIZU22fb2832016-05-06 11:44:03 -070059import static org.onosproject.net.resource.ResourceEvent.Type.RESOURCE_ADDED;
60import static org.onosproject.net.resource.ResourceEvent.Type.RESOURCE_REMOVED;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070061
62/**
63 * Implementation of ResourceStore using TransactionalMap.
64 */
Sho SHIMIZU9a2b8292015-10-28 13:00:16 -070065@Component(immediate = true)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070066@Service
67@Beta
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080068public class ConsistentResourceStore extends AbstractStore<ResourceEvent, ResourceStoreDelegate>
69 implements ResourceStore {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070070 private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
71
Sho SHIMIZU22fb2832016-05-06 11:44:03 -070072 static final Serializer SERIALIZER = Serializer.using(
HIGUCHI Yuta6f584222016-05-06 11:15:38 -070073 Arrays.asList(KryoNamespaces.API),
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080074 ContinuousResourceAllocation.class);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070075
Thomas Vachuska762a2d82016-01-04 10:25:20 -080076 // TODO: We should provide centralized values for this
Sho SHIMIZU22fb2832016-05-06 11:44:03 -070077 static final int MAX_RETRIES = 5;
78 static final int RETRY_DELAY = 1_000; // millis
Thomas Vachuska762a2d82016-01-04 10:25:20 -080079
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070080 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected StorageService service;
82
Sho SHIMIZU22fb2832016-05-06 11:44:03 -070083 private ConsistentDiscreteResourceStore discreteStore;
84 private ConsistentContinuousResourceStore continuousStore;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070085
86 @Activate
87 public void activate() {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -070088 discreteStore = new ConsistentDiscreteResourceStore(service);
89 continuousStore = new ConsistentContinuousResourceStore(service);
Sho SHIMIZUe7db6142015-11-04 11:24:22 -080090
Madan Jampanic7f49f92015-12-10 11:35:06 -080091 log.info("Started");
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070092 }
93
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -080094 // Computational complexity: O(1) if the resource is discrete type.
95 // O(n) if the resource is continuous type where n is the number of the existing allocations for the resource
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070096 @Override
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -080097 public List<ResourceAllocation> getResourceAllocations(ResourceId id) {
98 checkNotNull(id);
99 checkArgument(id instanceof DiscreteResourceId || id instanceof ContinuousResourceId);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700100
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800101 if (id instanceof DiscreteResourceId) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700102 return discreteStore.getResourceAllocations((DiscreteResourceId) id);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800103 } else {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700104 return continuousStore.getResourceAllocations((ContinuousResourceId) id);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800105 }
106 }
107
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700108 @Override
Jonathan Hart56151262016-02-11 09:48:50 -0800109 public boolean register(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700110 checkNotNull(resources);
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800111 if (log.isTraceEnabled()) {
112 resources.forEach(r -> log.trace("registering {}", r));
113 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700114
115 TransactionContext tx = service.transactionContextBuilder().build();
116 tx.begin();
117
Sho SHIMIZU69420fe2016-02-09 15:01:07 -0800118 // the order is preserved by LinkedHashMap
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800119 Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800120 .filter(x -> x.parent().isPresent())
Sho SHIMIZU69420fe2016-02-09 15:01:07 -0800121 .collect(Collectors.groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.toList()));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700122
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700123 TransactionalDiscreteResourceStore discreteTxStore = discreteStore.transactional(tx);
124 TransactionalContinuousResourceStore continuousTxStore = continuousStore.transactional(tx);
125 for (Map.Entry<DiscreteResource, List<Resource>> entry : resourceMap.entrySet()) {
126 if (!lookup(discreteTxStore, continuousTxStore, entry.getKey().id()).isPresent()) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800127 return abortTransaction(tx);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700128 }
129
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700130 if (!appendValues(discreteTxStore, continuousTxStore, entry.getKey().id(), entry.getValue())) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800131 return abortTransaction(tx);
132 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700133 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800134
Madan Jampani3780d4b2016-04-04 18:18:24 -0700135 return tx.commit().whenComplete((status, error) -> {
136 if (status == CommitStatus.SUCCESS) {
137 log.trace("Transaction commit succeeded on registration: resources={}", resources);
138 List<ResourceEvent> events = resources.stream()
139 .filter(x -> x.parent().isPresent())
140 .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
141 .collect(Collectors.toList());
142 notifyDelegate(events);
143 } else {
144 log.warn("Transaction commit failed on registration", error);
145 }
146 }).join() == CommitStatus.SUCCESS;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700147 }
148
149 @Override
Jonathan Hart56151262016-02-11 09:48:50 -0800150 public boolean unregister(List<ResourceId> ids) {
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800151 checkNotNull(ids);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700152
153 TransactionContext tx = service.transactionContextBuilder().build();
154 tx.begin();
155
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700156 TransactionalDiscreteResourceStore discreteTxStore = discreteStore.transactional(tx);
157 TransactionalContinuousResourceStore continuousTxStore = continuousStore.transactional(tx);
Sho SHIMIZU7d54d9c2016-02-17 13:58:46 -0800158 // Look up resources by resource IDs
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800159 List<Resource> resources = ids.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800160 .filter(x -> x.parent().isPresent())
Sho SHIMIZU7d54d9c2016-02-17 13:58:46 -0800161 .map(x -> {
162 // avoid access to consistent map in the case of discrete resource
163 if (x instanceof DiscreteResourceId) {
164 return Optional.of(Resources.discrete((DiscreteResourceId) x).resource());
165 } else {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700166 return continuousTxStore.lookup((ContinuousResourceId) x);
Sho SHIMIZU7d54d9c2016-02-17 13:58:46 -0800167 }
168 })
HIGUCHI Yuta315179a2016-02-18 14:01:22 -0800169 .filter(Optional::isPresent)
170 .map(Optional::get)
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800171 .collect(Collectors.toList());
Sho SHIMIZU69420fe2016-02-09 15:01:07 -0800172 // the order is preserved by LinkedHashMap
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800173 Map<DiscreteResourceId, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU69420fe2016-02-09 15:01:07 -0800174 .collect(Collectors.groupingBy(x -> x.parent().get().id(), LinkedHashMap::new, Collectors.toList()));
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700175
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800176 // even if one of the resources is allocated to a consumer,
177 // all unregistrations are regarded as failure
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700178 for (Map.Entry<DiscreteResourceId, List<Resource>> entry : resourceMap.entrySet()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800179 boolean allocated = entry.getValue().stream().anyMatch(x -> {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800180 if (x instanceof DiscreteResource) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700181 return discreteTxStore.isAllocated(((DiscreteResource) x).id());
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800182 } else if (x instanceof ContinuousResource) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700183 return continuousTxStore.isAllocated(((ContinuousResource) x).id());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800184 } else {
185 return false;
186 }
187 });
188 if (allocated) {
HIGUCHI Yuta5b6dfba2016-01-27 14:43:41 -0800189 log.warn("Failed to unregister {}: allocation exists", entry.getKey());
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800190 return abortTransaction(tx);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700191 }
192
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700193 if (!removeValues(discreteTxStore, continuousTxStore, entry.getKey(), entry.getValue())) {
HIGUCHI Yuta6acdfd02016-02-18 10:39:43 -0800194 log.warn("Failed to unregister {}: Failed to remove {} values.",
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700195 entry.getKey(), entry.getValue().size());
HIGUCHI Yuta6acdfd02016-02-18 10:39:43 -0800196 log.debug("Failed to unregister {}: Failed to remove values: {}",
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700197 entry.getKey(), entry.getValue());
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800198 return abortTransaction(tx);
199 }
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700200 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800201
Madan Jampani3780d4b2016-04-04 18:18:24 -0700202 return tx.commit().whenComplete((status, error) -> {
203 if (status == CommitStatus.SUCCESS) {
204 List<ResourceEvent> events = resources.stream()
205 .filter(x -> x.parent().isPresent())
206 .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
207 .collect(Collectors.toList());
208 notifyDelegate(events);
209 } else {
210 log.warn("Failed to unregister {}: Commit failed.", ids, error);
211 }
212 }).join() == CommitStatus.SUCCESS;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700213 }
214
215 @Override
Jonathan Hart56151262016-02-11 09:48:50 -0800216 public boolean allocate(List<Resource> resources, ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700217 checkNotNull(resources);
218 checkNotNull(consumer);
219
220 TransactionContext tx = service.transactionContextBuilder().build();
221 tx.begin();
222
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700223 TransactionalDiscreteResourceStore discreteTxStore = discreteStore.transactional(tx);
224 TransactionalContinuousResourceStore continuousTxStore = continuousStore.transactional(tx);
225 for (Resource resource : resources) {
Sho SHIMIZU171a9382016-02-15 13:56:34 -0800226 if (resource instanceof DiscreteResource) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700227 if (!discreteTxStore.allocate(consumer, (DiscreteResource) resource)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800228 return abortTransaction(tx);
229 }
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800230 } else if (resource instanceof ContinuousResource) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700231 if (!continuousTxStore.allocate(consumer, (ContinuousResource) resource)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800232 return abortTransaction(tx);
233 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800234 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700235 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800236
Madan Jampani3780d4b2016-04-04 18:18:24 -0700237 return tx.commit().join() == CommitStatus.SUCCESS;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700238 }
239
240 @Override
Sho SHIMIZUfc64ffe2016-02-10 20:11:09 -0800241 public boolean release(List<ResourceAllocation> allocations) {
242 checkNotNull(allocations);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700243
244 TransactionContext tx = service.transactionContextBuilder().build();
245 tx.begin();
246
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700247 TransactionalDiscreteResourceStore discreteTxStore = discreteStore.transactional(tx);
248 TransactionalContinuousResourceStore continuousTxStore = continuousStore.transactional(tx);
Sho SHIMIZUfc64ffe2016-02-10 20:11:09 -0800249 for (ResourceAllocation allocation : allocations) {
250 Resource resource = allocation.resource();
251 ResourceConsumer consumer = allocation.consumer();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700252
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800253 if (resource instanceof DiscreteResource) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700254 if (!discreteTxStore.release((DiscreteResource) resource, consumer)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800255 return abortTransaction(tx);
256 }
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800257 } else if (resource instanceof ContinuousResource) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700258 if (!continuousTxStore.release((ContinuousResource) resource, consumer)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800259 return abortTransaction(tx);
260 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700261 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700262 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800263
Madan Jampani3780d4b2016-04-04 18:18:24 -0700264 return tx.commit().join() == CommitStatus.SUCCESS;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700265 }
266
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800267 // computational complexity: O(1) if the resource is discrete type.
268 // O(n) if the resource is continuous type where n is the number of the children of
269 // the specified resource's parent
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700270 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800271 public boolean isAvailable(Resource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800272 checkNotNull(resource);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800273 checkArgument(resource instanceof DiscreteResource || resource instanceof ContinuousResource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800274
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800275 if (resource instanceof DiscreteResource) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800276 // check if already consumed
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700277 return discreteStore.isAvailable((DiscreteResource) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800278 } else {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700279 return continuousStore.isAvailable((ContinuousResource) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800280 }
281 }
282
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800283 // computational complexity: O(n + m) where n is the number of entries in discreteConsumers
284 // and m is the number of allocations for all continuous resources
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800285 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800286 public Collection<Resource> getResources(ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700287 checkNotNull(consumer);
288
289 // NOTE: getting all entries may become performance bottleneck
290 // TODO: revisit for better backend data structure
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700291 Stream<DiscreteResource> discrete = discreteStore.getResources(consumer);
292 Stream<ContinuousResource> continuous = continuousStore.getResources(consumer);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800293
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700294 return Stream.concat(discrete, continuous).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700295 }
296
Sho SHIMIZU82bfe992016-02-10 09:55:32 -0800297 // computational complexity: O(1)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700298 @Override
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800299 public Set<Resource> getChildResources(DiscreteResourceId parent) {
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700300 checkNotNull(parent);
301
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700302 return ImmutableSet.<Resource>builder()
303 .addAll(discreteStore.getChildResources(parent))
304 .addAll(continuousStore.getChildResources(parent))
305 .build();
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700306 }
307
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800308 // computational complexity: O(n) where n is the number of the children of the parent
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700309 @Override
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800310 public <T> Collection<Resource> getAllocatedResources(DiscreteResourceId parent, Class<T> cls) {
Sho SHIMIZU1f5e5912015-08-10 17:00:00 -0700311 checkNotNull(parent);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700312 checkNotNull(cls);
313
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700314 Set<Resource> children = getChildResources(parent);
315 if (children.isEmpty()) {
316 return children;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700317 }
318
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700319 Stream<DiscreteResource> discrete = discreteStore.getAllocatedResources(parent, cls);
320 Stream<ContinuousResource> continuous = continuousStore.getAllocatedResources(parent, cls);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800321
322 return Stream.concat(discrete, continuous).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700323 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700324
325 /**
326 * Abort the transaction.
327 *
328 * @param tx transaction context
329 * @return always false
330 */
331 private boolean abortTransaction(TransactionContext tx) {
332 tx.abort();
333 return false;
334 }
335
336 /**
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700337 * Appends the values to the existing values associated with the specified key.
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700338 * If the map already has all the given values, appending will not happen.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700339 *
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700340 * @param key key specifying values
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700341 * @param values values to be appended
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700342 * @return true if the operation succeeds, false otherwise.
343 */
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800344 // computational complexity: O(n) where n is the number of the specified value
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700345 private boolean appendValues(TransactionalDiscreteResourceStore discreteTxStore,
346 TransactionalContinuousResourceStore continuousTxStore,
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800347 DiscreteResourceId key, List<Resource> values) {
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700348 // it's assumed that the passed "values" is non-empty
349
350 // This is 2-pass scan. Nicer to have 1-pass scan
351 List<DiscreteResource> discreteValues = values.stream()
352 .filter(x -> x instanceof DiscreteResource)
353 .map(x -> (DiscreteResource) x)
354 .collect(Collectors.toList());
355 List<ContinuousResource> continuousValues = values.stream()
356 .filter(x -> x instanceof ContinuousResource)
357 .map(x -> (ContinuousResource) x)
358 .collect(Collectors.toList());
359
360 // short-circuit decision avoiding unnecessary distributed map operations
361 if (continuousValues.isEmpty()) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700362 return discreteTxStore.appendValues(key, discreteValues);
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700363 }
364 if (discreteValues.isEmpty()) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700365 return continuousTxStore.appendValues(key, continuousValues);
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700366 }
367
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700368 return discreteTxStore.appendValues(key, discreteValues)
369 && continuousTxStore.appendValues(key, continuousValues);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700370 }
371
372 /**
Sho SHIMIZUba1f83b2015-10-14 08:11:20 -0700373 * Removes the values from the existing values associated with the specified key.
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700374 * If the map doesn't contain the given values, removal will not happen.
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700375 *
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700376 * @param discreteTxStore map holding multiple discrete resources for a key
377 * @param continuousTxStore map holding multiple continuous resources for a key
378 * @param key key specifying values
379 * @param values values to be removed
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700380 * @return true if the operation succeeds, false otherwise
381 */
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700382 private boolean removeValues(TransactionalDiscreteResourceStore discreteTxStore,
383 TransactionalContinuousResourceStore continuousTxStore,
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800384 DiscreteResourceId key, List<Resource> values) {
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700385 // it's assumed that the passed "values" is non-empty
386
387 // This is 2-pass scan. Nicer to have 1-pass scan
388 List<DiscreteResource> discreteValues = values.stream()
389 .filter(x -> x instanceof DiscreteResource)
390 .map(x -> (DiscreteResource) x)
391 .collect(Collectors.toList());
392 List<ContinuousResource> continuousValues = values.stream()
393 .filter(x -> x instanceof ContinuousResource)
394 .map(x -> (ContinuousResource) x)
395 .collect(Collectors.toList());
396
397 // short-circuit decision avoiding unnecessary distributed map operations
398 if (continuousValues.isEmpty()) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700399 return discreteTxStore.removeValues(key, discreteValues);
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700400 }
401 if (discreteValues.isEmpty()) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700402 return continuousTxStore.removeValues(key, continuousValues);
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700403 }
404
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700405 return discreteTxStore.removeValues(key, discreteValues)
406 && continuousTxStore.removeValues(key, continuousValues);
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700407 }
408
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700409 /**
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800410 * Returns the resource which has the same key as the specified resource ID
411 * in the set as a value of the map.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700412 *
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800413 * @param id ID of resource to be checked
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800414 * @return the resource which is regarded as the same as the specified resource
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700415 */
Sho SHIMIZUa6a6fd32016-02-10 18:36:44 -0800416 // Naive implementation, which traverses all elements in the set when continuous resource
417 // computational complexity: O(1) when discrete resource. O(n) when continuous resource
418 // where n is the number of elements in the associated set
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700419 private Optional<Resource> lookup(TransactionalDiscreteResourceStore discreteTxStore,
420 TransactionalContinuousResourceStore continuousTxStore,
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700421 ResourceId id) {
422 if (id instanceof DiscreteResourceId) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700423 return discreteTxStore.lookup((DiscreteResourceId) id);
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700424 } else if (id instanceof ContinuousResourceId) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700425 return continuousTxStore.lookup((ContinuousResourceId) id);
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700426 } else {
427 return Optional.empty();
428 }
429 }
430
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800431 // internal use only
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700432 static final class ContinuousResourceAllocation {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800433 private final ContinuousResource original;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800434 private final ImmutableList<ResourceAllocation> allocations;
435
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700436 ContinuousResourceAllocation(ContinuousResource original,
437 ImmutableList<ResourceAllocation> allocations) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800438 this.original = original;
439 this.allocations = allocations;
440 }
441
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700442 ContinuousResource original() {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800443 return original;
444 }
445
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700446 ImmutableList<ResourceAllocation> allocations() {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800447 return allocations;
448 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700449 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700450}