blob: a51baa71337ed4875de7c379212af6ec668f4643 [file] [log] [blame]
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07001/*
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -08002 * Copyright 2015-2016 Open Networking Laboratory
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.newresource.impl;
17
18import com.google.common.annotations.Beta;
Sho SHIMIZUe7db6142015-11-04 11:24:22 -080019import com.google.common.collect.ImmutableList;
Sho SHIMIZU83258ae2016-01-29 17:39:07 -080020import com.google.common.collect.ImmutableSet;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080021import com.google.common.collect.Maps;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080027import org.onlab.util.GuavaCollectors;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080028import org.onlab.util.Tools;
Sho SHIMIZUf33b8932016-01-25 18:43:32 -080029import org.onosproject.net.newresource.ContinuousResource;
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -080030import org.onosproject.net.newresource.ContinuousResourceId;
Sho SHIMIZUf33b8932016-01-25 18:43:32 -080031import org.onosproject.net.newresource.DiscreteResource;
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -080032import org.onosproject.net.newresource.DiscreteResourceId;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080033import org.onosproject.net.newresource.ResourceAllocation;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070034import org.onosproject.net.newresource.ResourceConsumer;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080035import org.onosproject.net.newresource.ResourceEvent;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080036import org.onosproject.net.newresource.ResourceId;
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -080037import org.onosproject.net.newresource.Resource;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070038import org.onosproject.net.newresource.ResourceStore;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080039import org.onosproject.net.newresource.ResourceStoreDelegate;
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -080040import org.onosproject.net.newresource.Resources;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080041import org.onosproject.store.AbstractStore;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070042import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.service.ConsistentMap;
Thomas Vachuska762a2d82016-01-04 10:25:20 -080044import org.onosproject.store.service.ConsistentMapException;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070045import org.onosproject.store.service.Serializer;
46import org.onosproject.store.service.StorageService;
47import org.onosproject.store.service.TransactionContext;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070048import org.onosproject.store.service.TransactionalMap;
49import org.onosproject.store.service.Versioned;
50import org.slf4j.Logger;
51import org.slf4j.LoggerFactory;
52
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070053import java.util.Arrays;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070054import java.util.Collection;
Sho SHIMIZU69420fe2016-02-09 15:01:07 -080055import java.util.LinkedHashMap;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070056import java.util.LinkedHashSet;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070057import java.util.List;
58import java.util.Map;
59import java.util.Optional;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080060import java.util.Set;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070061import java.util.stream.Collectors;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080062import java.util.stream.Stream;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070063
64import static com.google.common.base.Preconditions.checkArgument;
65import static com.google.common.base.Preconditions.checkNotNull;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080066import static org.onosproject.net.newresource.ResourceEvent.Type.*;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070067
68/**
69 * Implementation of ResourceStore using TransactionalMap.
70 */
Sho SHIMIZU9a2b8292015-10-28 13:00:16 -070071@Component(immediate = true)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070072@Service
73@Beta
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080074public class ConsistentResourceStore extends AbstractStore<ResourceEvent, ResourceStoreDelegate>
75 implements ResourceStore {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070076 private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
77
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080078 private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers";
79 private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
Sho SHIMIZUba41fc12015-08-12 15:43:22 -070080 private static final String CHILD_MAP = "onos-resource-children";
81 private static final Serializer SERIALIZER = Serializer.using(
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080082 Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API),
83 ContinuousResourceAllocation.class);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070084
Thomas Vachuska762a2d82016-01-04 10:25:20 -080085 // TODO: We should provide centralized values for this
86 private static final int MAX_RETRIES = 5;
87 private static final int RETRY_DELAY = 1_000; // millis
88
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070089 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 protected StorageService service;
91
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -080092 private ConsistentMap<DiscreteResourceId, ResourceConsumer> discreteConsumers;
93 private ConsistentMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumers;
94 private ConsistentMap<DiscreteResourceId, Set<Resource>> childMap;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070095
96 @Activate
97 public void activate() {
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -080098 discreteConsumers = service.<DiscreteResourceId, ResourceConsumer>consistentMapBuilder()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -080099 .withName(DISCRETE_CONSUMER_MAP)
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700100 .withSerializer(SERIALIZER)
101 .build();
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800102 continuousConsumers = service.<ContinuousResourceId, ContinuousResourceAllocation>consistentMapBuilder()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800103 .withName(CONTINUOUS_CONSUMER_MAP)
104 .withSerializer(SERIALIZER)
105 .build();
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800106 childMap = service.<DiscreteResourceId, Set<Resource>>consistentMapBuilder()
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700107 .withName(CHILD_MAP)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700108 .withSerializer(SERIALIZER)
109 .build();
Sho SHIMIZUe7db6142015-11-04 11:24:22 -0800110
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800111 Tools.retryable(() -> childMap.put(Resource.ROOT.id(), new LinkedHashSet<>()),
Thomas Vachuska762a2d82016-01-04 10:25:20 -0800112 ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
Madan Jampanic7f49f92015-12-10 11:35:06 -0800113 log.info("Started");
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700114 }
115
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800116 // Computational complexity: O(1) if the resource is discrete type.
117 // O(n) if the resource is continuous type where n is the number of the existing allocations for the resource
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700118 @Override
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800119 public List<ResourceAllocation> getResourceAllocations(ResourceId id) {
120 checkNotNull(id);
121 checkArgument(id instanceof DiscreteResourceId || id instanceof ContinuousResourceId);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700122
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800123 if (id instanceof DiscreteResourceId) {
124 return getResourceAllocations((DiscreteResourceId) id);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800125 } else {
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800126 return getResourceAllocations((ContinuousResourceId) id);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800127 }
128 }
129
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800130 // computational complexity: O(1)
131 private List<ResourceAllocation> getResourceAllocations(DiscreteResourceId resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800132 Versioned<ResourceConsumer> consumer = discreteConsumers.get(resource);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700133 if (consumer == null) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800134 return ImmutableList.of();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700135 }
136
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800137 return ImmutableList.of(new ResourceAllocation(Resources.discrete(resource).resource(), consumer.value()));
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800138 }
139
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800140 // computational complexity: O(n) where n is the number of the existing allocations for the resource
141 private List<ResourceAllocation> getResourceAllocations(ContinuousResourceId resource) {
142 Versioned<ContinuousResourceAllocation> allocations = continuousConsumers.get(resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800143 if (allocations == null) {
144 return ImmutableList.of();
145 }
146
147 return allocations.value().allocations().stream()
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800148 .filter(x -> x.resource().id().equals(resource))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800149 .collect(GuavaCollectors.toImmutableList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700150 }
151
152 @Override
Jonathan Hart56151262016-02-11 09:48:50 -0800153 public boolean register(List<Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700154 checkNotNull(resources);
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800155 if (log.isTraceEnabled()) {
156 resources.forEach(r -> log.trace("registering {}", r));
157 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700158
159 TransactionContext tx = service.transactionContextBuilder().build();
160 tx.begin();
161
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800162 TransactionalMap<DiscreteResourceId, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800163 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700164
Sho SHIMIZU69420fe2016-02-09 15:01:07 -0800165 // the order is preserved by LinkedHashMap
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800166 Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800167 .filter(x -> x.parent().isPresent())
Sho SHIMIZU69420fe2016-02-09 15:01:07 -0800168 .collect(Collectors.groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.toList()));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700169
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800170 for (Map.Entry<DiscreteResource, List<Resource>> entry: resourceMap.entrySet()) {
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800171 if (!lookup(childTxMap, entry.getKey().id()).isPresent()) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800172 return abortTransaction(tx);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700173 }
174
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800175 if (!appendValues(childTxMap, entry.getKey().id(), entry.getValue())) {
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800176 return abortTransaction(tx);
177 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700178 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800179
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800180 boolean success = tx.commit();
181 if (success) {
182 List<ResourceEvent> events = resources.stream()
183 .filter(x -> x.parent().isPresent())
184 .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
185 .collect(Collectors.toList());
186 notifyDelegate(events);
187 }
188 return success;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700189 }
190
191 @Override
Jonathan Hart56151262016-02-11 09:48:50 -0800192 public boolean unregister(List<ResourceId> ids) {
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800193 checkNotNull(ids);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700194
195 TransactionContext tx = service.transactionContextBuilder().build();
196 tx.begin();
197
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800198 TransactionalMap<DiscreteResourceId, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800199 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800200 TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800201 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800202 TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800203 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700204
Sho SHIMIZU7d54d9c2016-02-17 13:58:46 -0800205 // Look up resources by resource IDs
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800206 List<Resource> resources = ids.stream()
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800207 .filter(x -> x.parent().isPresent())
Sho SHIMIZU7d54d9c2016-02-17 13:58:46 -0800208 .map(x -> {
209 // avoid access to consistent map in the case of discrete resource
210 if (x instanceof DiscreteResourceId) {
211 return Optional.of(Resources.discrete((DiscreteResourceId) x).resource());
212 } else {
213 return lookup(childTxMap, x);
214 }
215 })
216 .flatMap(Tools::stream)
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800217 .collect(Collectors.toList());
Sho SHIMIZU69420fe2016-02-09 15:01:07 -0800218 // the order is preserved by LinkedHashMap
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800219 Map<DiscreteResourceId, List<Resource>> resourceMap = resources.stream()
Sho SHIMIZU69420fe2016-02-09 15:01:07 -0800220 .collect(Collectors.groupingBy(x -> x.parent().get().id(), LinkedHashMap::new, Collectors.toList()));
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700221
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800222 // even if one of the resources is allocated to a consumer,
223 // all unregistrations are regarded as failure
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800224 for (Map.Entry<DiscreteResourceId, List<Resource>> entry: resourceMap.entrySet()) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800225 boolean allocated = entry.getValue().stream().anyMatch(x -> {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800226 if (x instanceof DiscreteResource) {
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800227 return discreteConsumerTxMap.get(((DiscreteResource) x).id()) != null;
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800228 } else if (x instanceof ContinuousResource) {
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800229 ContinuousResourceAllocation allocations =
230 continuousConsumerTxMap.get(((ContinuousResource) x).id());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800231 return allocations != null && !allocations.allocations().isEmpty();
232 } else {
233 return false;
234 }
235 });
236 if (allocated) {
HIGUCHI Yuta5b6dfba2016-01-27 14:43:41 -0800237 log.warn("Failed to unregister {}: allocation exists", entry.getKey());
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800238 return abortTransaction(tx);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700239 }
240
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800241 if (!removeValues(childTxMap, entry.getKey(), entry.getValue())) {
HIGUCHI Yuta6acdfd02016-02-18 10:39:43 -0800242 log.warn("Failed to unregister {}: Failed to remove {} values.",
243 entry.getKey(), entry.getValue().size());
244 log.debug("Failed to unregister {}: Failed to remove values: {}",
HIGUCHI Yuta5b6dfba2016-01-27 14:43:41 -0800245 entry.getKey(), entry.getValue());
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800246 return abortTransaction(tx);
247 }
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700248 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800249
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800250 boolean success = tx.commit();
251 if (success) {
252 List<ResourceEvent> events = resources.stream()
253 .filter(x -> x.parent().isPresent())
254 .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
255 .collect(Collectors.toList());
256 notifyDelegate(events);
HIGUCHI Yuta5b6dfba2016-01-27 14:43:41 -0800257 } else {
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800258 log.warn("Failed to unregister {}: Commit failed.", ids);
Sho SHIMIZUfa62b472015-11-02 17:35:46 -0800259 }
260 return success;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700261 }
262
263 @Override
Jonathan Hart56151262016-02-11 09:48:50 -0800264 public boolean allocate(List<Resource> resources, ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700265 checkNotNull(resources);
266 checkNotNull(consumer);
267
268 TransactionContext tx = service.transactionContextBuilder().build();
269 tx.begin();
270
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800271 TransactionalMap<DiscreteResourceId, Set<Resource>> childTxMap =
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800272 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800273 TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800274 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800275 TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800276 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700277
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800278 for (Resource resource: resources) {
Sho SHIMIZU171a9382016-02-15 13:56:34 -0800279 // if the resource is not registered, then abort
280 Optional<Resource> lookedUp = lookup(childTxMap, resource.id());
281 if (!lookedUp.isPresent()) {
282 return abortTransaction(tx);
283 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700284
Sho SHIMIZU171a9382016-02-15 13:56:34 -0800285 if (resource instanceof DiscreteResource) {
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800286 ResourceConsumer oldValue = discreteConsumerTxMap.put(((DiscreteResource) resource).id(), consumer);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800287 if (oldValue != null) {
288 return abortTransaction(tx);
289 }
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800290 } else if (resource instanceof ContinuousResource) {
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800291 // Down cast: this must be safe as ContinuousResource is associated with ContinuousResourceId
292 ContinuousResource continuous = (ContinuousResource) lookedUp.get();
293 ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(continuous.id());
294 if (!hasEnoughResource(continuous, (ContinuousResource) resource, allocations)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800295 return abortTransaction(tx);
296 }
297
298 boolean success = appendValue(continuousConsumerTxMap,
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800299 continuous, new ResourceAllocation(continuous, consumer));
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800300 if (!success) {
301 return abortTransaction(tx);
302 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800303 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700304 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800305
306 return tx.commit();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700307 }
308
309 @Override
Sho SHIMIZUfc64ffe2016-02-10 20:11:09 -0800310 public boolean release(List<ResourceAllocation> allocations) {
311 checkNotNull(allocations);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700312
313 TransactionContext tx = service.transactionContextBuilder().build();
314 tx.begin();
315
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800316 TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800317 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800318 TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800319 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700320
Sho SHIMIZUfc64ffe2016-02-10 20:11:09 -0800321 for (ResourceAllocation allocation : allocations) {
322 Resource resource = allocation.resource();
323 ResourceConsumer consumer = allocation.consumer();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700324
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800325 if (resource instanceof DiscreteResource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800326 // if this single release fails (because the resource is allocated to another consumer,
327 // the whole release fails
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800328 if (!discreteConsumerTxMap.remove(((DiscreteResource) resource).id(), consumer)) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800329 return abortTransaction(tx);
330 }
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800331 } else if (resource instanceof ContinuousResource) {
332 ContinuousResource continuous = (ContinuousResource) resource;
Sho SHIMIZUfc64ffe2016-02-10 20:11:09 -0800333 ContinuousResourceAllocation continuousAllocation = continuousConsumerTxMap.get(continuous.id());
334 ImmutableList<ResourceAllocation> newAllocations = continuousAllocation.allocations().stream()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800335 .filter(x -> !(x.consumer().equals(consumer) &&
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800336 ((ContinuousResource) x.resource()).value() == continuous.value()))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800337 .collect(GuavaCollectors.toImmutableList());
338
Sho SHIMIZUfc64ffe2016-02-10 20:11:09 -0800339 if (!continuousConsumerTxMap.replace(continuous.id(), continuousAllocation,
340 new ContinuousResourceAllocation(continuousAllocation.original(), newAllocations))) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800341 return abortTransaction(tx);
342 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700343 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700344 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800345
346 return tx.commit();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700347 }
348
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800349 // computational complexity: O(1) if the resource is discrete type.
350 // O(n) if the resource is continuous type where n is the number of the children of
351 // the specified resource's parent
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700352 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800353 public boolean isAvailable(Resource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800354 checkNotNull(resource);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800355 checkArgument(resource instanceof DiscreteResource || resource instanceof ContinuousResource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800356
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800357 if (resource instanceof DiscreteResource) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800358 // check if already consumed
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800359 return getResourceAllocations(resource.id()).isEmpty();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800360 } else {
Sho SHIMIZUf17ae282016-02-10 23:44:30 -0800361 return isAvailable((ContinuousResource) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800362 }
363 }
364
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800365 // computational complexity: O(n) where n is the number of existing allocations for the resource
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800366 private boolean isAvailable(ContinuousResource resource) {
Sho SHIMIZUf17ae282016-02-10 23:44:30 -0800367 // check if it's registered or not.
368 Versioned<Set<Resource>> children = childMap.get(resource.parent().get().id());
369 if (children == null) {
370 return false;
371 }
372
373 ContinuousResource registered = children.value().stream()
374 .filter(c -> c.id().equals(resource.id()))
375 .findFirst()
376 .map(c -> (ContinuousResource) c)
377 .get();
378 if (registered.value() < resource.value()) {
379 // Capacity < requested, can never satisfy
380 return false;
381 }
382
383 // check if there's enough left
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800384 Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
385 if (allocation == null) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800386 // no allocation (=no consumer) full registered resources available
387 return true;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800388 }
389
390 return hasEnoughResource(allocation.value().original(), resource, allocation.value());
391 }
392
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800393 // computational complexity: O(n + m) where n is the number of entries in discreteConsumers
394 // and m is the number of allocations for all continuous resources
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800395 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800396 public Collection<Resource> getResources(ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700397 checkNotNull(consumer);
398
399 // NOTE: getting all entries may become performance bottleneck
400 // TODO: revisit for better backend data structure
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800401 Stream<DiscreteResource> discreteStream = discreteConsumers.entrySet().stream()
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700402 .filter(x -> x.getValue().value().equals(consumer))
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800403 .map(Map.Entry::getKey)
404 .map(x -> Resources.discrete(x).resource());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800405
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800406 Stream<ContinuousResource> continuousStream = continuousConsumers.values().stream()
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800407 .flatMap(x -> x.value().allocations().stream()
408 .map(y -> Maps.immutableEntry(x.value().original(), y)))
409 .filter(x -> x.getValue().consumer().equals(consumer))
410 .map(x -> x.getKey());
411
412 return Stream.concat(discreteStream, continuousStream).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700413 }
414
Sho SHIMIZU82bfe992016-02-10 09:55:32 -0800415 // computational complexity: O(1)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700416 @Override
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800417 public Set<Resource> getChildResources(DiscreteResourceId parent) {
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700418 checkNotNull(parent);
419
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800420 Versioned<Set<Resource>> children = childMap.get(parent);
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700421 if (children == null) {
Sho SHIMIZU83258ae2016-01-29 17:39:07 -0800422 return ImmutableSet.of();
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700423 }
424
425 return children.value();
426 }
427
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800428 // computational complexity: O(n) where n is the number of the children of the parent
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700429 @Override
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800430 public <T> Collection<Resource> getAllocatedResources(DiscreteResourceId parent, Class<T> cls) {
Sho SHIMIZU1f5e5912015-08-10 17:00:00 -0700431 checkNotNull(parent);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700432 checkNotNull(cls);
433
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800434 Versioned<Set<Resource>> children = childMap.get(parent);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700435 if (children == null) {
Sho SHIMIZU2c0ae122016-01-20 13:14:38 -0800436 return ImmutableList.of();
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700437 }
438
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800439 Stream<DiscreteResource> discrete = children.value().stream()
Sho SHIMIZU003ed322016-02-11 12:58:42 -0800440 .filter(x -> x.isTypeOf(cls))
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800441 .filter(x -> x instanceof DiscreteResource)
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800442 .map(x -> ((DiscreteResource) x))
443 .filter(x -> discreteConsumers.containsKey(x.id()));
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800444
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800445 Stream<ContinuousResource> continuous = children.value().stream()
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800446 .filter(x -> x.id().equals(parent.child(cls)))
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800447 .filter(x -> x instanceof ContinuousResource)
448 .map(x -> (ContinuousResource) x)
Sho SHIMIZU90039242016-02-11 09:45:32 -0800449 // we don't use cascading simple predicates like follows to reduce accesses to consistent map
450 // .filter(x -> continuousConsumers.containsKey(x.id()))
451 // .filter(x -> continuousConsumers.get(x.id()) != null)
452 // .filter(x -> !continuousConsumers.get(x.id()).value().allocations().isEmpty());
453 .filter(resource -> {
454 Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
455 if (allocation == null) {
456 return false;
457 }
458 return !allocation.value().allocations().isEmpty();
459 });
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800460
461 return Stream.concat(discrete, continuous).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700462 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700463
464 /**
465 * Abort the transaction.
466 *
467 * @param tx transaction context
468 * @return always false
469 */
470 private boolean abortTransaction(TransactionContext tx) {
471 tx.abort();
472 return false;
473 }
474
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800475 // Appends the specified ResourceAllocation to the existing values stored in the map
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800476 // computational complexity: O(n) where n is the number of the elements in the associated allocation
477 private boolean appendValue(TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> map,
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800478 ContinuousResource original, ResourceAllocation value) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800479 ContinuousResourceAllocation oldValue = map.putIfAbsent(original.id(),
480 new ContinuousResourceAllocation(original, ImmutableList.of(value)));
481 if (oldValue == null) {
482 return true;
483 }
484
485 if (oldValue.allocations().contains(value)) {
486 // don't write to map because all values are already stored
487 return true;
488 }
489
490 ContinuousResourceAllocation newValue = new ContinuousResourceAllocation(original,
491 ImmutableList.<ResourceAllocation>builder()
492 .addAll(oldValue.allocations())
493 .add(value)
494 .build());
495 return map.replace(original.id(), oldValue, newValue);
496 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700497 /**
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700498 * Appends the values to the existing values associated with the specified key.
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700499 * If the map already has all the given values, appending will not happen.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700500 *
501 * @param map map holding multiple values for a key
502 * @param key key specifying values
503 * @param values values to be appended
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700504 * @return true if the operation succeeds, false otherwise.
505 */
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800506 // computational complexity: O(n) where n is the number of the specified value
507 private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<Resource>> map,
508 DiscreteResourceId key, List<Resource> values) {
Sho SHIMIZU07b7bc92016-01-29 18:27:58 -0800509 Set<Resource> oldValues = map.putIfAbsent(key, new LinkedHashSet<>(values));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700510 if (oldValues == null) {
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800511 return true;
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700512 }
513
Sho SHIMIZU1992daf2016-02-17 17:38:55 -0800514 Set<ResourceId> oldIds = oldValues.stream()
515 .map(Resource::id)
516 .collect(Collectors.toSet());
517 // values whose IDs don't match any IDs of oldValues
518 Set<Resource> addedValues = values.stream()
519 .filter(x -> !oldIds.contains(x.id()))
520 .collect(Collectors.toCollection(LinkedHashSet::new));
521 // no new ID, then no-op
522 if (addedValues.isEmpty()) {
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700523 // don't write to map because all values are already stored
524 return true;
525 }
526
Sho SHIMIZU07b7bc92016-01-29 18:27:58 -0800527 LinkedHashSet<Resource> newValues = new LinkedHashSet<>(oldValues);
Sho SHIMIZU1992daf2016-02-17 17:38:55 -0800528 newValues.addAll(addedValues);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800529 return map.replace(key, oldValues, newValues);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700530 }
531
532 /**
Sho SHIMIZUba1f83b2015-10-14 08:11:20 -0700533 * Removes the values from the existing values associated with the specified key.
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700534 * If the map doesn't contain the given values, removal will not happen.
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700535 *
536 * @param map map holding multiple values for a key
537 * @param key key specifying values
538 * @param values values to be removed
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700539 * @return true if the operation succeeds, false otherwise
540 */
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800541 // computational complexity: O(n) where n is the number of the specified values
542 private boolean removeValues(TransactionalMap<DiscreteResourceId, Set<Resource>> map,
543 DiscreteResourceId key, List<Resource> values) {
Sho SHIMIZU07b7bc92016-01-29 18:27:58 -0800544 Set<Resource> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700545 if (oldValues == null) {
HIGUCHI Yutadc4394c2016-01-29 15:35:10 -0800546 log.trace("No-Op removing values. key {} did not exist", key);
Sho SHIMIZU93a74b32015-11-09 11:48:23 -0800547 return true;
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700548 }
549
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800550 if (values.stream().allMatch(x -> !oldValues.contains(x))) {
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700551 // don't write map because none of the values are stored
HIGUCHI Yutadc4394c2016-01-29 15:35:10 -0800552 log.trace("No-Op removing values. key {} did not contain {}", key, values);
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700553 return true;
554 }
555
Sho SHIMIZU07b7bc92016-01-29 18:27:58 -0800556 LinkedHashSet<Resource> newValues = new LinkedHashSet<>(oldValues);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800557 newValues.removeAll(values);
558 return map.replace(key, oldValues, newValues);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700559 }
560
561 /**
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800562 * Returns the resource which has the same key as the specified resource ID
563 * in the set as a value of the map.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700564 *
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800565 * @param childTxMap map storing parent - child relationship of resources
566 * @param id ID of resource to be checked
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800567 * @return the resource which is regarded as the same as the specified resource
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700568 */
Sho SHIMIZUa6a6fd32016-02-10 18:36:44 -0800569 // Naive implementation, which traverses all elements in the set when continuous resource
570 // computational complexity: O(1) when discrete resource. O(n) when continuous resource
571 // where n is the number of elements in the associated set
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800572 private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<Resource>> childTxMap, ResourceId id) {
573 if (!id.parent().isPresent()) {
574 return Optional.of(Resource.ROOT);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700575 }
576
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800577 Set<Resource> values = childTxMap.get(id.parent().get());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800578 if (values == null) {
579 return Optional.empty();
580 }
581
Sho SHIMIZUa6a6fd32016-02-10 18:36:44 -0800582 // short-circuit if discrete resource
583 // check the existence in the set: O(1) operation
584 if (id instanceof DiscreteResourceId) {
585 DiscreteResource discrete = Resources.discrete((DiscreteResourceId) id).resource();
586 if (values.contains(discrete)) {
587 return Optional.of(discrete);
588 } else {
589 return Optional.empty();
590 }
591 }
592
593 // continuous resource case
594 // iterate over the values in the set: O(n) operation
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800595 return values.stream()
596 .filter(x -> x.id().equals(id))
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800597 .findFirst();
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800598 }
599
600 /**
601 * Checks if there is enough resource volume to allocated the requested resource
602 * against the specified resource.
603 *
604 * @param original original resource
605 * @param request requested resource
606 * @param allocation current allocation of the resource
607 * @return true if there is enough resource volume. Otherwise, false.
608 */
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800609 // computational complexity: O(n) where n is the number of allocations
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800610 private boolean hasEnoughResource(ContinuousResource original,
611 ContinuousResource request,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800612 ContinuousResourceAllocation allocation) {
613 if (allocation == null) {
614 return request.value() <= original.value();
615 }
616
617 double allocated = allocation.allocations().stream()
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800618 .filter(x -> x.resource() instanceof ContinuousResource)
619 .map(x -> (ContinuousResource) x.resource())
620 .mapToDouble(ContinuousResource::value)
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800621 .sum();
622 double left = original.value() - allocated;
623 return request.value() <= left;
624 }
625
626 // internal use only
627 private static final class ContinuousResourceAllocation {
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800628 private final ContinuousResource original;
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800629 private final ImmutableList<ResourceAllocation> allocations;
630
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800631 private ContinuousResourceAllocation(ContinuousResource original,
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800632 ImmutableList<ResourceAllocation> allocations) {
633 this.original = original;
634 this.allocations = allocations;
635 }
636
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800637 private ContinuousResource original() {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800638 return original;
639 }
640
641 private ImmutableList<ResourceAllocation> allocations() {
642 return allocations;
643 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700644 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700645}