[SDFAB-21] Handle the VERIFY operation in the fabric pipeliner

Change-Id: I4e7ace7395f5bc8d7745b273e40f140b4da6d21d
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/pipeliner/FabricPipeliner.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/pipeliner/FabricPipeliner.java
index d1e9de2..744cd9d 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/pipeliner/FabricPipeliner.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/pipeliner/FabricPipeliner.java
@@ -17,6 +17,9 @@
 package org.onosproject.pipelines.fabric.impl.behaviour.pipeliner;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.onlab.packet.Ethernet;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.SharedExecutors;
@@ -37,6 +40,7 @@
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.Criteria;
 import org.onosproject.net.flow.criteria.PiCriterion;
+import org.onosproject.net.flowobjective.DefaultNextObjective;
 import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveStore;
 import org.onosproject.net.flowobjective.ForwardingObjective;
@@ -45,7 +49,12 @@
 import org.onosproject.net.flowobjective.NextTreatment;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.DefaultGroup;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
 import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupKey;
 import org.onosproject.net.group.GroupService;
 import org.onosproject.net.pi.runtime.PiAction;
 import org.onosproject.net.pi.runtime.PiActionParam;
@@ -57,8 +66,11 @@
 import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
@@ -157,9 +169,22 @@
     @Override
     public void next(NextObjective obj) {
         if (obj.op() == Objective.Operation.VERIFY) {
-            // TODO: support VERIFY operation
-            log.debug("VERIFY operation not yet supported for NextObjective, will return success");
-            success(obj);
+            if (obj.type() != NextObjective.Type.HASHED) {
+                log.warn("VERIFY operation not yet supported for NextObjective {}, will return failure :(",
+                        obj.type());
+                fail(obj, ObjectiveError.UNSUPPORTED);
+                return;
+            }
+
+            if (log.isTraceEnabled()) {
+                log.trace("Verify NextObjective {} in dev {}", obj, deviceId);
+            }
+            ObjectiveError error = handleVerify(obj);
+            if (error == null) {
+                success(obj);
+            } else {
+                fail(obj, error);
+            }
             return;
         }
 
@@ -199,6 +224,7 @@
     }
 
     private void handleNextGroup(NextObjective obj) {
+        // FIXME SDFAB-250 ADD_TO and REMOVE_FROM should update the content
         switch (obj.op()) {
             case REMOVE:
                 removeNextGroup(obj);
@@ -381,11 +407,145 @@
                 .build();
     }
 
+    private ObjectiveError handleVerify(NextObjective nextObjective) {
+        Map<GroupBucket, FlowRule> bucketsToFlows = getBucketToFlowMapping(nextObjective);
+        if (bucketsToFlows.isEmpty() && !nextObjective.nextTreatments().isEmpty()) {
+            log.warn("VERIFY failed due to translation error, bucketsToFlows is empty");
+            return ObjectiveError.BADPARAMS;
+        }
+
+        if (log.isTraceEnabled()) {
+            log.trace("Mapping bucketsToFlows {} has been generated ", bucketsToFlows);
+        }
+
+        GroupKey groupKey = nextTranslator.getGroupKey(nextObjective);
+        if (groupKey == null) {
+            log.warn("VERIFY failed due to translation error, unable to determine group key");
+            return ObjectiveError.BADPARAMS;
+        }
+        Group groupFromStore = groupService.getGroup(deviceId, groupKey);
+        if (groupFromStore == null) {
+            log.warn("VERIFY failed due to missing group in the store");
+            return ObjectiveError.GROUPMISSING;
+        }
+
+        // Looking for duplicate buckets - remove them by using a set and comparing size after/before
+        Set<GroupBucket> bucketsFromStore = Sets.newHashSet(groupFromStore.buckets().buckets());
+        if (groupFromStore.buckets().buckets().size() > bucketsFromStore.size()) {
+            log.warn("Duplicated buckets detected in device:{}, nextId:{}, before-size" +
+                            ":{} after-size:{} .. correcting", deviceId,
+                    nextObjective.id(), groupFromStore.buckets().buckets().size(), bucketsFromStore.size());
+            final GroupBuckets bucketToSet = new GroupBuckets(Lists.newArrayList(bucketsFromStore));
+            groupService.setBucketsForGroup(deviceId, groupKey, bucketToSet, groupKey, nextObjective.appId());
+            // Forge temporary the group to avoid race condition with the store
+            groupFromStore = new DefaultGroup(groupFromStore.id(), deviceId, groupFromStore.type(), bucketToSet);
+        }
+
+        // Looking for buckets missing in the group but defined in the next
+        Map<GroupBucket, FlowRule> toAdd = Maps.newHashMap();
+        for (Map.Entry<GroupBucket, FlowRule> entry : bucketsToFlows.entrySet()) {
+            if (!groupFromStore.buckets().buckets().contains(entry.getKey())) {
+                toAdd.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        // Looking for buckets missing in the next but defined in the group
+        // FIXME SDFAB-250 we cannot remove associated egress flows
+        List<GroupBucket> toRemove = Lists.newArrayList();
+        groupFromStore.buckets().buckets().forEach(bucket -> {
+            if (!bucketsToFlows.containsKey(bucket)) {
+                toRemove.add(bucket);
+            }
+        });
+
+        if (!toAdd.isEmpty() || !toRemove.isEmpty()) {
+            log.warn("Mismatch detected in device:{}, nextId:{}, groupFromTranslation-size:{} " +
+                            "groupFromStore-size:{} toAdd-size:{} toRemove-size: {} .. correcting",
+                    deviceId, nextObjective.id(), bucketsToFlows.size(), groupFromStore.buckets().buckets().size(),
+                    toAdd.size(), toRemove.size());
+        }
+
+        if (!toAdd.isEmpty()) {
+            if (log.isTraceEnabled()) {
+                log.trace("Adding missing buckets {} and flows {}", toAdd.keySet(), toAdd.values());
+            }
+            final FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
+            final FlowRule dummyFlow = getDummyFlow(nextObjective);
+            toAdd.values().stream()
+                    .filter(flowRule -> !flowRule.equals(dummyFlow))
+                    .forEach(ops::add);
+            final GroupBuckets bucketsToAdd = new GroupBuckets(Lists.newArrayList(toAdd.keySet()));
+            groupService.addBucketsToGroup(deviceId, groupKey, bucketsToAdd, groupKey, nextObjective.appId());
+            flowRuleService.apply(ops.build());
+        }
+
+        if (!toRemove.isEmpty()) {
+            if (log.isTraceEnabled()) {
+                log.trace("Removing stale buckets {}", toRemove);
+            }
+            final GroupBuckets bucketsToRemove = new GroupBuckets(toRemove);
+            groupService.removeBucketsFromGroup(deviceId, groupKey, bucketsToRemove, groupKey,
+                    nextObjective.appId());
+        }
+
+        return null;
+    }
+
+    private Map<GroupBucket, FlowRule> getBucketToFlowMapping(NextObjective nextObjective) {
+        Map<GroupBucket, FlowRule> mapping = Maps.newHashMap();
+        NextObjective newNextObjective;
+        ObjectiveTranslation result;
+        FlowRule dummyFlow = getDummyFlow(nextObjective);
+        FlowRule egFlow;
+        GroupBucket groupBucket;
+        GroupDescription group;
+        for (NextTreatment nextTreatment : nextObjective.nextTreatments()) {
+            newNextObjective = DefaultNextObjective.builder()
+                    .withId(nextObjective.id())
+                    .withType(nextObjective.type())
+                    .fromApp(nextObjective.appId())
+                    .withMeta(nextObjective.meta())
+                    .addTreatment(nextTreatment)
+                    .verify();
+            result = nextTranslator.translate(newNextObjective);
+            if ((result.groups().isEmpty() && result.flowRules().isEmpty()) ||
+                    result.groups().size() > 1) {
+                return Collections.emptyMap();
+            }
+            group = result.groups().iterator().next();
+            egFlow = result.flowRules().stream()
+                    .filter(flowRule -> flowRule.table().equals(FabricConstants.FABRIC_EGRESS_EGRESS_NEXT_EGRESS_VLAN))
+                    .findFirst()
+                    .orElse(null);
+            if (group.buckets().buckets().isEmpty() || group.buckets().buckets().size() > 1) {
+                return Collections.emptyMap();
+            }
+            groupBucket = group.buckets().buckets().iterator().next();
+            if (egFlow == null) {
+                mapping.put(groupBucket, dummyFlow);
+            } else {
+                mapping.put(groupBucket, egFlow);
+            }
+        }
+        return mapping;
+    }
+
+    private FlowRule getDummyFlow(NextObjective nextObjective) {
+        return DefaultFlowRule.builder()
+                .forDevice(deviceId)
+                .forTable(0)
+                .fromApp(nextObjective.appId())
+                .withPriority(1)
+                .withSelector(DefaultTrafficSelector.emptySelector())
+                .makePermanent()
+                .build();
+    }
+
     /**
      * NextGroup implementation.
      */
     private static class FabricNextGroup implements NextGroup {
-
+        // FIXME SDFAB-250 they are not very useful nor technically correct
         private final NextObjective.Type type;
         private final List<String> nextMappings;
 
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/pipeliner/NextObjectiveTranslator.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/pipeliner/NextObjectiveTranslator.java
index 07d86c3..8ecffb2 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/pipeliner/NextObjectiveTranslator.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/pipeliner/NextObjectiveTranslator.java
@@ -226,7 +226,7 @@
         // Updated result builder with hashed group.
         final int groupId = selectGroup(obj, resultBuilder);
 
-        if (isGroupModifyOp(obj)) {
+        if (isGroupModifyOp(obj) || obj.op() == Objective.Operation.VERIFY) {
             // No changes to flow rules.
             return;
         }
@@ -394,10 +394,7 @@
                 .collect(Collectors.toList());
 
         final int groupId = obj.id();
-        final PiGroupKey groupKey = new PiGroupKey(
-                hashedTableId,
-                FabricConstants.FABRIC_INGRESS_NEXT_HASHED_SELECTOR,
-                groupId);
+        final PiGroupKey groupKey = (PiGroupKey) getGroupKey(obj);
 
         resultBuilder.addGroup(new DefaultGroupDescription(
                 deviceId,
@@ -449,8 +446,7 @@
         final int groupId = obj.id();
         // Use DefaultGroupKey instead of PiGroupKey as we don't have any
         // action profile to apply to the groups of ALL type.
-        final GroupKey groupKey = new DefaultGroupKey(
-                FabricPipeliner.KRYO.serialize(groupId));
+        final GroupKey groupKey = getGroupKey(obj);
 
         resultBuilder.addGroup(
                 new DefaultGroupDescription(
@@ -501,4 +497,17 @@
     private boolean isXconnect(NextObjective obj) {
         return obj.appId().name().contains(XCONNECT);
     }
+
+    // Builds up the group key based on the next objective type
+    public GroupKey getGroupKey(NextObjective objective) {
+        if (objective.type() == NextObjective.Type.HASHED || objective.type() == NextObjective.Type.SIMPLE) {
+            return new PiGroupKey(FabricConstants.FABRIC_INGRESS_NEXT_HASHED,
+                    FabricConstants.FABRIC_INGRESS_NEXT_HASHED_SELECTOR,
+                    objective.id());
+        } else if (objective.type() == NextObjective.Type.BROADCAST) {
+            return new DefaultGroupKey(
+                    FabricPipeliner.KRYO.serialize(objective.id()));
+        }
+        return null;
+    }
 }