blob: 707ce4b3468b7c2ccceaaa8e5bcde020535a8e34 [file] [log] [blame]
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07001/*
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -08002 * Copyright 2015-2016 Open Networking Laboratory
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.newresource.impl;
17
18import com.google.common.annotations.Beta;
Sho SHIMIZUe7db6142015-11-04 11:24:22 -080019import com.google.common.collect.ImmutableList;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080020import com.google.common.collect.Maps;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080026import org.onlab.util.GuavaCollectors;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080027import org.onlab.util.Tools;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080028import org.onosproject.net.newresource.ResourceAllocation;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070029import org.onosproject.net.newresource.ResourceConsumer;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080030import org.onosproject.net.newresource.ResourceEvent;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080031import org.onosproject.net.newresource.ResourceId;
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080032import org.onosproject.net.newresource.Resource;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070033import org.onosproject.net.newresource.ResourceStore;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080034import org.onosproject.net.newresource.ResourceStoreDelegate;
35import org.onosproject.store.AbstractStore;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070036import org.onosproject.store.serializers.KryoNamespaces;
37import org.onosproject.store.service.ConsistentMap;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080038import org.onosproject.store.service.ConsistentMapException;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070039import org.onosproject.store.service.Serializer;
40import org.onosproject.store.service.StorageService;
41import org.onosproject.store.service.TransactionContext;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070042import org.onosproject.store.service.TransactionalMap;
43import org.onosproject.store.service.Versioned;
44import org.slf4j.Logger;
45import org.slf4j.LoggerFactory;
46
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070047import java.util.Arrays;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070048import java.util.Collection;
49import java.util.Iterator;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070050import java.util.LinkedHashSet;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070051import java.util.List;
52import java.util.Map;
53import java.util.Optional;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080054import java.util.Set;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070055import java.util.stream.Collectors;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080056import java.util.stream.Stream;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070057
58import static com.google.common.base.Preconditions.checkArgument;
59import static com.google.common.base.Preconditions.checkNotNull;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080060import static org.onosproject.net.newresource.ResourceEvent.Type.*;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070061
62/**
63 * Implementation of ResourceStore using TransactionalMap.
64 */
Sho SHIMIZU9a2b8292015-10-28 13:00:16 -070065@Component(immediate = true)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070066@Service
67@Beta
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080068public class ConsistentResourceStore extends AbstractStore<ResourceEvent, ResourceStoreDelegate>
69 implements ResourceStore {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070070 private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
71
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080072 private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers";
73 private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070074 private static final String CHILD_MAP = "onos-resource-children";
75 private static final Serializer SERIALIZER = Serializer.using(
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080076 Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API),
77 ContinuousResourceAllocation.class);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070078
Thomas Vachuska762a2d82016-01-04 10:25:20 -080079 // TODO: We should provide centralized values for this
80 private static final int MAX_RETRIES = 5;
81 private static final int RETRY_DELAY = 1_000; // millis
82
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070083 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected StorageService service;
85
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080086 private ConsistentMap<Resource.Discrete, ResourceConsumer> discreteConsumers;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080087 private ConsistentMap<ResourceId, ContinuousResourceAllocation> continuousConsumers;
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080088 private ConsistentMap<Resource.Discrete, Set<Resource>> childMap;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070089
90 @Activate
91 public void activate() {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080092 discreteConsumers = service.<Resource.Discrete, ResourceConsumer>consistentMapBuilder()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080093 .withName(DISCRETE_CONSUMER_MAP)
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070094 .withSerializer(SERIALIZER)
95 .build();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080096 continuousConsumers = service.<ResourceId, ContinuousResourceAllocation>consistentMapBuilder()
97 .withName(CONTINUOUS_CONSUMER_MAP)
98 .withSerializer(SERIALIZER)
99 .build();
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800100 childMap = service.<Resource.Discrete, Set<Resource>>consistentMapBuilder()
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700101 .withName(CHILD_MAP)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700102 .withSerializer(SERIALIZER)
103 .build();
Sho SHIMIZUe7db6142015-11-04 11:24:22 -0800104
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800105 Tools.retryable(() -> childMap.put(Resource.ROOT, new LinkedHashSet<>()),
Thomas Vachuska762a2d82016-01-04 10:25:20 -0800106 ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
Madan Jampanic7f49f92015-12-10 11:35:06 -0800107 log.info("Started");
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700108 }
109
110 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800111 public List<ResourceConsumer> getConsumers(Resource resource) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700112 checkNotNull(resource);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800113 checkArgument(resource instanceof Resource.Discrete || resource instanceof Resource.Continuous);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700114
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800115 if (resource instanceof Resource.Discrete) {
116 return getConsumer((Resource.Discrete) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800117 } else {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800118 return getConsumer((Resource.Continuous) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800119 }
120 }
121
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800122 private List<ResourceConsumer> getConsumer(Resource.Discrete resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800123 Versioned<ResourceConsumer> consumer = discreteConsumers.get(resource);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700124 if (consumer == null) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800125 return ImmutableList.of();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700126 }
127
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800128 return ImmutableList.of(consumer.value());
129 }
130
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800131 private List<ResourceConsumer> getConsumer(Resource.Continuous resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800132 Versioned<ContinuousResourceAllocation> allocations = continuousConsumers.get(resource.id());
133 if (allocations == null) {
134 return ImmutableList.of();
135 }
136
137 return allocations.value().allocations().stream()
138 .filter(x -> x.resource().equals(resource))
139 .map(ResourceAllocation::consumer)
140 .collect(GuavaCollectors.toImmutableList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700141 }
142
143 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800144 public boolean register(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700145 checkNotNull(resources);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700146
147 TransactionContext tx = service.transactionContextBuilder().build();
148 tx.begin();
149
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800150 TransactionalMap<Resource.Discrete, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800151 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700152
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800153 Map<Resource.Discrete, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800154 .filter(x -> x.parent().isPresent())
155 .collect(Collectors.groupingBy(x -> x.parent().get()));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700156
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800157 for (Map.Entry<Resource.Discrete, List<Resource>> entry: resourceMap.entrySet()) {
158 Optional<Resource.Discrete> child = lookup(childTxMap, entry.getKey());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800159 if (!child.isPresent()) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800160 return abortTransaction(tx);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700161 }
162
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800163 if (!appendValues(childTxMap, entry.getKey(), entry.getValue())) {
164 return abortTransaction(tx);
165 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700166 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800167
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800168 boolean success = tx.commit();
169 if (success) {
170 List<ResourceEvent> events = resources.stream()
171 .filter(x -> x.parent().isPresent())
172 .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
173 .collect(Collectors.toList());
174 notifyDelegate(events);
175 }
176 return success;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700177 }
178
179 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800180 public boolean unregister(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700181 checkNotNull(resources);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700182
183 TransactionContext tx = service.transactionContextBuilder().build();
184 tx.begin();
185
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800186 TransactionalMap<Resource.Discrete, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800187 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800188 TransactionalMap<Resource.Discrete, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800189 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
190 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
191 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700192
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800193 // Extract Discrete instances from resources
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800194 Map<Resource.Discrete, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800195 .filter(x -> x.parent().isPresent())
196 .collect(Collectors.groupingBy(x -> x.parent().get()));
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700197
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800198 // even if one of the resources is allocated to a consumer,
199 // all unregistrations are regarded as failure
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800200 for (Map.Entry<Resource.Discrete, List<Resource>> entry: resourceMap.entrySet()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800201 boolean allocated = entry.getValue().stream().anyMatch(x -> {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800202 if (x instanceof Resource.Discrete) {
203 return discreteConsumerTxMap.get((Resource.Discrete) x) != null;
204 } else if (x instanceof Resource.Continuous) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800205 ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(x.id());
206 return allocations != null && !allocations.allocations().isEmpty();
207 } else {
208 return false;
209 }
210 });
211 if (allocated) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800212 return abortTransaction(tx);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700213 }
214
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800215 if (!removeValues(childTxMap, entry.getKey(), entry.getValue())) {
216 return abortTransaction(tx);
217 }
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700218 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800219
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800220 boolean success = tx.commit();
221 if (success) {
222 List<ResourceEvent> events = resources.stream()
223 .filter(x -> x.parent().isPresent())
224 .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
225 .collect(Collectors.toList());
226 notifyDelegate(events);
227 }
228 return success;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700229 }
230
231 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800232 public boolean allocate(List<Resource> resources, ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700233 checkNotNull(resources);
234 checkNotNull(consumer);
235
236 TransactionContext tx = service.transactionContextBuilder().build();
237 tx.begin();
238
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800239 TransactionalMap<Resource.Discrete, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800240 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800241 TransactionalMap<Resource.Discrete, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800242 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
243 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
244 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700245
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800246 for (Resource resource: resources) {
247 if (resource instanceof Resource.Discrete) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800248 if (!lookup(childTxMap, resource).isPresent()) {
249 return abortTransaction(tx);
250 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700251
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800252 ResourceConsumer oldValue = discreteConsumerTxMap.put((Resource.Discrete) resource, consumer);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800253 if (oldValue != null) {
254 return abortTransaction(tx);
255 }
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800256 } else if (resource instanceof Resource.Continuous) {
257 Optional<Resource.Continuous> continuous = lookup(childTxMap, (Resource.Continuous) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800258 if (!continuous.isPresent()) {
259 return abortTransaction(tx);
260 }
261
262 ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(continuous.get().id());
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800263 if (!hasEnoughResource(continuous.get(), (Resource.Continuous) resource, allocations)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800264 return abortTransaction(tx);
265 }
266
267 boolean success = appendValue(continuousConsumerTxMap,
268 continuous.get(), new ResourceAllocation(continuous.get(), consumer));
269 if (!success) {
270 return abortTransaction(tx);
271 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800272 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700273 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800274
275 return tx.commit();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700276 }
277
278 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800279 public boolean release(List<Resource> resources, List<ResourceConsumer> consumers) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700280 checkNotNull(resources);
281 checkNotNull(consumers);
282 checkArgument(resources.size() == consumers.size());
283
284 TransactionContext tx = service.transactionContextBuilder().build();
285 tx.begin();
286
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800287 TransactionalMap<Resource.Discrete, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800288 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
289 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
290 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800291 Iterator<Resource> resourceIte = resources.iterator();
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800292 Iterator<ResourceConsumer> consumerIte = consumers.iterator();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700293
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800294 while (resourceIte.hasNext() && consumerIte.hasNext()) {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800295 Resource resource = resourceIte.next();
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800296 ResourceConsumer consumer = consumerIte.next();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700297
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800298 if (resource instanceof Resource.Discrete) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800299 // if this single release fails (because the resource is allocated to another consumer,
300 // the whole release fails
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800301 if (!discreteConsumerTxMap.remove((Resource.Discrete) resource, consumer)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800302 return abortTransaction(tx);
303 }
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800304 } else if (resource instanceof Resource.Continuous) {
305 Resource.Continuous continuous = (Resource.Continuous) resource;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800306 ContinuousResourceAllocation allocation = continuousConsumerTxMap.get(continuous.id());
307 ImmutableList<ResourceAllocation> newAllocations = allocation.allocations().stream()
308 .filter(x -> !(x.consumer().equals(consumer) &&
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800309 ((Resource.Continuous) x.resource()).value() == continuous.value()))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800310 .collect(GuavaCollectors.toImmutableList());
311
312 if (!continuousConsumerTxMap.replace(continuous.id(), allocation,
313 new ContinuousResourceAllocation(allocation.original(), newAllocations))) {
314 return abortTransaction(tx);
315 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700316 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700317 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800318
319 return tx.commit();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700320 }
321
322 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800323 public boolean isAvailable(Resource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800324 checkNotNull(resource);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800325 checkArgument(resource instanceof Resource.Discrete || resource instanceof Resource.Continuous);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800326
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800327 if (resource instanceof Resource.Discrete) {
328 return getConsumer((Resource.Discrete) resource).isEmpty();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800329 } else {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800330 return isAvailable((Resource.Continuous) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800331 }
332 }
333
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800334 private boolean isAvailable(Resource.Continuous resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800335 Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
336 if (allocation == null) {
337 return false;
338 }
339
340 return hasEnoughResource(allocation.value().original(), resource, allocation.value());
341 }
342
343 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800344 public Collection<Resource> getResources(ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700345 checkNotNull(consumer);
346
347 // NOTE: getting all entries may become performance bottleneck
348 // TODO: revisit for better backend data structure
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800349 Stream<Resource.Discrete> discreteStream = discreteConsumers.entrySet().stream()
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700350 .filter(x -> x.getValue().value().equals(consumer))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800351 .map(Map.Entry::getKey);
352
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800353 Stream<Resource.Continuous> continuousStream = continuousConsumers.values().stream()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800354 .flatMap(x -> x.value().allocations().stream()
355 .map(y -> Maps.immutableEntry(x.value().original(), y)))
356 .filter(x -> x.getValue().consumer().equals(consumer))
357 .map(x -> x.getKey());
358
359 return Stream.concat(discreteStream, continuousStream).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700360 }
361
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700362 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800363 public Collection<Resource> getChildResources(Resource parent) {
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700364 checkNotNull(parent);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800365 checkArgument(parent instanceof Resource.Discrete);
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700366
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800367 Versioned<Set<Resource>> children = childMap.get((Resource.Discrete) parent);
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700368 if (children == null) {
Sho SHIMIZU2c0ae122016-01-20 13:14:38 -0800369 return ImmutableList.of();
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700370 }
371
372 return children.value();
373 }
374
375 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800376 public <T> Collection<Resource> getAllocatedResources(Resource parent, Class<T> cls) {
Sho SHIMIZU1f5e5912015-08-10 17:00:00 -0700377 checkNotNull(parent);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700378 checkNotNull(cls);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800379 checkArgument(parent instanceof Resource.Discrete);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700380
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800381 Versioned<Set<Resource>> children = childMap.get((Resource.Discrete) parent);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700382 if (children == null) {
Sho SHIMIZU2c0ae122016-01-20 13:14:38 -0800383 return ImmutableList.of();
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700384 }
385
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800386 Stream<Resource.Discrete> discrete = children.value().stream()
Sho SHIMIZUc9546a32015-11-10 11:22:28 -0800387 .filter(x -> x.last().getClass().equals(cls))
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800388 .filter(x -> x instanceof Resource.Discrete)
389 .map(x -> (Resource.Discrete) x)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800390 .filter(discreteConsumers::containsKey);
391
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800392 Stream<Resource.Continuous> continuous = children.value().stream()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800393 .filter(x -> x.last().getClass().equals(cls))
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800394 .filter(x -> x instanceof Resource.Continuous)
395 .map(x -> (Resource.Continuous) x)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800396 .filter(x -> continuousConsumers.containsKey(x.id()))
397 .filter(x -> continuousConsumers.get(x.id()) != null)
398 .filter(x -> !continuousConsumers.get(x.id()).value().allocations().isEmpty());
399
400 return Stream.concat(discrete, continuous).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700401 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700402
403 /**
404 * Abort the transaction.
405 *
406 * @param tx transaction context
407 * @return always false
408 */
409 private boolean abortTransaction(TransactionContext tx) {
410 tx.abort();
411 return false;
412 }
413
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800414 // Appends the specified ResourceAllocation to the existing values stored in the map
415 private boolean appendValue(TransactionalMap<ResourceId, ContinuousResourceAllocation> map,
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800416 Resource.Continuous original, ResourceAllocation value) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800417 ContinuousResourceAllocation oldValue = map.putIfAbsent(original.id(),
418 new ContinuousResourceAllocation(original, ImmutableList.of(value)));
419 if (oldValue == null) {
420 return true;
421 }
422
423 if (oldValue.allocations().contains(value)) {
424 // don't write to map because all values are already stored
425 return true;
426 }
427
428 ContinuousResourceAllocation newValue = new ContinuousResourceAllocation(original,
429 ImmutableList.<ResourceAllocation>builder()
430 .addAll(oldValue.allocations())
431 .add(value)
432 .build());
433 return map.replace(original.id(), oldValue, newValue);
434 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700435 /**
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700436 * Appends the values to the existing values associated with the specified key.
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700437 * If the map already has all the given values, appending will not happen.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700438 *
439 * @param map map holding multiple values for a key
440 * @param key key specifying values
441 * @param values values to be appended
442 * @param <K> type of the key
443 * @param <V> type of the element of the list
444 * @return true if the operation succeeds, false otherwise.
445 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800446 private <K, V> boolean appendValues(TransactionalMap<K, Set<V>> map, K key, List<V> values) {
447 Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>(values));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700448 if (oldValues == null) {
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800449 return true;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700450 }
451
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800452 if (oldValues.containsAll(values)) {
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700453 // don't write to map because all values are already stored
454 return true;
455 }
456
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800457 LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
458 newValues.addAll(values);
459 return map.replace(key, oldValues, newValues);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700460 }
461
462 /**
Sho SHIMIZUba1f83b2015-10-14 08:11:20 -0700463 * Removes the values from the existing values associated with the specified key.
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700464 * If the map doesn't contain the given values, removal will not happen.
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700465 *
466 * @param map map holding multiple values for a key
467 * @param key key specifying values
468 * @param values values to be removed
469 * @param <K> type of the key
470 * @param <V> type of the element of the list
471 * @return true if the operation succeeds, false otherwise
472 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800473 private <K, V> boolean removeValues(TransactionalMap<K, Set<V>> map, K key, List<? extends V> values) {
474 Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700475 if (oldValues == null) {
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800476 return true;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700477 }
478
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800479 if (values.stream().allMatch(x -> !oldValues.contains(x))) {
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700480 // don't write map because none of the values are stored
481 return true;
482 }
483
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800484 LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
485 newValues.removeAll(values);
486 return map.replace(key, oldValues, newValues);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700487 }
488
489 /**
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800490 * Returns the resource which has the same key as the key of the specified resource
491 * in the list as a value of the map.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700492 *
493 * @param map map storing parent - child relationship of resources
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800494 * @param resource resource to be checked for its key
495 * @return the resource which is regarded as the same as the specified resource
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700496 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800497 // Naive implementation, which traverses all elements in the list
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800498 private <T extends Resource> Optional<T> lookup(
499 TransactionalMap<Resource.Discrete, Set<Resource>> map, T resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800500 // if it is root, always returns itself
Sho SHIMIZUc9546a32015-11-10 11:22:28 -0800501 if (!resource.parent().isPresent()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800502 return Optional.of(resource);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700503 }
504
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800505 Set<Resource> values = map.get(resource.parent().get());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800506 if (values == null) {
507 return Optional.empty();
508 }
509
510 @SuppressWarnings("unchecked")
511 Optional<T> result = values.stream()
512 .filter(x -> x.id().equals(resource.id()))
513 .map(x -> (T) x)
514 .findFirst();
515 return result;
516 }
517
518 /**
519 * Checks if there is enough resource volume to allocated the requested resource
520 * against the specified resource.
521 *
522 * @param original original resource
523 * @param request requested resource
524 * @param allocation current allocation of the resource
525 * @return true if there is enough resource volume. Otherwise, false.
526 */
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800527 private boolean hasEnoughResource(Resource.Continuous original,
528 Resource.Continuous request,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800529 ContinuousResourceAllocation allocation) {
530 if (allocation == null) {
531 return request.value() <= original.value();
532 }
533
534 double allocated = allocation.allocations().stream()
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800535 .filter(x -> x.resource() instanceof Resource.Continuous)
536 .map(x -> (Resource.Continuous) x.resource())
537 .mapToDouble(Resource.Continuous::value)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800538 .sum();
539 double left = original.value() - allocated;
540 return request.value() <= left;
541 }
542
543 // internal use only
544 private static final class ContinuousResourceAllocation {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800545 private final Resource.Continuous original;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800546 private final ImmutableList<ResourceAllocation> allocations;
547
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800548 private ContinuousResourceAllocation(Resource.Continuous original,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800549 ImmutableList<ResourceAllocation> allocations) {
550 this.original = original;
551 this.allocations = allocations;
552 }
553
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800554 private Resource.Continuous original() {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800555 return original;
556 }
557
558 private ImmutableList<ResourceAllocation> allocations() {
559 return allocations;
560 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700561 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700562}