[ONOS-7242] Support MPLS by fabric.p4 pipeliner

Change-Id: I56a8f266e6d0afe5ad6737b8d0e399758fb75378
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
index e2d8048..3fceb5d 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
@@ -17,6 +17,7 @@
 package org.onosproject.pipelines.fabric.pipeliner;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.net.DeviceId;
@@ -37,6 +38,7 @@
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.Group;
 import org.onosproject.net.group.GroupDescription;
 import org.onosproject.net.group.GroupEvent;
 import org.onosproject.net.group.GroupListener;
@@ -46,12 +48,14 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -67,7 +71,14 @@
             .build("FabricPipeliner");
 
     // TODO: make this configurable
-    private static final long DEFAULT_INSTALLATION_TIME_OUT = 10;
+    private static final long DEFAULT_INSTALLATION_TIME_OUT = 40;
+    private static final Map<Objective.Operation, GroupEvent.Type> OBJ_OP_TO_GRP_EVENT_TYPE =
+            ImmutableMap.<Objective.Operation, GroupEvent.Type>builder()
+                    .put(Objective.Operation.ADD, GroupEvent.Type.GROUP_ADDED)
+                    .put(Objective.Operation.ADD_TO_EXISTING, GroupEvent.Type.GROUP_UPDATED)
+                    .put(Objective.Operation.REMOVE, GroupEvent.Type.GROUP_REMOVED)
+                    .put(Objective.Operation.REMOVE_FROM_EXISTING, GroupEvent.Type.GROUP_UPDATED)
+            .build();
 
     protected DeviceId deviceId;
     protected FlowRuleService flowRuleService;
@@ -132,6 +143,13 @@
             return;
         }
 
+        if (nextObjective.op() == Objective.Operation.VERIFY) {
+            // TODO: support VERIFY operation
+            log.debug("Currently we don't support VERIFY operation, return success directly to the context");
+            success(nextObjective);
+            return;
+        }
+
         applyTranslationResult(nextObjective, result, success -> {
             if (!success) {
                 fail(nextObjective, ObjectiveError.GROUPINSTALLATIONFAILED);
@@ -161,7 +179,13 @@
 
     @Override
     public List<String> getNextMappings(NextGroup nextGroup) {
-        return null;
+        FabricNextGroup fabricNextGroup = KRYO.deserialize(nextGroup.data());
+        NextObjective.Type type = fabricNextGroup.type();
+        Collection<PortNumber> outputPorts = fabricNextGroup.outputPorts();
+
+        return outputPorts.stream()
+                .map(port -> String.format("%s -> %s", type, port))
+                .collect(Collectors.toList());
     }
 
     private void applyTranslationResult(Objective objective,
@@ -204,7 +228,7 @@
         try {
             return flowInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.warn("Got exception while installing groups: {}", e);
+            log.warn("Got exception while installing flows:{}", e.getMessage());
             return false;
         }
     }
@@ -213,12 +237,18 @@
         if (groups.isEmpty()) {
             return true;
         }
+        Collection<Integer> groupIds = groups.stream()
+                .map(GroupDescription::givenGroupId)
+                .collect(Collectors.toSet());
+
         int numGroupsToBeInstalled = groups.size();
         CompletableFuture<Boolean> groupInstallFuture = new CompletableFuture<>();
         AtomicInteger numGroupsInstalled = new AtomicInteger(0);
+
         GroupListener listener = new GroupListener() {
             @Override
             public void event(GroupEvent event) {
+                log.debug("Receive group event for group {}", event.subject());
                 int currentNumGroupInstalled = numGroupsInstalled.incrementAndGet();
                 if (currentNumGroupInstalled == numGroupsToBeInstalled) {
                     // install completed
@@ -228,9 +258,11 @@
             }
             @Override
             public boolean isRelevant(GroupEvent event) {
-                return groups.contains(event.subject());
+                Group group = event.subject();
+                return groupIds.contains(group.givenGroupId());
             }
         };
+
         groupService.addListener(listener);
 
         switch (objective.op()) {
@@ -240,6 +272,22 @@
             case REMOVE:
                 groups.forEach(group -> groupService.removeGroup(deviceId, group.appCookie(), objective.appId()));
                 break;
+            case ADD_TO_EXISTING:
+                groups.forEach(group -> {
+                    groupService.addBucketsToGroup(deviceId, group.appCookie(),
+                                                   group.buckets(),
+                                                   group.appCookie(),
+                                                   group.appId());
+                });
+                break;
+            case REMOVE_FROM_EXISTING:
+                groups.forEach(group -> {
+                    groupService.removeBucketsFromGroup(deviceId, group.appCookie(),
+                                                        group.buckets(),
+                                                        group.appCookie(),
+                                                        group.appId());
+                });
+                break;
             default:
                 log.warn("Unsupported objective operation {}", objective.op());
                 groupService.removeListener(listener);
@@ -247,7 +295,8 @@
         try {
             return groupInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.warn("Got exception while installing groups: {}", e);
+            groupService.removeListener(listener);
+            log.warn("Got exception while installing groups: {}", e.getMessage());
             return false;
         }
     }
@@ -300,4 +349,5 @@
             return KRYO.serialize(this);
         }
     }
+
 }