ONOS-5808: Allocate BW from ConnectivityIntentCompiler and unit tests for partial failure

Change-Id: I2eb3c16efbce619db6d0d2ba415a35752a61ece4
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 6f2ed13..7b79f95 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -51,8 +51,9 @@
 import org.onosproject.net.intent.impl.phase.FinalIntentProcessPhase;
 import org.onosproject.net.intent.impl.phase.IntentProcessPhase;
 import org.onosproject.net.intent.impl.phase.Skipped;
-import org.osgi.service.component.ComponentContext;
+import org.onosproject.net.resource.ResourceConsumer;
 import org.onosproject.net.resource.ResourceService;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
 import java.util.Collection;
@@ -344,9 +345,8 @@
             post(event);
             switch (event.type()) {
                 case WITHDRAWN:
-                    // release resources allocated to withdrawn intent
-                    if (!resourceService.release(event.subject().id())) {
-                        log.error("Failed to release resources allocated to {}", event.subject().id());
+                    if (!skipReleaseResourcesOnWithdrawal) {
+                        releaseResources(event.subject());
                     }
                     break;
                 default:
@@ -363,6 +363,41 @@
         public void onUpdate(IntentData intentData) {
             trackerService.trackIntent(intentData);
         }
+
+        private void releaseResources(Intent intent) {
+            // If a resource group is set on the intent, the resource consumer is
+            // set equal to it. Otherwise it's set to the intent key
+            ResourceConsumer resourceConsumer =
+                    intent.resourceGroup() != null ? intent.resourceGroup() : intent.key();
+
+            // By default the resource doesn't get released
+            boolean removeResource = false;
+
+            if (intent.resourceGroup() == null) {
+                // If the intent doesn't have a resource group, it means the
+                // resource was registered using the intent key, so it can be
+                // released
+                removeResource = true;
+            } else {
+                // When a resource group is set, we make sure there are no other
+                // intents using the same resource group, before deleting the
+                // related resources.
+                Long remainingIntents =
+                        Tools.stream(store.getIntents())
+                             .filter(i -> i.resourceGroup().equals(intent.resourceGroup()))
+                             .count();
+                if (remainingIntents == 0) {
+                    removeResource = true;
+                }
+            }
+
+            if (removeResource) {
+                // Release resources allocated to withdrawn intent
+                if (!resourceService.release(resourceConsumer)) {
+                    log.error("Failed to release resources allocated to {}", resourceConsumer);
+                }
+            }
+        }
     }
 
     // Store delegate enabled only when performing intent throughput tests
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/ConnectivityIntentCompiler.java b/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/ConnectivityIntentCompiler.java
index 74e94b1..0489c85 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/ConnectivityIntentCompiler.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/ConnectivityIntentCompiler.java
@@ -17,32 +17,48 @@
 
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.graph.DefaultEdgeWeigher;
 import org.onlab.graph.ScalarWeight;
 import org.onlab.graph.Weight;
+import org.onlab.util.Bandwidth;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
 import org.onosproject.net.DisjointPath;
 import org.onosproject.net.ElementId;
 import org.onosproject.net.Path;
+import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.intent.ConnectivityIntent;
 import org.onosproject.net.intent.Constraint;
 import org.onosproject.net.intent.IntentCompiler;
 import org.onosproject.net.intent.IntentExtensionService;
+import org.onosproject.net.intent.constraint.BandwidthConstraint;
 import org.onosproject.net.intent.constraint.HashedPathSelectionConstraint;
 import org.onosproject.net.intent.impl.PathNotFoundException;
-import org.onosproject.net.resource.ResourceQueryService;
 import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.resource.Resource;
+import org.onosproject.net.resource.ResourceAllocation;
+import org.onosproject.net.resource.ResourceConsumer;
+import org.onosproject.net.resource.ResourceId;
+import org.onosproject.net.resource.ResourceService;
+import org.onosproject.net.resource.Resources;
 import org.onosproject.net.topology.LinkWeigher;
 import org.onosproject.net.topology.PathService;
 import org.onosproject.net.topology.TopologyEdge;
 import org.onosproject.net.topology.TopologyVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Base class for compilers of various
@@ -54,6 +70,11 @@
 
     private static final ProviderId PID = new ProviderId("core", "org.onosproject.core", true);
 
+    private static final Logger log = LoggerFactory.getLogger(ConnectivityIntentCompiler.class);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected IntentExtensionService intentManager;
 
@@ -61,7 +82,7 @@
     protected PathService pathService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ResourceQueryService resourceService;
+    protected ResourceService resourceService;
 
     /**
      * Returns an edge-weight capable of evaluating links on the basis of the
@@ -164,6 +185,155 @@
     }
 
     /**
+     * Allocates the bandwidth specified as intent constraint on each link
+     * composing the intent, if a bandwidth constraint is specified.
+     *
+     * @param intent the intent requesting bandwidth allocation
+     * @param connectPoints the connect points composing the intent path computed
+     */
+    protected void allocateBandwidth(ConnectivityIntent intent,
+                                     List<ConnectPoint> connectPoints) {
+        // Retrieve bandwidth constraint if exists
+        List<Constraint> constraints = intent.constraints();
+
+        if (constraints == null) {
+            return;
+        }
+
+        Optional<Constraint> constraint =
+                constraints.stream()
+                           .filter(c -> c instanceof BandwidthConstraint)
+                           .findAny();
+
+        // If there is no bandwidth constraint continue
+        if (!constraint.isPresent()) {
+            return;
+        }
+
+        BandwidthConstraint bwConstraint = (BandwidthConstraint) constraint.get();
+
+        double bw = bwConstraint.bandwidth().bps();
+
+        // If a resource group is set on the intent, the resource consumer is
+        // set equal to it. Otherwise it's set to the intent key
+        ResourceConsumer newResourceConsumer =
+                intent.resourceGroup() != null ? intent.resourceGroup() : intent.key();
+
+        // Get the list of current resource allocations
+        Collection<ResourceAllocation> resourceAllocations =
+                resourceService.getResourceAllocations(newResourceConsumer);
+
+        // Get the list of resources already allocated from resource allocations
+        List<Resource> resourcesAllocated =
+                resourcesFromAllocations(resourceAllocations);
+
+        // Get the list of resource ids for resources already allocated
+        List<ResourceId> idsResourcesAllocated = resourceIds(resourcesAllocated);
+
+        // Create the list of incoming resources requested. Exclude resources
+        // already allocated.
+        List<Resource> incomingResources =
+                resources(connectPoints, bw).stream()
+                                            .filter(r -> !resourcesAllocated.contains(r))
+                                            .collect(Collectors.toList());
+
+        if (incomingResources.isEmpty()) {
+            return;
+        }
+
+        // Create the list of resources to be added, meaning their key is not
+        // present in the resources already allocated
+        List<Resource> resourcesToAdd =
+                incomingResources.stream()
+                                 .filter(r -> !idsResourcesAllocated.contains(r.id()))
+                                 .collect(Collectors.toList());
+
+        // Resources to updated are all the new valid resources except the
+        // resources to be added
+        List<Resource> resourcesToUpdate = Lists.newArrayList(incomingResources);
+        resourcesToUpdate.removeAll(resourcesToAdd);
+
+        // If there are no resources to update skip update procedures
+        if (!resourcesToUpdate.isEmpty()) {
+            // Remove old resources that need to be updated
+            // TODO: use transaction updates when available in the resource service
+            List<ResourceAllocation> resourceAllocationsToUpdate =
+                    resourceAllocations.stream()
+                            .filter(rA -> resourceIds(resourcesToUpdate).contains(rA.resource().id()))
+                            .collect(Collectors.toList());
+            log.debug("Releasing bandwidth for intent {}: {} bps", newResourceConsumer, resourcesToUpdate);
+            resourceService.release(resourceAllocationsToUpdate);
+
+            // Update resourcesToAdd with the list of both the new resources and
+            // the resources to update
+            resourcesToAdd.addAll(resourcesToUpdate);
+        }
+
+        // Look also for resources allocated using the intent key and -if any-
+        // remove them
+        if (intent.resourceGroup() != null) {
+            // Get the list of current resource allocations made by intent key
+            Collection<ResourceAllocation> resourceAllocationsByKey =
+                    resourceService.getResourceAllocations(intent.key());
+
+            resourceService.release(Lists.newArrayList(resourceAllocationsByKey));
+        }
+
+        // Allocate resources
+        log.debug("Allocating bandwidth for intent {}: {} bps", newResourceConsumer, resourcesToAdd);
+        List<ResourceAllocation> allocations =
+                resourceService.allocate(newResourceConsumer, resourcesToAdd);
+
+        if (allocations.isEmpty()) {
+            log.debug("No resources allocated for intent {}", newResourceConsumer);
+        }
+
+        log.debug("Done allocating bandwidth for intent {}", newResourceConsumer);
+    }
+
+    /**
+     * Produces a list of resources from a list of resource allocations.
+     *
+     * @param rAs the list of resource allocations
+     * @return a list of resources retrieved from the resource allocations given
+     */
+    private static List<Resource> resourcesFromAllocations(Collection<ResourceAllocation> rAs) {
+        return rAs.stream()
+                  .map(ResourceAllocation::resource)
+                  .collect(Collectors.toList());
+    }
+
+    /**
+     * Creates a list of continuous bandwidth resources given a list of connect
+     * points and a bandwidth.
+     *
+     * @param cps the list of connect points
+     * @param bw the bandwidth expressed as a double
+     * @return the list of resources
+     */
+    private static List<Resource> resources(List<ConnectPoint> cps, double bw) {
+        return cps.stream()
+                  // Make sure the element id is a valid device id
+                  .filter(cp -> cp.elementId() instanceof DeviceId)
+                  // Create a continuous resource for each CP we're going through
+                  .map(cp -> Resources.continuous(cp.deviceId(), cp.port(),
+                                                  Bandwidth.class).resource(bw))
+                  .collect(Collectors.toList());
+    }
+
+    /**
+     * Returns a list of resource ids given a list of resources.
+     *
+     * @param resources the list of resources
+     * @return the list of resource ids retrieved from the resources given
+     */
+    private static List<ResourceId> resourceIds(List<Resource> resources) {
+        return resources.stream()
+                        .map(Resource::id)
+                        .collect(Collectors.toList());
+    }
+
+    /**
      * Edge-weight capable of evaluating link cost using a set of constraints.
      */
     protected class ConstraintBasedLinkWeigher extends DefaultEdgeWeigher<TopologyVertex, TopologyEdge>
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/HostToHostIntentCompiler.java b/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/HostToHostIntentCompiler.java
index f2283a0..4d6270a 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/HostToHostIntentCompiler.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/HostToHostIntentCompiler.java
@@ -22,6 +22,7 @@
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultLink;
 import org.onosproject.net.DefaultPath;
 import org.onosproject.net.DeviceId;
@@ -35,7 +36,6 @@
 import org.onosproject.net.intent.Intent;
 import org.onosproject.net.intent.IntentCompilationException;
 import org.onosproject.net.intent.LinkCollectionIntent;
-import org.onosproject.net.intent.PathIntent;
 import org.onosproject.net.intent.constraint.AsymmetricPathConstraint;
 import org.slf4j.Logger;
 
@@ -45,6 +45,7 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.onosproject.net.Link.Type.EDGE;
 import static org.onosproject.net.flow.DefaultTrafficSelector.builder;
@@ -115,22 +116,6 @@
                 .build();
     }
 
-    // Creates a path intent from the specified path and original connectivity intent.
-    private Intent createPathIntent(Path path, Host src, Host dst,
-                                    HostToHostIntent intent) {
-        TrafficSelector selector = builder(intent.selector())
-                .matchEthSrc(src.mac()).matchEthDst(dst.mac()).build();
-        return PathIntent.builder()
-                .appId(intent.appId())
-                .key(intent.key())
-                .selector(selector)
-                .treatment(intent.treatment())
-                .path(path)
-                .constraints(intent.constraints())
-                .priority(intent.priority())
-                .build();
-    }
-
     private FilteredConnectPoint getFilteredPointFromLink(Link link) {
         FilteredConnectPoint filteredConnectPoint;
         if (link.src().elementId() instanceof DeviceId) {
@@ -147,14 +132,13 @@
                                              Host src,
                                              Host dst,
                                              HostToHostIntent intent) {
-        /*
-         * The path contains also the edge links, these are not necessary
-         * for the LinkCollectionIntent.
-         */
-        Set<Link> coreLinks = path.links()
-                .stream()
-                .filter(link -> !link.type().equals(EDGE))
-                .collect(Collectors.toSet());
+        // Try to allocate bandwidth
+        List<ConnectPoint> pathCPs =
+                path.links().stream()
+                            .flatMap(l -> Stream.of(l.src(), l.dst()))
+                            .collect(Collectors.toList());
+
+        allocateBandwidth(intent, pathCPs);
 
         Link ingressLink = path.links().get(0);
         Link egressLink = path.links().get(path.links().size() - 1);
@@ -167,6 +151,15 @@
                 .matchEthDst(dst.mac())
                 .build();
 
+        /*
+         * The path contains also the edge links, these are not necessary
+         * for the LinkCollectionIntent.
+         */
+        Set<Link> coreLinks = path.links()
+                .stream()
+                .filter(link -> !link.type().equals(EDGE))
+                .collect(Collectors.toSet());
+
         return LinkCollectionIntent.builder()
                 .key(intent.key())
                 .appId(intent.appId())
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/MultiPointToSinglePointIntentCompiler.java b/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/MultiPointToSinglePointIntentCompiler.java
index 3b4dca9..7e6cf3d 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/MultiPointToSinglePointIntentCompiler.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/MultiPointToSinglePointIntentCompiler.java
@@ -38,6 +38,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.onosproject.net.intent.constraint.PartialFailureConstraint.intentAllowsPartialFailure;
 
@@ -85,7 +87,7 @@
                 continue;
             }
 
-            Path path = getPath(intent, ingressPoint.deviceId(), intent.egressPoint().deviceId());
+            Path path = getPath(intent, ingressPoint.deviceId(), egressPoint.deviceId());
 
             if (path != null) {
                 hasPaths = true;
@@ -106,6 +108,23 @@
             }
         }
 
+        // Allocate bandwidth on existing paths if a bandwidth constraint is set
+        List<ConnectPoint> ingressCPs =
+                intent.filteredIngressPoints().stream()
+                                              .map(fcp -> fcp.connectPoint())
+                                              .collect(Collectors.toList());
+        ConnectPoint egressCP = intent.filteredEgressPoint().connectPoint();
+
+        List<ConnectPoint> pathCPs =
+                links.values().stream()
+                              .flatMap(l -> Stream.of(l.src(), l.dst()))
+                              .collect(Collectors.toList());
+
+        pathCPs.addAll(ingressCPs);
+        pathCPs.add(egressCP);
+
+        allocateBandwidth(intent, pathCPs);
+
         if (!hasPaths) {
             throw new IntentException("Cannot find any path between ingress and egress points.");
         } else if (!allowMissingPaths && missingSomePaths) {
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/PointToPointIntentCompiler.java b/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/PointToPointIntentCompiler.java
index 94fe6ed..10e24eb 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/PointToPointIntentCompiler.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/PointToPointIntentCompiler.java
@@ -73,6 +73,8 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
 import static org.onosproject.net.DefaultEdgeLink.createEdgeLink;
@@ -122,8 +124,8 @@
     @Override
     public List<Intent> compile(PointToPointIntent intent, List<Intent> installable) {
         log.trace("compiling {} {}", intent, installable);
-        ConnectPoint ingressPoint = intent.ingressPoint();
-        ConnectPoint egressPoint = intent.egressPoint();
+        ConnectPoint ingressPoint = intent.filteredIngressPoint().connectPoint();
+        ConnectPoint egressPoint = intent.filteredEgressPoint().connectPoint();
 
         if (ingressPoint.deviceId().equals(egressPoint.deviceId())) {
             return createZeroHopLinkCollectionIntent(intent);
@@ -157,6 +159,15 @@
                                        intent));
     }
 
+    /**
+     * Creates an unprotected intent.
+     * @param ingressPoint the ingress connect point
+     * @param egressPoint the egress connect point
+     * @param intent the original intent
+     * @return the compilation result
+     * @deprecated 1.10.0
+     */
+    @Deprecated
     private List<Intent> createUnprotectedIntent(ConnectPoint ingressPoint,
                                                  ConnectPoint egressPoint,
                                                  PointToPointIntent intent) {
@@ -177,6 +188,20 @@
         Path path = getPathOrException(intent, intent.filteredIngressPoint().connectPoint().deviceId(),
                                        intent.filteredEgressPoint().connectPoint().deviceId());
 
+        // Allocate bandwidth if a bandwidth constraint is set
+        ConnectPoint ingressCP = intent.filteredIngressPoint().connectPoint();
+        ConnectPoint egressCP = intent.filteredEgressPoint().connectPoint();
+
+        List<ConnectPoint> pathCPs =
+                path.links().stream()
+                            .flatMap(l -> Stream.of(l.src(), l.dst()))
+                            .collect(Collectors.toList());
+
+        pathCPs.add(ingressCP);
+        pathCPs.add(egressCP);
+
+        allocateBandwidth(intent, pathCPs);
+
         return asList(createLinkCollectionIntent(ImmutableSet.copyOf(path.links()),
                                                  path.cost(),
                                                  intent));
@@ -292,6 +317,21 @@
         if (reusableIntents != null && reusableIntents.size() > 1) {
             return reusableIntents;
         } else {
+            // Allocate bandwidth if a bandwidth constraint is set
+            ConnectPoint ingressCP = intent.filteredIngressPoint().connectPoint();
+            ConnectPoint egressCP = intent.filteredEgressPoint().connectPoint();
+
+            List<ConnectPoint> pathCPs =
+                    onlyPath.links().stream()
+                            .flatMap(l -> Stream.of(l.src(), l.dst()))
+                            .collect(Collectors.toList());
+
+            pathCPs.add(ingressCP);
+            pathCPs.add(egressCP);
+
+            // Allocate bandwidth if a bandwidth constraint is set
+            allocateBandwidth(intent, pathCPs);
+
             links.add(createEdgeLink(ingressPoint, true));
             links.addAll(onlyPath.links());
             links.add(createEdgeLink(egressPoint, false));
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/SinglePointToMultiPointIntentCompiler.java b/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/SinglePointToMultiPointIntentCompiler.java
index 504e4f3..369e52b 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/SinglePointToMultiPointIntentCompiler.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/compiler/SinglePointToMultiPointIntentCompiler.java
@@ -34,6 +34,8 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.onosproject.net.intent.constraint.PartialFailureConstraint.intentAllowsPartialFailure;
 
@@ -76,6 +78,7 @@
             }
 
             Path path = getPath(intent, intent.ingressPoint().deviceId(), egressPoint.deviceId());
+
             if (path != null) {
                 hasPaths = true;
                 links.addAll(path.links());
@@ -84,6 +87,23 @@
             }
         }
 
+        // Allocate bandwidth if a bandwidth constraint is set
+        ConnectPoint ingressCP = intent.filteredIngressPoint().connectPoint();
+        List<ConnectPoint> egressCPs =
+                intent.filteredEgressPoints().stream()
+                        .map(fcp -> fcp.connectPoint())
+                        .collect(Collectors.toList());
+
+        List<ConnectPoint> pathCPs =
+                links.stream()
+                     .flatMap(l -> Stream.of(l.src(), l.dst()))
+                     .collect(Collectors.toList());
+
+        pathCPs.add(ingressCP);
+        pathCPs.addAll(egressCPs);
+
+        allocateBandwidth(intent, pathCPs);
+
         if (!hasPaths) {
             throw new IntentException("Cannot find any path between ingress and egress points.");
         } else if (!allowMissingPaths && missingSomePaths) {