Separate the child map into two child maps for each resource type

Now, we have a child map for discrete type resources and
a child map for continuous type resources.

This is a refactoring for ONOS-4281.

Change-Id: I1fc931d6b6599885573908606600a61686073bee
diff --git a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
index 89b73de..4c0137e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
@@ -79,8 +79,9 @@
     private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
 
     private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers";
+    private static final String DISCRETE_CHILD_MAP = "onos-resource-discrete-children";
     private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
-    private static final String CHILD_MAP = "onos-resource-children";
+    private static final String CONTINUOUS_CHILD_MAP = "onos-resource-continuous-children";
     private static final Serializer SERIALIZER = Serializer.using(
             Arrays.asList(KryoNamespaces.API),
             ContinuousResourceAllocation.class);
@@ -93,8 +94,9 @@
     protected StorageService service;
 
     private ConsistentMap<DiscreteResourceId, ResourceConsumer> discreteConsumers;
+    private ConsistentMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildMap;
     private ConsistentMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumers;
-    private ConsistentMap<DiscreteResourceId, Set<Resource>> childMap;
+    private ConsistentMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildMap;
 
     @Activate
     public void activate() {
@@ -102,17 +104,23 @@
                 .withName(DISCRETE_CONSUMER_MAP)
                 .withSerializer(SERIALIZER)
                 .build();
+        discreteChildMap = service.<DiscreteResourceId, Set<DiscreteResource>>consistentMapBuilder()
+                .withName(DISCRETE_CHILD_MAP)
+                .withSerializer(SERIALIZER)
+                .build();
         continuousConsumers = service.<ContinuousResourceId, ContinuousResourceAllocation>consistentMapBuilder()
                 .withName(CONTINUOUS_CONSUMER_MAP)
                 .withSerializer(SERIALIZER)
                 .build();
-        childMap = service.<DiscreteResourceId, Set<Resource>>consistentMapBuilder()
-                .withName(CHILD_MAP)
+        continuousChildMap = service.<DiscreteResourceId, Set<ContinuousResource>>consistentMapBuilder()
+                .withName(CONTINUOUS_CHILD_MAP)
                 .withSerializer(SERIALIZER)
                 .build();
 
-        Tools.retryable(() -> childMap.put(Resource.ROOT.id(), new LinkedHashSet<>()),
-                        ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
+        Tools.retryable(() -> discreteChildMap.put(Resource.ROOT.id(), new LinkedHashSet<>()),
+                ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
+        Tools.retryable(() -> continuousChildMap.put(Resource.ROOT.id(), new LinkedHashSet<>()),
+                ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
         log.info("Started");
     }
 
@@ -162,8 +170,10 @@
         TransactionContext tx = service.transactionContextBuilder().build();
         tx.begin();
 
-        TransactionalMap<DiscreteResourceId, Set<Resource>> childTxMap =
-                tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
+        TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildTxMap =
+                tx.getTransactionalMap(DISCRETE_CHILD_MAP, SERIALIZER);
+        TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildTxMap =
+                tx.getTransactionalMap(CONTINUOUS_CHILD_MAP, SERIALIZER);
 
         // the order is preserved by LinkedHashMap
         Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
@@ -171,11 +181,11 @@
                 .collect(Collectors.groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.toList()));
 
         for (Map.Entry<DiscreteResource, List<Resource>> entry: resourceMap.entrySet()) {
-            if (!lookup(childTxMap, entry.getKey().id()).isPresent()) {
+            if (!lookup(discreteChildTxMap, continuousChildTxMap, entry.getKey().id()).isPresent()) {
                 return abortTransaction(tx);
             }
 
-            if (!appendValues(childTxMap, entry.getKey().id(), entry.getValue())) {
+            if (!appendValues(discreteChildTxMap, continuousChildTxMap, entry.getKey().id(), entry.getValue())) {
                 return abortTransaction(tx);
             }
         }
@@ -201,12 +211,14 @@
         TransactionContext tx = service.transactionContextBuilder().build();
         tx.begin();
 
-        TransactionalMap<DiscreteResourceId, Set<Resource>> childTxMap =
-                tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
         TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap =
                 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
+        TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildTxMap =
+                tx.getTransactionalMap(DISCRETE_CHILD_MAP, SERIALIZER);
         TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
                 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
+        TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildTxMap =
+                tx.getTransactionalMap(CONTINUOUS_CHILD_MAP, SERIALIZER);
 
         // Look up resources by resource IDs
         List<Resource> resources = ids.stream()
@@ -216,7 +228,7 @@
                     if (x instanceof DiscreteResourceId) {
                         return Optional.of(Resources.discrete((DiscreteResourceId) x).resource());
                     } else {
-                        return lookup(childTxMap, x);
+                        return lookup(continuousChildTxMap, (ContinuousResourceId) x);
                     }
                 })
                 .filter(Optional::isPresent)
@@ -245,7 +257,7 @@
                 return abortTransaction(tx);
             }
 
-            if (!removeValues(childTxMap, entry.getKey(), entry.getValue())) {
+            if (!removeValues(discreteChildTxMap, continuousChildTxMap, entry.getKey(), entry.getValue())) {
                 log.warn("Failed to unregister {}: Failed to remove {} values.",
                           entry.getKey(), entry.getValue().size());
                 log.debug("Failed to unregister {}: Failed to remove values: {}",
@@ -275,16 +287,18 @@
         TransactionContext tx = service.transactionContextBuilder().build();
         tx.begin();
 
-        TransactionalMap<DiscreteResourceId, Set<Resource>> childTxMap =
-                tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
         TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap =
                 tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
+        TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildTxMap =
+                tx.getTransactionalMap(DISCRETE_CHILD_MAP, SERIALIZER);
         TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
                 tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
+        TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildTxMap =
+                tx.getTransactionalMap(CONTINUOUS_CHILD_MAP, SERIALIZER);
 
         for (Resource resource: resources) {
             // if the resource is not registered, then abort
-            Optional<Resource> lookedUp = lookup(childTxMap, resource.id());
+            Optional<Resource> lookedUp = lookup(discreteChildTxMap, continuousChildTxMap, resource.id());
             if (!lookedUp.isPresent()) {
                 return abortTransaction(tx);
             }
@@ -372,7 +386,7 @@
     // computational complexity: O(n) where n is the number of existing allocations for the resource
     private boolean isAvailable(ContinuousResource resource) {
         // check if it's registered or not.
-        Versioned<Set<Resource>> children = childMap.get(resource.parent().get().id());
+        Versioned<Set<ContinuousResource>> children = continuousChildMap.get(resource.parent().get().id());
         if (children == null) {
             return false;
         }
@@ -380,7 +394,6 @@
         ContinuousResource registered = children.value().stream()
                 .filter(c -> c.id().equals(resource.id()))
                 .findFirst()
-                .map(c -> (ContinuousResource) c)
                 .get();
         if (registered.value() < resource.value()) {
             // Capacity < requested, can never satisfy
@@ -424,12 +437,21 @@
     public Set<Resource> getChildResources(DiscreteResourceId parent) {
         checkNotNull(parent);
 
-        Versioned<Set<Resource>> children = childMap.get(parent);
-        if (children == null) {
-            return ImmutableSet.of();
-        }
+        Versioned<Set<DiscreteResource>> discreteChildren = discreteChildMap.get(parent);
+        Versioned<Set<ContinuousResource>> continuousChildren = continuousChildMap.get(parent);
 
-        return children.value();
+        if (discreteChildren == null && continuousChildren == null) {
+            return ImmutableSet.of();
+        } else if (discreteChildren == null) {
+            return ImmutableSet.copyOf(continuousChildren.value());
+        } else if (continuousChildren == null) {
+            return ImmutableSet.copyOf(discreteChildren.value());
+        } else {
+            return ImmutableSet.<Resource>builder()
+                    .addAll(discreteChildren.value())
+                    .addAll(continuousChildren.value())
+                    .build();
+        }
     }
 
     // computational complexity: O(n) where n is the number of the children of the parent
@@ -438,18 +460,18 @@
         checkNotNull(parent);
         checkNotNull(cls);
 
-        Versioned<Set<Resource>> children = childMap.get(parent);
-        if (children == null) {
-            return ImmutableList.of();
+        Set<Resource> children = getChildResources(parent);
+        if (children.isEmpty()) {
+            return children;
         }
 
-        Stream<DiscreteResource> discrete = children.value().stream()
+        Stream<DiscreteResource> discrete = children.stream()
                 .filter(x -> x.isTypeOf(cls))
                 .filter(x -> x instanceof DiscreteResource)
                 .map(x -> ((DiscreteResource) x))
                 .filter(x -> discreteConsumers.containsKey(x.id()));
 
-        Stream<ContinuousResource> continuous = children.value().stream()
+        Stream<ContinuousResource> continuous = children.stream()
                 .filter(x -> x.id().equals(parent.child(cls)))
                 .filter(x -> x instanceof ContinuousResource)
                 .map(x -> (ContinuousResource) x)
@@ -505,29 +527,77 @@
      * Appends the values to the existing values associated with the specified key.
      * If the map already has all the given values, appending will not happen.
      *
-     * @param map map holding multiple values for a key
+     * @param discreteTxMap map holding multiple discrete resources for a key
+     * @param continuousTxMap map holding multiple continuous resources for a key
      * @param key key specifying values
      * @param values values to be appended
      * @return true if the operation succeeds, false otherwise.
      */
     // computational complexity: O(n) where n is the number of the specified value
-    private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<Resource>> map,
+    private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap,
+                                 TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap,
                                  DiscreteResourceId key, List<Resource> values) {
-        Set<Resource> requested = new LinkedHashSet<>(values);
-        Set<Resource> oldValues = map.putIfAbsent(key, requested);
+        // it's assumed that the passed "values" is non-empty
+
+        // This is 2-pass scan. Nicer to have 1-pass scan
+        List<DiscreteResource> discreteValues = values.stream()
+                .filter(x -> x instanceof DiscreteResource)
+                .map(x -> (DiscreteResource) x)
+                .collect(Collectors.toList());
+        List<ContinuousResource> continuousValues = values.stream()
+                .filter(x -> x instanceof ContinuousResource)
+                .map(x -> (ContinuousResource) x)
+                .collect(Collectors.toList());
+
+        // short-circuit decision avoiding unnecessary distributed map operations
+        if (continuousValues.isEmpty()) {
+            return appendValues(discreteTxMap, key, discreteValues, null);
+        }
+        if (discreteValues.isEmpty()) {
+            return appendValues(continuousTxMap, key, continuousValues, null);
+        }
+
+        return appendValues(discreteTxMap, key, discreteValues, null)
+                && appendValues(continuousTxMap, key, continuousValues, null);
+    }
+
+    private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> map,
+                                 DiscreteResourceId key, List<DiscreteResource> values, DiscreteResource dummy) {
+        Set<DiscreteResource> requested = new LinkedHashSet<>(values);
+        Set<DiscreteResource> oldValues = map.putIfAbsent(key, requested);
         if (oldValues == null) {
             return true;
         }
 
-        Set<Resource> addedValues = Sets.difference(requested, oldValues);
+        Set<DiscreteResource> addedValues = Sets.difference(requested, oldValues);
         // no new value, then no-op
         if (addedValues.isEmpty()) {
             // don't write to map because all values are already stored
             return true;
         }
 
-        Set<ResourceId> addedIds = addedValues.stream()
-                .map(Resource::id)
+        Set<DiscreteResource> newValues = new LinkedHashSet<>(oldValues);
+        newValues.addAll(addedValues);
+        return map.replace(key, oldValues, newValues);
+    }
+
+    private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> map,
+                                 DiscreteResourceId key, List<ContinuousResource> values, ContinuousResource dummy) {
+        Set<ContinuousResource> requested = new LinkedHashSet<>(values);
+        Set<ContinuousResource> oldValues = map.putIfAbsent(key, requested);
+        if (oldValues == null) {
+            return true;
+        }
+
+        Set<ContinuousResource> addedValues = Sets.difference(requested, oldValues);
+        // no new value, then no-op
+        if (addedValues.isEmpty()) {
+            // don't write to map because all values are already stored
+            return true;
+        }
+
+        Set<ContinuousResourceId> addedIds = addedValues.stream()
+                .map(ContinuousResource::id)
                 .collect(Collectors.toSet());
         // if the value is not found but the same ID is found
         // (this happens only when being continuous resource)
@@ -535,7 +605,7 @@
             // no-op, but indicating failure (reject the request)
             return false;
         }
-        Set<Resource> newValues = new LinkedHashSet<>(oldValues);
+        Set<ContinuousResource> newValues = new LinkedHashSet<>(oldValues);
         newValues.addAll(addedValues);
         return map.replace(key, oldValues, newValues);
     }
@@ -544,15 +614,41 @@
      * Removes the values from the existing values associated with the specified key.
      * If the map doesn't contain the given values, removal will not happen.
      *
-     * @param map map holding multiple values for a key
+     * @param discreteTxMap map holding multiple discrete resources for a key
+     * @param continuousTxMap map holding multiple continuous resources for a key
      * @param key key specifying values
      * @param values values to be removed
      * @return true if the operation succeeds, false otherwise
      */
-    // computational complexity: O(n) where n is the number of the specified values
-    private boolean removeValues(TransactionalMap<DiscreteResourceId, Set<Resource>> map,
+    private boolean removeValues(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap,
+                                 TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap,
                                  DiscreteResourceId key, List<Resource> values) {
-        Set<Resource> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
+        // it's assumed that the passed "values" is non-empty
+
+        // This is 2-pass scan. Nicer to have 1-pass scan
+        List<DiscreteResource> discreteValues = values.stream()
+                .filter(x -> x instanceof DiscreteResource)
+                .map(x -> (DiscreteResource) x)
+                .collect(Collectors.toList());
+        List<ContinuousResource> continuousValues = values.stream()
+                .filter(x -> x instanceof ContinuousResource)
+                .map(x -> (ContinuousResource) x)
+                .collect(Collectors.toList());
+
+        // short-circuit decision avoiding unnecessary distributed map operations
+        if (continuousValues.isEmpty()) {
+            return removeValues(discreteTxMap, key, discreteValues);
+        }
+        if (discreteValues.isEmpty()) {
+            return removeValues(continuousTxMap, key, continuousValues);
+        }
+
+        return removeValues(discreteTxMap, key, discreteValues) && removeValues(continuousTxMap, key, continuousValues);
+    }
+
+    private <T extends Resource> boolean removeValues(TransactionalMap<DiscreteResourceId, Set<T>> map,
+                               DiscreteResourceId key, List<T> values) {
+        Set<T> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
         if (oldValues == null) {
             log.trace("No-Op removing values. key {} did not exist", key);
             return true;
@@ -564,47 +660,70 @@
             return true;
         }
 
-        LinkedHashSet<Resource> newValues = new LinkedHashSet<>(oldValues);
+        LinkedHashSet<T> newValues = new LinkedHashSet<>(oldValues);
         newValues.removeAll(values);
         return map.replace(key, oldValues, newValues);
-    }
 
+    }
     /**
      * Returns the resource which has the same key as the specified resource ID
      * in the set as a value of the map.
      *
-     * @param childTxMap map storing parent - child relationship of resources
+     * @param discreteTxMap map storing parent - child relationship of discrete resources
+     * @param continuousTxMap map storing parent -child relationship of continuous resources
      * @param id ID of resource to be checked
      * @return the resource which is regarded as the same as the specified resource
      */
     // Naive implementation, which traverses all elements in the set when continuous resource
     // computational complexity: O(1) when discrete resource. O(n) when continuous resource
     // where n is the number of elements in the associated set
-    private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<Resource>> childTxMap, ResourceId id) {
+    private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap,
+                                      TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap,
+                                      ResourceId id) {
+        if (id instanceof DiscreteResourceId) {
+            return lookup(discreteTxMap, (DiscreteResourceId) id);
+        } else if (id instanceof ContinuousResourceId) {
+            return lookup(continuousTxMap, (ContinuousResourceId) id);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    // check the existence in the set: O(1) operation
+    private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap,
+                                      DiscreteResourceId id) {
         if (!id.parent().isPresent()) {
             return Optional.of(Resource.ROOT);
         }
 
-        Set<Resource> values = childTxMap.get(id.parent().get());
+        Set<DiscreteResource> values = discreteTxMap.get(id.parent().get());
         if (values == null) {
             return Optional.empty();
         }
 
-        // short-circuit if discrete resource
-        // check the existence in the set: O(1) operation
-        if (id instanceof DiscreteResourceId) {
-            DiscreteResource discrete = Resources.discrete((DiscreteResourceId) id).resource();
-            if (values.contains(discrete)) {
-                return Optional.of(discrete);
-            } else {
-                return Optional.empty();
-            }
+        DiscreteResource resource = Resources.discrete(id).resource();
+        if (values.contains(resource)) {
+            return Optional.of(resource);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    // iterate over the values in the set: O(n) operation
+    private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap,
+                                      ContinuousResourceId id) {
+        if (!id.parent().isPresent()) {
+            return Optional.of(Resource.ROOT);
         }
 
-        // continuous resource case
-        // iterate over the values in the set: O(n) operation
+        Set<ContinuousResource> values = continuousTxMap.get(id.parent().get());
+        if (values == null) {
+            return Optional.empty();
+        }
+
         return values.stream()
                 .filter(x -> x.id().equals(id))
+                .map(x -> (Resource) x)
                 .findFirst();
     }