ONOS-3296: Support continuous type resources
Change-Id: I155e41e7a7c1750ff45986a55bedab353485d3fa
diff --git a/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java b/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java
index c2a1a7c..7339b16 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2015 Open Networking Laboratory
+ * Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,14 +17,18 @@
import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.GuavaCollectors;
import org.onlab.util.Tools;
+import org.onosproject.net.newresource.ResourceAllocation;
import org.onosproject.net.newresource.ResourceConsumer;
import org.onosproject.net.newresource.ResourceEvent;
+import org.onosproject.net.newresource.ResourceId;
import org.onosproject.net.newresource.ResourcePath;
import org.onosproject.net.newresource.ResourceStore;
import org.onosproject.net.newresource.ResourceStoreDelegate;
@@ -40,7 +44,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -49,7 +52,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -65,10 +70,12 @@
implements ResourceStore {
private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
- private static final String CONSUMER_MAP = "onos-resource-consumers";
+ private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers";
+ private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
private static final String CHILD_MAP = "onos-resource-children";
private static final Serializer SERIALIZER = Serializer.using(
- Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API));
+ Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API),
+ ContinuousResourceAllocation.class);
// TODO: We should provide centralized values for this
private static final int MAX_RETRIES = 5;
@@ -77,35 +84,61 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService service;
- private ConsistentMap<ResourcePath, ResourceConsumer> consumerMap;
- private ConsistentMap<ResourcePath, List<ResourcePath>> childMap;
+ private ConsistentMap<ResourcePath.Discrete, ResourceConsumer> discreteConsumers;
+ private ConsistentMap<ResourceId, ContinuousResourceAllocation> continuousConsumers;
+ private ConsistentMap<ResourcePath.Discrete, Set<ResourcePath>> childMap;
@Activate
public void activate() {
- consumerMap = service.<ResourcePath, ResourceConsumer>consistentMapBuilder()
- .withName(CONSUMER_MAP)
+ discreteConsumers = service.<ResourcePath.Discrete, ResourceConsumer>consistentMapBuilder()
+ .withName(DISCRETE_CONSUMER_MAP)
.withSerializer(SERIALIZER)
.build();
- childMap = service.<ResourcePath, List<ResourcePath>>consistentMapBuilder()
+ continuousConsumers = service.<ResourceId, ContinuousResourceAllocation>consistentMapBuilder()
+ .withName(CONTINUOUS_CONSUMER_MAP)
+ .withSerializer(SERIALIZER)
+ .build();
+ childMap = service.<ResourcePath.Discrete, Set<ResourcePath>>consistentMapBuilder()
.withName(CHILD_MAP)
.withSerializer(SERIALIZER)
.build();
- Tools.retryable(() -> childMap.put(ResourcePath.ROOT, ImmutableList.of()),
+ Tools.retryable(() -> childMap.put(ResourcePath.ROOT, new LinkedHashSet<>()),
ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
log.info("Started");
}
@Override
- public Optional<ResourceConsumer> getConsumer(ResourcePath resource) {
+ public List<ResourceConsumer> getConsumers(ResourcePath resource) {
checkNotNull(resource);
+ checkArgument(resource instanceof ResourcePath.Discrete || resource instanceof ResourcePath.Continuous);
- Versioned<ResourceConsumer> consumer = consumerMap.get(resource);
+ if (resource instanceof ResourcePath.Discrete) {
+ return getConsumer((ResourcePath.Discrete) resource);
+ } else {
+ return getConsumer((ResourcePath.Continuous) resource);
+ }
+ }
+
+ private List<ResourceConsumer> getConsumer(ResourcePath.Discrete resource) {
+ Versioned<ResourceConsumer> consumer = discreteConsumers.get(resource);
if (consumer == null) {
- return Optional.empty();
+ return ImmutableList.of();
}
- return Optional.of(consumer.value());
+ return ImmutableList.of(consumer.value());
+ }
+
+ private List<ResourceConsumer> getConsumer(ResourcePath.Continuous resource) {
+ Versioned<ContinuousResourceAllocation> allocations = continuousConsumers.get(resource.id());
+ if (allocations == null) {
+ return ImmutableList.of();
+ }
+
+ return allocations.value().allocations().stream()
+ .filter(x -> x.resource().equals(resource))
+ .map(ResourceAllocation::consumer)
+ .collect(GuavaCollectors.toImmutableList());
}
@Override
@@ -115,15 +148,16 @@
TransactionContext tx = service.transactionContextBuilder().build();
tx.begin();
- TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap =
+ TransactionalMap<ResourcePath.Discrete, Set<ResourcePath>> childTxMap =
tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
- Map<ResourcePath, List<ResourcePath>> resourceMap = resources.stream()
+ Map<ResourcePath.Discrete, List<ResourcePath>> resourceMap = resources.stream()
.filter(x -> x.parent().isPresent())
.collect(Collectors.groupingBy(x -> x.parent().get()));
- for (Map.Entry<ResourcePath, List<ResourcePath>> entry: resourceMap.entrySet()) {
- if (!isRegistered(childTxMap, entry.getKey())) {
+ for (Map.Entry<ResourcePath.Discrete, List<ResourcePath>> entry: resourceMap.entrySet()) {
+ Optional<ResourcePath.Discrete> child = lookup(childTxMap, entry.getKey());
+ if (!child.isPresent()) {
return abortTransaction(tx);
}
@@ -150,19 +184,32 @@
TransactionContext tx = service.transactionContextBuilder().build();
tx.begin();
- TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap =
+ TransactionalMap<ResourcePath.Discrete, Set<ResourcePath>> childTxMap =
tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
- TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap =
- tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER);
+ TransactionalMap<ResourcePath.Discrete, ResourceConsumer> discreteConsumerTxMap =
+ tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
+ TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
+ tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
- Map<ResourcePath, List<ResourcePath>> resourceMap = resources.stream()
+ // Extract Discrete instances from resources
+ Map<ResourcePath.Discrete, List<ResourcePath>> resourceMap = resources.stream()
.filter(x -> x.parent().isPresent())
.collect(Collectors.groupingBy(x -> x.parent().get()));
// even if one of the resources is allocated to a consumer,
// all unregistrations are regarded as failure
- for (Map.Entry<ResourcePath, List<ResourcePath>> entry: resourceMap.entrySet()) {
- if (entry.getValue().stream().anyMatch(x -> consumerTxMap.get(x) != null)) {
+ for (Map.Entry<ResourcePath.Discrete, List<ResourcePath>> entry: resourceMap.entrySet()) {
+ boolean allocated = entry.getValue().stream().anyMatch(x -> {
+ if (x instanceof ResourcePath.Discrete) {
+ return discreteConsumerTxMap.get((ResourcePath.Discrete) x) != null;
+ } else if (x instanceof ResourcePath.Continuous) {
+ ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(x.id());
+ return allocations != null && !allocations.allocations().isEmpty();
+ } else {
+ return false;
+ }
+ });
+ if (allocated) {
return abortTransaction(tx);
}
@@ -190,19 +237,39 @@
TransactionContext tx = service.transactionContextBuilder().build();
tx.begin();
- TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap =
+ TransactionalMap<ResourcePath.Discrete, Set<ResourcePath>> childTxMap =
tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
- TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap =
- tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER);
+ TransactionalMap<ResourcePath.Discrete, ResourceConsumer> discreteConsumerTxMap =
+ tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
+ TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
+ tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
for (ResourcePath resource: resources) {
- if (!isRegistered(childTxMap, resource)) {
- return abortTransaction(tx);
- }
+ if (resource instanceof ResourcePath.Discrete) {
+ if (!lookup(childTxMap, resource).isPresent()) {
+ return abortTransaction(tx);
+ }
- ResourceConsumer oldValue = consumerTxMap.put(resource, consumer);
- if (oldValue != null) {
- return abortTransaction(tx);
+ ResourceConsumer oldValue = discreteConsumerTxMap.put((ResourcePath.Discrete) resource, consumer);
+ if (oldValue != null) {
+ return abortTransaction(tx);
+ }
+ } else if (resource instanceof ResourcePath.Continuous) {
+ Optional<ResourcePath.Continuous> continuous = lookup(childTxMap, (ResourcePath.Continuous) resource);
+ if (!continuous.isPresent()) {
+ return abortTransaction(tx);
+ }
+
+ ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(continuous.get().id());
+ if (!hasEnoughResource(continuous.get(), (ResourcePath.Continuous) resource, allocations)) {
+ return abortTransaction(tx);
+ }
+
+ boolean success = appendValue(continuousConsumerTxMap,
+ continuous.get(), new ResourceAllocation(continuous.get(), consumer));
+ if (!success) {
+ return abortTransaction(tx);
+ }
}
}
@@ -218,8 +285,10 @@
TransactionContext tx = service.transactionContextBuilder().build();
tx.begin();
- TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap =
- tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER);
+ TransactionalMap<ResourcePath.Discrete, ResourceConsumer> discreteConsumerTxMap =
+ tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
+ TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
+ tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Iterator<ResourcePath> resourceIte = resources.iterator();
Iterator<ResourceConsumer> consumerIte = consumers.iterator();
@@ -227,10 +296,24 @@
ResourcePath resource = resourceIte.next();
ResourceConsumer consumer = consumerIte.next();
- // if this single release fails (because the resource is allocated to another consumer,
- // the whole release fails
- if (!consumerTxMap.remove(resource, consumer)) {
- return abortTransaction(tx);
+ if (resource instanceof ResourcePath.Discrete) {
+ // if this single release fails (because the resource is allocated to another consumer,
+ // the whole release fails
+ if (!discreteConsumerTxMap.remove((ResourcePath.Discrete) resource, consumer)) {
+ return abortTransaction(tx);
+ }
+ } else if (resource instanceof ResourcePath.Continuous) {
+ ResourcePath.Continuous continuous = (ResourcePath.Continuous) resource;
+ ContinuousResourceAllocation allocation = continuousConsumerTxMap.get(continuous.id());
+ ImmutableList<ResourceAllocation> newAllocations = allocation.allocations().stream()
+ .filter(x -> !(x.consumer().equals(consumer) &&
+ ((ResourcePath.Continuous) x.resource()).value() == continuous.value()))
+ .collect(GuavaCollectors.toImmutableList());
+
+ if (!continuousConsumerTxMap.replace(continuous.id(), allocation,
+ new ContinuousResourceAllocation(allocation.original(), newAllocations))) {
+ return abortTransaction(tx);
+ }
}
}
@@ -238,22 +321,51 @@
}
@Override
+ public boolean isAvailable(ResourcePath resource) {
+ checkNotNull(resource);
+ checkArgument(resource instanceof ResourcePath.Discrete || resource instanceof ResourcePath.Continuous);
+
+ if (resource instanceof ResourcePath.Discrete) {
+ return getConsumer((ResourcePath.Discrete) resource).isEmpty();
+ } else {
+ return isAvailable((ResourcePath.Continuous) resource);
+ }
+ }
+
+ private boolean isAvailable(ResourcePath.Continuous resource) {
+ Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
+ if (allocation == null) {
+ return false;
+ }
+
+ return hasEnoughResource(allocation.value().original(), resource, allocation.value());
+ }
+
+ @Override
public Collection<ResourcePath> getResources(ResourceConsumer consumer) {
checkNotNull(consumer);
// NOTE: getting all entries may become performance bottleneck
// TODO: revisit for better backend data structure
- return consumerMap.entrySet().stream()
+ Stream<ResourcePath.Discrete> discreteStream = discreteConsumers.entrySet().stream()
.filter(x -> x.getValue().value().equals(consumer))
- .map(Map.Entry::getKey)
- .collect(Collectors.toList());
+ .map(Map.Entry::getKey);
+
+ Stream<ResourcePath.Continuous> continuousStream = continuousConsumers.values().stream()
+ .flatMap(x -> x.value().allocations().stream()
+ .map(y -> Maps.immutableEntry(x.value().original(), y)))
+ .filter(x -> x.getValue().consumer().equals(consumer))
+ .map(x -> x.getKey());
+
+ return Stream.concat(discreteStream, continuousStream).collect(Collectors.toList());
}
@Override
public Collection<ResourcePath> getChildResources(ResourcePath parent) {
checkNotNull(parent);
+ checkArgument(parent instanceof ResourcePath.Discrete);
- Versioned<List<ResourcePath>> children = childMap.get(parent);
+ Versioned<Set<ResourcePath>> children = childMap.get((ResourcePath.Discrete) parent);
if (children == null) {
return Collections.emptyList();
}
@@ -265,16 +377,28 @@
public <T> Collection<ResourcePath> getAllocatedResources(ResourcePath parent, Class<T> cls) {
checkNotNull(parent);
checkNotNull(cls);
+ checkArgument(parent instanceof ResourcePath.Discrete);
- Versioned<List<ResourcePath>> children = childMap.get(parent);
+ Versioned<Set<ResourcePath>> children = childMap.get((ResourcePath.Discrete) parent);
if (children == null) {
return Collections.emptyList();
}
- return children.value().stream()
+ Stream<ResourcePath.Discrete> discrete = children.value().stream()
.filter(x -> x.last().getClass().equals(cls))
- .filter(consumerMap::containsKey)
- .collect(Collectors.toList());
+ .filter(x -> x instanceof ResourcePath.Discrete)
+ .map(x -> (ResourcePath.Discrete) x)
+ .filter(discreteConsumers::containsKey);
+
+ Stream<ResourcePath.Continuous> continuous = children.value().stream()
+ .filter(x -> x.last().getClass().equals(cls))
+ .filter(x -> x instanceof ResourcePath.Continuous)
+ .map(x -> (ResourcePath.Continuous) x)
+ .filter(x -> continuousConsumers.containsKey(x.id()))
+ .filter(x -> continuousConsumers.get(x.id()) != null)
+ .filter(x -> !continuousConsumers.get(x.id()).value().allocations().isEmpty());
+
+ return Stream.concat(discrete, continuous).collect(Collectors.toList());
}
/**
@@ -288,6 +412,27 @@
return false;
}
+ // Appends the specified ResourceAllocation to the existing values stored in the map
+ private boolean appendValue(TransactionalMap<ResourceId, ContinuousResourceAllocation> map,
+ ResourcePath.Continuous original, ResourceAllocation value) {
+ ContinuousResourceAllocation oldValue = map.putIfAbsent(original.id(),
+ new ContinuousResourceAllocation(original, ImmutableList.of(value)));
+ if (oldValue == null) {
+ return true;
+ }
+
+ if (oldValue.allocations().contains(value)) {
+ // don't write to map because all values are already stored
+ return true;
+ }
+
+ ContinuousResourceAllocation newValue = new ContinuousResourceAllocation(original,
+ ImmutableList.<ResourceAllocation>builder()
+ .addAll(oldValue.allocations())
+ .add(value)
+ .build());
+ return map.replace(original.id(), oldValue, newValue);
+ }
/**
* Appends the values to the existing values associated with the specified key.
* If the map already has all the given values, appending will not happen.
@@ -299,20 +444,20 @@
* @param <V> type of the element of the list
* @return true if the operation succeeds, false otherwise.
*/
- private <K, V> boolean appendValues(TransactionalMap<K, List<V>> map, K key, List<V> values) {
- List<V> oldValues = map.putIfAbsent(key, new ArrayList<>(values));
+ private <K, V> boolean appendValues(TransactionalMap<K, Set<V>> map, K key, List<V> values) {
+ Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>(values));
if (oldValues == null) {
return true;
}
- LinkedHashSet<V> oldSet = new LinkedHashSet<>(oldValues);
- if (oldSet.containsAll(values)) {
+ if (oldValues.containsAll(values)) {
// don't write to map because all values are already stored
return true;
}
- oldSet.addAll(values);
- return map.replace(key, oldValues, new ArrayList<>(oldSet));
+ LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
+ newValues.addAll(values);
+ return map.replace(key, oldValues, newValues);
}
/**
@@ -326,37 +471,93 @@
* @param <V> type of the element of the list
* @return true if the operation succeeds, false otherwise
*/
- private <K, V> boolean removeValues(TransactionalMap<K, List<V>> map, K key, List<V> values) {
- List<V> oldValues = map.get(key);
+ private <K, V> boolean removeValues(TransactionalMap<K, Set<V>> map, K key, List<? extends V> values) {
+ Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
if (oldValues == null) {
- map.put(key, new ArrayList<>());
return true;
}
- LinkedHashSet<V> oldSet = new LinkedHashSet<>(oldValues);
- if (values.stream().allMatch(x -> !oldSet.contains(x))) {
+ if (values.stream().allMatch(x -> !oldValues.contains(x))) {
// don't write map because none of the values are stored
return true;
}
- oldSet.removeAll(values);
- return map.replace(key, oldValues, new ArrayList<>(oldSet));
+ LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
+ newValues.removeAll(values);
+ return map.replace(key, oldValues, newValues);
}
/**
- * Checks if the specified resource is registered as a child of a resource in the map.
+ * Returns the resource which has the same key as the key of the specified resource
+ * in the list as a value of the map.
*
* @param map map storing parent - child relationship of resources
- * @param resource resource to be checked
- * @return true if the resource is registered, false otherwise.
+ * @param resource resource to be checked for its key
+ * @return the resource which is regarded as the same as the specified resource
*/
- private boolean isRegistered(TransactionalMap<ResourcePath, List<ResourcePath>> map, ResourcePath resource) {
- // root is always regarded to be registered
+ // Naive implementation, which traverses all elements in the list
+ private <T extends ResourcePath> Optional<T> lookup(
+ TransactionalMap<ResourcePath.Discrete, Set<ResourcePath>> map, T resource) {
+ // if it is root, always returns itself
if (!resource.parent().isPresent()) {
- return true;
+ return Optional.of(resource);
}
- List<ResourcePath> value = map.get(resource.parent().get());
- return value != null && value.contains(resource);
+ Set<ResourcePath> values = map.get(resource.parent().get());
+ if (values == null) {
+ return Optional.empty();
+ }
+
+ @SuppressWarnings("unchecked")
+ Optional<T> result = values.stream()
+ .filter(x -> x.id().equals(resource.id()))
+ .map(x -> (T) x)
+ .findFirst();
+ return result;
+ }
+
+ /**
+ * Checks if there is enough resource volume to allocated the requested resource
+ * against the specified resource.
+ *
+ * @param original original resource
+ * @param request requested resource
+ * @param allocation current allocation of the resource
+ * @return true if there is enough resource volume. Otherwise, false.
+ */
+ private boolean hasEnoughResource(ResourcePath.Continuous original,
+ ResourcePath.Continuous request,
+ ContinuousResourceAllocation allocation) {
+ if (allocation == null) {
+ return request.value() <= original.value();
+ }
+
+ double allocated = allocation.allocations().stream()
+ .filter(x -> x.resource() instanceof ResourcePath.Continuous)
+ .map(x -> (ResourcePath.Continuous) x.resource())
+ .mapToDouble(ResourcePath.Continuous::value)
+ .sum();
+ double left = original.value() - allocated;
+ return request.value() <= left;
+ }
+
+ // internal use only
+ private static final class ContinuousResourceAllocation {
+ private final ResourcePath.Continuous original;
+ private final ImmutableList<ResourceAllocation> allocations;
+
+ private ContinuousResourceAllocation(ResourcePath.Continuous original,
+ ImmutableList<ResourceAllocation> allocations) {
+ this.original = original;
+ this.allocations = allocations;
+ }
+
+ private ResourcePath.Continuous original() {
+ return original;
+ }
+
+ private ImmutableList<ResourceAllocation> allocations() {
+ return allocations;
+ }
}
}