blob: 817f8d2b868929da7eb421e1ee50cfafe99ecc0d [file] [log] [blame]
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07001/*
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -08002 * Copyright 2015-2016 Open Networking Laboratory
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.newresource.impl;
17
18import com.google.common.annotations.Beta;
Sho SHIMIZUe7db6142015-11-04 11:24:22 -080019import com.google.common.collect.ImmutableList;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080020import com.google.common.collect.Maps;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080026import org.onlab.util.GuavaCollectors;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080027import org.onlab.util.Tools;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080028import org.onosproject.net.newresource.ResourceAllocation;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070029import org.onosproject.net.newresource.ResourceConsumer;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080030import org.onosproject.net.newresource.ResourceEvent;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080031import org.onosproject.net.newresource.ResourceId;
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080032import org.onosproject.net.newresource.Resource;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070033import org.onosproject.net.newresource.ResourceStore;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080034import org.onosproject.net.newresource.ResourceStoreDelegate;
35import org.onosproject.store.AbstractStore;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070036import org.onosproject.store.serializers.KryoNamespaces;
37import org.onosproject.store.service.ConsistentMap;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080038import org.onosproject.store.service.ConsistentMapException;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070039import org.onosproject.store.service.Serializer;
40import org.onosproject.store.service.StorageService;
41import org.onosproject.store.service.TransactionContext;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070042import org.onosproject.store.service.TransactionalMap;
43import org.onosproject.store.service.Versioned;
44import org.slf4j.Logger;
45import org.slf4j.LoggerFactory;
46
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070047import java.util.Arrays;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070048import java.util.Collection;
49import java.util.Iterator;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070050import java.util.LinkedHashSet;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070051import java.util.List;
52import java.util.Map;
53import java.util.Optional;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080054import java.util.Set;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070055import java.util.stream.Collectors;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080056import java.util.stream.Stream;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070057
58import static com.google.common.base.Preconditions.checkArgument;
59import static com.google.common.base.Preconditions.checkNotNull;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080060import static org.onosproject.net.newresource.ResourceEvent.Type.*;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070061
62/**
63 * Implementation of ResourceStore using TransactionalMap.
64 */
Sho SHIMIZU9a2b8292015-10-28 13:00:16 -070065@Component(immediate = true)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070066@Service
67@Beta
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080068public class ConsistentResourceStore extends AbstractStore<ResourceEvent, ResourceStoreDelegate>
69 implements ResourceStore {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070070 private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
71
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080072 private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers";
73 private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070074 private static final String CHILD_MAP = "onos-resource-children";
75 private static final Serializer SERIALIZER = Serializer.using(
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080076 Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API),
77 ContinuousResourceAllocation.class);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070078
Thomas Vachuska762a2d82016-01-04 10:25:20 -080079 // TODO: We should provide centralized values for this
80 private static final int MAX_RETRIES = 5;
81 private static final int RETRY_DELAY = 1_000; // millis
82
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070083 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected StorageService service;
85
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080086 private ConsistentMap<Resource.Discrete, ResourceConsumer> discreteConsumers;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080087 private ConsistentMap<ResourceId, ContinuousResourceAllocation> continuousConsumers;
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080088 private ConsistentMap<Resource.Discrete, Set<Resource>> childMap;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070089
90 @Activate
91 public void activate() {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080092 discreteConsumers = service.<Resource.Discrete, ResourceConsumer>consistentMapBuilder()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080093 .withName(DISCRETE_CONSUMER_MAP)
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070094 .withSerializer(SERIALIZER)
95 .build();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080096 continuousConsumers = service.<ResourceId, ContinuousResourceAllocation>consistentMapBuilder()
97 .withName(CONTINUOUS_CONSUMER_MAP)
98 .withSerializer(SERIALIZER)
99 .build();
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800100 childMap = service.<Resource.Discrete, Set<Resource>>consistentMapBuilder()
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700101 .withName(CHILD_MAP)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700102 .withSerializer(SERIALIZER)
103 .build();
Sho SHIMIZUe7db6142015-11-04 11:24:22 -0800104
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800105 Tools.retryable(() -> childMap.put(Resource.ROOT, new LinkedHashSet<>()),
Thomas Vachuska762a2d82016-01-04 10:25:20 -0800106 ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
Madan Jampanic7f49f92015-12-10 11:35:06 -0800107 log.info("Started");
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700108 }
109
110 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800111 public List<ResourceConsumer> getConsumers(Resource resource) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700112 checkNotNull(resource);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800113 checkArgument(resource instanceof Resource.Discrete || resource instanceof Resource.Continuous);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700114
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800115 if (resource instanceof Resource.Discrete) {
116 return getConsumer((Resource.Discrete) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800117 } else {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800118 return getConsumer((Resource.Continuous) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800119 }
120 }
121
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800122 private List<ResourceConsumer> getConsumer(Resource.Discrete resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800123 Versioned<ResourceConsumer> consumer = discreteConsumers.get(resource);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700124 if (consumer == null) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800125 return ImmutableList.of();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700126 }
127
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800128 return ImmutableList.of(consumer.value());
129 }
130
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800131 private List<ResourceConsumer> getConsumer(Resource.Continuous resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800132 Versioned<ContinuousResourceAllocation> allocations = continuousConsumers.get(resource.id());
133 if (allocations == null) {
134 return ImmutableList.of();
135 }
136
137 return allocations.value().allocations().stream()
138 .filter(x -> x.resource().equals(resource))
139 .map(ResourceAllocation::consumer)
140 .collect(GuavaCollectors.toImmutableList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700141 }
142
143 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800144 public boolean register(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700145 checkNotNull(resources);
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800146 if (log.isTraceEnabled()) {
147 resources.forEach(r -> log.trace("registering {}", r));
148 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700149
150 TransactionContext tx = service.transactionContextBuilder().build();
151 tx.begin();
152
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800153 TransactionalMap<Resource.Discrete, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800154 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700155
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800156 Map<Resource.Discrete, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800157 .filter(x -> x.parent().isPresent())
158 .collect(Collectors.groupingBy(x -> x.parent().get()));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700159
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800160 for (Map.Entry<Resource.Discrete, List<Resource>> entry: resourceMap.entrySet()) {
161 Optional<Resource.Discrete> child = lookup(childTxMap, entry.getKey());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800162 if (!child.isPresent()) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800163 return abortTransaction(tx);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700164 }
165
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800166 if (!appendValues(childTxMap, entry.getKey(), entry.getValue())) {
167 return abortTransaction(tx);
168 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700169 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800170
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800171 boolean success = tx.commit();
172 if (success) {
173 List<ResourceEvent> events = resources.stream()
174 .filter(x -> x.parent().isPresent())
175 .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
176 .collect(Collectors.toList());
177 notifyDelegate(events);
178 }
179 return success;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700180 }
181
182 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800183 public boolean unregister(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700184 checkNotNull(resources);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700185
186 TransactionContext tx = service.transactionContextBuilder().build();
187 tx.begin();
188
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800189 TransactionalMap<Resource.Discrete, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800190 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800191 TransactionalMap<Resource.Discrete, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800192 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
193 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
194 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700195
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800196 // Extract Discrete instances from resources
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800197 Map<Resource.Discrete, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800198 .filter(x -> x.parent().isPresent())
199 .collect(Collectors.groupingBy(x -> x.parent().get()));
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700200
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800201 // even if one of the resources is allocated to a consumer,
202 // all unregistrations are regarded as failure
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800203 for (Map.Entry<Resource.Discrete, List<Resource>> entry: resourceMap.entrySet()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800204 boolean allocated = entry.getValue().stream().anyMatch(x -> {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800205 if (x instanceof Resource.Discrete) {
206 return discreteConsumerTxMap.get((Resource.Discrete) x) != null;
207 } else if (x instanceof Resource.Continuous) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800208 ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(x.id());
209 return allocations != null && !allocations.allocations().isEmpty();
210 } else {
211 return false;
212 }
213 });
214 if (allocated) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800215 return abortTransaction(tx);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700216 }
217
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800218 if (!removeValues(childTxMap, entry.getKey(), entry.getValue())) {
219 return abortTransaction(tx);
220 }
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700221 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800222
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800223 boolean success = tx.commit();
224 if (success) {
225 List<ResourceEvent> events = resources.stream()
226 .filter(x -> x.parent().isPresent())
227 .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
228 .collect(Collectors.toList());
229 notifyDelegate(events);
230 }
231 return success;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700232 }
233
234 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800235 public boolean allocate(List<Resource> resources, ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700236 checkNotNull(resources);
237 checkNotNull(consumer);
238
239 TransactionContext tx = service.transactionContextBuilder().build();
240 tx.begin();
241
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800242 TransactionalMap<Resource.Discrete, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800243 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800244 TransactionalMap<Resource.Discrete, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800245 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
246 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
247 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700248
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800249 for (Resource resource: resources) {
250 if (resource instanceof Resource.Discrete) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800251 if (!lookup(childTxMap, resource).isPresent()) {
252 return abortTransaction(tx);
253 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700254
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800255 ResourceConsumer oldValue = discreteConsumerTxMap.put((Resource.Discrete) resource, consumer);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800256 if (oldValue != null) {
257 return abortTransaction(tx);
258 }
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800259 } else if (resource instanceof Resource.Continuous) {
260 Optional<Resource.Continuous> continuous = lookup(childTxMap, (Resource.Continuous) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800261 if (!continuous.isPresent()) {
262 return abortTransaction(tx);
263 }
264
265 ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(continuous.get().id());
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800266 if (!hasEnoughResource(continuous.get(), (Resource.Continuous) resource, allocations)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800267 return abortTransaction(tx);
268 }
269
270 boolean success = appendValue(continuousConsumerTxMap,
271 continuous.get(), new ResourceAllocation(continuous.get(), consumer));
272 if (!success) {
273 return abortTransaction(tx);
274 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800275 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700276 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800277
278 return tx.commit();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700279 }
280
281 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800282 public boolean release(List<Resource> resources, List<ResourceConsumer> consumers) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700283 checkNotNull(resources);
284 checkNotNull(consumers);
285 checkArgument(resources.size() == consumers.size());
286
287 TransactionContext tx = service.transactionContextBuilder().build();
288 tx.begin();
289
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800290 TransactionalMap<Resource.Discrete, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800291 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
292 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
293 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800294 Iterator<Resource> resourceIte = resources.iterator();
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800295 Iterator<ResourceConsumer> consumerIte = consumers.iterator();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700296
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800297 while (resourceIte.hasNext() && consumerIte.hasNext()) {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800298 Resource resource = resourceIte.next();
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800299 ResourceConsumer consumer = consumerIte.next();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700300
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800301 if (resource instanceof Resource.Discrete) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800302 // if this single release fails (because the resource is allocated to another consumer,
303 // the whole release fails
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800304 if (!discreteConsumerTxMap.remove((Resource.Discrete) resource, consumer)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800305 return abortTransaction(tx);
306 }
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800307 } else if (resource instanceof Resource.Continuous) {
308 Resource.Continuous continuous = (Resource.Continuous) resource;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800309 ContinuousResourceAllocation allocation = continuousConsumerTxMap.get(continuous.id());
310 ImmutableList<ResourceAllocation> newAllocations = allocation.allocations().stream()
311 .filter(x -> !(x.consumer().equals(consumer) &&
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800312 ((Resource.Continuous) x.resource()).value() == continuous.value()))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800313 .collect(GuavaCollectors.toImmutableList());
314
315 if (!continuousConsumerTxMap.replace(continuous.id(), allocation,
316 new ContinuousResourceAllocation(allocation.original(), newAllocations))) {
317 return abortTransaction(tx);
318 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700319 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700320 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800321
322 return tx.commit();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700323 }
324
325 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800326 public boolean isAvailable(Resource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800327 checkNotNull(resource);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800328 checkArgument(resource instanceof Resource.Discrete || resource instanceof Resource.Continuous);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800329
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800330 // check if it's registered or not.
331 Versioned<Set<Resource>> v = childMap.get(resource.parent().get());
332 if (v == null || !v.value().contains(resource)) {
333 return false;
334 }
335
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800336 if (resource instanceof Resource.Discrete) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800337 // check if already consumed
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800338 return getConsumer((Resource.Discrete) resource).isEmpty();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800339 } else {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800340 Resource.Continuous requested = (Resource.Continuous) resource;
341 Resource.Continuous registered = v.value().stream()
342 .filter(c -> c.equals(resource))
343 .findFirst()
344 .map(c -> (Resource.Continuous) c)
345 .get();
346 if (registered.value() < requested.value()) {
347 // Capacity < requested, can never satisfy
348 return false;
349 }
350 // check if there's enough left
351 return isAvailable(requested);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800352 }
353 }
354
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800355 private boolean isAvailable(Resource.Continuous resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800356 Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
357 if (allocation == null) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800358 // no allocation (=no consumer) full registered resources available
359 return true;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800360 }
361
362 return hasEnoughResource(allocation.value().original(), resource, allocation.value());
363 }
364
365 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800366 public Collection<Resource> getResources(ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700367 checkNotNull(consumer);
368
369 // NOTE: getting all entries may become performance bottleneck
370 // TODO: revisit for better backend data structure
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800371 Stream<Resource.Discrete> discreteStream = discreteConsumers.entrySet().stream()
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700372 .filter(x -> x.getValue().value().equals(consumer))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800373 .map(Map.Entry::getKey);
374
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800375 Stream<Resource.Continuous> continuousStream = continuousConsumers.values().stream()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800376 .flatMap(x -> x.value().allocations().stream()
377 .map(y -> Maps.immutableEntry(x.value().original(), y)))
378 .filter(x -> x.getValue().consumer().equals(consumer))
379 .map(x -> x.getKey());
380
381 return Stream.concat(discreteStream, continuousStream).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700382 }
383
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700384 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800385 public Collection<Resource> getChildResources(Resource parent) {
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700386 checkNotNull(parent);
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800387 if (!(parent instanceof Resource.Discrete)) {
388 // only Discrete resource can have child resource
389 return ImmutableList.of();
390 }
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700391
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800392 Versioned<Set<Resource>> children = childMap.get((Resource.Discrete) parent);
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700393 if (children == null) {
Sho SHIMIZU2c0ae122016-01-20 13:14:38 -0800394 return ImmutableList.of();
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700395 }
396
397 return children.value();
398 }
399
400 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800401 public <T> Collection<Resource> getAllocatedResources(Resource parent, Class<T> cls) {
Sho SHIMIZU1f5e5912015-08-10 17:00:00 -0700402 checkNotNull(parent);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700403 checkNotNull(cls);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800404 checkArgument(parent instanceof Resource.Discrete);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700405
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800406 Versioned<Set<Resource>> children = childMap.get((Resource.Discrete) parent);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700407 if (children == null) {
Sho SHIMIZU2c0ae122016-01-20 13:14:38 -0800408 return ImmutableList.of();
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700409 }
410
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800411 Stream<Resource.Discrete> discrete = children.value().stream()
Sho SHIMIZUc9546a32015-11-10 11:22:28 -0800412 .filter(x -> x.last().getClass().equals(cls))
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800413 .filter(x -> x instanceof Resource.Discrete)
414 .map(x -> (Resource.Discrete) x)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800415 .filter(discreteConsumers::containsKey);
416
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800417 Stream<Resource.Continuous> continuous = children.value().stream()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800418 .filter(x -> x.last().getClass().equals(cls))
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800419 .filter(x -> x instanceof Resource.Continuous)
420 .map(x -> (Resource.Continuous) x)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800421 .filter(x -> continuousConsumers.containsKey(x.id()))
422 .filter(x -> continuousConsumers.get(x.id()) != null)
423 .filter(x -> !continuousConsumers.get(x.id()).value().allocations().isEmpty());
424
425 return Stream.concat(discrete, continuous).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700426 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700427
428 /**
429 * Abort the transaction.
430 *
431 * @param tx transaction context
432 * @return always false
433 */
434 private boolean abortTransaction(TransactionContext tx) {
435 tx.abort();
436 return false;
437 }
438
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800439 // Appends the specified ResourceAllocation to the existing values stored in the map
440 private boolean appendValue(TransactionalMap<ResourceId, ContinuousResourceAllocation> map,
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800441 Resource.Continuous original, ResourceAllocation value) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800442 ContinuousResourceAllocation oldValue = map.putIfAbsent(original.id(),
443 new ContinuousResourceAllocation(original, ImmutableList.of(value)));
444 if (oldValue == null) {
445 return true;
446 }
447
448 if (oldValue.allocations().contains(value)) {
449 // don't write to map because all values are already stored
450 return true;
451 }
452
453 ContinuousResourceAllocation newValue = new ContinuousResourceAllocation(original,
454 ImmutableList.<ResourceAllocation>builder()
455 .addAll(oldValue.allocations())
456 .add(value)
457 .build());
458 return map.replace(original.id(), oldValue, newValue);
459 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700460 /**
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700461 * Appends the values to the existing values associated with the specified key.
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700462 * If the map already has all the given values, appending will not happen.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700463 *
464 * @param map map holding multiple values for a key
465 * @param key key specifying values
466 * @param values values to be appended
467 * @param <K> type of the key
468 * @param <V> type of the element of the list
469 * @return true if the operation succeeds, false otherwise.
470 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800471 private <K, V> boolean appendValues(TransactionalMap<K, Set<V>> map, K key, List<V> values) {
472 Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>(values));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700473 if (oldValues == null) {
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800474 return true;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700475 }
476
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800477 if (oldValues.containsAll(values)) {
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700478 // don't write to map because all values are already stored
479 return true;
480 }
481
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800482 LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
483 newValues.addAll(values);
484 return map.replace(key, oldValues, newValues);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700485 }
486
487 /**
Sho SHIMIZUba1f83b2015-10-14 08:11:20 -0700488 * Removes the values from the existing values associated with the specified key.
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700489 * If the map doesn't contain the given values, removal will not happen.
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700490 *
491 * @param map map holding multiple values for a key
492 * @param key key specifying values
493 * @param values values to be removed
494 * @param <K> type of the key
495 * @param <V> type of the element of the list
496 * @return true if the operation succeeds, false otherwise
497 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800498 private <K, V> boolean removeValues(TransactionalMap<K, Set<V>> map, K key, List<? extends V> values) {
499 Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700500 if (oldValues == null) {
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800501 return true;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700502 }
503
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800504 if (values.stream().allMatch(x -> !oldValues.contains(x))) {
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700505 // don't write map because none of the values are stored
506 return true;
507 }
508
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800509 LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
510 newValues.removeAll(values);
511 return map.replace(key, oldValues, newValues);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700512 }
513
514 /**
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800515 * Returns the resource which has the same key as the key of the specified resource
516 * in the list as a value of the map.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700517 *
518 * @param map map storing parent - child relationship of resources
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800519 * @param resource resource to be checked for its key
520 * @return the resource which is regarded as the same as the specified resource
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700521 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800522 // Naive implementation, which traverses all elements in the list
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800523 private <T extends Resource> Optional<T> lookup(
524 TransactionalMap<Resource.Discrete, Set<Resource>> map, T resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800525 // if it is root, always returns itself
Sho SHIMIZUc9546a32015-11-10 11:22:28 -0800526 if (!resource.parent().isPresent()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800527 return Optional.of(resource);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700528 }
529
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800530 Set<Resource> values = map.get(resource.parent().get());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800531 if (values == null) {
532 return Optional.empty();
533 }
534
535 @SuppressWarnings("unchecked")
536 Optional<T> result = values.stream()
537 .filter(x -> x.id().equals(resource.id()))
538 .map(x -> (T) x)
539 .findFirst();
540 return result;
541 }
542
543 /**
544 * Checks if there is enough resource volume to allocated the requested resource
545 * against the specified resource.
546 *
547 * @param original original resource
548 * @param request requested resource
549 * @param allocation current allocation of the resource
550 * @return true if there is enough resource volume. Otherwise, false.
551 */
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800552 private boolean hasEnoughResource(Resource.Continuous original,
553 Resource.Continuous request,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800554 ContinuousResourceAllocation allocation) {
555 if (allocation == null) {
556 return request.value() <= original.value();
557 }
558
559 double allocated = allocation.allocations().stream()
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800560 .filter(x -> x.resource() instanceof Resource.Continuous)
561 .map(x -> (Resource.Continuous) x.resource())
562 .mapToDouble(Resource.Continuous::value)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800563 .sum();
564 double left = original.value() - allocated;
565 return request.value() <= left;
566 }
567
568 // internal use only
569 private static final class ContinuousResourceAllocation {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800570 private final Resource.Continuous original;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800571 private final ImmutableList<ResourceAllocation> allocations;
572
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800573 private ContinuousResourceAllocation(Resource.Continuous original,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800574 ImmutableList<ResourceAllocation> allocations) {
575 this.original = original;
576 this.allocations = allocations;
577 }
578
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800579 private Resource.Continuous original() {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800580 return original;
581 }
582
583 private ImmutableList<ResourceAllocation> allocations() {
584 return allocations;
585 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700586 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700587}