[ONOS-7598] Fix P4 fabric pipeliner performance issue
Change-Id: I2dc175a2e0036ac3b1af41873b29714db15900e3
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
index e2d8048..4460111 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
@@ -16,15 +16,22 @@
package org.onosproject.pipelines.fabric.pipeliner;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.onlab.util.KryoNamespace;
+import org.onosproject.core.GroupId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.behaviour.NextGroup;
import org.onosproject.net.behaviour.Pipeliner;
import org.onosproject.net.behaviour.PipelinerContext;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleOperationsContext;
@@ -46,12 +53,12 @@
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
@@ -66,23 +73,43 @@
.register(FabricNextGroup.class)
.build("FabricPipeliner");
+ private static final Set<GroupEvent.Type> GROUP_FAILED_TYPES =
+ Sets.newHashSet(GroupEvent.Type.GROUP_ADD_FAILED,
+ GroupEvent.Type.GROUP_REMOVE_FAILED,
+ GroupEvent.Type.GROUP_UPDATE_FAILED);
+
// TODO: make this configurable
private static final long DEFAULT_INSTALLATION_TIME_OUT = 10;
protected DeviceId deviceId;
protected FlowRuleService flowRuleService;
protected GroupService groupService;
+ protected GroupListener groupListener = new InternalGroupListener();
protected FlowObjectiveStore flowObjectiveStore;
protected FabricFilteringPipeliner pipelinerFilter;
protected FabricForwardingPipeliner pipelinerForward;
protected FabricNextPipeliner pipelinerNext;
+ private Map<FlowId, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
+ private Map<GroupId, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
+ private Cache<Objective, PendingInstallObjective> pendingInstallObjectives = CacheBuilder.newBuilder()
+ .expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS)
+ .removalListener((RemovalListener<Objective, PendingInstallObjective>) removalNotification -> {
+ RemovalCause cause = removalNotification.getCause();
+ PendingInstallObjective pio = removalNotification.getValue();
+ if (cause == RemovalCause.EXPIRED && pio != null) {
+ pio.failed(ObjectiveError.INSTALLATIONTIMEOUT);
+ }
+ })
+ .build();
+
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
this.deviceId = deviceId;
this.flowRuleService = context.directory().get(FlowRuleService.class);
this.groupService = context.directory().get(GroupService.class);
+ this.groupService.addListener(groupListener);
this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
this.pipelinerFilter = new FabricFilteringPipeliner(deviceId);
this.pipelinerForward = new FabricForwardingPipeliner(deviceId);
@@ -169,69 +196,65 @@
Consumer<Boolean> callback) {
Collection<GroupDescription> groups = result.groups();
Collection<FlowRule> flowRules = result.flowRules();
- CompletableFuture.supplyAsync(() -> installGroups(objective, groups))
- .thenApplyAsync(groupSuccess -> groupSuccess && installFlows(objective, flowRules))
- .thenAcceptAsync(callback)
- .exceptionally((ex) -> {
- log.warn("Got unexpected exception while applying translation result {}",
- result);
- fail(objective, ObjectiveError.UNKNOWN);
- return null;
- });
+
+ Set<FlowId> flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet());
+ Set<GroupId> groupIds = groups.stream().map(GroupDescription::givenGroupId)
+ .map(GroupId::new).collect(Collectors.toSet());
+
+ PendingInstallObjective pio =
+ new PendingInstallObjective(objective, flowIds, groupIds, callback);
+
+ flowIds.forEach(flowId -> {
+ pendingInstallObjectiveFlows.put(flowId, pio);
+ });
+
+ groupIds.forEach(groupId -> {
+ pendingInstallObjectiveGroups.put(groupId, pio);
+ });
+
+ pendingInstallObjectives.put(objective, pio);
+ installGroups(objective, groups);
+ installFlows(objective, flowRules);
}
- private boolean installFlows(Objective objective, Collection<FlowRule> flowRules) {
+ private void installFlows(Objective objective, Collection<FlowRule> flowRules) {
if (flowRules.isEmpty()) {
- return true;
+ return;
}
- CompletableFuture<Boolean> flowInstallFuture = new CompletableFuture<>();
+
FlowRuleOperationsContext ctx = new FlowRuleOperationsContext() {
@Override
public void onSuccess(FlowRuleOperations ops) {
- flowInstallFuture.complete(true);
+ ops.stages().forEach(stage -> {
+ stage.forEach(op -> {
+ FlowId flowId = op.rule().id();
+ PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(flowId);
+
+ if (pio != null) {
+ pio.flowInstalled(flowId);
+ }
+ });
+ });
}
@Override
public void onError(FlowRuleOperations ops) {
log.warn("Failed to install flow rules: {}", flowRules);
- flowInstallFuture.complete(false);
+ PendingInstallObjective pio = pendingInstallObjectives.getIfPresent(objective);
+ if (pio != null) {
+ pio.failed(ObjectiveError.FLOWINSTALLATIONFAILED);
+ }
}
};
FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules, ctx);
flowRuleService.apply(ops);
-
- try {
- return flowInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- log.warn("Got exception while installing groups: {}", e);
- return false;
- }
}
- private boolean installGroups(Objective objective, Collection<GroupDescription> groups) {
+ private void installGroups(Objective objective, Collection<GroupDescription> groups) {
if (groups.isEmpty()) {
- return true;
+ return;
}
- int numGroupsToBeInstalled = groups.size();
- CompletableFuture<Boolean> groupInstallFuture = new CompletableFuture<>();
- AtomicInteger numGroupsInstalled = new AtomicInteger(0);
- GroupListener listener = new GroupListener() {
- @Override
- public void event(GroupEvent event) {
- int currentNumGroupInstalled = numGroupsInstalled.incrementAndGet();
- if (currentNumGroupInstalled == numGroupsToBeInstalled) {
- // install completed
- groupService.removeListener(this);
- groupInstallFuture.complete(true);
- }
- }
- @Override
- public boolean isRelevant(GroupEvent event) {
- return groups.contains(event.subject());
- }
- };
- groupService.addListener(listener);
switch (objective.op()) {
case ADD:
@@ -242,13 +265,6 @@
break;
default:
log.warn("Unsupported objective operation {}", objective.op());
- groupService.removeListener(listener);
- }
- try {
- return groupInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- log.warn("Got exception while installing groups: {}", e);
- return false;
}
}
@@ -300,4 +316,63 @@
return KRYO.serialize(this);
}
}
+
+ class InternalGroupListener implements GroupListener {
+ @Override
+ public void event(GroupEvent event) {
+ GroupId groupId = event.subject().id();
+ PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(groupId);
+ if (pio == null) {
+ return;
+ }
+ if (GROUP_FAILED_TYPES.contains(event.type())) {
+ pio.failed(ObjectiveError.GROUPINSTALLATIONFAILED);
+ }
+ pio.groupInstalled(groupId);
+ }
+
+ @Override
+ public boolean isRelevant(GroupEvent event) {
+ return pendingInstallObjectiveGroups.containsKey(event.subject().id());
+ }
+ }
+
+ class PendingInstallObjective {
+ Objective objective;
+ Collection<FlowId> flowIds;
+ Collection<GroupId> groupIds;
+ Consumer<Boolean> callback;
+
+ public PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
+ Collection<GroupId> groupIds, Consumer<Boolean> callback) {
+ this.objective = objective;
+ this.flowIds = flowIds;
+ this.groupIds = groupIds;
+ this.callback = callback;
+ }
+
+ void flowInstalled(FlowId flowId) {
+ flowIds.remove(flowId);
+ checkIfFinished();
+ }
+
+ void groupInstalled(GroupId groupId) {
+ groupIds.remove(groupId);
+ checkIfFinished();
+ }
+
+ private void checkIfFinished() {
+ if (flowIds.isEmpty() && groupIds.isEmpty()) {
+ pendingInstallObjectives.invalidate(objective);
+ callback.accept(true);
+ }
+ }
+
+ void failed(ObjectiveError error) {
+ flowIds.forEach(pendingInstallObjectiveFlows::remove);
+ groupIds.forEach(pendingInstallObjectiveGroups::remove);
+ pendingInstallObjectives.invalidate(objective);
+ fail(objective, error);
+ }
+ }
}
diff --git a/pipelines/fabric/src/test/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipelinerTest.java b/pipelines/fabric/src/test/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipelinerTest.java
index 30610ed..5cee6ad 100644
--- a/pipelines/fabric/src/test/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipelinerTest.java
+++ b/pipelines/fabric/src/test/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipelinerTest.java
@@ -17,6 +17,7 @@
package org.onosproject.pipelines.fabric.pipeliner;
import org.junit.Before;
+import org.onlab.junit.TestUtils;
import org.onlab.osgi.ServiceDirectory;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
@@ -27,7 +28,10 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.behaviour.PipelinerContext;
+import org.onosproject.net.driver.Driver;
+import org.onosproject.net.driver.DriverHandler;
import org.onosproject.net.flow.criteria.PiCriterion;
+import org.onosproject.net.group.GroupService;
import org.onosproject.pipelines.fabric.FabricConstants;
import static org.easymock.EasyMock.createNiceMock;
@@ -76,10 +80,18 @@
public void setup() {
pipeliner = new FabricPipeliner();
+ GroupService mockGroupService = createNiceMock(GroupService.class);
ServiceDirectory serviceDirectory = createNiceMock(ServiceDirectory.class);
PipelinerContext pipelinerContext = createNiceMock(PipelinerContext.class);
+ DriverHandler driverHandler = createNiceMock(DriverHandler.class);
+ Driver mockDriver = createNiceMock(Driver.class);
+ expect(mockDriver.getProperty("supportTableCounters")).andReturn("true").anyTimes();
+ expect(mockDriver.getProperty("noHashedTable")).andReturn("false").anyTimes();
+ expect(driverHandler.driver()).andReturn(mockDriver).anyTimes();
expect(pipelinerContext.directory()).andReturn(serviceDirectory).anyTimes();
- replay(serviceDirectory, pipelinerContext);
+ expect(serviceDirectory.get(GroupService.class)).andReturn(mockGroupService).anyTimes();
+ replay(serviceDirectory, pipelinerContext, driverHandler, mockDriver);
+ TestUtils.setField(pipeliner, "handler", driverHandler);
pipeliner.init(DEVICE_ID, pipelinerContext);
}