ONOS-3296: Support continuous type resources

Change-Id: I155e41e7a7c1750ff45986a55bedab353485d3fa
diff --git a/core/api/src/main/java/org/onosproject/net/newresource/ResourceService.java b/core/api/src/main/java/org/onosproject/net/newresource/ResourceService.java
index 966de50..79a7bba 100644
--- a/core/api/src/main/java/org/onosproject/net/newresource/ResourceService.java
+++ b/core/api/src/main/java/org/onosproject/net/newresource/ResourceService.java
@@ -126,13 +126,13 @@
     boolean release(ResourceConsumer consumer);
 
     /**
-     * Returns resource allocation of the specified resource.
+     * Returns resource allocations of the specified resource.
      *
      * @param resource resource to check the allocation
-     * @return allocation information enclosed by Optional.
-     * If the resource is not allocated, the return value is empty.
+     * @return list of allocation information.
+     * If the resource is not allocated, the return value is an empty list.
      */
-    Optional<ResourceAllocation> getResourceAllocation(ResourcePath resource);
+    List<ResourceAllocation> getResourceAllocation(ResourcePath resource);
 
     /**
      * Returns allocated resources being as children of the specified parent and being the specified resource type.
diff --git a/core/api/src/main/java/org/onosproject/net/newresource/ResourceStore.java b/core/api/src/main/java/org/onosproject/net/newresource/ResourceStore.java
index 7c67430..b0c24bd 100644
--- a/core/api/src/main/java/org/onosproject/net/newresource/ResourceStore.java
+++ b/core/api/src/main/java/org/onosproject/net/newresource/ResourceStore.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Open Networking Laboratory
+ * Copyright 2015-2016 Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Optional;
 
 /**
  * Service for storing resource and consumer information.
@@ -77,12 +76,23 @@
     boolean release(List<ResourcePath> resources, List<ResourceConsumer> consumers);
 
     /**
-     * Returns the resource consumer to whom the specified resource is allocated.
+     * Returns the resource consumers to whom the specified resource is allocated.
+     * The return value is a list having only one element when the given resource is discrete type.
+     * The return value may have multiple elements when the given resource is continuous type.
      *
      * @param resource resource whose allocated consumer to be returned
-     * @return resource consumer who are allocated the resource
+     * @return resource consumers who are allocated the resource.
+     * Returns empty list if there is no such consumer.
      */
-    Optional<ResourceConsumer> getConsumer(ResourcePath resource);
+    List<ResourceConsumer> getConsumers(ResourcePath resource);
+
+    /**
+     * Returns the availability of the specified resource.
+     *
+     * @param resource resource to check the availability
+     * @return true if available, otherwise false
+     */
+    boolean isAvailable(ResourcePath resource);
 
     /**
      * Returns a collection of the resources allocated to the specified consumer.
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/OpticalCircuitIntentCompiler.java b/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/OpticalCircuitIntentCompiler.java
index ee04aab..2c547ea 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/OpticalCircuitIntentCompiler.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/OpticalCircuitIntentCompiler.java
@@ -313,9 +313,11 @@
             OchPort ochPort = (OchPort) deviceService.getPort(ochCP.deviceId(), ochCP.port());
             Optional<IntentId> intentId =
                     resourceService.getResourceAllocation(ResourcePath.discrete(ochCP.deviceId(), ochCP.port()))
+                            .stream()
                             .map(ResourceAllocation::consumer)
                             .filter(x -> x instanceof IntentId)
-                            .map(x -> (IntentId) x);
+                            .map(x -> (IntentId) x)
+                            .findAny();
 
             if (isAvailable(intentId.orElse(null))) {
                 return ochPort;
@@ -332,9 +334,12 @@
 
             Optional<IntentId> intentId =
                     resourceService.getResourceAllocation(ResourcePath.discrete(oduPort.deviceId(), port.number()))
+                            .stream()
                             .map(ResourceAllocation::consumer)
                             .filter(x -> x instanceof IntentId)
-                            .map(x -> (IntentId) x);
+                            .map(x -> (IntentId) x)
+                            .findAny();
+
             if (isAvailable(intentId.orElse(null))) {
                 return (OchPort) port;
             }
diff --git a/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceManager.java b/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceManager.java
index a5aae0f..eaf4b59 100644
--- a/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceManager.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Open Networking Laboratory
+ * Copyright 2015-2016 Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.GuavaCollectors;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.net.newresource.ResourceAdminService;
 import org.onosproject.net.newresource.ResourceAllocation;
@@ -34,10 +35,8 @@
 import org.onosproject.net.newresource.ResourceStore;
 import org.onosproject.net.newresource.ResourceStoreDelegate;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -107,11 +106,13 @@
     }
 
     @Override
-    public Optional<ResourceAllocation> getResourceAllocation(ResourcePath resource) {
+    public List<ResourceAllocation> getResourceAllocation(ResourcePath resource) {
         checkNotNull(resource);
 
-        Optional<ResourceConsumer> consumer = store.getConsumer(resource);
-        return consumer.map(x -> new ResourceAllocation(resource, x));
+        List<ResourceConsumer> consumers = store.getConsumers(resource);
+        return consumers.stream()
+                .map(x -> new ResourceAllocation(resource, x))
+                .collect(GuavaCollectors.toImmutableList());
     }
 
     @Override
@@ -119,17 +120,12 @@
         checkNotNull(parent);
         checkNotNull(cls);
 
+        // We access store twice in this method, then the store may be updated by others
         Collection<ResourcePath> resources = store.getAllocatedResources(parent, cls);
-        List<ResourceAllocation> allocations = new ArrayList<>(resources.size());
-        for (ResourcePath resource: resources) {
-            // We access store twice in this method, then the store may be updated by others
-            Optional<ResourceConsumer> consumer = store.getConsumer(resource);
-            if (consumer.isPresent()) {
-                allocations.add(new ResourceAllocation(resource, consumer.get()));
-            }
-        }
-
-        return allocations;
+        return resources.stream()
+                .flatMap(resource -> store.getConsumers(resource).stream()
+                        .map(consumer -> new ResourceAllocation(resource, consumer)))
+                .collect(GuavaCollectors.toImmutableList());
     }
 
     @Override
@@ -149,7 +145,7 @@
         Collection<ResourcePath> children = store.getChildResources(parent);
         return children.stream()
                 // We access store twice in this method, then the store may be updated by others
-                .filter(x -> !store.getConsumer(x).isPresent())
+                .filter(store::isAvailable)
                 .collect(Collectors.toList());
     }
 
@@ -157,8 +153,7 @@
     public boolean isAvailable(ResourcePath resource) {
         checkNotNull(resource);
 
-        Optional<ResourceConsumer> consumer = store.getConsumer(resource);
-        return !consumer.isPresent();
+        return store.isAvailable(resource);
     }
 
     @Override
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/compiler/MockResourceService.java b/core/net/src/test/java/org/onosproject/net/intent/impl/compiler/MockResourceService.java
index f5d3d0f..2926346 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/compiler/MockResourceService.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/compiler/MockResourceService.java
@@ -66,9 +66,10 @@
     }
 
     @Override
-    public Optional<ResourceAllocation> getResourceAllocation(ResourcePath resource) {
+    public List<ResourceAllocation> getResourceAllocation(ResourcePath resource) {
         return Optional.ofNullable(assignment.get(resource))
-                .map(x -> new ResourceAllocation(resource, x));
+                .map(x -> ImmutableList.of(new ResourceAllocation(resource, x)))
+                .orElse(ImmutableList.of());
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java b/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java
index c2a1a7c..7339b16 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Open Networking Laboratory
+ * Copyright 2015-2016 Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,14 +17,18 @@
 
 import com.google.common.annotations.Beta;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.GuavaCollectors;
 import org.onlab.util.Tools;
+import org.onosproject.net.newresource.ResourceAllocation;
 import org.onosproject.net.newresource.ResourceConsumer;
 import org.onosproject.net.newresource.ResourceEvent;
+import org.onosproject.net.newresource.ResourceId;
 import org.onosproject.net.newresource.ResourcePath;
 import org.onosproject.net.newresource.ResourceStore;
 import org.onosproject.net.newresource.ResourceStoreDelegate;
@@ -40,7 +44,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -49,7 +52,9 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -65,10 +70,12 @@
         implements ResourceStore {
     private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
 
-    private static final String CONSUMER_MAP = "onos-resource-consumers";
+    private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers";
+    private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
     private static final String CHILD_MAP = "onos-resource-children";
     private static final Serializer SERIALIZER = Serializer.using(
-            Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API));
+            Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API),
+            ContinuousResourceAllocation.class);
 
     // TODO: We should provide centralized values for this
     private static final int MAX_RETRIES = 5;
@@ -77,35 +84,61 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService service;
 
-    private ConsistentMap<ResourcePath, ResourceConsumer> consumerMap;
-    private ConsistentMap<ResourcePath, List<ResourcePath>> childMap;
+    private ConsistentMap<ResourcePath.Discrete, ResourceConsumer> discreteConsumers;
+    private ConsistentMap<ResourceId, ContinuousResourceAllocation> continuousConsumers;
+    private ConsistentMap<ResourcePath.Discrete, Set<ResourcePath>> childMap;
 
     @Activate
     public void activate() {
-        consumerMap = service.<ResourcePath, ResourceConsumer>consistentMapBuilder()
-                .withName(CONSUMER_MAP)
+        discreteConsumers = service.<ResourcePath.Discrete, ResourceConsumer>consistentMapBuilder()
+                .withName(DISCRETE_CONSUMER_MAP)
                 .withSerializer(SERIALIZER)
                 .build();
-        childMap = service.<ResourcePath, List<ResourcePath>>consistentMapBuilder()
+        continuousConsumers = service.<ResourceId, ContinuousResourceAllocation>consistentMapBuilder()
+                .withName(CONTINUOUS_CONSUMER_MAP)
+                .withSerializer(SERIALIZER)
+                .build();
+        childMap = service.<ResourcePath.Discrete, Set<ResourcePath>>consistentMapBuilder()
                 .withName(CHILD_MAP)
                 .withSerializer(SERIALIZER)
                 .build();
 
-        Tools.retryable(() -> childMap.put(ResourcePath.ROOT, ImmutableList.of()),
+        Tools.retryable(() -> childMap.put(ResourcePath.ROOT, new LinkedHashSet<>()),
                         ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
         log.info("Started");
     }
 
     @Override
-    public Optional<ResourceConsumer> getConsumer(ResourcePath resource) {
+    public List<ResourceConsumer> getConsumers(ResourcePath resource) {
         checkNotNull(resource);
+        checkArgument(resource instanceof ResourcePath.Discrete || resource instanceof ResourcePath.Continuous);
 
-        Versioned<ResourceConsumer> consumer = consumerMap.get(resource);
+        if (resource instanceof ResourcePath.Discrete) {
+            return getConsumer((ResourcePath.Discrete) resource);
+        } else {
+            return getConsumer((ResourcePath.Continuous) resource);
+        }
+    }
+
+    private List<ResourceConsumer> getConsumer(ResourcePath.Discrete resource) {
+        Versioned<ResourceConsumer> consumer = discreteConsumers.get(resource);
         if (consumer == null) {
-            return Optional.empty();
+            return ImmutableList.of();
         }
 
-        return Optional.of(consumer.value());
+        return ImmutableList.of(consumer.value());
+    }
+
+    private List<ResourceConsumer> getConsumer(ResourcePath.Continuous resource) {
+        Versioned<ContinuousResourceAllocation> allocations = continuousConsumers.get(resource.id());
+        if (allocations == null) {
+            return ImmutableList.of();
+        }
+
+        return allocations.value().allocations().stream()
+                .filter(x -> x.resource().equals(resource))
+                .map(ResourceAllocation::consumer)
+                .collect(GuavaCollectors.toImmutableList());
     }
 
     @Override
@@ -115,15 +148,16 @@
         TransactionContext tx = service.transactionContextBuilder().build();
         tx.begin();
 
-        TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap =
+        TransactionalMap<ResourcePath.Discrete, Set<ResourcePath>> childTxMap =
                 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
 
-        Map<ResourcePath, List<ResourcePath>> resourceMap = resources.stream()
+        Map<ResourcePath.Discrete, List<ResourcePath>> resourceMap = resources.stream()
                 .filter(x -> x.parent().isPresent())
                 .collect(Collectors.groupingBy(x -> x.parent().get()));
 
-        for (Map.Entry<ResourcePath, List<ResourcePath>> entry: resourceMap.entrySet()) {
-            if (!isRegistered(childTxMap, entry.getKey())) {
+        for (Map.Entry<ResourcePath.Discrete, List<ResourcePath>> entry: resourceMap.entrySet()) {
+            Optional<ResourcePath.Discrete> child = lookup(childTxMap, entry.getKey());
+            if (!child.isPresent()) {
                 return abortTransaction(tx);
             }
 
@@ -150,19 +184,32 @@
         TransactionContext tx = service.transactionContextBuilder().build();
         tx.begin();
 
-        TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap =
+        TransactionalMap<ResourcePath.Discrete, Set<ResourcePath>> childTxMap =
                 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
-        TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap =
-                tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER);
+        TransactionalMap<ResourcePath.Discrete, ResourceConsumer> discreteConsumerTxMap =
+                tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
+        TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
+                tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
 
-        Map<ResourcePath, List<ResourcePath>> resourceMap = resources.stream()
+        // Extract Discrete instances from resources
+        Map<ResourcePath.Discrete, List<ResourcePath>> resourceMap = resources.stream()
                 .filter(x -> x.parent().isPresent())
                 .collect(Collectors.groupingBy(x -> x.parent().get()));
 
         // even if one of the resources is allocated to a consumer,
         // all unregistrations are regarded as failure
-        for (Map.Entry<ResourcePath, List<ResourcePath>> entry: resourceMap.entrySet()) {
-            if (entry.getValue().stream().anyMatch(x -> consumerTxMap.get(x) != null)) {
+        for (Map.Entry<ResourcePath.Discrete, List<ResourcePath>> entry: resourceMap.entrySet()) {
+            boolean allocated = entry.getValue().stream().anyMatch(x -> {
+                if (x instanceof ResourcePath.Discrete) {
+                    return discreteConsumerTxMap.get((ResourcePath.Discrete) x) != null;
+                } else if (x instanceof ResourcePath.Continuous) {
+                    ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(x.id());
+                    return allocations != null && !allocations.allocations().isEmpty();
+                } else {
+                    return false;
+                }
+            });
+            if (allocated) {
                 return abortTransaction(tx);
             }
 
@@ -190,19 +237,39 @@
         TransactionContext tx = service.transactionContextBuilder().build();
         tx.begin();
 
-        TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap =
+        TransactionalMap<ResourcePath.Discrete, Set<ResourcePath>> childTxMap =
                 tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
-        TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap =
-                tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER);
+        TransactionalMap<ResourcePath.Discrete, ResourceConsumer> discreteConsumerTxMap =
+                tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
+        TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
+                tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
 
         for (ResourcePath resource: resources) {
-            if (!isRegistered(childTxMap, resource)) {
-                return abortTransaction(tx);
-            }
+            if (resource instanceof ResourcePath.Discrete) {
+                if (!lookup(childTxMap, resource).isPresent()) {
+                    return abortTransaction(tx);
+                }
 
-            ResourceConsumer oldValue = consumerTxMap.put(resource, consumer);
-            if (oldValue != null) {
-                return abortTransaction(tx);
+                ResourceConsumer oldValue = discreteConsumerTxMap.put((ResourcePath.Discrete) resource, consumer);
+                if (oldValue != null) {
+                    return abortTransaction(tx);
+                }
+            } else if (resource instanceof ResourcePath.Continuous) {
+                Optional<ResourcePath.Continuous> continuous = lookup(childTxMap, (ResourcePath.Continuous) resource);
+                if (!continuous.isPresent()) {
+                    return abortTransaction(tx);
+                }
+
+                ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(continuous.get().id());
+                if (!hasEnoughResource(continuous.get(), (ResourcePath.Continuous) resource, allocations)) {
+                    return abortTransaction(tx);
+                }
+
+                boolean success = appendValue(continuousConsumerTxMap,
+                        continuous.get(), new ResourceAllocation(continuous.get(), consumer));
+                if (!success) {
+                    return abortTransaction(tx);
+                }
             }
         }
 
@@ -218,8 +285,10 @@
         TransactionContext tx = service.transactionContextBuilder().build();
         tx.begin();
 
-        TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap =
-                tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER);
+        TransactionalMap<ResourcePath.Discrete, ResourceConsumer> discreteConsumerTxMap =
+                tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
+        TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
+                tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
         Iterator<ResourcePath> resourceIte = resources.iterator();
         Iterator<ResourceConsumer> consumerIte = consumers.iterator();
 
@@ -227,10 +296,24 @@
             ResourcePath resource = resourceIte.next();
             ResourceConsumer consumer = consumerIte.next();
 
-            // if this single release fails (because the resource is allocated to another consumer,
-            // the whole release fails
-            if (!consumerTxMap.remove(resource, consumer)) {
-                return abortTransaction(tx);
+            if (resource instanceof ResourcePath.Discrete) {
+                // if this single release fails (because the resource is allocated to another consumer,
+                // the whole release fails
+                if (!discreteConsumerTxMap.remove((ResourcePath.Discrete) resource, consumer)) {
+                    return abortTransaction(tx);
+                }
+            } else if (resource instanceof ResourcePath.Continuous) {
+                ResourcePath.Continuous continuous = (ResourcePath.Continuous) resource;
+                ContinuousResourceAllocation allocation = continuousConsumerTxMap.get(continuous.id());
+                ImmutableList<ResourceAllocation> newAllocations = allocation.allocations().stream()
+                        .filter(x -> !(x.consumer().equals(consumer) &&
+                                ((ResourcePath.Continuous) x.resource()).value() == continuous.value()))
+                        .collect(GuavaCollectors.toImmutableList());
+
+                if (!continuousConsumerTxMap.replace(continuous.id(), allocation,
+                        new ContinuousResourceAllocation(allocation.original(), newAllocations))) {
+                    return abortTransaction(tx);
+                }
             }
         }
 
@@ -238,22 +321,51 @@
     }
 
     @Override
+    public boolean isAvailable(ResourcePath resource) {
+        checkNotNull(resource);
+        checkArgument(resource instanceof ResourcePath.Discrete || resource instanceof ResourcePath.Continuous);
+
+        if (resource instanceof ResourcePath.Discrete) {
+            return getConsumer((ResourcePath.Discrete) resource).isEmpty();
+        } else {
+            return isAvailable((ResourcePath.Continuous) resource);
+        }
+    }
+
+    private boolean isAvailable(ResourcePath.Continuous resource) {
+        Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
+        if (allocation == null) {
+            return false;
+        }
+
+        return hasEnoughResource(allocation.value().original(), resource, allocation.value());
+    }
+
+    @Override
     public Collection<ResourcePath> getResources(ResourceConsumer consumer) {
         checkNotNull(consumer);
 
         // NOTE: getting all entries may become performance bottleneck
         // TODO: revisit for better backend data structure
-        return consumerMap.entrySet().stream()
+        Stream<ResourcePath.Discrete> discreteStream = discreteConsumers.entrySet().stream()
                 .filter(x -> x.getValue().value().equals(consumer))
-                .map(Map.Entry::getKey)
-                .collect(Collectors.toList());
+                .map(Map.Entry::getKey);
+
+        Stream<ResourcePath.Continuous> continuousStream = continuousConsumers.values().stream()
+                .flatMap(x -> x.value().allocations().stream()
+                        .map(y -> Maps.immutableEntry(x.value().original(), y)))
+                .filter(x -> x.getValue().consumer().equals(consumer))
+                .map(x -> x.getKey());
+
+        return Stream.concat(discreteStream, continuousStream).collect(Collectors.toList());
     }
 
     @Override
     public Collection<ResourcePath> getChildResources(ResourcePath parent) {
         checkNotNull(parent);
+        checkArgument(parent instanceof ResourcePath.Discrete);
 
-        Versioned<List<ResourcePath>> children = childMap.get(parent);
+        Versioned<Set<ResourcePath>> children = childMap.get((ResourcePath.Discrete) parent);
         if (children == null) {
             return Collections.emptyList();
         }
@@ -265,16 +377,28 @@
     public <T> Collection<ResourcePath> getAllocatedResources(ResourcePath parent, Class<T> cls) {
         checkNotNull(parent);
         checkNotNull(cls);
+        checkArgument(parent instanceof ResourcePath.Discrete);
 
-        Versioned<List<ResourcePath>> children = childMap.get(parent);
+        Versioned<Set<ResourcePath>> children = childMap.get((ResourcePath.Discrete) parent);
         if (children == null) {
             return Collections.emptyList();
         }
 
-        return children.value().stream()
+        Stream<ResourcePath.Discrete> discrete = children.value().stream()
                 .filter(x -> x.last().getClass().equals(cls))
-                .filter(consumerMap::containsKey)
-                .collect(Collectors.toList());
+                .filter(x -> x instanceof ResourcePath.Discrete)
+                .map(x -> (ResourcePath.Discrete) x)
+                .filter(discreteConsumers::containsKey);
+
+        Stream<ResourcePath.Continuous> continuous = children.value().stream()
+                .filter(x -> x.last().getClass().equals(cls))
+                .filter(x -> x instanceof ResourcePath.Continuous)
+                .map(x -> (ResourcePath.Continuous) x)
+                .filter(x -> continuousConsumers.containsKey(x.id()))
+                .filter(x -> continuousConsumers.get(x.id()) != null)
+                .filter(x -> !continuousConsumers.get(x.id()).value().allocations().isEmpty());
+
+        return Stream.concat(discrete, continuous).collect(Collectors.toList());
     }
 
     /**
@@ -288,6 +412,27 @@
         return false;
     }
 
+    // Appends the specified ResourceAllocation to the existing values stored in the map
+    private boolean appendValue(TransactionalMap<ResourceId, ContinuousResourceAllocation> map,
+                                ResourcePath.Continuous original, ResourceAllocation value) {
+        ContinuousResourceAllocation oldValue = map.putIfAbsent(original.id(),
+                new ContinuousResourceAllocation(original, ImmutableList.of(value)));
+        if (oldValue == null) {
+            return true;
+        }
+
+        if (oldValue.allocations().contains(value)) {
+            // don't write to map because all values are already stored
+            return true;
+        }
+
+        ContinuousResourceAllocation newValue = new ContinuousResourceAllocation(original,
+                ImmutableList.<ResourceAllocation>builder()
+                        .addAll(oldValue.allocations())
+                        .add(value)
+                        .build());
+        return map.replace(original.id(), oldValue, newValue);
+    }
     /**
      * Appends the values to the existing values associated with the specified key.
      * If the map already has all the given values, appending will not happen.
@@ -299,20 +444,20 @@
      * @param <V> type of the element of the list
      * @return true if the operation succeeds, false otherwise.
      */
-    private <K, V> boolean appendValues(TransactionalMap<K, List<V>> map, K key, List<V> values) {
-        List<V> oldValues = map.putIfAbsent(key, new ArrayList<>(values));
+    private <K, V> boolean appendValues(TransactionalMap<K, Set<V>> map, K key, List<V> values) {
+        Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>(values));
         if (oldValues == null) {
             return true;
         }
 
-        LinkedHashSet<V> oldSet = new LinkedHashSet<>(oldValues);
-        if (oldSet.containsAll(values)) {
+        if (oldValues.containsAll(values)) {
             // don't write to map because all values are already stored
             return true;
         }
 
-        oldSet.addAll(values);
-        return map.replace(key, oldValues, new ArrayList<>(oldSet));
+        LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
+        newValues.addAll(values);
+        return map.replace(key, oldValues, newValues);
     }
 
     /**
@@ -326,37 +471,93 @@
      * @param <V> type of the element of the list
      * @return true if the operation succeeds, false otherwise
      */
-    private <K, V> boolean removeValues(TransactionalMap<K, List<V>> map, K key, List<V> values) {
-        List<V> oldValues = map.get(key);
+    private <K, V> boolean removeValues(TransactionalMap<K, Set<V>> map, K key, List<? extends V> values) {
+        Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
         if (oldValues == null) {
-            map.put(key, new ArrayList<>());
             return true;
         }
 
-        LinkedHashSet<V> oldSet = new LinkedHashSet<>(oldValues);
-        if (values.stream().allMatch(x -> !oldSet.contains(x))) {
+        if (values.stream().allMatch(x -> !oldValues.contains(x))) {
             // don't write map because none of the values are stored
             return true;
         }
 
-        oldSet.removeAll(values);
-        return map.replace(key, oldValues, new ArrayList<>(oldSet));
+        LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
+        newValues.removeAll(values);
+        return map.replace(key, oldValues, newValues);
     }
 
     /**
-     * Checks if the specified resource is registered as a child of a resource in the map.
+     * Returns the resource which has the same key as the key of the specified resource
+     * in the list as a value of the map.
      *
      * @param map map storing parent - child relationship of resources
-     * @param resource resource to be checked
-     * @return true if the resource is registered, false otherwise.
+     * @param resource resource to be checked for its key
+     * @return the resource which is regarded as the same as the specified resource
      */
-    private boolean isRegistered(TransactionalMap<ResourcePath, List<ResourcePath>> map, ResourcePath resource) {
-        // root is always regarded to be registered
+    // Naive implementation, which traverses all elements in the list
+    private <T extends ResourcePath> Optional<T> lookup(
+            TransactionalMap<ResourcePath.Discrete, Set<ResourcePath>> map, T resource) {
+        // if it is root, always returns itself
         if (!resource.parent().isPresent()) {
-            return true;
+            return Optional.of(resource);
         }
 
-        List<ResourcePath> value = map.get(resource.parent().get());
-        return value != null && value.contains(resource);
+        Set<ResourcePath> values = map.get(resource.parent().get());
+        if (values == null) {
+            return Optional.empty();
+        }
+
+        @SuppressWarnings("unchecked")
+        Optional<T> result = values.stream()
+                .filter(x -> x.id().equals(resource.id()))
+                .map(x -> (T) x)
+                .findFirst();
+        return result;
+    }
+
+    /**
+     * Checks if there is enough resource volume to allocated the requested resource
+     * against the specified resource.
+     *
+     * @param original original resource
+     * @param request requested resource
+     * @param allocation current allocation of the resource
+     * @return true if there is enough resource volume. Otherwise, false.
+     */
+    private boolean hasEnoughResource(ResourcePath.Continuous original,
+                                      ResourcePath.Continuous request,
+                                      ContinuousResourceAllocation allocation) {
+        if (allocation == null) {
+            return request.value() <= original.value();
+        }
+
+        double allocated = allocation.allocations().stream()
+                .filter(x -> x.resource() instanceof ResourcePath.Continuous)
+                .map(x -> (ResourcePath.Continuous) x.resource())
+                .mapToDouble(ResourcePath.Continuous::value)
+                .sum();
+        double left = original.value() - allocated;
+        return request.value() <= left;
+    }
+
+    // internal use only
+    private static final class ContinuousResourceAllocation {
+        private final ResourcePath.Continuous original;
+        private final ImmutableList<ResourceAllocation> allocations;
+
+        private ContinuousResourceAllocation(ResourcePath.Continuous original,
+                                             ImmutableList<ResourceAllocation> allocations) {
+            this.original = original;
+            this.allocations = allocations;
+        }
+
+        private ResourcePath.Continuous original() {
+            return original;
+        }
+
+        private ImmutableList<ResourceAllocation> allocations() {
+            return allocations;
+        }
     }
 }
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 16a2a99..c7a236d 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -204,6 +204,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -239,7 +240,8 @@
             .register(CopyOnWriteArraySet.class)
             .register(ArrayList.class,
                       LinkedList.class,
-                      HashSet.class
+                      HashSet.class,
+                      LinkedHashSet.class
                       )
             .register(Maps.immutableEntry("a", "b").getClass())
             .register(new ArraysAsListSerializer(), Arrays.asList().getClass())