blob: 4c0137e4ad110aed33c92b08f8b607810c373df0 [file] [log] [blame]
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Sho SHIMIZUe18cb122016-02-22 21:04:56 -080016package org.onosproject.store.resource.impl;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070017
18import com.google.common.annotations.Beta;
Sho SHIMIZUe7db6142015-11-04 11:24:22 -080019import com.google.common.collect.ImmutableList;
Sho SHIMIZU83258ae2016-01-29 17:39:07 -080020import com.google.common.collect.ImmutableSet;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080021import com.google.common.collect.Maps;
Sho SHIMIZU67c90102016-02-23 10:49:58 -080022import com.google.common.collect.Sets;
Madan Jampani3780d4b2016-04-04 18:18:24 -070023
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070024import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080029import org.onlab.util.GuavaCollectors;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080030import org.onlab.util.Tools;
Sho SHIMIZUe18cb122016-02-22 21:04:56 -080031import org.onosproject.net.resource.ContinuousResource;
32import org.onosproject.net.resource.ContinuousResourceId;
33import org.onosproject.net.resource.DiscreteResource;
34import org.onosproject.net.resource.DiscreteResourceId;
35import org.onosproject.net.resource.ResourceAllocation;
36import org.onosproject.net.resource.ResourceConsumer;
37import org.onosproject.net.resource.ResourceEvent;
38import org.onosproject.net.resource.ResourceId;
39import org.onosproject.net.resource.Resource;
40import org.onosproject.net.resource.ResourceStore;
41import org.onosproject.net.resource.ResourceStoreDelegate;
42import org.onosproject.net.resource.Resources;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080043import org.onosproject.store.AbstractStore;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070044import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani3780d4b2016-04-04 18:18:24 -070045import org.onosproject.store.service.CommitStatus;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070046import org.onosproject.store.service.ConsistentMap;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080047import org.onosproject.store.service.ConsistentMapException;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070048import org.onosproject.store.service.Serializer;
49import org.onosproject.store.service.StorageService;
50import org.onosproject.store.service.TransactionContext;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070051import org.onosproject.store.service.TransactionalMap;
52import org.onosproject.store.service.Versioned;
53import org.slf4j.Logger;
54import org.slf4j.LoggerFactory;
55
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070056import java.util.Arrays;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070057import java.util.Collection;
Sho SHIMIZU69420fe2016-02-09 15:01:07 -080058import java.util.LinkedHashMap;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070059import java.util.LinkedHashSet;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070060import java.util.List;
61import java.util.Map;
62import java.util.Optional;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080063import java.util.Set;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070064import java.util.stream.Collectors;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080065import java.util.stream.Stream;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070066
67import static com.google.common.base.Preconditions.checkArgument;
68import static com.google.common.base.Preconditions.checkNotNull;
Sho SHIMIZUe18cb122016-02-22 21:04:56 -080069import static org.onosproject.net.resource.ResourceEvent.Type.*;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070070
71/**
72 * Implementation of ResourceStore using TransactionalMap.
73 */
Sho SHIMIZU9a2b8292015-10-28 13:00:16 -070074@Component(immediate = true)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070075@Service
76@Beta
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080077public class ConsistentResourceStore extends AbstractStore<ResourceEvent, ResourceStoreDelegate>
78 implements ResourceStore {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070079 private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
80
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080081 private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers";
Sho SHIMIZU03be2662016-05-04 09:38:45 -070082 private static final String DISCRETE_CHILD_MAP = "onos-resource-discrete-children";
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080083 private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
Sho SHIMIZU03be2662016-05-04 09:38:45 -070084 private static final String CONTINUOUS_CHILD_MAP = "onos-resource-continuous-children";
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070085 private static final Serializer SERIALIZER = Serializer.using(
HIGUCHI Yuta6f584222016-05-06 11:15:38 -070086 Arrays.asList(KryoNamespaces.API),
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080087 ContinuousResourceAllocation.class);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070088
Thomas Vachuska762a2d82016-01-04 10:25:20 -080089 // TODO: We should provide centralized values for this
90 private static final int MAX_RETRIES = 5;
91 private static final int RETRY_DELAY = 1_000; // millis
92
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070093 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected StorageService service;
95
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -080096 private ConsistentMap<DiscreteResourceId, ResourceConsumer> discreteConsumers;
Sho SHIMIZU03be2662016-05-04 09:38:45 -070097 private ConsistentMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildMap;
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -080098 private ConsistentMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumers;
Sho SHIMIZU03be2662016-05-04 09:38:45 -070099 private ConsistentMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildMap;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700100
101 @Activate
102 public void activate() {
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800103 discreteConsumers = service.<DiscreteResourceId, ResourceConsumer>consistentMapBuilder()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800104 .withName(DISCRETE_CONSUMER_MAP)
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700105 .withSerializer(SERIALIZER)
106 .build();
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700107 discreteChildMap = service.<DiscreteResourceId, Set<DiscreteResource>>consistentMapBuilder()
108 .withName(DISCRETE_CHILD_MAP)
109 .withSerializer(SERIALIZER)
110 .build();
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800111 continuousConsumers = service.<ContinuousResourceId, ContinuousResourceAllocation>consistentMapBuilder()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800112 .withName(CONTINUOUS_CONSUMER_MAP)
113 .withSerializer(SERIALIZER)
114 .build();
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700115 continuousChildMap = service.<DiscreteResourceId, Set<ContinuousResource>>consistentMapBuilder()
116 .withName(CONTINUOUS_CHILD_MAP)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700117 .withSerializer(SERIALIZER)
118 .build();
Sho SHIMIZUe7db6142015-11-04 11:24:22 -0800119
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700120 Tools.retryable(() -> discreteChildMap.put(Resource.ROOT.id(), new LinkedHashSet<>()),
121 ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
122 Tools.retryable(() -> continuousChildMap.put(Resource.ROOT.id(), new LinkedHashSet<>()),
123 ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
Madan Jampanic7f49f92015-12-10 11:35:06 -0800124 log.info("Started");
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700125 }
126
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800127 // Computational complexity: O(1) if the resource is discrete type.
128 // O(n) if the resource is continuous type where n is the number of the existing allocations for the resource
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700129 @Override
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800130 public List<ResourceAllocation> getResourceAllocations(ResourceId id) {
131 checkNotNull(id);
132 checkArgument(id instanceof DiscreteResourceId || id instanceof ContinuousResourceId);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700133
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800134 if (id instanceof DiscreteResourceId) {
135 return getResourceAllocations((DiscreteResourceId) id);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800136 } else {
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800137 return getResourceAllocations((ContinuousResourceId) id);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800138 }
139 }
140
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800141 // computational complexity: O(1)
142 private List<ResourceAllocation> getResourceAllocations(DiscreteResourceId resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800143 Versioned<ResourceConsumer> consumer = discreteConsumers.get(resource);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700144 if (consumer == null) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800145 return ImmutableList.of();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700146 }
147
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800148 return ImmutableList.of(new ResourceAllocation(Resources.discrete(resource).resource(), consumer.value()));
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800149 }
150
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800151 // computational complexity: O(n) where n is the number of the existing allocations for the resource
152 private List<ResourceAllocation> getResourceAllocations(ContinuousResourceId resource) {
153 Versioned<ContinuousResourceAllocation> allocations = continuousConsumers.get(resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800154 if (allocations == null) {
155 return ImmutableList.of();
156 }
157
158 return allocations.value().allocations().stream()
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800159 .filter(x -> x.resource().id().equals(resource))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800160 .collect(GuavaCollectors.toImmutableList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700161 }
162
163 @Override
Jonathan Hart56151262016-02-11 09:48:50 -0800164 public boolean register(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700165 checkNotNull(resources);
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800166 if (log.isTraceEnabled()) {
167 resources.forEach(r -> log.trace("registering {}", r));
168 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700169
170 TransactionContext tx = service.transactionContextBuilder().build();
171 tx.begin();
172
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700173 TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildTxMap =
174 tx.getTransactionalMap(DISCRETE_CHILD_MAP, SERIALIZER);
175 TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildTxMap =
176 tx.getTransactionalMap(CONTINUOUS_CHILD_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700177
Sho SHIMIZU69420fe2016-02-09 15:01:07 -0800178 // the order is preserved by LinkedHashMap
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800179 Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800180 .filter(x -> x.parent().isPresent())
Sho SHIMIZU69420fe2016-02-09 15:01:07 -0800181 .collect(Collectors.groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.toList()));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700182
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800183 for (Map.Entry<DiscreteResource, List<Resource>> entry: resourceMap.entrySet()) {
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700184 if (!lookup(discreteChildTxMap, continuousChildTxMap, entry.getKey().id()).isPresent()) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800185 return abortTransaction(tx);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700186 }
187
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700188 if (!appendValues(discreteChildTxMap, continuousChildTxMap, entry.getKey().id(), entry.getValue())) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800189 return abortTransaction(tx);
190 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700191 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800192
Madan Jampani3780d4b2016-04-04 18:18:24 -0700193 return tx.commit().whenComplete((status, error) -> {
194 if (status == CommitStatus.SUCCESS) {
195 log.trace("Transaction commit succeeded on registration: resources={}", resources);
196 List<ResourceEvent> events = resources.stream()
197 .filter(x -> x.parent().isPresent())
198 .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
199 .collect(Collectors.toList());
200 notifyDelegate(events);
201 } else {
202 log.warn("Transaction commit failed on registration", error);
203 }
204 }).join() == CommitStatus.SUCCESS;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700205 }
206
207 @Override
Jonathan Hart56151262016-02-11 09:48:50 -0800208 public boolean unregister(List<ResourceId> ids) {
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800209 checkNotNull(ids);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700210
211 TransactionContext tx = service.transactionContextBuilder().build();
212 tx.begin();
213
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800214 TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800215 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700216 TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildTxMap =
217 tx.getTransactionalMap(DISCRETE_CHILD_MAP, SERIALIZER);
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800218 TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800219 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700220 TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildTxMap =
221 tx.getTransactionalMap(CONTINUOUS_CHILD_MAP, SERIALIZER);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700222
Sho SHIMIZU7d54d9c2016-02-17 13:58:46 -0800223 // Look up resources by resource IDs
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800224 List<Resource> resources = ids.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800225 .filter(x -> x.parent().isPresent())
Sho SHIMIZU7d54d9c2016-02-17 13:58:46 -0800226 .map(x -> {
227 // avoid access to consistent map in the case of discrete resource
228 if (x instanceof DiscreteResourceId) {
229 return Optional.of(Resources.discrete((DiscreteResourceId) x).resource());
230 } else {
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700231 return lookup(continuousChildTxMap, (ContinuousResourceId) x);
Sho SHIMIZU7d54d9c2016-02-17 13:58:46 -0800232 }
233 })
HIGUCHI Yuta315179a2016-02-18 14:01:22 -0800234 .filter(Optional::isPresent)
235 .map(Optional::get)
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800236 .collect(Collectors.toList());
Sho SHIMIZU69420fe2016-02-09 15:01:07 -0800237 // the order is preserved by LinkedHashMap
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800238 Map<DiscreteResourceId, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU69420fe2016-02-09 15:01:07 -0800239 .collect(Collectors.groupingBy(x -> x.parent().get().id(), LinkedHashMap::new, Collectors.toList()));
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700240
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800241 // even if one of the resources is allocated to a consumer,
242 // all unregistrations are regarded as failure
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800243 for (Map.Entry<DiscreteResourceId, List<Resource>> entry: resourceMap.entrySet()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800244 boolean allocated = entry.getValue().stream().anyMatch(x -> {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800245 if (x instanceof DiscreteResource) {
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800246 return discreteConsumerTxMap.get(((DiscreteResource) x).id()) != null;
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800247 } else if (x instanceof ContinuousResource) {
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800248 ContinuousResourceAllocation allocations =
249 continuousConsumerTxMap.get(((ContinuousResource) x).id());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800250 return allocations != null && !allocations.allocations().isEmpty();
251 } else {
252 return false;
253 }
254 });
255 if (allocated) {
HIGUCHI Yuta5b6dfba2016-01-27 14:43:41 -0800256 log.warn("Failed to unregister {}: allocation exists", entry.getKey());
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800257 return abortTransaction(tx);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700258 }
259
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700260 if (!removeValues(discreteChildTxMap, continuousChildTxMap, entry.getKey(), entry.getValue())) {
HIGUCHI Yuta6acdfd02016-02-18 10:39:43 -0800261 log.warn("Failed to unregister {}: Failed to remove {} values.",
262 entry.getKey(), entry.getValue().size());
263 log.debug("Failed to unregister {}: Failed to remove values: {}",
HIGUCHI Yuta5b6dfba2016-01-27 14:43:41 -0800264 entry.getKey(), entry.getValue());
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800265 return abortTransaction(tx);
266 }
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700267 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800268
Madan Jampani3780d4b2016-04-04 18:18:24 -0700269 return tx.commit().whenComplete((status, error) -> {
270 if (status == CommitStatus.SUCCESS) {
271 List<ResourceEvent> events = resources.stream()
272 .filter(x -> x.parent().isPresent())
273 .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
274 .collect(Collectors.toList());
275 notifyDelegate(events);
276 } else {
277 log.warn("Failed to unregister {}: Commit failed.", ids, error);
278 }
279 }).join() == CommitStatus.SUCCESS;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700280 }
281
282 @Override
Jonathan Hart56151262016-02-11 09:48:50 -0800283 public boolean allocate(List<Resource> resources, ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700284 checkNotNull(resources);
285 checkNotNull(consumer);
286
287 TransactionContext tx = service.transactionContextBuilder().build();
288 tx.begin();
289
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800290 TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800291 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700292 TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildTxMap =
293 tx.getTransactionalMap(DISCRETE_CHILD_MAP, SERIALIZER);
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800294 TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800295 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700296 TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildTxMap =
297 tx.getTransactionalMap(CONTINUOUS_CHILD_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700298
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800299 for (Resource resource: resources) {
Sho SHIMIZU171a9382016-02-15 13:56:34 -0800300 // if the resource is not registered, then abort
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700301 Optional<Resource> lookedUp = lookup(discreteChildTxMap, continuousChildTxMap, resource.id());
Sho SHIMIZU171a9382016-02-15 13:56:34 -0800302 if (!lookedUp.isPresent()) {
303 return abortTransaction(tx);
304 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700305
Sho SHIMIZU171a9382016-02-15 13:56:34 -0800306 if (resource instanceof DiscreteResource) {
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800307 ResourceConsumer oldValue = discreteConsumerTxMap.put(((DiscreteResource) resource).id(), consumer);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800308 if (oldValue != null) {
309 return abortTransaction(tx);
310 }
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800311 } else if (resource instanceof ContinuousResource) {
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800312 // Down cast: this must be safe as ContinuousResource is associated with ContinuousResourceId
313 ContinuousResource continuous = (ContinuousResource) lookedUp.get();
314 ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(continuous.id());
315 if (!hasEnoughResource(continuous, (ContinuousResource) resource, allocations)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800316 return abortTransaction(tx);
317 }
318
319 boolean success = appendValue(continuousConsumerTxMap,
Naoki Shiota3bac1c32016-03-31 17:46:45 -0700320 continuous, new ResourceAllocation(resource, consumer));
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800321 if (!success) {
322 return abortTransaction(tx);
323 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800324 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700325 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800326
Madan Jampani3780d4b2016-04-04 18:18:24 -0700327 return tx.commit().join() == CommitStatus.SUCCESS;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700328 }
329
330 @Override
Sho SHIMIZUfc64ffe2016-02-10 20:11:09 -0800331 public boolean release(List<ResourceAllocation> allocations) {
332 checkNotNull(allocations);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700333
334 TransactionContext tx = service.transactionContextBuilder().build();
335 tx.begin();
336
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800337 TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800338 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800339 TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800340 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700341
Sho SHIMIZUfc64ffe2016-02-10 20:11:09 -0800342 for (ResourceAllocation allocation : allocations) {
343 Resource resource = allocation.resource();
344 ResourceConsumer consumer = allocation.consumer();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700345
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800346 if (resource instanceof DiscreteResource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800347 // if this single release fails (because the resource is allocated to another consumer,
348 // the whole release fails
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800349 if (!discreteConsumerTxMap.remove(((DiscreteResource) resource).id(), consumer)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800350 return abortTransaction(tx);
351 }
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800352 } else if (resource instanceof ContinuousResource) {
353 ContinuousResource continuous = (ContinuousResource) resource;
Sho SHIMIZUfc64ffe2016-02-10 20:11:09 -0800354 ContinuousResourceAllocation continuousAllocation = continuousConsumerTxMap.get(continuous.id());
355 ImmutableList<ResourceAllocation> newAllocations = continuousAllocation.allocations().stream()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800356 .filter(x -> !(x.consumer().equals(consumer) &&
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800357 ((ContinuousResource) x.resource()).value() == continuous.value()))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800358 .collect(GuavaCollectors.toImmutableList());
359
Sho SHIMIZUfc64ffe2016-02-10 20:11:09 -0800360 if (!continuousConsumerTxMap.replace(continuous.id(), continuousAllocation,
361 new ContinuousResourceAllocation(continuousAllocation.original(), newAllocations))) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800362 return abortTransaction(tx);
363 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700364 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700365 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800366
Madan Jampani3780d4b2016-04-04 18:18:24 -0700367 return tx.commit().join() == CommitStatus.SUCCESS;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700368 }
369
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800370 // computational complexity: O(1) if the resource is discrete type.
371 // O(n) if the resource is continuous type where n is the number of the children of
372 // the specified resource's parent
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700373 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800374 public boolean isAvailable(Resource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800375 checkNotNull(resource);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800376 checkArgument(resource instanceof DiscreteResource || resource instanceof ContinuousResource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800377
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800378 if (resource instanceof DiscreteResource) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800379 // check if already consumed
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800380 return getResourceAllocations(resource.id()).isEmpty();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800381 } else {
Sho SHIMIZUf17ae282016-02-10 23:44:30 -0800382 return isAvailable((ContinuousResource) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800383 }
384 }
385
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800386 // computational complexity: O(n) where n is the number of existing allocations for the resource
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800387 private boolean isAvailable(ContinuousResource resource) {
Sho SHIMIZUf17ae282016-02-10 23:44:30 -0800388 // check if it's registered or not.
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700389 Versioned<Set<ContinuousResource>> children = continuousChildMap.get(resource.parent().get().id());
Sho SHIMIZUf17ae282016-02-10 23:44:30 -0800390 if (children == null) {
391 return false;
392 }
393
394 ContinuousResource registered = children.value().stream()
395 .filter(c -> c.id().equals(resource.id()))
396 .findFirst()
Sho SHIMIZUf17ae282016-02-10 23:44:30 -0800397 .get();
398 if (registered.value() < resource.value()) {
399 // Capacity < requested, can never satisfy
400 return false;
401 }
402
403 // check if there's enough left
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800404 Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
405 if (allocation == null) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800406 // no allocation (=no consumer) full registered resources available
407 return true;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800408 }
409
410 return hasEnoughResource(allocation.value().original(), resource, allocation.value());
411 }
412
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800413 // computational complexity: O(n + m) where n is the number of entries in discreteConsumers
414 // and m is the number of allocations for all continuous resources
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800415 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800416 public Collection<Resource> getResources(ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700417 checkNotNull(consumer);
418
419 // NOTE: getting all entries may become performance bottleneck
420 // TODO: revisit for better backend data structure
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800421 Stream<DiscreteResource> discreteStream = discreteConsumers.entrySet().stream()
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700422 .filter(x -> x.getValue().value().equals(consumer))
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800423 .map(Map.Entry::getKey)
424 .map(x -> Resources.discrete(x).resource());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800425
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800426 Stream<ContinuousResource> continuousStream = continuousConsumers.values().stream()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800427 .flatMap(x -> x.value().allocations().stream()
428 .map(y -> Maps.immutableEntry(x.value().original(), y)))
429 .filter(x -> x.getValue().consumer().equals(consumer))
430 .map(x -> x.getKey());
431
432 return Stream.concat(discreteStream, continuousStream).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700433 }
434
Sho SHIMIZU82bfe992016-02-10 09:55:32 -0800435 // computational complexity: O(1)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700436 @Override
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800437 public Set<Resource> getChildResources(DiscreteResourceId parent) {
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700438 checkNotNull(parent);
439
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700440 Versioned<Set<DiscreteResource>> discreteChildren = discreteChildMap.get(parent);
441 Versioned<Set<ContinuousResource>> continuousChildren = continuousChildMap.get(parent);
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700442
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700443 if (discreteChildren == null && continuousChildren == null) {
444 return ImmutableSet.of();
445 } else if (discreteChildren == null) {
446 return ImmutableSet.copyOf(continuousChildren.value());
447 } else if (continuousChildren == null) {
448 return ImmutableSet.copyOf(discreteChildren.value());
449 } else {
450 return ImmutableSet.<Resource>builder()
451 .addAll(discreteChildren.value())
452 .addAll(continuousChildren.value())
453 .build();
454 }
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700455 }
456
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800457 // computational complexity: O(n) where n is the number of the children of the parent
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700458 @Override
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800459 public <T> Collection<Resource> getAllocatedResources(DiscreteResourceId parent, Class<T> cls) {
Sho SHIMIZU1f5e5912015-08-10 17:00:00 -0700460 checkNotNull(parent);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700461 checkNotNull(cls);
462
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700463 Set<Resource> children = getChildResources(parent);
464 if (children.isEmpty()) {
465 return children;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700466 }
467
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700468 Stream<DiscreteResource> discrete = children.stream()
Sho SHIMIZU003ed322016-02-11 12:58:42 -0800469 .filter(x -> x.isTypeOf(cls))
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800470 .filter(x -> x instanceof DiscreteResource)
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800471 .map(x -> ((DiscreteResource) x))
472 .filter(x -> discreteConsumers.containsKey(x.id()));
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800473
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700474 Stream<ContinuousResource> continuous = children.stream()
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800475 .filter(x -> x.id().equals(parent.child(cls)))
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800476 .filter(x -> x instanceof ContinuousResource)
477 .map(x -> (ContinuousResource) x)
Sho SHIMIZU90039242016-02-11 09:45:32 -0800478 // we don't use cascading simple predicates like follows to reduce accesses to consistent map
479 // .filter(x -> continuousConsumers.containsKey(x.id()))
480 // .filter(x -> continuousConsumers.get(x.id()) != null)
481 // .filter(x -> !continuousConsumers.get(x.id()).value().allocations().isEmpty());
482 .filter(resource -> {
483 Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
484 if (allocation == null) {
485 return false;
486 }
487 return !allocation.value().allocations().isEmpty();
488 });
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800489
490 return Stream.concat(discrete, continuous).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700491 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700492
493 /**
494 * Abort the transaction.
495 *
496 * @param tx transaction context
497 * @return always false
498 */
499 private boolean abortTransaction(TransactionContext tx) {
500 tx.abort();
501 return false;
502 }
503
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800504 // Appends the specified ResourceAllocation to the existing values stored in the map
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800505 // computational complexity: O(n) where n is the number of the elements in the associated allocation
506 private boolean appendValue(TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> map,
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800507 ContinuousResource original, ResourceAllocation value) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800508 ContinuousResourceAllocation oldValue = map.putIfAbsent(original.id(),
509 new ContinuousResourceAllocation(original, ImmutableList.of(value)));
510 if (oldValue == null) {
511 return true;
512 }
513
514 if (oldValue.allocations().contains(value)) {
515 // don't write to map because all values are already stored
516 return true;
517 }
518
519 ContinuousResourceAllocation newValue = new ContinuousResourceAllocation(original,
520 ImmutableList.<ResourceAllocation>builder()
521 .addAll(oldValue.allocations())
522 .add(value)
523 .build());
524 return map.replace(original.id(), oldValue, newValue);
525 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700526 /**
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700527 * Appends the values to the existing values associated with the specified key.
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700528 * If the map already has all the given values, appending will not happen.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700529 *
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700530 * @param discreteTxMap map holding multiple discrete resources for a key
531 * @param continuousTxMap map holding multiple continuous resources for a key
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700532 * @param key key specifying values
533 * @param values values to be appended
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700534 * @return true if the operation succeeds, false otherwise.
535 */
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800536 // computational complexity: O(n) where n is the number of the specified value
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700537 private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap,
538 TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap,
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800539 DiscreteResourceId key, List<Resource> values) {
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700540 // it's assumed that the passed "values" is non-empty
541
542 // This is 2-pass scan. Nicer to have 1-pass scan
543 List<DiscreteResource> discreteValues = values.stream()
544 .filter(x -> x instanceof DiscreteResource)
545 .map(x -> (DiscreteResource) x)
546 .collect(Collectors.toList());
547 List<ContinuousResource> continuousValues = values.stream()
548 .filter(x -> x instanceof ContinuousResource)
549 .map(x -> (ContinuousResource) x)
550 .collect(Collectors.toList());
551
552 // short-circuit decision avoiding unnecessary distributed map operations
553 if (continuousValues.isEmpty()) {
554 return appendValues(discreteTxMap, key, discreteValues, null);
555 }
556 if (discreteValues.isEmpty()) {
557 return appendValues(continuousTxMap, key, continuousValues, null);
558 }
559
560 return appendValues(discreteTxMap, key, discreteValues, null)
561 && appendValues(continuousTxMap, key, continuousValues, null);
562 }
563
564 private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> map,
565 DiscreteResourceId key, List<DiscreteResource> values, DiscreteResource dummy) {
566 Set<DiscreteResource> requested = new LinkedHashSet<>(values);
567 Set<DiscreteResource> oldValues = map.putIfAbsent(key, requested);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700568 if (oldValues == null) {
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800569 return true;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700570 }
571
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700572 Set<DiscreteResource> addedValues = Sets.difference(requested, oldValues);
Sho SHIMIZU67c90102016-02-23 10:49:58 -0800573 // no new value, then no-op
Sho SHIMIZU1992daf2016-02-17 17:38:55 -0800574 if (addedValues.isEmpty()) {
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700575 // don't write to map because all values are already stored
576 return true;
577 }
578
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700579 Set<DiscreteResource> newValues = new LinkedHashSet<>(oldValues);
580 newValues.addAll(addedValues);
581 return map.replace(key, oldValues, newValues);
582 }
583
584 private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> map,
585 DiscreteResourceId key, List<ContinuousResource> values, ContinuousResource dummy) {
586 Set<ContinuousResource> requested = new LinkedHashSet<>(values);
587 Set<ContinuousResource> oldValues = map.putIfAbsent(key, requested);
588 if (oldValues == null) {
589 return true;
590 }
591
592 Set<ContinuousResource> addedValues = Sets.difference(requested, oldValues);
593 // no new value, then no-op
594 if (addedValues.isEmpty()) {
595 // don't write to map because all values are already stored
596 return true;
597 }
598
599 Set<ContinuousResourceId> addedIds = addedValues.stream()
600 .map(ContinuousResource::id)
Sho SHIMIZU67c90102016-02-23 10:49:58 -0800601 .collect(Collectors.toSet());
602 // if the value is not found but the same ID is found
603 // (this happens only when being continuous resource)
604 if (oldValues.stream().anyMatch(x -> addedIds.contains(x.id()))) {
605 // no-op, but indicating failure (reject the request)
606 return false;
607 }
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700608 Set<ContinuousResource> newValues = new LinkedHashSet<>(oldValues);
Sho SHIMIZU1992daf2016-02-17 17:38:55 -0800609 newValues.addAll(addedValues);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800610 return map.replace(key, oldValues, newValues);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700611 }
612
613 /**
Sho SHIMIZUba1f83b2015-10-14 08:11:20 -0700614 * Removes the values from the existing values associated with the specified key.
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700615 * If the map doesn't contain the given values, removal will not happen.
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700616 *
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700617 * @param discreteTxMap map holding multiple discrete resources for a key
618 * @param continuousTxMap map holding multiple continuous resources for a key
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700619 * @param key key specifying values
620 * @param values values to be removed
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700621 * @return true if the operation succeeds, false otherwise
622 */
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700623 private boolean removeValues(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap,
624 TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap,
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800625 DiscreteResourceId key, List<Resource> values) {
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700626 // it's assumed that the passed "values" is non-empty
627
628 // This is 2-pass scan. Nicer to have 1-pass scan
629 List<DiscreteResource> discreteValues = values.stream()
630 .filter(x -> x instanceof DiscreteResource)
631 .map(x -> (DiscreteResource) x)
632 .collect(Collectors.toList());
633 List<ContinuousResource> continuousValues = values.stream()
634 .filter(x -> x instanceof ContinuousResource)
635 .map(x -> (ContinuousResource) x)
636 .collect(Collectors.toList());
637
638 // short-circuit decision avoiding unnecessary distributed map operations
639 if (continuousValues.isEmpty()) {
640 return removeValues(discreteTxMap, key, discreteValues);
641 }
642 if (discreteValues.isEmpty()) {
643 return removeValues(continuousTxMap, key, continuousValues);
644 }
645
646 return removeValues(discreteTxMap, key, discreteValues) && removeValues(continuousTxMap, key, continuousValues);
647 }
648
649 private <T extends Resource> boolean removeValues(TransactionalMap<DiscreteResourceId, Set<T>> map,
650 DiscreteResourceId key, List<T> values) {
651 Set<T> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700652 if (oldValues == null) {
HIGUCHI Yutadc4394c2016-01-29 15:35:10 -0800653 log.trace("No-Op removing values. key {} did not exist", key);
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800654 return true;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700655 }
656
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800657 if (values.stream().allMatch(x -> !oldValues.contains(x))) {
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700658 // don't write map because none of the values are stored
HIGUCHI Yutadc4394c2016-01-29 15:35:10 -0800659 log.trace("No-Op removing values. key {} did not contain {}", key, values);
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700660 return true;
661 }
662
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700663 LinkedHashSet<T> newValues = new LinkedHashSet<>(oldValues);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800664 newValues.removeAll(values);
665 return map.replace(key, oldValues, newValues);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700666
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700667 }
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700668 /**
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800669 * Returns the resource which has the same key as the specified resource ID
670 * in the set as a value of the map.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700671 *
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700672 * @param discreteTxMap map storing parent - child relationship of discrete resources
673 * @param continuousTxMap map storing parent -child relationship of continuous resources
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800674 * @param id ID of resource to be checked
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800675 * @return the resource which is regarded as the same as the specified resource
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700676 */
Sho SHIMIZUa6a6fd32016-02-10 18:36:44 -0800677 // Naive implementation, which traverses all elements in the set when continuous resource
678 // computational complexity: O(1) when discrete resource. O(n) when continuous resource
679 // where n is the number of elements in the associated set
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700680 private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap,
681 TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap,
682 ResourceId id) {
683 if (id instanceof DiscreteResourceId) {
684 return lookup(discreteTxMap, (DiscreteResourceId) id);
685 } else if (id instanceof ContinuousResourceId) {
686 return lookup(continuousTxMap, (ContinuousResourceId) id);
687 } else {
688 return Optional.empty();
689 }
690 }
691
692 // check the existence in the set: O(1) operation
693 private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap,
694 DiscreteResourceId id) {
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800695 if (!id.parent().isPresent()) {
696 return Optional.of(Resource.ROOT);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700697 }
698
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700699 Set<DiscreteResource> values = discreteTxMap.get(id.parent().get());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800700 if (values == null) {
701 return Optional.empty();
702 }
703
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700704 DiscreteResource resource = Resources.discrete(id).resource();
705 if (values.contains(resource)) {
706 return Optional.of(resource);
707 } else {
708 return Optional.empty();
709 }
710 }
711
712 // iterate over the values in the set: O(n) operation
713 private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap,
714 ContinuousResourceId id) {
715 if (!id.parent().isPresent()) {
716 return Optional.of(Resource.ROOT);
Sho SHIMIZUa6a6fd32016-02-10 18:36:44 -0800717 }
718
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700719 Set<ContinuousResource> values = continuousTxMap.get(id.parent().get());
720 if (values == null) {
721 return Optional.empty();
722 }
723
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800724 return values.stream()
725 .filter(x -> x.id().equals(id))
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700726 .map(x -> (Resource) x)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800727 .findFirst();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800728 }
729
730 /**
731 * Checks if there is enough resource volume to allocated the requested resource
732 * against the specified resource.
733 *
734 * @param original original resource
735 * @param request requested resource
736 * @param allocation current allocation of the resource
737 * @return true if there is enough resource volume. Otherwise, false.
738 */
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800739 // computational complexity: O(n) where n is the number of allocations
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800740 private boolean hasEnoughResource(ContinuousResource original,
741 ContinuousResource request,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800742 ContinuousResourceAllocation allocation) {
743 if (allocation == null) {
744 return request.value() <= original.value();
745 }
746
747 double allocated = allocation.allocations().stream()
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800748 .filter(x -> x.resource() instanceof ContinuousResource)
749 .map(x -> (ContinuousResource) x.resource())
750 .mapToDouble(ContinuousResource::value)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800751 .sum();
752 double left = original.value() - allocated;
753 return request.value() <= left;
754 }
755
756 // internal use only
757 private static final class ContinuousResourceAllocation {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800758 private final ContinuousResource original;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800759 private final ImmutableList<ResourceAllocation> allocations;
760
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800761 private ContinuousResourceAllocation(ContinuousResource original,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800762 ImmutableList<ResourceAllocation> allocations) {
763 this.original = original;
764 this.allocations = allocations;
765 }
766
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800767 private ContinuousResource original() {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800768 return original;
769 }
770
771 private ImmutableList<ResourceAllocation> allocations() {
772 return allocations;
773 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700774 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700775}