[ONOS-7598] Fix P4 fabric pipeliner performance issue
Change-Id: I2dc175a2e0036ac3b1af41873b29714db15900e3
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
index 93f53db..5c66474 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
@@ -16,10 +16,15 @@
package org.onosproject.pipelines.fabric.pipeliner;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.onlab.util.KryoNamespace;
+import org.onosproject.core.GroupId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.behaviour.NextGroup;
@@ -27,6 +32,7 @@
import org.onosproject.net.behaviour.PipelinerContext;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
import org.onosproject.net.driver.Driver;
+import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleOperationsContext;
@@ -39,7 +45,6 @@
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupEvent;
import org.onosproject.net.group.GroupListener;
@@ -50,11 +55,9 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -71,24 +74,36 @@
.register(FabricNextGroup.class)
.build("FabricPipeliner");
+ private static final Set<GroupEvent.Type> GROUP_FAILED_TYPES =
+ Sets.newHashSet(GroupEvent.Type.GROUP_ADD_FAILED,
+ GroupEvent.Type.GROUP_REMOVE_FAILED,
+ GroupEvent.Type.GROUP_UPDATE_FAILED);
+
// TODO: make this configurable
private static final long DEFAULT_INSTALLATION_TIME_OUT = 40;
- private static final Map<Objective.Operation, GroupEvent.Type> OBJ_OP_TO_GRP_EVENT_TYPE =
- ImmutableMap.<Objective.Operation, GroupEvent.Type>builder()
- .put(Objective.Operation.ADD, GroupEvent.Type.GROUP_ADDED)
- .put(Objective.Operation.ADD_TO_EXISTING, GroupEvent.Type.GROUP_UPDATED)
- .put(Objective.Operation.REMOVE, GroupEvent.Type.GROUP_REMOVED)
- .put(Objective.Operation.REMOVE_FROM_EXISTING, GroupEvent.Type.GROUP_UPDATED)
- .build();
protected DeviceId deviceId;
protected FlowRuleService flowRuleService;
protected GroupService groupService;
+ protected GroupListener groupListener = new InternalGroupListener();
protected FlowObjectiveStore flowObjectiveStore;
protected FabricFilteringPipeliner pipelinerFilter;
protected FabricForwardingPipeliner pipelinerForward;
protected FabricNextPipeliner pipelinerNext;
+ private Map<FlowId, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
+ private Map<GroupId, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
+ private Cache<Objective, PendingInstallObjective> pendingInstallObjectives = CacheBuilder.newBuilder()
+ .expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS)
+ .removalListener((RemovalListener<Objective, PendingInstallObjective>) removalNotification -> {
+ RemovalCause cause = removalNotification.getCause();
+ PendingInstallObjective pio = removalNotification.getValue();
+ if (cause == RemovalCause.EXPIRED && pio != null) {
+ pio.failed(ObjectiveError.INSTALLATIONTIMEOUT);
+ }
+ })
+ .build();
+
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
@@ -96,6 +111,7 @@
this.deviceId = deviceId;
this.flowRuleService = context.directory().get(FlowRuleService.class);
this.groupService = context.directory().get(GroupService.class);
+ this.groupService.addListener(groupListener);
this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
this.pipelinerFilter = new FabricFilteringPipeliner(deviceId);
this.pipelinerForward = new FabricForwardingPipeliner(deviceId);
@@ -195,77 +211,65 @@
Consumer<Boolean> callback) {
Collection<GroupDescription> groups = result.groups();
Collection<FlowRule> flowRules = result.flowRules();
- CompletableFuture.supplyAsync(() -> installGroups(objective, groups))
- .thenApplyAsync(groupSuccess -> groupSuccess && installFlows(objective, flowRules))
- .thenAcceptAsync(callback)
- .exceptionally((ex) -> {
- log.warn("Got unexpected exception while applying translation result {}: {}",
- result, ex);
- fail(objective, ObjectiveError.UNKNOWN);
- return null;
- });
+
+ Set<FlowId> flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet());
+ Set<GroupId> groupIds = groups.stream().map(GroupDescription::givenGroupId)
+ .map(GroupId::new).collect(Collectors.toSet());
+
+ PendingInstallObjective pio =
+ new PendingInstallObjective(objective, flowIds, groupIds, callback);
+
+ flowIds.forEach(flowId -> {
+ pendingInstallObjectiveFlows.put(flowId, pio);
+ });
+
+ groupIds.forEach(groupId -> {
+ pendingInstallObjectiveGroups.put(groupId, pio);
+ });
+
+ pendingInstallObjectives.put(objective, pio);
+ installGroups(objective, groups);
+ installFlows(objective, flowRules);
}
- private boolean installFlows(Objective objective, Collection<FlowRule> flowRules) {
+ private void installFlows(Objective objective, Collection<FlowRule> flowRules) {
if (flowRules.isEmpty()) {
- return true;
+ return;
}
- CompletableFuture<Boolean> flowInstallFuture = new CompletableFuture<>();
+
FlowRuleOperationsContext ctx = new FlowRuleOperationsContext() {
@Override
public void onSuccess(FlowRuleOperations ops) {
- flowInstallFuture.complete(true);
+ ops.stages().forEach(stage -> {
+ stage.forEach(op -> {
+ FlowId flowId = op.rule().id();
+ PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(flowId);
+
+ if (pio != null) {
+ pio.flowInstalled(flowId);
+ }
+ });
+ });
}
@Override
public void onError(FlowRuleOperations ops) {
log.warn("Failed to install flow rules: {}", flowRules);
- flowInstallFuture.complete(false);
+ PendingInstallObjective pio = pendingInstallObjectives.getIfPresent(objective);
+ if (pio != null) {
+ pio.failed(ObjectiveError.FLOWINSTALLATIONFAILED);
+ }
}
};
FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules, ctx);
flowRuleService.apply(ops);
-
- try {
- return flowInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- log.warn("Got exception while installing flows:{}", e.getMessage());
- return false;
- }
}
- private boolean installGroups(Objective objective, Collection<GroupDescription> groups) {
+ private void installGroups(Objective objective, Collection<GroupDescription> groups) {
if (groups.isEmpty()) {
- return true;
+ return;
}
- Collection<Integer> groupIds = groups.stream()
- .map(GroupDescription::givenGroupId)
- .collect(Collectors.toSet());
-
- int numGroupsToBeInstalled = groups.size();
- CompletableFuture<Boolean> groupInstallFuture = new CompletableFuture<>();
- AtomicInteger numGroupsInstalled = new AtomicInteger(0);
-
- GroupListener listener = new GroupListener() {
- @Override
- public void event(GroupEvent event) {
- log.debug("Receive group event for group {}", event.subject());
- int currentNumGroupInstalled = numGroupsInstalled.incrementAndGet();
- if (currentNumGroupInstalled == numGroupsToBeInstalled) {
- // install completed
- groupService.removeListener(this);
- groupInstallFuture.complete(true);
- }
- }
- @Override
- public boolean isRelevant(GroupEvent event) {
- Group group = event.subject();
- return groupIds.contains(group.givenGroupId());
- }
- };
-
- groupService.addListener(listener);
switch (objective.op()) {
case ADD:
@@ -292,14 +296,6 @@
break;
default:
log.warn("Unsupported objective operation {}", objective.op());
- groupService.removeListener(listener);
- }
- try {
- return groupInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- groupService.removeListener(listener);
- log.warn("Got exception while installing groups: {}", e.getMessage());
- return false;
}
}
@@ -352,4 +348,62 @@
}
}
+ class InternalGroupListener implements GroupListener {
+ @Override
+ public void event(GroupEvent event) {
+ GroupId groupId = event.subject().id();
+ PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(groupId);
+ if (pio == null) {
+ return;
+ }
+ if (GROUP_FAILED_TYPES.contains(event.type())) {
+ pio.failed(ObjectiveError.GROUPINSTALLATIONFAILED);
+ }
+ pio.groupInstalled(groupId);
+ }
+
+ @Override
+ public boolean isRelevant(GroupEvent event) {
+ return pendingInstallObjectiveGroups.containsKey(event.subject().id());
+ }
+ }
+
+ class PendingInstallObjective {
+ Objective objective;
+ Collection<FlowId> flowIds;
+ Collection<GroupId> groupIds;
+ Consumer<Boolean> callback;
+
+ public PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
+ Collection<GroupId> groupIds, Consumer<Boolean> callback) {
+ this.objective = objective;
+ this.flowIds = flowIds;
+ this.groupIds = groupIds;
+ this.callback = callback;
+ }
+
+ void flowInstalled(FlowId flowId) {
+ flowIds.remove(flowId);
+ checkIfFinished();
+ }
+
+ void groupInstalled(GroupId groupId) {
+ groupIds.remove(groupId);
+ checkIfFinished();
+ }
+
+ private void checkIfFinished() {
+ if (flowIds.isEmpty() && groupIds.isEmpty()) {
+ pendingInstallObjectives.invalidate(objective);
+ callback.accept(true);
+ }
+ }
+
+ void failed(ObjectiveError error) {
+ flowIds.forEach(pendingInstallObjectiveFlows::remove);
+ groupIds.forEach(pendingInstallObjectiveGroups::remove);
+ pendingInstallObjectives.invalidate(objective);
+ fail(objective, error);
+ }
+ }
}
diff --git a/pipelines/fabric/src/test/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipelinerTest.java b/pipelines/fabric/src/test/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipelinerTest.java
index a7b6dce..5414998 100644
--- a/pipelines/fabric/src/test/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipelinerTest.java
+++ b/pipelines/fabric/src/test/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipelinerTest.java
@@ -31,6 +31,7 @@
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverHandler;
import org.onosproject.net.flow.criteria.PiCriterion;
+import org.onosproject.net.group.GroupService;
import org.onosproject.pipelines.fabric.FabricConstants;
import org.onosproject.pipelines.fabric.FabricInterpreter;
@@ -82,6 +83,7 @@
public void setup() {
pipeliner = new FabricPipeliner();
+ GroupService mockGroupService = createNiceMock(GroupService.class);
ServiceDirectory serviceDirectory = createNiceMock(ServiceDirectory.class);
PipelinerContext pipelinerContext = createNiceMock(PipelinerContext.class);
DriverHandler driverHandler = createNiceMock(DriverHandler.class);
@@ -90,6 +92,7 @@
expect(mockDriver.getProperty("noHashedTable")).andReturn("false").anyTimes();
expect(driverHandler.driver()).andReturn(mockDriver).anyTimes();
expect(pipelinerContext.directory()).andReturn(serviceDirectory).anyTimes();
+ expect(serviceDirectory.get(GroupService.class)).andReturn(mockGroupService).anyTimes();
replay(serviceDirectory, pipelinerContext, driverHandler, mockDriver);
TestUtils.setField(pipeliner, "handler", driverHandler);