blob: e5ed75bc343d7b1924400c460659a8fe7abd414d [file] [log] [blame]
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Sho SHIMIZUe18cb122016-02-22 21:04:56 -080016package org.onosproject.store.resource.impl;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070017
Jordan Halterman6359c832017-03-29 16:53:21 -070018import java.util.Collection;
19import java.util.LinkedHashMap;
20import java.util.LinkedHashSet;
21import java.util.List;
22import java.util.Map;
23import java.util.Optional;
24import java.util.Set;
Jordan Halterman7af71da2017-05-15 13:41:00 -070025import java.util.concurrent.ExecutionException;
26import java.util.concurrent.TimeUnit;
27import java.util.concurrent.TimeoutException;
Jordan Halterman6359c832017-03-29 16:53:21 -070028import java.util.function.Function;
29import java.util.stream.Collectors;
30import java.util.stream.Stream;
31
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070032import com.google.common.annotations.Beta;
Sho SHIMIZU83258ae2016-01-29 17:39:07 -080033import com.google.common.collect.ImmutableSet;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070034import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Reference;
37import org.apache.felix.scr.annotations.ReferenceCardinality;
38import org.apache.felix.scr.annotations.Service;
Sho SHIMIZU65de9612016-05-19 12:24:20 -070039import org.onlab.util.KryoNamespace;
Sho SHIMIZU0b4e9dd2016-06-28 17:42:05 -070040import org.onlab.util.Tools;
Sho SHIMIZUe18cb122016-02-22 21:04:56 -080041import org.onosproject.net.resource.ContinuousResource;
42import org.onosproject.net.resource.ContinuousResourceId;
43import org.onosproject.net.resource.DiscreteResource;
44import org.onosproject.net.resource.DiscreteResourceId;
Sho SHIMIZU22fb2832016-05-06 11:44:03 -070045import org.onosproject.net.resource.Resource;
Sho SHIMIZUe18cb122016-02-22 21:04:56 -080046import org.onosproject.net.resource.ResourceAllocation;
47import org.onosproject.net.resource.ResourceConsumer;
Naoki Shiotabd1974c2016-04-29 18:44:17 -070048import org.onosproject.net.resource.ResourceConsumerId;
Sho SHIMIZUe18cb122016-02-22 21:04:56 -080049import org.onosproject.net.resource.ResourceEvent;
50import org.onosproject.net.resource.ResourceId;
Sho SHIMIZUe18cb122016-02-22 21:04:56 -080051import org.onosproject.net.resource.ResourceStore;
52import org.onosproject.net.resource.ResourceStoreDelegate;
53import org.onosproject.net.resource.Resources;
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080054import org.onosproject.store.AbstractStore;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070055import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani3780d4b2016-04-04 18:18:24 -070056import org.onosproject.store.service.CommitStatus;
Jordan Halterman7af71da2017-05-15 13:41:00 -070057import org.onosproject.store.service.DistributedPrimitive;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070058import org.onosproject.store.service.Serializer;
59import org.onosproject.store.service.StorageService;
60import org.onosproject.store.service.TransactionContext;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070061import org.slf4j.Logger;
62import org.slf4j.LoggerFactory;
63
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070064import static com.google.common.base.Preconditions.checkArgument;
65import static com.google.common.base.Preconditions.checkNotNull;
Yuta HIGUCHI1edc36b2018-01-24 23:39:06 -080066import static java.util.stream.Collectors.groupingBy;
Sho SHIMIZU22fb2832016-05-06 11:44:03 -070067import static org.onosproject.net.resource.ResourceEvent.Type.RESOURCE_ADDED;
68import static org.onosproject.net.resource.ResourceEvent.Type.RESOURCE_REMOVED;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070069
70/**
71 * Implementation of ResourceStore using TransactionalMap.
72 */
Sho SHIMIZU9a2b8292015-10-28 13:00:16 -070073@Component(immediate = true)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070074@Service
75@Beta
Sho SHIMIZUfa62b472015-11-02 17:35:46 -080076public class ConsistentResourceStore extends AbstractStore<ResourceEvent, ResourceStoreDelegate>
77 implements ResourceStore {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070078 private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
79
Sho SHIMIZU65de9612016-05-19 12:24:20 -070080 static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
81 .register(KryoNamespaces.API)
82 .register(UnifiedDiscreteResources.class)
83 .register(new EncodableDiscreteResourcesSerializer(), EncodableDiscreteResources.class)
Sho SHIMIZUf503a622016-05-25 11:12:23 -070084 .register(GenericDiscreteResources.class)
Sho SHIMIZU65de9612016-05-19 12:24:20 -070085 .register(EmptyDiscreteResources.class)
Sho SHIMIZU2795d632016-05-25 14:10:13 -070086 .register(new EncodedResourcesSerializer(), EncodedDiscreteResources.class)
Sho SHIMIZU65de9612016-05-19 12:24:20 -070087 .register(ContinuousResourceAllocation.class)
Sho SHIMIZU9db6da62016-06-01 12:31:21 -070088 .register(PortNumberCodec.class)
Sho SHIMIZU59512bf2016-06-01 11:30:58 -070089 .register(VlanIdCodec.class)
90 .register(MplsLabelCodec.class)
Sho SHIMIZU65de9612016-05-19 12:24:20 -070091 .build());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070092
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected StorageService service;
95
Sho SHIMIZU7ecf5ea2016-05-13 15:28:59 -070096 private ConsistentDiscreteResourceSubStore discreteStore;
97 private ConsistentContinuousResourceSubStore continuousStore;
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -070098
99 @Activate
100 public void activate() {
Sho SHIMIZU7ecf5ea2016-05-13 15:28:59 -0700101 discreteStore = new ConsistentDiscreteResourceSubStore(service);
102 continuousStore = new ConsistentContinuousResourceSubStore(service);
Sho SHIMIZUe7db6142015-11-04 11:24:22 -0800103
Madan Jampanic7f49f92015-12-10 11:35:06 -0800104 log.info("Started");
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700105 }
106
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800107 // Computational complexity: O(1) if the resource is discrete type.
108 // O(n) if the resource is continuous type where n is the number of the existing allocations for the resource
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700109 @Override
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800110 public List<ResourceAllocation> getResourceAllocations(ResourceId id) {
111 checkNotNull(id);
112 checkArgument(id instanceof DiscreteResourceId || id instanceof ContinuousResourceId);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700113
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800114 if (id instanceof DiscreteResourceId) {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700115 return discreteStore.getResourceAllocations((DiscreteResourceId) id);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800116 } else {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700117 return continuousStore.getResourceAllocations((ContinuousResourceId) id);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800118 }
119 }
120
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700121 @Override
Sho SHIMIZUef835c92016-08-08 13:51:17 -0700122 public boolean register(List<? extends Resource> resources) {
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700123 checkNotNull(resources);
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800124 if (log.isTraceEnabled()) {
125 resources.forEach(r -> log.trace("registering {}", r));
126 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700127
Jordan Halterman7af71da2017-05-15 13:41:00 -0700128 // Retry the transaction until successful.
129 while (true) {
130 TransactionContext tx = service.transactionContextBuilder().build();
131 tx.begin();
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700132
Jordan Halterman7af71da2017-05-15 13:41:00 -0700133 // the order is preserved by LinkedHashMap
134 Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
135 .filter(x -> x.parent().isPresent())
Yuta HIGUCHI1edc36b2018-01-24 23:39:06 -0800136 .collect(groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.<Resource>toList()));
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700137
Jordan Halterman7af71da2017-05-15 13:41:00 -0700138 TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
139 TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
140 for (Map.Entry<DiscreteResource, List<Resource>> entry : resourceMap.entrySet()) {
141 DiscreteResourceId parentId = entry.getKey().id();
142 if (!discreteTxStore.lookup(parentId).isPresent()) {
143 return abortTransaction(tx);
144 }
145
146 if (!register(discreteTxStore, continuousTxStore, parentId, entry.getValue())) {
147 return abortTransaction(tx);
148 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700149 }
150
Jordan Halterman7af71da2017-05-15 13:41:00 -0700151 try {
152 CommitStatus status = commitTransaction(tx);
153 if (status == CommitStatus.SUCCESS) {
154 log.trace("Transaction commit succeeded on registration: resources={}", resources);
155 List<ResourceEvent> events = resources.stream()
156 .filter(x -> x.parent().isPresent())
157 .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
158 .collect(Collectors.toList());
159 notifyDelegate(events);
160 return true;
161 }
162 } catch (InterruptedException | ExecutionException | TimeoutException e) {
163 log.warn("Transaction commit failed on registration", e);
164 return false;
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800165 }
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700166 }
167 }
168
169 @Override
Sho SHIMIZUef835c92016-08-08 13:51:17 -0700170 public boolean unregister(List<? extends ResourceId> ids) {
Sho SHIMIZU72f81b12016-02-09 09:26:17 -0800171 checkNotNull(ids);
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700172
Jordan Halterman7af71da2017-05-15 13:41:00 -0700173 // Retry the transaction until successful.
174 while (true) {
175 TransactionContext tx = service.transactionContextBuilder().build();
176 tx.begin();
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700177
Jordan Halterman7af71da2017-05-15 13:41:00 -0700178 TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
179 TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
180 // Look up resources by resource IDs
181 List<Resource> resources = ids.stream()
182 .filter(x -> x.parent().isPresent())
183 .map(x -> {
184 // avoid access to consistent map in the case of discrete resource
185 if (x instanceof DiscreteResourceId) {
186 return Optional.of(Resources.discrete((DiscreteResourceId) x).resource());
187 } else {
188 return continuousTxStore.lookup((ContinuousResourceId) x);
189 }
190 })
191 .flatMap(Tools::stream)
192 .collect(Collectors.toList());
193 // the order is preserved by LinkedHashMap
194 Map<DiscreteResourceId, List<Resource>> resourceMap = resources.stream().collect(
195 Collectors.groupingBy(x -> x.parent().get().id(), LinkedHashMap::new, Collectors.toList()));
Sho SHIMIZU83e17a02015-08-20 14:07:05 -0700196
Jordan Halterman7af71da2017-05-15 13:41:00 -0700197 for (Map.Entry<DiscreteResourceId, List<Resource>> entry : resourceMap.entrySet()) {
198 if (!unregister(discreteTxStore, continuousTxStore, entry.getKey(), entry.getValue())) {
199 log.warn("Failed to unregister {}: Failed to remove {} values.",
200 entry.getKey(), entry.getValue().size());
201 return abortTransaction(tx);
202 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800203 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800204
Jordan Halterman7af71da2017-05-15 13:41:00 -0700205 try {
206 CommitStatus status = commitTransaction(tx);
207 if (status == CommitStatus.SUCCESS) {
208 List<ResourceEvent> events = resources.stream()
209 .filter(x -> x.parent().isPresent())
210 .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
211 .collect(Collectors.toList());
212 notifyDelegate(events);
213 return true;
214 }
215 } catch (InterruptedException | ExecutionException | TimeoutException e) {
Sho SHIMIZU61936212017-01-09 17:50:56 -0800216 String message = resources.stream()
217 .map(Resource::simpleTypeName)
218 .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
219 .entrySet().stream()
Jordan Halterman7af71da2017-05-15 13:41:00 -0700220 .map(entry -> String.format("%d %s type resources", entry.getValue(), entry.getKey()))
Sho SHIMIZU61936212017-01-09 17:50:56 -0800221 .collect(Collectors.joining(", "));
Jordan Halterman7af71da2017-05-15 13:41:00 -0700222 log.warn("Failed to unregister {}: {}", message, e);
223 return false;
Madan Jampani3780d4b2016-04-04 18:18:24 -0700224 }
Jordan Halterman7af71da2017-05-15 13:41:00 -0700225 }
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700226 }
227
228 @Override
Sho SHIMIZUef835c92016-08-08 13:51:17 -0700229 public boolean allocate(List<? extends Resource> resources, ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700230 checkNotNull(resources);
231 checkNotNull(consumer);
232
Jordan Halterman7af71da2017-05-15 13:41:00 -0700233 while (true) {
234 TransactionContext tx = service.transactionContextBuilder().build();
235 tx.begin();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700236
Jordan Halterman7af71da2017-05-15 13:41:00 -0700237 TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
238 TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
239 for (Resource resource : resources) {
240 if (resource instanceof DiscreteResource) {
241 if (!discreteTxStore.allocate(consumer.consumerId(), (DiscreteResource) resource)) {
242 return abortTransaction(tx);
243 }
244 } else if (resource instanceof ContinuousResource) {
245 if (!continuousTxStore.allocate(consumer.consumerId(), (ContinuousResource) resource)) {
246 return abortTransaction(tx);
247 }
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800248 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800249 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800250
Jordan Halterman7af71da2017-05-15 13:41:00 -0700251 try {
252 if (commitTransaction(tx) == CommitStatus.SUCCESS) {
253 return true;
254 }
255 } catch (InterruptedException | ExecutionException | TimeoutException e) {
256 log.warn("Failed to allocate {}: {}", resources, e);
257 return false;
258 }
259 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700260 }
261
262 @Override
Sho SHIMIZUfc64ffe2016-02-10 20:11:09 -0800263 public boolean release(List<ResourceAllocation> allocations) {
264 checkNotNull(allocations);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700265
Jordan Halterman7af71da2017-05-15 13:41:00 -0700266 while (true) {
267 TransactionContext tx = service.transactionContextBuilder().build();
268 tx.begin();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700269
Jordan Halterman7af71da2017-05-15 13:41:00 -0700270 TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
271 TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
272 for (ResourceAllocation allocation : allocations) {
273 Resource resource = allocation.resource();
274 ResourceConsumerId consumerId = allocation.consumerId();
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700275
Jordan Halterman7af71da2017-05-15 13:41:00 -0700276 if (resource instanceof DiscreteResource) {
277 if (!discreteTxStore.release(consumerId, (DiscreteResource) resource)) {
278 return abortTransaction(tx);
279 }
280 } else if (resource instanceof ContinuousResource) {
281 if (!continuousTxStore.release(consumerId, (ContinuousResource) resource)) {
282 return abortTransaction(tx);
283 }
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800284 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700285 }
Sho SHIMIZU1e0a34c2015-11-02 16:52:29 -0800286
Jordan Halterman7af71da2017-05-15 13:41:00 -0700287 try {
288 if (commitTransaction(tx) == CommitStatus.SUCCESS) {
289 return true;
290 }
291 } catch (InterruptedException | ExecutionException | TimeoutException e) {
292 log.warn("Failed to release {}: {}", allocations, e);
293 return false;
294 }
295 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700296 }
297
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800298 // computational complexity: O(1) if the resource is discrete type.
299 // O(n) if the resource is continuous type where n is the number of the children of
300 // the specified resource's parent
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700301 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800302 public boolean isAvailable(Resource resource) {
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800303 checkNotNull(resource);
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800304 checkArgument(resource instanceof DiscreteResource || resource instanceof ContinuousResource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800305
Sho SHIMIZUf33b8932016-01-25 18:43:32 -0800306 if (resource instanceof DiscreteResource) {
HIGUCHI Yuta6f828c32016-01-20 18:11:05 -0800307 // check if already consumed
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700308 return discreteStore.isAvailable((DiscreteResource) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800309 } else {
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700310 return continuousStore.isAvailable((ContinuousResource) resource);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800311 }
312 }
313
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800314 // computational complexity: O(n + m) where n is the number of entries in discreteConsumers
315 // and m is the number of allocations for all continuous resources
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800316 @Override
Sho SHIMIZU8fa670a2016-01-14 11:17:18 -0800317 public Collection<Resource> getResources(ResourceConsumer consumer) {
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700318 checkNotNull(consumer);
319
320 // NOTE: getting all entries may become performance bottleneck
321 // TODO: revisit for better backend data structure
Naoki Shiotabd1974c2016-04-29 18:44:17 -0700322 Stream<DiscreteResource> discrete = discreteStore.getResources(consumer.consumerId());
323 Stream<ContinuousResource> continuous = continuousStore.getResources(consumer.consumerId());
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800324
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700325 return Stream.concat(discrete, continuous).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700326 }
327
Sho SHIMIZU82bfe992016-02-10 09:55:32 -0800328 // computational complexity: O(1)
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700329 @Override
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800330 public Set<Resource> getChildResources(DiscreteResourceId parent) {
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700331 checkNotNull(parent);
332
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700333 return ImmutableSet.<Resource>builder()
334 .addAll(discreteStore.getChildResources(parent))
335 .addAll(continuousStore.getChildResources(parent))
336 .build();
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700337 }
338
Sho SHIMIZU9cc4a242016-05-26 12:55:35 -0700339 @Override
340 public <T> Set<Resource> getChildResources(DiscreteResourceId parent, Class<T> cls) {
341 checkNotNull(parent);
342 checkNotNull(cls);
343
344 return ImmutableSet.<Resource>builder()
345 .addAll(discreteStore.getChildResources(parent, cls))
346 .addAll(continuousStore.getChildResources(parent, cls))
347 .build();
348 }
349
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800350 // computational complexity: O(n) where n is the number of the children of the parent
Sho SHIMIZUe7f4f3f2015-10-13 16:27:25 -0700351 @Override
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800352 public <T> Collection<Resource> getAllocatedResources(DiscreteResourceId parent, Class<T> cls) {
Sho SHIMIZU1f5e5912015-08-10 17:00:00 -0700353 checkNotNull(parent);
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700354 checkNotNull(cls);
355
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700356 Stream<DiscreteResource> discrete = discreteStore.getAllocatedResources(parent, cls);
357 Stream<ContinuousResource> continuous = continuousStore.getAllocatedResources(parent, cls);
Sho SHIMIZU6c9e33a2016-01-07 18:45:27 -0800358
359 return Stream.concat(discrete, continuous).collect(Collectors.toList());
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700360 }
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700361
362 /**
Jordan Halterman7af71da2017-05-15 13:41:00 -0700363 * Commits a transaction.
364 *
365 * @param tx the transaction to commit
366 * @return the transaction status
367 */
368 private CommitStatus commitTransaction(TransactionContext tx)
369 throws InterruptedException, ExecutionException, TimeoutException {
Jordan Halterman6440b092017-05-24 17:48:08 -0700370 return tx.commit().get(DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Jordan Halterman7af71da2017-05-15 13:41:00 -0700371 }
372
373 /**
Sho SHIMIZUd29847f2015-08-13 09:10:59 -0700374 * Abort the transaction.
375 *
376 * @param tx transaction context
377 * @return always false
378 */
379 private boolean abortTransaction(TransactionContext tx) {
380 tx.abort();
381 return false;
382 }
383
384 /**
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700385 * Appends the values to the existing values associated with the specified key.
Sho SHIMIZU4568c412015-08-21 16:39:07 -0700386 * If the map already has all the given values, appending will not happen.
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700387 *
Sho SHIMIZU0b4e9dd2016-06-28 17:42:05 -0700388 * @param parent resource ID of the parent under which the given resources are registered
389 * @param resources resources to be registered
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700390 * @return true if the operation succeeds, false otherwise.
391 */
Sho SHIMIZUdd3750c2016-02-01 11:37:04 -0800392 // computational complexity: O(n) where n is the number of the specified value
Sho SHIMIZU7ecf5ea2016-05-13 15:28:59 -0700393 private boolean register(TransactionalDiscreteResourceSubStore discreteTxStore,
394 TransactionalContinuousResourceSubStore continuousTxStore,
Sho SHIMIZU0b4e9dd2016-06-28 17:42:05 -0700395 DiscreteResourceId parent, List<Resource> resources) {
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700396 // it's assumed that the passed "values" is non-empty
397
398 // This is 2-pass scan. Nicer to have 1-pass scan
Sho SHIMIZU0b4e9dd2016-06-28 17:42:05 -0700399 Set<DiscreteResource> discreteResources = resources.stream()
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700400 .filter(x -> x instanceof DiscreteResource)
401 .map(x -> (DiscreteResource) x)
Sho SHIMIZU34847b72016-06-07 14:31:54 -0700402 .collect(Collectors.toCollection(LinkedHashSet::new));
Sho SHIMIZU0b4e9dd2016-06-28 17:42:05 -0700403 Set<ContinuousResource> continuousResources = resources.stream()
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700404 .filter(x -> x instanceof ContinuousResource)
405 .map(x -> (ContinuousResource) x)
Sho SHIMIZU34847b72016-06-07 14:31:54 -0700406 .collect(Collectors.toCollection(LinkedHashSet::new));
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700407
Sho SHIMIZU0b4e9dd2016-06-28 17:42:05 -0700408 return discreteTxStore.register(parent, discreteResources)
409 && continuousTxStore.register(parent, continuousResources);
Sho SHIMIZUba41fc12015-08-12 15:43:22 -0700410 }
411
412 /**
Sho SHIMIZUba1f83b2015-10-14 08:11:20 -0700413 * Removes the values from the existing values associated with the specified key.
Sho SHIMIZU5618ee52015-08-21 17:19:44 -0700414 * If the map doesn't contain the given values, removal will not happen.
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700415 *
Sho SHIMIZU22fb2832016-05-06 11:44:03 -0700416 * @param discreteTxStore map holding multiple discrete resources for a key
417 * @param continuousTxStore map holding multiple continuous resources for a key
Sho SHIMIZU0b4e9dd2016-06-28 17:42:05 -0700418 * @param parent resource ID of the parent under which the given resources are unregistered
419 * @param resources resources to be unregistered
Sho SHIMIZU2d8a13a2015-08-18 22:37:41 -0700420 * @return true if the operation succeeds, false otherwise
421 */
Sho SHIMIZU7ecf5ea2016-05-13 15:28:59 -0700422 private boolean unregister(TransactionalDiscreteResourceSubStore discreteTxStore,
423 TransactionalContinuousResourceSubStore continuousTxStore,
Sho SHIMIZU0b4e9dd2016-06-28 17:42:05 -0700424 DiscreteResourceId parent, List<Resource> resources) {
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700425 // it's assumed that the passed "values" is non-empty
426
427 // This is 2-pass scan. Nicer to have 1-pass scan
Sho SHIMIZU0b4e9dd2016-06-28 17:42:05 -0700428 Set<DiscreteResource> discreteResources = resources.stream()
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700429 .filter(x -> x instanceof DiscreteResource)
430 .map(x -> (DiscreteResource) x)
Sho SHIMIZU34847b72016-06-07 14:31:54 -0700431 .collect(Collectors.toCollection(LinkedHashSet::new));
Sho SHIMIZU0b4e9dd2016-06-28 17:42:05 -0700432 Set<ContinuousResource> continuousResources = resources.stream()
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700433 .filter(x -> x instanceof ContinuousResource)
434 .map(x -> (ContinuousResource) x)
Sho SHIMIZU34847b72016-06-07 14:31:54 -0700435 .collect(Collectors.toCollection(LinkedHashSet::new));
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700436
Sho SHIMIZU0b4e9dd2016-06-28 17:42:05 -0700437 return discreteTxStore.unregister(parent, discreteResources)
438 && continuousTxStore.unregister(parent, continuousResources);
Sho SHIMIZU03be2662016-05-04 09:38:45 -0700439 }
Sho SHIMIZU78ee25c2015-07-16 15:54:14 -0700440}