[ONOS-7242] Support MPLS by fabric.p4 pipeliner

Change-Id: I56a8f266e6d0afe5ad6737b8d0e399758fb75378
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricForwardingPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricForwardingPipeliner.java
index dfd8d85..bceefdd 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricForwardingPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricForwardingPipeliner.java
@@ -30,6 +30,7 @@
 import org.onosproject.net.flow.criteria.Criterion;
 import org.onosproject.net.flow.criteria.EthCriterion;
 import org.onosproject.net.flow.criteria.IPCriterion;
+import org.onosproject.net.flow.criteria.MplsCriterion;
 import org.onosproject.net.flow.criteria.VlanIdCriterion;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.ObjectiveError;
@@ -97,6 +98,7 @@
         VlanIdCriterion vlanIdCriterion = null;
         EthCriterion ethDstCriterion = null;
         IPCriterion ipDstCriterion = null;
+        MplsCriterion mplsCriterion = null;
 
         for (Criterion criterion : criteria) {
             switch (criterion.type()) {
@@ -109,6 +111,13 @@
                 case IPV4_DST:
                     ipDstCriterion = (IPCriterion) criterion;
                     break;
+                case MPLS_LABEL:
+                    mplsCriterion = (MplsCriterion) criterion;
+                    break;
+                case ETH_TYPE:
+                case MPLS_BOS:
+                    // do nothing
+                    break;
                 default:
                     log.warn("Unsupported criterion {}", criterion);
                     break;
@@ -127,10 +136,12 @@
             case IPV4_UNICAST:
                 processIpv4UnicastRule(ipDstCriterion, fwd, resultBuilder);
                 break;
+            case MPLS:
+                processMplsRule(mplsCriterion, fwd, resultBuilder);
+                break;
             case IPV4_MULTICAST:
             case IPV6_UNICAST:
             case IPV6_MULTICAST:
-            case MPLS:
             default:
                 log.warn("Unsupported forwarding function type {}", criteria);
                 resultBuilder.setError(ObjectiveError.UNSUPPORTED);
@@ -228,6 +239,41 @@
         resultBuilder.addFlowRule(flowRule);
     }
 
+    private void processMplsRule(MplsCriterion mplsCriterion, ForwardingObjective fwd,
+                                 PipelinerTranslationResult.Builder resultBuilder) {
+        checkNotNull(mplsCriterion, "Mpls criterion should not be null");
+        if (fwd.nextId() == null) {
+            log.warn("Forwarding objective for MPLS should contains next id");
+            resultBuilder.setError(ObjectiveError.BADPARAMS);
+            return;
+        }
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .add(mplsCriterion)
+                .build();
+
+        PiActionParam nextIdParam = new PiActionParam(FabricConstants.ACT_PRM_NEXT_ID_ID,
+                                                      ImmutableByteSequence.copyFrom(fwd.nextId().byteValue()));
+        PiAction nextIdAction = PiAction.builder()
+                .withId(FabricConstants.ACT_POP_MPLS_AND_NEXT_ID)
+                .withParameter(nextIdParam)
+                .build();
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .piTableAction(nextIdAction)
+                .build();
+
+        FlowRule flowRule = DefaultFlowRule.builder()
+                .withSelector(selector)
+                .withTreatment(treatment)
+                .fromApp(fwd.appId())
+                .withPriority(fwd.priority())
+                .makePermanent()
+                .forDevice(deviceId)
+                .forTable(FabricConstants.TBL_MPLS_ID)
+                .build();
+
+        resultBuilder.addFlowRule(flowRule);
+    }
+
     private static TrafficTreatment buildSetNextIdTreatment(Integer nextId) {
         PiActionParam nextIdParam = new PiActionParam(FabricConstants.ACT_PRM_NEXT_ID_ID,
                                                       ImmutableByteSequence.copyFrom(nextId.byteValue()));
@@ -240,5 +286,4 @@
                 .piTableAction(nextIdAction)
                 .build();
     }
-
 }
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricNextPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricNextPipeliner.java
index 7487fbe..324a4f5 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricNextPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricNextPipeliner.java
@@ -17,12 +17,10 @@
 package org.onosproject.pipelines.fabric.pipeliner;
 
 import com.google.common.collect.ImmutableMap;
-import org.onlab.util.ImmutableByteSequence;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
-import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.PiCriterion;
@@ -30,14 +28,29 @@
 import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
 import org.onosproject.net.pi.runtime.PiAction;
+import org.onosproject.net.pi.runtime.PiActionGroupId;
 import org.onosproject.net.pi.runtime.PiActionParam;
-import org.onosproject.pipelines.fabric.FabricConstants;
+import org.onosproject.net.pi.runtime.PiGroupKey;
 import org.slf4j.Logger;
 
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
-import static org.onosproject.pipelines.fabric.pipeliner.FabricPipeliner.fail;
+import static org.onlab.util.ImmutableByteSequence.copyFrom;
+import static org.onosproject.pipelines.fabric.FabricConstants.ACT_PRF_ECMP_SELECTOR_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.ACT_PRM_NEXT_TYPE_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.ACT_SET_NEXT_TYPE_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.HF_FABRIC_METADATA_NEXT_ID_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.TBL_HASHED_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.TBL_NEXT_ID_MAPPING_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.TBL_SIMPLE_ID;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -47,15 +60,15 @@
     private static final Logger log = getLogger(FabricNextPipeliner.class);
 
     // Next types
-    private static final byte TBL_SIMPLE = 0;
-    private static final byte TBL_HASHED = 1;
-    private static final byte TBL_BROADCAST = 2;
-    private static final byte PUNT = 3;
+    private static final byte NXT_TYPE_SIMPLE = 0;
+    private static final byte NXT_TYPE_HASHED = 1;
+    private static final byte NXT_TYPE_BROADCAST = 2;
+    private static final byte NXT_TYPE_PUNT = 3;
     private static final Map<NextObjective.Type, Byte> NEXT_TYPE_MAP =
             ImmutableMap.<NextObjective.Type, Byte>builder()
-                    .put(NextObjective.Type.SIMPLE, TBL_SIMPLE)
-                    .put(NextObjective.Type.HASHED, TBL_HASHED)
-                    .put(NextObjective.Type.BROADCAST, TBL_BROADCAST)
+                    .put(NextObjective.Type.SIMPLE, NXT_TYPE_SIMPLE)
+                    .put(NextObjective.Type.HASHED, NXT_TYPE_HASHED)
+                    .put(NextObjective.Type.BROADCAST, NXT_TYPE_BROADCAST)
                     .build();
 
     protected DeviceId deviceId;
@@ -66,11 +79,14 @@
 
     public PipelinerTranslationResult next(NextObjective nextObjective) {
         PipelinerTranslationResult.Builder resultBuilder = PipelinerTranslationResult.builder();
-        FlowRule nextIdMappingRule = processNextIdMapping(nextObjective);
-        FlowRule nextRule = null;
+        processNextIdMapping(nextObjective, resultBuilder);
+
         switch (nextObjective.type()) {
             case SIMPLE:
-                nextRule = processSimpleNext(nextObjective);
+                processSimpleNext(nextObjective, resultBuilder);
+                break;
+            case HASHED:
+                processHashedNext(nextObjective, resultBuilder);
                 break;
             default:
                 log.warn("Unsupported next type {}", nextObjective);
@@ -78,35 +94,33 @@
                 break;
         }
 
-        if (nextIdMappingRule != null && nextRule != null) {
-            resultBuilder.addFlowRule(nextIdMappingRule);
-            resultBuilder.addFlowRule(nextRule);
-        }
-
         return resultBuilder.build();
     }
 
-    private FlowRule processNextIdMapping(NextObjective next) {
+    private void processNextIdMapping(NextObjective next,
+                                      PipelinerTranslationResult.Builder resultBuilder) {
         // program the next id mapping table
         TrafficSelector nextIdSelector = buildNextIdSelector(next.id());
         TrafficTreatment setNextTypeTreatment = buildSetNextTypeTreatment(next.type());
 
-        return DefaultFlowRule.builder()
-                .withSelector(nextIdSelector)
-                .withTreatment(setNextTypeTreatment)
-                .forDevice(deviceId)
-                .forTable(FabricConstants.TBL_NEXT_ID_MAPPING_ID)
-                .makePermanent()
-                .withPriority(next.priority())
-                .fromApp(next.appId())
-                .build();
+        resultBuilder.addFlowRule(DefaultFlowRule.builder()
+                                          .withSelector(nextIdSelector)
+                                          .withTreatment(setNextTypeTreatment)
+                                          .forDevice(deviceId)
+                                          .forTable(TBL_NEXT_ID_MAPPING_ID)
+                                          .makePermanent()
+                                          .withPriority(next.priority())
+                                          .fromApp(next.appId())
+                                          .build());
     }
 
-    private FlowRule processSimpleNext(NextObjective next) {
+    private void processSimpleNext(NextObjective next,
+                                   PipelinerTranslationResult.Builder resultBuilder) {
+
         if (next.next().size() > 1) {
             log.warn("Only one treatment in simple next objective");
-            fail(next, ObjectiveError.BADPARAMS);
-            return null;
+            resultBuilder.setError(ObjectiveError.BADPARAMS);
+            return;
         }
 
         TrafficSelector selector = buildNextIdSelector(next.id());
@@ -120,36 +134,79 @@
 
         if (outputInst == null) {
             log.warn("At least one output instruction in simple next objective");
-            fail(next, ObjectiveError.BADPARAMS);
-            return null;
+            resultBuilder.setError(ObjectiveError.BADPARAMS);
+            return;
         }
-        return DefaultFlowRule.builder()
-                .withSelector(selector)
-                .withTreatment(treatment)
-                .forTable(FabricConstants.TBL_SIMPLE_ID)
-                .makePermanent()
-                .withPriority(next.priority())
-                .forDevice(deviceId)
-                .fromApp(next.appId())
+        resultBuilder.addFlowRule(DefaultFlowRule.builder()
+                                          .withSelector(selector)
+                                          .withTreatment(treatment)
+                                          .forTable(TBL_SIMPLE_ID)
+                                          .makePermanent()
+                                          .withPriority(next.priority())
+                                          .forDevice(deviceId)
+                                          .fromApp(next.appId())
+                                          .build());
+    }
+
+    private void processHashedNext(NextObjective nextObjective, PipelinerTranslationResult.Builder resultBuilder) {
+        // create hash groups
+        int groupId = nextObjective.id();
+        List<GroupBucket> bucketList = nextObjective.next().stream()
+                .map(DefaultGroupBucket::createSelectGroupBucket)
+                .collect(Collectors.toList());
+
+        if (bucketList.size() != nextObjective.next().size()) {
+            // some action not converted
+            // set error
+            log.warn("Expected bucket size {}, got {}", nextObjective.next().size(), bucketList.size());
+            resultBuilder.setError(ObjectiveError.BADPARAMS);
+            return;
+        }
+
+        GroupBuckets buckets = new GroupBuckets(bucketList);
+        PiGroupKey groupKey = new PiGroupKey(TBL_HASHED_ID,
+                                             ACT_PRF_ECMP_SELECTOR_ID,
+                                             groupId);
+
+        resultBuilder.addGroup(new DefaultGroupDescription(deviceId,
+                                                           GroupDescription.Type.SELECT,
+                                                           buckets,
+                                                           groupKey,
+                                                           groupId,
+                                                           nextObjective.appId()));
+
+        // flow
+        TrafficSelector selector = buildNextIdSelector(nextObjective.id());
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .piTableAction(PiActionGroupId.of(nextObjective.id()))
                 .build();
+
+        resultBuilder.addFlowRule(DefaultFlowRule.builder()
+                                          .withSelector(selector)
+                                          .withTreatment(treatment)
+                                          .forTable(TBL_HASHED_ID)
+                                          .makePermanent()
+                                          .withPriority(nextObjective.priority())
+                                          .forDevice(deviceId)
+                                          .fromApp(nextObjective.appId())
+                                          .build());
     }
 
     private TrafficSelector buildNextIdSelector(int nextId) {
-        byte[] nextIdVal = new byte[]{(byte) nextId};
-        PiCriterion nextIdCrriterion = PiCriterion.builder()
-                .matchExact(FabricConstants.HF_FABRIC_METADATA_NEXT_ID_ID, nextIdVal)
+        PiCriterion nextIdCriterion = PiCriterion.builder()
+                .matchExact(HF_FABRIC_METADATA_NEXT_ID_ID, nextId)
                 .build();
         return DefaultTrafficSelector.builder()
-                .matchPi(nextIdCrriterion)
+                .matchPi(nextIdCriterion)
                 .build();
     }
 
     private TrafficTreatment buildSetNextTypeTreatment(NextObjective.Type nextType) {
-        byte nextTypeVal = NEXT_TYPE_MAP.getOrDefault(nextType, PUNT);
-        PiActionParam nextTypeParam = new PiActionParam(FabricConstants.ACT_PRM_NEXT_TYPE_ID,
-                                                        ImmutableByteSequence.copyFrom(nextTypeVal));
+        byte nextTypeVal = NEXT_TYPE_MAP.getOrDefault(nextType, NXT_TYPE_PUNT);
+        PiActionParam nextTypeParam = new PiActionParam(ACT_PRM_NEXT_TYPE_ID,
+                                                        copyFrom(nextTypeVal));
         PiAction nextTypeAction = PiAction.builder()
-                .withId(FabricConstants.ACT_SET_NEXT_TYPE_ID)
+                .withId(ACT_SET_NEXT_TYPE_ID)
                 .withParameter(nextTypeParam)
                 .build();
         return DefaultTrafficTreatment.builder()
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
index e2d8048..3fceb5d 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
@@ -17,6 +17,7 @@
 package org.onosproject.pipelines.fabric.pipeliner;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.net.DeviceId;
@@ -37,6 +38,7 @@
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.Group;
 import org.onosproject.net.group.GroupDescription;
 import org.onosproject.net.group.GroupEvent;
 import org.onosproject.net.group.GroupListener;
@@ -46,12 +48,14 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -67,7 +71,14 @@
             .build("FabricPipeliner");
 
     // TODO: make this configurable
-    private static final long DEFAULT_INSTALLATION_TIME_OUT = 10;
+    private static final long DEFAULT_INSTALLATION_TIME_OUT = 40;
+    private static final Map<Objective.Operation, GroupEvent.Type> OBJ_OP_TO_GRP_EVENT_TYPE =
+            ImmutableMap.<Objective.Operation, GroupEvent.Type>builder()
+                    .put(Objective.Operation.ADD, GroupEvent.Type.GROUP_ADDED)
+                    .put(Objective.Operation.ADD_TO_EXISTING, GroupEvent.Type.GROUP_UPDATED)
+                    .put(Objective.Operation.REMOVE, GroupEvent.Type.GROUP_REMOVED)
+                    .put(Objective.Operation.REMOVE_FROM_EXISTING, GroupEvent.Type.GROUP_UPDATED)
+            .build();
 
     protected DeviceId deviceId;
     protected FlowRuleService flowRuleService;
@@ -132,6 +143,13 @@
             return;
         }
 
+        if (nextObjective.op() == Objective.Operation.VERIFY) {
+            // TODO: support VERIFY operation
+            log.debug("Currently we don't support VERIFY operation, return success directly to the context");
+            success(nextObjective);
+            return;
+        }
+
         applyTranslationResult(nextObjective, result, success -> {
             if (!success) {
                 fail(nextObjective, ObjectiveError.GROUPINSTALLATIONFAILED);
@@ -161,7 +179,13 @@
 
     @Override
     public List<String> getNextMappings(NextGroup nextGroup) {
-        return null;
+        FabricNextGroup fabricNextGroup = KRYO.deserialize(nextGroup.data());
+        NextObjective.Type type = fabricNextGroup.type();
+        Collection<PortNumber> outputPorts = fabricNextGroup.outputPorts();
+
+        return outputPorts.stream()
+                .map(port -> String.format("%s -> %s", type, port))
+                .collect(Collectors.toList());
     }
 
     private void applyTranslationResult(Objective objective,
@@ -204,7 +228,7 @@
         try {
             return flowInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.warn("Got exception while installing groups: {}", e);
+            log.warn("Got exception while installing flows:{}", e.getMessage());
             return false;
         }
     }
@@ -213,12 +237,18 @@
         if (groups.isEmpty()) {
             return true;
         }
+        Collection<Integer> groupIds = groups.stream()
+                .map(GroupDescription::givenGroupId)
+                .collect(Collectors.toSet());
+
         int numGroupsToBeInstalled = groups.size();
         CompletableFuture<Boolean> groupInstallFuture = new CompletableFuture<>();
         AtomicInteger numGroupsInstalled = new AtomicInteger(0);
+
         GroupListener listener = new GroupListener() {
             @Override
             public void event(GroupEvent event) {
+                log.debug("Receive group event for group {}", event.subject());
                 int currentNumGroupInstalled = numGroupsInstalled.incrementAndGet();
                 if (currentNumGroupInstalled == numGroupsToBeInstalled) {
                     // install completed
@@ -228,9 +258,11 @@
             }
             @Override
             public boolean isRelevant(GroupEvent event) {
-                return groups.contains(event.subject());
+                Group group = event.subject();
+                return groupIds.contains(group.givenGroupId());
             }
         };
+
         groupService.addListener(listener);
 
         switch (objective.op()) {
@@ -240,6 +272,22 @@
             case REMOVE:
                 groups.forEach(group -> groupService.removeGroup(deviceId, group.appCookie(), objective.appId()));
                 break;
+            case ADD_TO_EXISTING:
+                groups.forEach(group -> {
+                    groupService.addBucketsToGroup(deviceId, group.appCookie(),
+                                                   group.buckets(),
+                                                   group.appCookie(),
+                                                   group.appId());
+                });
+                break;
+            case REMOVE_FROM_EXISTING:
+                groups.forEach(group -> {
+                    groupService.removeBucketsFromGroup(deviceId, group.appCookie(),
+                                                        group.buckets(),
+                                                        group.appCookie(),
+                                                        group.appId());
+                });
+                break;
             default:
                 log.warn("Unsupported objective operation {}", objective.op());
                 groupService.removeListener(listener);
@@ -247,7 +295,8 @@
         try {
             return groupInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.warn("Got exception while installing groups: {}", e);
+            groupService.removeListener(listener);
+            log.warn("Got exception while installing groups: {}", e.getMessage());
             return false;
         }
     }
@@ -300,4 +349,5 @@
             return KRYO.serialize(this);
         }
     }
+
 }
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/ForwardingFunctionType.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/ForwardingFunctionType.java
index 8d8de54..3cf015d 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/ForwardingFunctionType.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/ForwardingFunctionType.java
@@ -65,11 +65,16 @@
     IPV6_MULTICAST,
 
     /**
-     * MPLS, with EtherType, MPLS label and MPLS BOS criterion.
+     * MPLS, with EtherType, MPLS label and MPLS BOS(true) criterion.
      */
     MPLS,
 
     /**
+     * Pseudo-wire, with EtherType, MPLS label and MPLS BOS(false) criterion.
+     */
+    PW,
+
+    /**
      * Unsupported type.
      */
     UNSUPPORTED;