blob: 758b933367702a9178dcf21c58b4fcfd0ce7955a [file] [log] [blame]
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07001/*
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -08002 * Copyright 2015-2016 Open Networking Laboratory
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.newresource.impl;
17
18import com.google.common.annotations.Beta;
Sho SHIMIZUe7db6142015-11-04 11:24:22 -080019import com.google.common.collect.ImmutableList;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080020import com.google.common.collect.Maps;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080026import org.onlab.util.GuavaCollectors;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080027import org.onlab.util.Tools;
Sho SHIMIZUf33b8932016-01-25 18:43:32 -080028import org.onosproject.net.newresource.ContinuousResource;
29import org.onosproject.net.newresource.DiscreteResource;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080030import org.onosproject.net.newresource.ResourceAllocation;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070031import org.onosproject.net.newresource.ResourceConsumer;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080032import org.onosproject.net.newresource.ResourceEvent;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080033import org.onosproject.net.newresource.ResourceId;
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080034import org.onosproject.net.newresource.Resource;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070035import org.onosproject.net.newresource.ResourceStore;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080036import org.onosproject.net.newresource.ResourceStoreDelegate;
37import org.onosproject.store.AbstractStore;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070038import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.service.ConsistentMap;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080040import org.onosproject.store.service.ConsistentMapException;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070041import org.onosproject.store.service.Serializer;
42import org.onosproject.store.service.StorageService;
43import org.onosproject.store.service.TransactionContext;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070044import org.onosproject.store.service.TransactionalMap;
45import org.onosproject.store.service.Versioned;
46import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
48
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070049import java.util.Arrays;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070050import java.util.Collection;
51import java.util.Iterator;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070052import java.util.LinkedHashSet;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070053import java.util.List;
54import java.util.Map;
55import java.util.Optional;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080056import java.util.Set;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070057import java.util.stream.Collectors;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080058import java.util.stream.Stream;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070059
60import static com.google.common.base.Preconditions.checkArgument;
61import static com.google.common.base.Preconditions.checkNotNull;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080062import static org.onosproject.net.newresource.ResourceEvent.Type.*;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070063
64/**
65 * Implementation of ResourceStore using TransactionalMap.
66 */
Sho SHIMIZU9a2b8292015-10-28 13:00:16 -070067@Component(immediate = true)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070068@Service
69@Beta
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080070public class ConsistentResourceStore extends AbstractStore<ResourceEvent, ResourceStoreDelegate>
71 implements ResourceStore {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070072 private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
73
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080074 private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers";
75 private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070076 private static final String CHILD_MAP = "onos-resource-children";
77 private static final Serializer SERIALIZER = Serializer.using(
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080078 Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API),
79 ContinuousResourceAllocation.class);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070080
Thomas Vachuska762a2d82016-01-04 10:25:20 -080081 // TODO: We should provide centralized values for this
82 private static final int MAX_RETRIES = 5;
83 private static final int RETRY_DELAY = 1_000; // millis
84
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070085 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 protected StorageService service;
87
Sho SHIMIZUf33b8932016-01-25 18:43:32 -080088 private ConsistentMap<DiscreteResource, ResourceConsumer> discreteConsumers;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080089 private ConsistentMap<ResourceId, ContinuousResourceAllocation> continuousConsumers;
Sho SHIMIZUf33b8932016-01-25 18:43:32 -080090 private ConsistentMap<DiscreteResource, Set<Resource>> childMap;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070091
92 @Activate
93 public void activate() {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -080094 discreteConsumers = service.<DiscreteResource, ResourceConsumer>consistentMapBuilder()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080095 .withName(DISCRETE_CONSUMER_MAP)
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070096 .withSerializer(SERIALIZER)
97 .build();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080098 continuousConsumers = service.<ResourceId, ContinuousResourceAllocation>consistentMapBuilder()
99 .withName(CONTINUOUS_CONSUMER_MAP)
100 .withSerializer(SERIALIZER)
101 .build();
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800102 childMap = service.<DiscreteResource, Set<Resource>>consistentMapBuilder()
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700103 .withName(CHILD_MAP)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700104 .withSerializer(SERIALIZER)
105 .build();
Sho SHIMIZUe7db6142015-11-04 11:24:22 -0800106
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800107 Tools.retryable(() -> childMap.put(Resource.ROOT, new LinkedHashSet<>()),
Thomas Vachuska762a2d82016-01-04 10:25:20 -0800108 ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
Madan Jampanic7f49f92015-12-10 11:35:06 -0800109 log.info("Started");
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700110 }
111
112 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800113 public List<ResourceConsumer> getConsumers(Resource resource) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700114 checkNotNull(resource);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800115 checkArgument(resource instanceof DiscreteResource || resource instanceof ContinuousResource);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700116
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800117 if (resource instanceof DiscreteResource) {
118 return getConsumer((DiscreteResource) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800119 } else {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800120 return getConsumer((ContinuousResource) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800121 }
122 }
123
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800124 private List<ResourceConsumer> getConsumer(DiscreteResource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800125 Versioned<ResourceConsumer> consumer = discreteConsumers.get(resource);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700126 if (consumer == null) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800127 return ImmutableList.of();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700128 }
129
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800130 return ImmutableList.of(consumer.value());
131 }
132
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800133 private List<ResourceConsumer> getConsumer(ContinuousResource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800134 Versioned<ContinuousResourceAllocation> allocations = continuousConsumers.get(resource.id());
135 if (allocations == null) {
136 return ImmutableList.of();
137 }
138
139 return allocations.value().allocations().stream()
140 .filter(x -> x.resource().equals(resource))
141 .map(ResourceAllocation::consumer)
142 .collect(GuavaCollectors.toImmutableList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700143 }
144
145 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800146 public boolean register(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700147 checkNotNull(resources);
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800148 if (log.isTraceEnabled()) {
149 resources.forEach(r -> log.trace("registering {}", r));
150 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700151
152 TransactionContext tx = service.transactionContextBuilder().build();
153 tx.begin();
154
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800155 TransactionalMap<DiscreteResource, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800156 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700157
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800158 Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800159 .filter(x -> x.parent().isPresent())
160 .collect(Collectors.groupingBy(x -> x.parent().get()));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700161
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800162 for (Map.Entry<DiscreteResource, List<Resource>> entry: resourceMap.entrySet()) {
163 Optional<DiscreteResource> child = lookup(childTxMap, entry.getKey());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800164 if (!child.isPresent()) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800165 return abortTransaction(tx);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700166 }
167
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800168 if (!appendValues(childTxMap, entry.getKey(), entry.getValue())) {
169 return abortTransaction(tx);
170 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700171 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800172
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800173 boolean success = tx.commit();
174 if (success) {
175 List<ResourceEvent> events = resources.stream()
176 .filter(x -> x.parent().isPresent())
177 .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
178 .collect(Collectors.toList());
179 notifyDelegate(events);
180 }
181 return success;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700182 }
183
184 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800185 public boolean unregister(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700186 checkNotNull(resources);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700187
188 TransactionContext tx = service.transactionContextBuilder().build();
189 tx.begin();
190
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800191 TransactionalMap<DiscreteResource, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800192 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800193 TransactionalMap<DiscreteResource, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800194 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
195 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
196 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700197
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800198 // Extract Discrete instances from resources
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800199 Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800200 .filter(x -> x.parent().isPresent())
201 .collect(Collectors.groupingBy(x -> x.parent().get()));
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700202
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800203 // even if one of the resources is allocated to a consumer,
204 // all unregistrations are regarded as failure
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800205 for (Map.Entry<DiscreteResource, List<Resource>> entry: resourceMap.entrySet()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800206 boolean allocated = entry.getValue().stream().anyMatch(x -> {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800207 if (x instanceof DiscreteResource) {
208 return discreteConsumerTxMap.get((DiscreteResource) x) != null;
209 } else if (x instanceof ContinuousResource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800210 ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(x.id());
211 return allocations != null && !allocations.allocations().isEmpty();
212 } else {
213 return false;
214 }
215 });
216 if (allocated) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800217 return abortTransaction(tx);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700218 }
219
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800220 if (!removeValues(childTxMap, entry.getKey(), entry.getValue())) {
221 return abortTransaction(tx);
222 }
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700223 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800224
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800225 boolean success = tx.commit();
226 if (success) {
227 List<ResourceEvent> events = resources.stream()
228 .filter(x -> x.parent().isPresent())
229 .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
230 .collect(Collectors.toList());
231 notifyDelegate(events);
232 }
233 return success;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700234 }
235
236 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800237 public boolean allocate(List<Resource> resources, ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700238 checkNotNull(resources);
239 checkNotNull(consumer);
240
241 TransactionContext tx = service.transactionContextBuilder().build();
242 tx.begin();
243
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800244 TransactionalMap<DiscreteResource, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800245 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800246 TransactionalMap<DiscreteResource, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800247 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
248 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
249 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700250
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800251 for (Resource resource: resources) {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800252 if (resource instanceof DiscreteResource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800253 if (!lookup(childTxMap, resource).isPresent()) {
254 return abortTransaction(tx);
255 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700256
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800257 ResourceConsumer oldValue = discreteConsumerTxMap.put((DiscreteResource) resource, consumer);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800258 if (oldValue != null) {
259 return abortTransaction(tx);
260 }
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800261 } else if (resource instanceof ContinuousResource) {
262 Optional<ContinuousResource> continuous = lookup(childTxMap, (ContinuousResource) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800263 if (!continuous.isPresent()) {
264 return abortTransaction(tx);
265 }
266
267 ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(continuous.get().id());
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800268 if (!hasEnoughResource(continuous.get(), (ContinuousResource) resource, allocations)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800269 return abortTransaction(tx);
270 }
271
272 boolean success = appendValue(continuousConsumerTxMap,
273 continuous.get(), new ResourceAllocation(continuous.get(), consumer));
274 if (!success) {
275 return abortTransaction(tx);
276 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800277 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700278 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800279
280 return tx.commit();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700281 }
282
283 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800284 public boolean release(List<Resource> resources, List<ResourceConsumer> consumers) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700285 checkNotNull(resources);
286 checkNotNull(consumers);
287 checkArgument(resources.size() == consumers.size());
288
289 TransactionContext tx = service.transactionContextBuilder().build();
290 tx.begin();
291
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800292 TransactionalMap<DiscreteResource, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800293 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
294 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
295 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800296 Iterator<Resource> resourceIte = resources.iterator();
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800297 Iterator<ResourceConsumer> consumerIte = consumers.iterator();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700298
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800299 while (resourceIte.hasNext() && consumerIte.hasNext()) {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800300 Resource resource = resourceIte.next();
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800301 ResourceConsumer consumer = consumerIte.next();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700302
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800303 if (resource instanceof DiscreteResource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800304 // if this single release fails (because the resource is allocated to another consumer,
305 // the whole release fails
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800306 if (!discreteConsumerTxMap.remove((DiscreteResource) resource, consumer)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800307 return abortTransaction(tx);
308 }
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800309 } else if (resource instanceof ContinuousResource) {
310 ContinuousResource continuous = (ContinuousResource) resource;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800311 ContinuousResourceAllocation allocation = continuousConsumerTxMap.get(continuous.id());
312 ImmutableList<ResourceAllocation> newAllocations = allocation.allocations().stream()
313 .filter(x -> !(x.consumer().equals(consumer) &&
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800314 ((ContinuousResource) x.resource()).value() == continuous.value()))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800315 .collect(GuavaCollectors.toImmutableList());
316
317 if (!continuousConsumerTxMap.replace(continuous.id(), allocation,
318 new ContinuousResourceAllocation(allocation.original(), newAllocations))) {
319 return abortTransaction(tx);
320 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700321 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700322 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800323
324 return tx.commit();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700325 }
326
327 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800328 public boolean isAvailable(Resource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800329 checkNotNull(resource);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800330 checkArgument(resource instanceof DiscreteResource || resource instanceof ContinuousResource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800331
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800332 // check if it's registered or not.
333 Versioned<Set<Resource>> v = childMap.get(resource.parent().get());
334 if (v == null || !v.value().contains(resource)) {
335 return false;
336 }
337
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800338 if (resource instanceof DiscreteResource) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800339 // check if already consumed
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800340 return getConsumer((DiscreteResource) resource).isEmpty();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800341 } else {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800342 ContinuousResource requested = (ContinuousResource) resource;
343 ContinuousResource registered = v.value().stream()
Sho SHIMIZU2d310222016-01-22 11:45:11 -0800344 .filter(c -> c.id().equals(resource.id()))
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800345 .findFirst()
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800346 .map(c -> (ContinuousResource) c)
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800347 .get();
348 if (registered.value() < requested.value()) {
349 // Capacity < requested, can never satisfy
350 return false;
351 }
352 // check if there's enough left
353 return isAvailable(requested);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800354 }
355 }
356
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800357 private boolean isAvailable(ContinuousResource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800358 Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
359 if (allocation == null) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800360 // no allocation (=no consumer) full registered resources available
361 return true;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800362 }
363
364 return hasEnoughResource(allocation.value().original(), resource, allocation.value());
365 }
366
367 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800368 public Collection<Resource> getResources(ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700369 checkNotNull(consumer);
370
371 // NOTE: getting all entries may become performance bottleneck
372 // TODO: revisit for better backend data structure
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800373 Stream<DiscreteResource> discreteStream = discreteConsumers.entrySet().stream()
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700374 .filter(x -> x.getValue().value().equals(consumer))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800375 .map(Map.Entry::getKey);
376
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800377 Stream<ContinuousResource> continuousStream = continuousConsumers.values().stream()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800378 .flatMap(x -> x.value().allocations().stream()
379 .map(y -> Maps.immutableEntry(x.value().original(), y)))
380 .filter(x -> x.getValue().consumer().equals(consumer))
381 .map(x -> x.getKey());
382
383 return Stream.concat(discreteStream, continuousStream).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700384 }
385
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700386 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800387 public Collection<Resource> getChildResources(Resource parent) {
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700388 checkNotNull(parent);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800389 if (!(parent instanceof DiscreteResource)) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800390 // only Discrete resource can have child resource
391 return ImmutableList.of();
392 }
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700393
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800394 Versioned<Set<Resource>> children = childMap.get((DiscreteResource) parent);
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700395 if (children == null) {
Sho SHIMIZU2c0ae122016-01-20 13:14:38 -0800396 return ImmutableList.of();
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700397 }
398
399 return children.value();
400 }
401
402 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800403 public <T> Collection<Resource> getAllocatedResources(Resource parent, Class<T> cls) {
Sho SHIMIZU1f5e5912015-08-10 17:00:00 -0700404 checkNotNull(parent);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700405 checkNotNull(cls);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800406 checkArgument(parent instanceof DiscreteResource);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700407
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800408 Versioned<Set<Resource>> children = childMap.get((DiscreteResource) parent);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700409 if (children == null) {
Sho SHIMIZU2c0ae122016-01-20 13:14:38 -0800410 return ImmutableList.of();
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700411 }
412
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800413 Stream<DiscreteResource> discrete = children.value().stream()
Sho SHIMIZUc9546a32015-11-10 11:22:28 -0800414 .filter(x -> x.last().getClass().equals(cls))
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800415 .filter(x -> x instanceof DiscreteResource)
416 .map(x -> (DiscreteResource) x)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800417 .filter(discreteConsumers::containsKey);
418
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800419 Stream<ContinuousResource> continuous = children.value().stream()
Sho SHIMIZU2d310222016-01-22 11:45:11 -0800420 .filter(x -> x.id().equals(parent.id().child(cls)))
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800421 .filter(x -> x instanceof ContinuousResource)
422 .map(x -> (ContinuousResource) x)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800423 .filter(x -> continuousConsumers.containsKey(x.id()))
424 .filter(x -> continuousConsumers.get(x.id()) != null)
425 .filter(x -> !continuousConsumers.get(x.id()).value().allocations().isEmpty());
426
427 return Stream.concat(discrete, continuous).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700428 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700429
430 /**
431 * Abort the transaction.
432 *
433 * @param tx transaction context
434 * @return always false
435 */
436 private boolean abortTransaction(TransactionContext tx) {
437 tx.abort();
438 return false;
439 }
440
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800441 // Appends the specified ResourceAllocation to the existing values stored in the map
442 private boolean appendValue(TransactionalMap<ResourceId, ContinuousResourceAllocation> map,
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800443 ContinuousResource original, ResourceAllocation value) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800444 ContinuousResourceAllocation oldValue = map.putIfAbsent(original.id(),
445 new ContinuousResourceAllocation(original, ImmutableList.of(value)));
446 if (oldValue == null) {
447 return true;
448 }
449
450 if (oldValue.allocations().contains(value)) {
451 // don't write to map because all values are already stored
452 return true;
453 }
454
455 ContinuousResourceAllocation newValue = new ContinuousResourceAllocation(original,
456 ImmutableList.<ResourceAllocation>builder()
457 .addAll(oldValue.allocations())
458 .add(value)
459 .build());
460 return map.replace(original.id(), oldValue, newValue);
461 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700462 /**
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700463 * Appends the values to the existing values associated with the specified key.
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700464 * If the map already has all the given values, appending will not happen.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700465 *
466 * @param map map holding multiple values for a key
467 * @param key key specifying values
468 * @param values values to be appended
469 * @param <K> type of the key
470 * @param <V> type of the element of the list
471 * @return true if the operation succeeds, false otherwise.
472 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800473 private <K, V> boolean appendValues(TransactionalMap<K, Set<V>> map, K key, List<V> values) {
474 Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>(values));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700475 if (oldValues == null) {
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800476 return true;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700477 }
478
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800479 if (oldValues.containsAll(values)) {
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700480 // don't write to map because all values are already stored
481 return true;
482 }
483
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800484 LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
485 newValues.addAll(values);
486 return map.replace(key, oldValues, newValues);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700487 }
488
489 /**
Sho SHIMIZUba1f83b2015-10-14 08:11:20 -0700490 * Removes the values from the existing values associated with the specified key.
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700491 * If the map doesn't contain the given values, removal will not happen.
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700492 *
493 * @param map map holding multiple values for a key
494 * @param key key specifying values
495 * @param values values to be removed
496 * @param <K> type of the key
497 * @param <V> type of the element of the list
498 * @return true if the operation succeeds, false otherwise
499 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800500 private <K, V> boolean removeValues(TransactionalMap<K, Set<V>> map, K key, List<? extends V> values) {
501 Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700502 if (oldValues == null) {
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800503 return true;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700504 }
505
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800506 if (values.stream().allMatch(x -> !oldValues.contains(x))) {
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700507 // don't write map because none of the values are stored
508 return true;
509 }
510
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800511 LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
512 newValues.removeAll(values);
513 return map.replace(key, oldValues, newValues);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700514 }
515
516 /**
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800517 * Returns the resource which has the same key as the key of the specified resource
518 * in the list as a value of the map.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700519 *
520 * @param map map storing parent - child relationship of resources
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800521 * @param resource resource to be checked for its key
522 * @return the resource which is regarded as the same as the specified resource
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700523 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800524 // Naive implementation, which traverses all elements in the list
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800525 private <T extends Resource> Optional<T> lookup(
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800526 TransactionalMap<DiscreteResource, Set<Resource>> map, T resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800527 // if it is root, always returns itself
Sho SHIMIZUc9546a32015-11-10 11:22:28 -0800528 if (!resource.parent().isPresent()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800529 return Optional.of(resource);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700530 }
531
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800532 Set<Resource> values = map.get(resource.parent().get());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800533 if (values == null) {
534 return Optional.empty();
535 }
536
537 @SuppressWarnings("unchecked")
538 Optional<T> result = values.stream()
539 .filter(x -> x.id().equals(resource.id()))
540 .map(x -> (T) x)
541 .findFirst();
542 return result;
543 }
544
545 /**
546 * Checks if there is enough resource volume to allocated the requested resource
547 * against the specified resource.
548 *
549 * @param original original resource
550 * @param request requested resource
551 * @param allocation current allocation of the resource
552 * @return true if there is enough resource volume. Otherwise, false.
553 */
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800554 private boolean hasEnoughResource(ContinuousResource original,
555 ContinuousResource request,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800556 ContinuousResourceAllocation allocation) {
557 if (allocation == null) {
558 return request.value() <= original.value();
559 }
560
561 double allocated = allocation.allocations().stream()
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800562 .filter(x -> x.resource() instanceof ContinuousResource)
563 .map(x -> (ContinuousResource) x.resource())
564 .mapToDouble(ContinuousResource::value)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800565 .sum();
566 double left = original.value() - allocated;
567 return request.value() <= left;
568 }
569
570 // internal use only
571 private static final class ContinuousResourceAllocation {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800572 private final ContinuousResource original;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800573 private final ImmutableList<ResourceAllocation> allocations;
574
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800575 private ContinuousResourceAllocation(ContinuousResource original,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800576 ImmutableList<ResourceAllocation> allocations) {
577 this.original = original;
578 this.allocations = allocations;
579 }
580
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800581 private ContinuousResource original() {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800582 return original;
583 }
584
585 private ImmutableList<ResourceAllocation> allocations() {
586 return allocations;
587 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700588 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700589}