blob: defc955bd1752e57413bb0f8c45adbd811a462d9 [file] [log] [blame]
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07001/*
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -08002 * Copyright 2015-2016 Open Networking Laboratory
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.newresource.impl;
17
18import com.google.common.annotations.Beta;
Sho SHIMIZUe7db6142015-11-04 11:24:22 -080019import com.google.common.collect.ImmutableList;
Sho SHIMIZU83258ae2016-01-29 17:39:07 -080020import com.google.common.collect.ImmutableSet;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080021import com.google.common.collect.Maps;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080027import org.onlab.util.GuavaCollectors;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080028import org.onlab.util.Tools;
Sho SHIMIZUf33b8932016-01-25 18:43:32 -080029import org.onosproject.net.newresource.ContinuousResource;
30import org.onosproject.net.newresource.DiscreteResource;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080031import org.onosproject.net.newresource.ResourceAllocation;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070032import org.onosproject.net.newresource.ResourceConsumer;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080033import org.onosproject.net.newresource.ResourceEvent;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080034import org.onosproject.net.newresource.ResourceId;
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080035import org.onosproject.net.newresource.Resource;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070036import org.onosproject.net.newresource.ResourceStore;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080037import org.onosproject.net.newresource.ResourceStoreDelegate;
38import org.onosproject.store.AbstractStore;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070039import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.service.ConsistentMap;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080041import org.onosproject.store.service.ConsistentMapException;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070042import org.onosproject.store.service.Serializer;
43import org.onosproject.store.service.StorageService;
44import org.onosproject.store.service.TransactionContext;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070045import org.onosproject.store.service.TransactionalMap;
46import org.onosproject.store.service.Versioned;
47import org.slf4j.Logger;
48import org.slf4j.LoggerFactory;
49
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070050import java.util.Arrays;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070051import java.util.Collection;
52import java.util.Iterator;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070053import java.util.LinkedHashSet;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070054import java.util.List;
55import java.util.Map;
56import java.util.Optional;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080057import java.util.Set;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070058import java.util.stream.Collectors;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080059import java.util.stream.Stream;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070060
61import static com.google.common.base.Preconditions.checkArgument;
62import static com.google.common.base.Preconditions.checkNotNull;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080063import static org.onosproject.net.newresource.ResourceEvent.Type.*;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070064
65/**
66 * Implementation of ResourceStore using TransactionalMap.
67 */
Sho SHIMIZU9a2b8292015-10-28 13:00:16 -070068@Component(immediate = true)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070069@Service
70@Beta
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080071public class ConsistentResourceStore extends AbstractStore<ResourceEvent, ResourceStoreDelegate>
72 implements ResourceStore {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070073 private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
74
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080075 private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers";
76 private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070077 private static final String CHILD_MAP = "onos-resource-children";
78 private static final Serializer SERIALIZER = Serializer.using(
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080079 Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API),
80 ContinuousResourceAllocation.class);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070081
Thomas Vachuska762a2d82016-01-04 10:25:20 -080082 // TODO: We should provide centralized values for this
83 private static final int MAX_RETRIES = 5;
84 private static final int RETRY_DELAY = 1_000; // millis
85
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070086 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87 protected StorageService service;
88
Sho SHIMIZUf33b8932016-01-25 18:43:32 -080089 private ConsistentMap<DiscreteResource, ResourceConsumer> discreteConsumers;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080090 private ConsistentMap<ResourceId, ContinuousResourceAllocation> continuousConsumers;
Sho SHIMIZUf33b8932016-01-25 18:43:32 -080091 private ConsistentMap<DiscreteResource, Set<Resource>> childMap;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070092
93 @Activate
94 public void activate() {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -080095 discreteConsumers = service.<DiscreteResource, ResourceConsumer>consistentMapBuilder()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080096 .withName(DISCRETE_CONSUMER_MAP)
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070097 .withSerializer(SERIALIZER)
98 .build();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080099 continuousConsumers = service.<ResourceId, ContinuousResourceAllocation>consistentMapBuilder()
100 .withName(CONTINUOUS_CONSUMER_MAP)
101 .withSerializer(SERIALIZER)
102 .build();
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800103 childMap = service.<DiscreteResource, Set<Resource>>consistentMapBuilder()
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700104 .withName(CHILD_MAP)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700105 .withSerializer(SERIALIZER)
106 .build();
Sho SHIMIZUe7db6142015-11-04 11:24:22 -0800107
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800108 Tools.retryable(() -> childMap.put(Resource.ROOT, new LinkedHashSet<>()),
Thomas Vachuska762a2d82016-01-04 10:25:20 -0800109 ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
Madan Jampanic7f49f92015-12-10 11:35:06 -0800110 log.info("Started");
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700111 }
112
113 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800114 public List<ResourceConsumer> getConsumers(Resource resource) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700115 checkNotNull(resource);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800116 checkArgument(resource instanceof DiscreteResource || resource instanceof ContinuousResource);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700117
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800118 if (resource instanceof DiscreteResource) {
Sho SHIMIZU4a1e59f2016-01-26 15:27:13 -0800119 return getConsumers((DiscreteResource) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800120 } else {
Sho SHIMIZU4a1e59f2016-01-26 15:27:13 -0800121 return getConsumers((ContinuousResource) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800122 }
123 }
124
Sho SHIMIZU4a1e59f2016-01-26 15:27:13 -0800125 private List<ResourceConsumer> getConsumers(DiscreteResource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800126 Versioned<ResourceConsumer> consumer = discreteConsumers.get(resource);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700127 if (consumer == null) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800128 return ImmutableList.of();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700129 }
130
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800131 return ImmutableList.of(consumer.value());
132 }
133
Sho SHIMIZU4a1e59f2016-01-26 15:27:13 -0800134 private List<ResourceConsumer> getConsumers(ContinuousResource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800135 Versioned<ContinuousResourceAllocation> allocations = continuousConsumers.get(resource.id());
136 if (allocations == null) {
137 return ImmutableList.of();
138 }
139
140 return allocations.value().allocations().stream()
141 .filter(x -> x.resource().equals(resource))
142 .map(ResourceAllocation::consumer)
143 .collect(GuavaCollectors.toImmutableList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700144 }
145
146 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800147 public boolean register(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700148 checkNotNull(resources);
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800149 if (log.isTraceEnabled()) {
150 resources.forEach(r -> log.trace("registering {}", r));
151 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700152
153 TransactionContext tx = service.transactionContextBuilder().build();
154 tx.begin();
155
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800156 TransactionalMap<DiscreteResource, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800157 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700158
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800159 Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800160 .filter(x -> x.parent().isPresent())
161 .collect(Collectors.groupingBy(x -> x.parent().get()));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700162
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800163 for (Map.Entry<DiscreteResource, List<Resource>> entry: resourceMap.entrySet()) {
164 Optional<DiscreteResource> child = lookup(childTxMap, entry.getKey());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800165 if (!child.isPresent()) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800166 return abortTransaction(tx);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700167 }
168
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800169 if (!appendValues(childTxMap, entry.getKey(), entry.getValue())) {
170 return abortTransaction(tx);
171 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700172 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800173
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800174 boolean success = tx.commit();
175 if (success) {
176 List<ResourceEvent> events = resources.stream()
177 .filter(x -> x.parent().isPresent())
178 .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
179 .collect(Collectors.toList());
180 notifyDelegate(events);
181 }
182 return success;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700183 }
184
185 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800186 public boolean unregister(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700187 checkNotNull(resources);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700188
189 TransactionContext tx = service.transactionContextBuilder().build();
190 tx.begin();
191
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800192 TransactionalMap<DiscreteResource, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800193 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800194 TransactionalMap<DiscreteResource, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800195 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
196 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
197 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700198
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800199 // Extract Discrete instances from resources
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800200 Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800201 .filter(x -> x.parent().isPresent())
202 .collect(Collectors.groupingBy(x -> x.parent().get()));
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700203
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800204 // even if one of the resources is allocated to a consumer,
205 // all unregistrations are regarded as failure
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800206 for (Map.Entry<DiscreteResource, List<Resource>> entry: resourceMap.entrySet()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800207 boolean allocated = entry.getValue().stream().anyMatch(x -> {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800208 if (x instanceof DiscreteResource) {
209 return discreteConsumerTxMap.get((DiscreteResource) x) != null;
210 } else if (x instanceof ContinuousResource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800211 ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(x.id());
212 return allocations != null && !allocations.allocations().isEmpty();
213 } else {
214 return false;
215 }
216 });
217 if (allocated) {
HIGUCHI Yuta5b6dfba2016-01-27 14:43:41 -0800218 log.warn("Failed to unregister {}: allocation exists", entry.getKey());
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800219 return abortTransaction(tx);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700220 }
221
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800222 if (!removeValues(childTxMap, entry.getKey(), entry.getValue())) {
HIGUCHI Yuta5b6dfba2016-01-27 14:43:41 -0800223 log.warn("Failed to unregister {}: Failed to remove values: {}",
224 entry.getKey(), entry.getValue());
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800225 return abortTransaction(tx);
226 }
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700227 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800228
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800229 boolean success = tx.commit();
230 if (success) {
231 List<ResourceEvent> events = resources.stream()
232 .filter(x -> x.parent().isPresent())
233 .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
234 .collect(Collectors.toList());
235 notifyDelegate(events);
HIGUCHI Yuta5b6dfba2016-01-27 14:43:41 -0800236 } else {
237 log.warn("Failed to unregister {}: Commit failed.", resources);
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800238 }
239 return success;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700240 }
241
242 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800243 public boolean allocate(List<Resource> resources, ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700244 checkNotNull(resources);
245 checkNotNull(consumer);
246
247 TransactionContext tx = service.transactionContextBuilder().build();
248 tx.begin();
249
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800250 TransactionalMap<DiscreteResource, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800251 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800252 TransactionalMap<DiscreteResource, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800253 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
254 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
255 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700256
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800257 for (Resource resource: resources) {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800258 if (resource instanceof DiscreteResource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800259 if (!lookup(childTxMap, resource).isPresent()) {
260 return abortTransaction(tx);
261 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700262
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800263 ResourceConsumer oldValue = discreteConsumerTxMap.put((DiscreteResource) resource, consumer);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800264 if (oldValue != null) {
265 return abortTransaction(tx);
266 }
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800267 } else if (resource instanceof ContinuousResource) {
268 Optional<ContinuousResource> continuous = lookup(childTxMap, (ContinuousResource) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800269 if (!continuous.isPresent()) {
270 return abortTransaction(tx);
271 }
272
273 ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(continuous.get().id());
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800274 if (!hasEnoughResource(continuous.get(), (ContinuousResource) resource, allocations)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800275 return abortTransaction(tx);
276 }
277
278 boolean success = appendValue(continuousConsumerTxMap,
279 continuous.get(), new ResourceAllocation(continuous.get(), consumer));
280 if (!success) {
281 return abortTransaction(tx);
282 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800283 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700284 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800285
286 return tx.commit();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700287 }
288
289 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800290 public boolean release(List<Resource> resources, List<ResourceConsumer> consumers) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700291 checkNotNull(resources);
292 checkNotNull(consumers);
293 checkArgument(resources.size() == consumers.size());
294
295 TransactionContext tx = service.transactionContextBuilder().build();
296 tx.begin();
297
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800298 TransactionalMap<DiscreteResource, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800299 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
300 TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
301 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800302 Iterator<Resource> resourceIte = resources.iterator();
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800303 Iterator<ResourceConsumer> consumerIte = consumers.iterator();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700304
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800305 while (resourceIte.hasNext() && consumerIte.hasNext()) {
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800306 Resource resource = resourceIte.next();
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800307 ResourceConsumer consumer = consumerIte.next();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700308
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800309 if (resource instanceof DiscreteResource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800310 // if this single release fails (because the resource is allocated to another consumer,
311 // the whole release fails
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800312 if (!discreteConsumerTxMap.remove((DiscreteResource) resource, consumer)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800313 return abortTransaction(tx);
314 }
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800315 } else if (resource instanceof ContinuousResource) {
316 ContinuousResource continuous = (ContinuousResource) resource;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800317 ContinuousResourceAllocation allocation = continuousConsumerTxMap.get(continuous.id());
318 ImmutableList<ResourceAllocation> newAllocations = allocation.allocations().stream()
319 .filter(x -> !(x.consumer().equals(consumer) &&
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800320 ((ContinuousResource) x.resource()).value() == continuous.value()))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800321 .collect(GuavaCollectors.toImmutableList());
322
323 if (!continuousConsumerTxMap.replace(continuous.id(), allocation,
324 new ContinuousResourceAllocation(allocation.original(), newAllocations))) {
325 return abortTransaction(tx);
326 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700327 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700328 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800329
330 return tx.commit();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700331 }
332
333 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800334 public boolean isAvailable(Resource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800335 checkNotNull(resource);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800336 checkArgument(resource instanceof DiscreteResource || resource instanceof ContinuousResource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800337
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800338 // check if it's registered or not.
339 Versioned<Set<Resource>> v = childMap.get(resource.parent().get());
340 if (v == null || !v.value().contains(resource)) {
341 return false;
342 }
343
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800344 if (resource instanceof DiscreteResource) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800345 // check if already consumed
Sho SHIMIZU4a1e59f2016-01-26 15:27:13 -0800346 return getConsumers((DiscreteResource) resource).isEmpty();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800347 } else {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800348 ContinuousResource requested = (ContinuousResource) resource;
349 ContinuousResource registered = v.value().stream()
Sho SHIMIZU2d310222016-01-22 11:45:11 -0800350 .filter(c -> c.id().equals(resource.id()))
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800351 .findFirst()
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800352 .map(c -> (ContinuousResource) c)
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800353 .get();
354 if (registered.value() < requested.value()) {
355 // Capacity < requested, can never satisfy
356 return false;
357 }
358 // check if there's enough left
359 return isAvailable(requested);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800360 }
361 }
362
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800363 private boolean isAvailable(ContinuousResource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800364 Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
365 if (allocation == null) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800366 // no allocation (=no consumer) full registered resources available
367 return true;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800368 }
369
370 return hasEnoughResource(allocation.value().original(), resource, allocation.value());
371 }
372
373 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800374 public Collection<Resource> getResources(ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700375 checkNotNull(consumer);
376
377 // NOTE: getting all entries may become performance bottleneck
378 // TODO: revisit for better backend data structure
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800379 Stream<DiscreteResource> discreteStream = discreteConsumers.entrySet().stream()
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700380 .filter(x -> x.getValue().value().equals(consumer))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800381 .map(Map.Entry::getKey);
382
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800383 Stream<ContinuousResource> continuousStream = continuousConsumers.values().stream()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800384 .flatMap(x -> x.value().allocations().stream()
385 .map(y -> Maps.immutableEntry(x.value().original(), y)))
386 .filter(x -> x.getValue().consumer().equals(consumer))
387 .map(x -> x.getKey());
388
389 return Stream.concat(discreteStream, continuousStream).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700390 }
391
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700392 @Override
Sho SHIMIZU83258ae2016-01-29 17:39:07 -0800393 public Set<Resource> getChildResources(Resource parent) {
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700394 checkNotNull(parent);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800395 if (!(parent instanceof DiscreteResource)) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800396 // only Discrete resource can have child resource
Sho SHIMIZU83258ae2016-01-29 17:39:07 -0800397 return ImmutableSet.of();
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800398 }
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700399
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800400 Versioned<Set<Resource>> children = childMap.get((DiscreteResource) parent);
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700401 if (children == null) {
Sho SHIMIZU83258ae2016-01-29 17:39:07 -0800402 return ImmutableSet.of();
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700403 }
404
405 return children.value();
406 }
407
408 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800409 public <T> Collection<Resource> getAllocatedResources(Resource parent, Class<T> cls) {
Sho SHIMIZU1f5e5912015-08-10 17:00:00 -0700410 checkNotNull(parent);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700411 checkNotNull(cls);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800412 checkArgument(parent instanceof DiscreteResource);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700413
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800414 Versioned<Set<Resource>> children = childMap.get((DiscreteResource) parent);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700415 if (children == null) {
Sho SHIMIZU2c0ae122016-01-20 13:14:38 -0800416 return ImmutableList.of();
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700417 }
418
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800419 Stream<DiscreteResource> discrete = children.value().stream()
Sho SHIMIZUc9546a32015-11-10 11:22:28 -0800420 .filter(x -> x.last().getClass().equals(cls))
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800421 .filter(x -> x instanceof DiscreteResource)
422 .map(x -> (DiscreteResource) x)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800423 .filter(discreteConsumers::containsKey);
424
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800425 Stream<ContinuousResource> continuous = children.value().stream()
Sho SHIMIZU2d310222016-01-22 11:45:11 -0800426 .filter(x -> x.id().equals(parent.id().child(cls)))
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800427 .filter(x -> x instanceof ContinuousResource)
428 .map(x -> (ContinuousResource) x)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800429 .filter(x -> continuousConsumers.containsKey(x.id()))
430 .filter(x -> continuousConsumers.get(x.id()) != null)
431 .filter(x -> !continuousConsumers.get(x.id()).value().allocations().isEmpty());
432
433 return Stream.concat(discrete, continuous).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700434 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700435
436 /**
437 * Abort the transaction.
438 *
439 * @param tx transaction context
440 * @return always false
441 */
442 private boolean abortTransaction(TransactionContext tx) {
443 tx.abort();
444 return false;
445 }
446
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800447 // Appends the specified ResourceAllocation to the existing values stored in the map
448 private boolean appendValue(TransactionalMap<ResourceId, ContinuousResourceAllocation> map,
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800449 ContinuousResource original, ResourceAllocation value) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800450 ContinuousResourceAllocation oldValue = map.putIfAbsent(original.id(),
451 new ContinuousResourceAllocation(original, ImmutableList.of(value)));
452 if (oldValue == null) {
453 return true;
454 }
455
456 if (oldValue.allocations().contains(value)) {
457 // don't write to map because all values are already stored
458 return true;
459 }
460
461 ContinuousResourceAllocation newValue = new ContinuousResourceAllocation(original,
462 ImmutableList.<ResourceAllocation>builder()
463 .addAll(oldValue.allocations())
464 .add(value)
465 .build());
466 return map.replace(original.id(), oldValue, newValue);
467 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700468 /**
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700469 * Appends the values to the existing values associated with the specified key.
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700470 * If the map already has all the given values, appending will not happen.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700471 *
472 * @param map map holding multiple values for a key
473 * @param key key specifying values
474 * @param values values to be appended
475 * @param <K> type of the key
476 * @param <V> type of the element of the list
477 * @return true if the operation succeeds, false otherwise.
478 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800479 private <K, V> boolean appendValues(TransactionalMap<K, Set<V>> map, K key, List<V> values) {
480 Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>(values));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700481 if (oldValues == null) {
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800482 return true;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700483 }
484
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800485 if (oldValues.containsAll(values)) {
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700486 // don't write to map because all values are already stored
487 return true;
488 }
489
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800490 LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
491 newValues.addAll(values);
492 return map.replace(key, oldValues, newValues);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700493 }
494
495 /**
Sho SHIMIZUba1f83b2015-10-14 08:11:20 -0700496 * Removes the values from the existing values associated with the specified key.
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700497 * If the map doesn't contain the given values, removal will not happen.
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700498 *
499 * @param map map holding multiple values for a key
500 * @param key key specifying values
501 * @param values values to be removed
502 * @param <K> type of the key
503 * @param <V> type of the element of the list
504 * @return true if the operation succeeds, false otherwise
505 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800506 private <K, V> boolean removeValues(TransactionalMap<K, Set<V>> map, K key, List<? extends V> values) {
507 Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700508 if (oldValues == null) {
HIGUCHI Yutadc4394c2016-01-29 15:35:10 -0800509 log.trace("No-Op removing values. key {} did not exist", key);
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800510 return true;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700511 }
512
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800513 if (values.stream().allMatch(x -> !oldValues.contains(x))) {
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700514 // don't write map because none of the values are stored
HIGUCHI Yutadc4394c2016-01-29 15:35:10 -0800515 log.trace("No-Op removing values. key {} did not contain {}", key, values);
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700516 return true;
517 }
518
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800519 LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
520 newValues.removeAll(values);
521 return map.replace(key, oldValues, newValues);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700522 }
523
524 /**
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800525 * Returns the resource which has the same key as the key of the specified resource
526 * in the list as a value of the map.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700527 *
528 * @param map map storing parent - child relationship of resources
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800529 * @param resource resource to be checked for its key
530 * @return the resource which is regarded as the same as the specified resource
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700531 */
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800532 // Naive implementation, which traverses all elements in the list
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800533 private <T extends Resource> Optional<T> lookup(
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800534 TransactionalMap<DiscreteResource, Set<Resource>> map, T resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800535 // if it is root, always returns itself
Sho SHIMIZUc9546a32015-11-10 11:22:28 -0800536 if (!resource.parent().isPresent()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800537 return Optional.of(resource);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700538 }
539
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800540 Set<Resource> values = map.get(resource.parent().get());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800541 if (values == null) {
542 return Optional.empty();
543 }
544
545 @SuppressWarnings("unchecked")
546 Optional<T> result = values.stream()
547 .filter(x -> x.id().equals(resource.id()))
548 .map(x -> (T) x)
549 .findFirst();
550 return result;
551 }
552
553 /**
554 * Checks if there is enough resource volume to allocated the requested resource
555 * against the specified resource.
556 *
557 * @param original original resource
558 * @param request requested resource
559 * @param allocation current allocation of the resource
560 * @return true if there is enough resource volume. Otherwise, false.
561 */
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800562 private boolean hasEnoughResource(ContinuousResource original,
563 ContinuousResource request,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800564 ContinuousResourceAllocation allocation) {
565 if (allocation == null) {
566 return request.value() <= original.value();
567 }
568
569 double allocated = allocation.allocations().stream()
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800570 .filter(x -> x.resource() instanceof ContinuousResource)
571 .map(x -> (ContinuousResource) x.resource())
572 .mapToDouble(ContinuousResource::value)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800573 .sum();
574 double left = original.value() - allocated;
575 return request.value() <= left;
576 }
577
578 // internal use only
579 private static final class ContinuousResourceAllocation {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800580 private final ContinuousResource original;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800581 private final ImmutableList<ResourceAllocation> allocations;
582
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800583 private ContinuousResourceAllocation(ContinuousResource original,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800584 ImmutableList<ResourceAllocation> allocations) {
585 this.original = original;
586 this.allocations = allocations;
587 }
588
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800589 private ContinuousResource original() {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800590 return original;
591 }
592
593 private ImmutableList<ResourceAllocation> allocations() {
594 return allocations;
595 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700596 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700597}