blob: 6b1443d99fbeed6ecef9e8b8071665e9f02f1355 [file] [log] [blame]
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07001/*
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -08002 * Copyright 2015-2016 Open Networking Laboratory
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.newresource.impl;
17
18import com.google.common.annotations.Beta;
Sho SHIMIZUe7db6142015-11-04 11:24:22 -080019import com.google.common.collect.ImmutableList;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080020import com.google.common.collect.Maps;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080026import org.onlab.util.GuavaCollectors;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080027import org.onlab.util.Tools;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080028import org.onosproject.net.newresource.ResourceAllocation;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070029import org.onosproject.net.newresource.ResourceConsumer;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080030import org.onosproject.net.newresource.ResourceEvent;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080031import org.onosproject.net.newresource.ResourceId;
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080032import org.onosproject.net.newresource.Resource;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070033import org.onosproject.net.newresource.ResourceStore;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080034import org.onosproject.net.newresource.ResourceStoreDelegate;
35import org.onosproject.store.AbstractStore;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070036import org.onosproject.store.serializers.KryoNamespaces;
37import org.onosproject.store.service.ConsistentMap;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080038import org.onosproject.store.service.ConsistentMapException;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070039import org.onosproject.store.service.Serializer;
40import org.onosproject.store.service.StorageService;
41import org.onosproject.store.service.TransactionContext;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070042import org.onosproject.store.service.TransactionalMap;
43import org.onosproject.store.service.Versioned;
44import org.slf4j.Logger;
45import org.slf4j.LoggerFactory;
46
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070047import java.util.Arrays;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070048import java.util.Collection;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070049import java.util.Collections;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070050import java.util.Iterator;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070051import java.util.LinkedHashSet;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070052import java.util.List;
53import java.util.Map;
54import java.util.Optional;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080055import java.util.Set;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070056import java.util.stream.Collectors;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080057import java.util.stream.Stream;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070058
59import static com.google.common.base.Preconditions.checkArgument;
60import static com.google.common.base.Preconditions.checkNotNull;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080061import static org.onosproject.net.newresource.ResourceEvent.Type.*;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070062
63/**
64 * Implementation of ResourceStore using TransactionalMap.
65 */
Sho SHIMIZU9a2b8292015-10-28 13:00:16 -070066@Component(immediate = true)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070067@Service
68@Beta
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080069public class ConsistentResourceStore extends AbstractStore<ResourceEvent, ResourceStoreDelegate>
70 implements ResourceStore {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070071 private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
72
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080073 private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers";
74 private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070075 private static final String CHILD_MAP = "onos-resource-children";
76 private static final Serializer SERIALIZER = Serializer.using(
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080077 Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API),
78 ContinuousResourceAllocation.class);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070079
Thomas Vachuska762a2d82016-01-04 10:25:20 -080080 // TODO: We should provide centralized values for this
81 private static final int MAX_RETRIES = 5;
82 private static final int RETRY_DELAY = 1_000; // millis
83
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070084 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected StorageService service;
86
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080087 private ConsistentMap<Resource.Discrete, ResourceConsumer> discreteConsumers;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080088 private ConsistentMap<ResourceId, ContinuousResourceAllocation> continuousConsumers;
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080089 private ConsistentMap<Resource.Discrete, Set<Resource>> childMap;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070090
91 @Activate
92 public void activate() {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080093 discreteConsumers = service.<Resource.Discrete, ResourceConsumer>consistentMapBuilder()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080094 .withName(DISCRETE_CONSUMER_MAP)
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070095 .withSerializer(SERIALIZER)
96 .build();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080097 continuousConsumers = service.<ResourceId, ContinuousResourceAllocation>consistentMapBuilder()
98 .withName(CONTINUOUS_CONSUMER_MAP)
99 .withSerializer(SERIALIZER)
100 .build();
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800101 childMap = service.<Resource.Discrete, Set<Resource>>consistentMapBuilder()
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700102 .withName(CHILD_MAP)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700103 .withSerializer(SERIALIZER)
104 .build();
Sho SHIMIZUe7db6142015-11-04 11:24:22 -0800105
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800106 Tools.retryable(() -> childMap.put(Resource.ROOT, new LinkedHashSet<>()),
Thomas Vachuska762a2d82016-01-04 10:25:20 -0800107 ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
Madan Jampanic7f49f92015-12-10 11:35:06 -0800108 log.info("Started");
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700109 }
110
111 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800112 public List<ResourceConsumer> getConsumers(Resource resource) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700113 checkNotNull(resource);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800114 checkArgument(resource instanceof Resource.Discrete || resource instanceof Resource.Continuous);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700115
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800116 if (resource instanceof Resource.Discrete) {
117 return getConsumer((Resource.Discrete) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800118 } else {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800119 return getConsumer((Resource.Continuous) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800120 }
121 }
122
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800123 private List<ResourceConsumer> getConsumer(Resource.Discrete resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800124 Versioned<ResourceConsumer> consumer = discreteConsumers.get(resource);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700125 if (consumer == null) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800126 return ImmutableList.of();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700127 }
128
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800129 return ImmutableList.of(consumer.value());
130 }
131
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800132 private List<ResourceConsumer> getConsumer(Resource.Continuous resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800133 Versioned<ContinuousResourceAllocation> allocations = continuousConsumers.get(resource.id());
134 if (allocations == null) {
135 return ImmutableList.of();
136 }
137
138 return allocations.value().allocations().stream()
139 .filter(x -> x.resource().equals(resource))
140 .map(ResourceAllocation::consumer)
141 .collect(GuavaCollectors.toImmutableList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700142 }
143
144 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800145 public boolean register(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700146 checkNotNull(resources);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700147
148 TransactionContext tx = service.transactionContextBuilder().build();
149 tx.begin();
150
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800151 TransactionalMap<Resource.Discrete, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800152 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700153
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800154 Map<Resource.Discrete, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800155 .filter(x -> x.parent().isPresent())
156 .collect(Collectors.groupingBy(x -> x.parent().get()));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700157
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800158 for (Map.Entry<Resource.Discrete, List<Resource>> entry: resourceMap.entrySet()) {
159 Optional<Resource.Discrete> child = lookup(childTxMap, entry.getKey());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800160 if (!child.isPresent()) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800161 return abortTransaction(tx);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700162 }
163
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800164 if (!appendValues(childTxMap, entry.getKey(), entry.getValue())) {
165 return abortTransaction(tx);
166 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700167 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800168
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800169 boolean success = tx.commit();
170 if (success) {
171 List<ResourceEvent> events = resources.stream()
172 .filter(x -> x.parent().isPresent())
173 .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
174 .collect(Collectors.toList());
175 notifyDelegate(events);
176 }
177 return success;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700178 }
179
180 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800181 public boolean unregister(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700182 checkNotNull(resources);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700183
184 TransactionContext tx = service.transactionContextBuilder().build();
185 tx.begin();
186
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800187 TransactionalMap<Resource.Discrete, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800188 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800189 TransactionalMap<Resource.Discrete, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800190 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
191 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
192 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700193
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800194 // Extract Discrete instances from resources
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800195 Map<Resource.Discrete, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800196 .filter(x -> x.parent().isPresent())
197 .collect(Collectors.groupingBy(x -> x.parent().get()));
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700198
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800199 // even if one of the resources is allocated to a consumer,
200 // all unregistrations are regarded as failure
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800201 for (Map.Entry<Resource.Discrete, List<Resource>> entry: resourceMap.entrySet()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800202 boolean allocated = entry.getValue().stream().anyMatch(x -> {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800203 if (x instanceof Resource.Discrete) {
204 return discreteConsumerTxMap.get((Resource.Discrete) x) != null;
205 } else if (x instanceof Resource.Continuous) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800206 ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(x.id());
207 return allocations != null && !allocations.allocations().isEmpty();
208 } else {
209 return false;
210 }
211 });
212 if (allocated) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800213 return abortTransaction(tx);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700214 }
215
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800216 if (!removeValues(childTxMap, entry.getKey(), entry.getValue())) {
217 return abortTransaction(tx);
218 }
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700219 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800220
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800221 boolean success = tx.commit();
222 if (success) {
223 List<ResourceEvent> events = resources.stream()
224 .filter(x -> x.parent().isPresent())
225 .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
226 .collect(Collectors.toList());
227 notifyDelegate(events);
228 }
229 return success;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700230 }
231
232 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800233 public boolean allocate(List<Resource> resources, ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700234 checkNotNull(resources);
235 checkNotNull(consumer);
236
237 TransactionContext tx = service.transactionContextBuilder().build();
238 tx.begin();
239
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800240 TransactionalMap<Resource.Discrete, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800241 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800242 TransactionalMap<Resource.Discrete, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800243 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
244 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
245 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700246
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800247 for (Resource resource: resources) {
248 if (resource instanceof Resource.Discrete) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800249 if (!lookup(childTxMap, resource).isPresent()) {
250 return abortTransaction(tx);
251 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700252
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800253 ResourceConsumer oldValue = discreteConsumerTxMap.put((Resource.Discrete) resource, consumer);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800254 if (oldValue != null) {
255 return abortTransaction(tx);
256 }
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800257 } else if (resource instanceof Resource.Continuous) {
258 Optional<Resource.Continuous> continuous = lookup(childTxMap, (Resource.Continuous) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800259 if (!continuous.isPresent()) {
260 return abortTransaction(tx);
261 }
262
263 ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(continuous.get().id());
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800264 if (!hasEnoughResource(continuous.get(), (Resource.Continuous) resource, allocations)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800265 return abortTransaction(tx);
266 }
267
268 boolean success = appendValue(continuousConsumerTxMap,
269 continuous.get(), new ResourceAllocation(continuous.get(), consumer));
270 if (!success) {
271 return abortTransaction(tx);
272 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800273 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700274 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800275
276 return tx.commit();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700277 }
278
279 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800280 public boolean release(List<Resource> resources, List<ResourceConsumer> consumers) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700281 checkNotNull(resources);
282 checkNotNull(consumers);
283 checkArgument(resources.size() == consumers.size());
284
285 TransactionContext tx = service.transactionContextBuilder().build();
286 tx.begin();
287
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800288 TransactionalMap<Resource.Discrete, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800289 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
290 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
291 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800292 Iterator<Resource> resourceIte = resources.iterator();
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800293 Iterator<ResourceConsumer> consumerIte = consumers.iterator();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700294
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800295 while (resourceIte.hasNext() && consumerIte.hasNext()) {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800296 Resource resource = resourceIte.next();
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800297 ResourceConsumer consumer = consumerIte.next();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700298
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800299 if (resource instanceof Resource.Discrete) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800300 // if this single release fails (because the resource is allocated to another consumer,
301 // the whole release fails
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800302 if (!discreteConsumerTxMap.remove((Resource.Discrete) resource, consumer)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800303 return abortTransaction(tx);
304 }
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800305 } else if (resource instanceof Resource.Continuous) {
306 Resource.Continuous continuous = (Resource.Continuous) resource;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800307 ContinuousResourceAllocation allocation = continuousConsumerTxMap.get(continuous.id());
308 ImmutableList<ResourceAllocation> newAllocations = allocation.allocations().stream()
309 .filter(x -> !(x.consumer().equals(consumer) &&
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800310 ((Resource.Continuous) x.resource()).value() == continuous.value()))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800311 .collect(GuavaCollectors.toImmutableList());
312
313 if (!continuousConsumerTxMap.replace(continuous.id(), allocation,
314 new ContinuousResourceAllocation(allocation.original(), newAllocations))) {
315 return abortTransaction(tx);
316 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700317 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700318 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800319
320 return tx.commit();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700321 }
322
323 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800324 public boolean isAvailable(Resource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800325 checkNotNull(resource);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800326 checkArgument(resource instanceof Resource.Discrete || resource instanceof Resource.Continuous);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800327
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800328 if (resource instanceof Resource.Discrete) {
329 return getConsumer((Resource.Discrete) resource).isEmpty();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800330 } else {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800331 return isAvailable((Resource.Continuous) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800332 }
333 }
334
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800335 private boolean isAvailable(Resource.Continuous resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800336 Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
337 if (allocation == null) {
338 return false;
339 }
340
341 return hasEnoughResource(allocation.value().original(), resource, allocation.value());
342 }
343
344 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800345 public Collection<Resource> getResources(ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700346 checkNotNull(consumer);
347
348 // NOTE: getting all entries may become performance bottleneck
349 // TODO: revisit for better backend data structure
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800350 Stream<Resource.Discrete> discreteStream = discreteConsumers.entrySet().stream()
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700351 .filter(x -> x.getValue().value().equals(consumer))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800352 .map(Map.Entry::getKey);
353
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800354 Stream<Resource.Continuous> continuousStream = continuousConsumers.values().stream()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800355 .flatMap(x -> x.value().allocations().stream()
356 .map(y -> Maps.immutableEntry(x.value().original(), y)))
357 .filter(x -> x.getValue().consumer().equals(consumer))
358 .map(x -> x.getKey());
359
360 return Stream.concat(discreteStream, continuousStream).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700361 }
362
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700363 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800364 public Collection<Resource> getChildResources(Resource parent) {
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700365 checkNotNull(parent);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800366 checkArgument(parent instanceof Resource.Discrete);
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700367
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800368 Versioned<Set<Resource>> children = childMap.get((Resource.Discrete) parent);
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700369 if (children == null) {
370 return Collections.emptyList();
371 }
372
373 return children.value();
374 }
375
376 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800377 public <T> Collection<Resource> getAllocatedResources(Resource parent, Class<T> cls) {
Sho SHIMIZU1f5e5912015-08-10 17:00:00 -0700378 checkNotNull(parent);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700379 checkNotNull(cls);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800380 checkArgument(parent instanceof Resource.Discrete);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700381
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800382 Versioned<Set<Resource>> children = childMap.get((Resource.Discrete) parent);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700383 if (children == null) {
384 return Collections.emptyList();
385 }
386
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800387 Stream<Resource.Discrete> discrete = children.value().stream()
Sho SHIMIZUc9546a32015-11-10 11:22:28 -0800388 .filter(x -> x.last().getClass().equals(cls))
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800389 .filter(x -> x instanceof Resource.Discrete)
390 .map(x -> (Resource.Discrete) x)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800391 .filter(discreteConsumers::containsKey);
392
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800393 Stream<Resource.Continuous> continuous = children.value().stream()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800394 .filter(x -> x.last().getClass().equals(cls))
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800395 .filter(x -> x instanceof Resource.Continuous)
396 .map(x -> (Resource.Continuous) x)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800397 .filter(x -> continuousConsumers.containsKey(x.id()))
398 .filter(x -> continuousConsumers.get(x.id()) != null)
399 .filter(x -> !continuousConsumers.get(x.id()).value().allocations().isEmpty());
400
401 return Stream.concat(discrete, continuous).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700402 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700403
404 /**
405 * Abort the transaction.
406 *
407 * @param tx transaction context
408 * @return always false
409 */
410 private boolean abortTransaction(TransactionContext tx) {
411 tx.abort();
412 return false;
413 }
414
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800415 // Appends the specified ResourceAllocation to the existing values stored in the map
416 private boolean appendValue(TransactionalMap<ResourceId, ContinuousResourceAllocation> map,
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800417 Resource.Continuous original, ResourceAllocation value) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800418 ContinuousResourceAllocation oldValue = map.putIfAbsent(original.id(),
419 new ContinuousResourceAllocation(original, ImmutableList.of(value)));
420 if (oldValue == null) {
421 return true;
422 }
423
424 if (oldValue.allocations().contains(value)) {
425 // don't write to map because all values are already stored
426 return true;
427 }
428
429 ContinuousResourceAllocation newValue = new ContinuousResourceAllocation(original,
430 ImmutableList.<ResourceAllocation>builder()
431 .addAll(oldValue.allocations())
432 .add(value)
433 .build());
434 return map.replace(original.id(), oldValue, newValue);
435 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700436 /**
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700437 * Appends the values to the existing values associated with the specified key.
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700438 * If the map already has all the given values, appending will not happen.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700439 *
440 * @param map map holding multiple values for a key
441 * @param key key specifying values
442 * @param values values to be appended
443 * @param <K> type of the key
444 * @param <V> type of the element of the list
445 * @return true if the operation succeeds, false otherwise.
446 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800447 private <K, V> boolean appendValues(TransactionalMap<K, Set<V>> map, K key, List<V> values) {
448 Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>(values));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700449 if (oldValues == null) {
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800450 return true;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700451 }
452
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800453 if (oldValues.containsAll(values)) {
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700454 // don't write to map because all values are already stored
455 return true;
456 }
457
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800458 LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
459 newValues.addAll(values);
460 return map.replace(key, oldValues, newValues);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700461 }
462
463 /**
Sho SHIMIZUba1f83b2015-10-14 08:11:20 -0700464 * Removes the values from the existing values associated with the specified key.
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700465 * If the map doesn't contain the given values, removal will not happen.
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700466 *
467 * @param map map holding multiple values for a key
468 * @param key key specifying values
469 * @param values values to be removed
470 * @param <K> type of the key
471 * @param <V> type of the element of the list
472 * @return true if the operation succeeds, false otherwise
473 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800474 private <K, V> boolean removeValues(TransactionalMap<K, Set<V>> map, K key, List<? extends V> values) {
475 Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700476 if (oldValues == null) {
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800477 return true;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700478 }
479
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800480 if (values.stream().allMatch(x -> !oldValues.contains(x))) {
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700481 // don't write map because none of the values are stored
482 return true;
483 }
484
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800485 LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
486 newValues.removeAll(values);
487 return map.replace(key, oldValues, newValues);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700488 }
489
490 /**
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800491 * Returns the resource which has the same key as the key of the specified resource
492 * in the list as a value of the map.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700493 *
494 * @param map map storing parent - child relationship of resources
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800495 * @param resource resource to be checked for its key
496 * @return the resource which is regarded as the same as the specified resource
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700497 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800498 // Naive implementation, which traverses all elements in the list
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800499 private <T extends Resource> Optional<T> lookup(
500 TransactionalMap<Resource.Discrete, Set<Resource>> map, T resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800501 // if it is root, always returns itself
Sho SHIMIZUc9546a32015-11-10 11:22:28 -0800502 if (!resource.parent().isPresent()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800503 return Optional.of(resource);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700504 }
505
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800506 Set<Resource> values = map.get(resource.parent().get());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800507 if (values == null) {
508 return Optional.empty();
509 }
510
511 @SuppressWarnings("unchecked")
512 Optional<T> result = values.stream()
513 .filter(x -> x.id().equals(resource.id()))
514 .map(x -> (T) x)
515 .findFirst();
516 return result;
517 }
518
519 /**
520 * Checks if there is enough resource volume to allocated the requested resource
521 * against the specified resource.
522 *
523 * @param original original resource
524 * @param request requested resource
525 * @param allocation current allocation of the resource
526 * @return true if there is enough resource volume. Otherwise, false.
527 */
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800528 private boolean hasEnoughResource(Resource.Continuous original,
529 Resource.Continuous request,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800530 ContinuousResourceAllocation allocation) {
531 if (allocation == null) {
532 return request.value() <= original.value();
533 }
534
535 double allocated = allocation.allocations().stream()
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800536 .filter(x -> x.resource() instanceof Resource.Continuous)
537 .map(x -> (Resource.Continuous) x.resource())
538 .mapToDouble(Resource.Continuous::value)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800539 .sum();
540 double left = original.value() - allocated;
541 return request.value() <= left;
542 }
543
544 // internal use only
545 private static final class ContinuousResourceAllocation {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800546 private final Resource.Continuous original;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800547 private final ImmutableList<ResourceAllocation> allocations;
548
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800549 private ContinuousResourceAllocation(Resource.Continuous original,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800550 ImmutableList<ResourceAllocation> allocations) {
551 this.original = original;
552 this.allocations = allocations;
553 }
554
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800555 private Resource.Continuous original() {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800556 return original;
557 }
558
559 private ImmutableList<ResourceAllocation> allocations() {
560 return allocations;
561 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700562 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700563}