Fix timeout problem of fabric pipeliner
Including a workaround for ONOS-7785
Change-Id: I867161f5edf63e82c42a731a4b107ea326d4675c
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index bd5c8c4..ed2a678 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -472,6 +472,11 @@
"staleLinkAge", "15000");
compCfgService.preSetProperty("org.onosproject.net.host.impl.HostManager",
"allowDuplicateIps", "false");
+ // For P4 switches
+ compCfgService.preSetProperty("org.onosproject.net.flow.impl.FlowRuleManager",
+ "fallbackFlowPollFrequency", "5");
+ compCfgService.preSetProperty("org.onosproject.net.group.impl.GroupManager",
+ "fallbackGroupPollFrequency", "5");
compCfgService.registerProperties(getClass());
modified(context);
diff --git a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
index 15c1a0d..0dd5f21 100644
--- a/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
+++ b/drivers/p4runtime/src/main/java/org/onosproject/drivers/p4runtime/P4RuntimeGroupProgrammable.java
@@ -19,12 +19,12 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Striped;
-import org.apache.commons.lang3.tuple.Pair;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeGroupMirror;
import org.onosproject.drivers.p4runtime.mirror.P4RuntimeMulticastGroupMirror;
import org.onosproject.drivers.p4runtime.mirror.TimedEntry;
import org.onosproject.net.DeviceId;
import org.onosproject.net.group.DefaultGroup;
+import org.onosproject.net.group.DefaultGroupDescription;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupOperation;
@@ -116,17 +116,23 @@
if (!setupBehaviour()) {
return;
}
- groupOps.operations().stream()
- // Get group type and operation type
- .map(op -> Pair.of(groupStore.getGroup(deviceId, op.groupId()),
- op.opType()))
- .forEach(pair -> {
- if (pair.getLeft().type().equals(GroupDescription.Type.ALL)) {
- processMcGroupOp(deviceId, pair.getLeft(), pair.getRight());
- } else {
- processGroupOp(deviceId, pair.getLeft(), pair.getRight());
- }
- });
+ groupOps.operations().forEach(op -> {
+ // ONOS-7785 We need app cookie (action profile id) from the group
+ Group groupOnStore = groupStore.getGroup(deviceId, op.groupId());
+ GroupDescription groupDesc = new DefaultGroupDescription(deviceId,
+ op.groupType(),
+ op.buckets(),
+ groupOnStore.appCookie(),
+ op.groupId().id(),
+ groupOnStore.appId());
+ DefaultGroup groupToApply = new DefaultGroup(op.groupId(), groupDesc);
+ if (op.groupType().equals(GroupDescription.Type.ALL)) {
+ processMcGroupOp(deviceId, groupToApply, op.opType());
+ } else {
+
+ processGroupOp(deviceId, groupToApply, op.opType());
+ }
+ });
}
@Override
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
index 363a95c..e344404 100644
--- a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
@@ -16,6 +16,8 @@
package org.onosproject.pipelines.fabric.pipeliner;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
@@ -24,6 +26,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
import org.onosproject.core.GroupId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
@@ -56,7 +59,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -81,6 +87,7 @@
// TODO: make this configurable
private static final long DEFAULT_INSTALLATION_TIME_OUT = 40;
+ private static final int NUM_CALLBACK_THREAD = 2;
protected DeviceId deviceId;
protected FlowRuleService flowRuleService;
@@ -92,7 +99,7 @@
protected FabricNextPipeliner pipelinerNext;
private Map<FlowId, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
- private Map<GroupId, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
+ private Map<PendingGroupKey, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
private Cache<Objective, PendingInstallObjective> pendingInstallObjectives = CacheBuilder.newBuilder()
.expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS)
.removalListener((RemovalListener<Objective, PendingInstallObjective>) removalNotification -> {
@@ -103,6 +110,8 @@
}
})
.build();
+ private static ExecutorService flowObjCallbackExecutor =
+ Executors.newFixedThreadPool(NUM_CALLBACK_THREAD, Tools.groupedThreads("fabric-pipeliner", "cb-", log));
@Override
@@ -126,8 +135,8 @@
return;
}
- applyTranslationResult(filterObjective, result, success -> {
- if (success) {
+ applyTranslationResult(filterObjective, result, error -> {
+ if (error == null) {
success(filterObjective);
} else {
fail(filterObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
@@ -143,8 +152,8 @@
return;
}
- applyTranslationResult(forwardObjective, result, success -> {
- if (success) {
+ applyTranslationResult(forwardObjective, result, error -> {
+ if (error == null) {
success(forwardObjective);
} else {
fail(forwardObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
@@ -168,25 +177,22 @@
return;
}
- applyTranslationResult(nextObjective, result, success -> {
- if (!success) {
- fail(nextObjective, ObjectiveError.GROUPINSTALLATIONFAILED);
+ applyTranslationResult(nextObjective, result, error -> {
+ if (error != null) {
+ fail(nextObjective, error);
return;
}
// Success, put next group to objective store
List<PortNumber> portNumbers = Lists.newArrayList();
nextObjective.next().forEach(treatment -> {
- Instructions.OutputInstruction outputInst = treatment.allInstructions()
+ treatment.allInstructions()
.stream()
.filter(inst -> inst.type() == Instruction.Type.OUTPUT)
.map(inst -> (Instructions.OutputInstruction) inst)
.findFirst()
- .orElse(null);
-
- if (outputInst != null) {
- portNumbers.add(outputInst.port());
- }
+ .map(Instructions.OutputInstruction::port)
+ .ifPresent(portNumbers::add);
});
FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
portNumbers);
@@ -208,23 +214,25 @@
private void applyTranslationResult(Objective objective,
PipelinerTranslationResult result,
- Consumer<Boolean> callback) {
+ Consumer<ObjectiveError> callback) {
Collection<GroupDescription> groups = result.groups();
Collection<FlowRule> flowRules = result.flowRules();
Set<FlowId> flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet());
- Set<GroupId> groupIds = groups.stream().map(GroupDescription::givenGroupId)
- .map(GroupId::new).collect(Collectors.toSet());
+ Set<PendingGroupKey> pendingGroupKeys = groups.stream().map(GroupDescription::givenGroupId)
+ .map(GroupId::new)
+ .map(gid -> new PendingGroupKey(gid, objective.op()))
+ .collect(Collectors.toSet());
PendingInstallObjective pio =
- new PendingInstallObjective(objective, flowIds, groupIds, callback);
+ new PendingInstallObjective(objective, flowIds, pendingGroupKeys, callback);
flowIds.forEach(flowId -> {
pendingInstallObjectiveFlows.put(flowId, pio);
});
- groupIds.forEach(groupId -> {
- pendingInstallObjectiveGroups.put(groupId, pio);
+ pendingGroupKeys.forEach(pendingGroupKey -> {
+ pendingInstallObjectiveGroups.put(pendingGroupKey, pio);
});
pendingInstallObjectives.put(objective, pio);
@@ -305,11 +313,16 @@
}
static void fail(Objective objective, ObjectiveError error) {
- objective.context().ifPresent(ctx -> ctx.onError(objective, error));
+ CompletableFuture.runAsync(() -> {
+ objective.context().ifPresent(ctx -> ctx.onError(objective, error));
+ }, flowObjCallbackExecutor);
+
}
static void success(Objective objective) {
- objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
+ CompletableFuture.runAsync(() -> {
+ objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
+ }, flowObjCallbackExecutor);
}
static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules,
@@ -317,16 +330,13 @@
FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
switch (objective.op()) {
case ADD:
+ case ADD_TO_EXISTING: // For egress VLAN
flowRules.forEach(ops::add);
break;
case REMOVE:
+ case REMOVE_FROM_EXISTING: // For egress VLAN
flowRules.forEach(ops::remove);
break;
- case ADD_TO_EXISTING:
- case REMOVE_FROM_EXISTING:
- // Next objective may use ADD_TO_EXIST or REMOVE_FROM_EXIST op
- // No need to update FlowRuls for vlan_meta table.
- return null;
default:
log.warn("Unsupported op {} for {}", objective.op(), objective);
fail(objective, ObjectiveError.BADPARAMS);
@@ -362,33 +372,33 @@
@Override
public void event(GroupEvent event) {
GroupId groupId = event.subject().id();
- PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(groupId);
- if (pio == null) {
- return;
- }
+ PendingGroupKey pendingGroupKey = new PendingGroupKey(groupId, event.type());
+ PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(pendingGroupKey);
if (GROUP_FAILED_TYPES.contains(event.type())) {
pio.failed(ObjectiveError.GROUPINSTALLATIONFAILED);
}
- pio.groupInstalled(groupId);
+ pio.groupInstalled(pendingGroupKey);
}
@Override
public boolean isRelevant(GroupEvent event) {
- return pendingInstallObjectiveGroups.containsKey(event.subject().id());
+ PendingGroupKey pendingGroupKey = new PendingGroupKey(event.subject().id(), event.type());
+ return pendingInstallObjectiveGroups.containsKey(pendingGroupKey);
}
}
class PendingInstallObjective {
Objective objective;
Collection<FlowId> flowIds;
- Collection<GroupId> groupIds;
- Consumer<Boolean> callback;
+ Collection<PendingGroupKey> pendingGroupKeys;
+ Consumer<ObjectiveError> callback;
public PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
- Collection<GroupId> groupIds, Consumer<Boolean> callback) {
+ Collection<PendingGroupKey> pendingGroupKeys,
+ Consumer<ObjectiveError> callback) {
this.objective = objective;
this.flowIds = flowIds;
- this.groupIds = groupIds;
+ this.pendingGroupKeys = pendingGroupKeys;
this.callback = callback;
}
@@ -397,23 +407,79 @@
checkIfFinished();
}
- void groupInstalled(GroupId groupId) {
- groupIds.remove(groupId);
+ void groupInstalled(PendingGroupKey pendingGroupKey) {
+ pendingGroupKeys.remove(pendingGroupKey);
checkIfFinished();
}
private void checkIfFinished() {
- if (flowIds.isEmpty() && groupIds.isEmpty()) {
+ if (flowIds.isEmpty() && pendingGroupKeys.isEmpty()) {
pendingInstallObjectives.invalidate(objective);
- callback.accept(true);
+ callback.accept(null);
}
}
void failed(ObjectiveError error) {
flowIds.forEach(pendingInstallObjectiveFlows::remove);
- groupIds.forEach(pendingInstallObjectiveGroups::remove);
+ pendingGroupKeys.forEach(pendingInstallObjectiveGroups::remove);
pendingInstallObjectives.invalidate(objective);
- fail(objective, error);
+ callback.accept(error);
+ }
+ }
+
+ class PendingGroupKey {
+ private GroupId groupId;
+ private GroupEvent.Type expectedEventType;
+
+ PendingGroupKey(GroupId groupId, GroupEvent.Type expectedEventType) {
+ this.groupId = groupId;
+ this.expectedEventType = expectedEventType;
+ }
+
+ PendingGroupKey(GroupId groupId, NextObjective.Operation objOp) {
+ this.groupId = groupId;
+
+ switch (objOp) {
+ case ADD:
+ expectedEventType = GroupEvent.Type.GROUP_ADDED;
+ break;
+ case REMOVE:
+ expectedEventType = GroupEvent.Type.GROUP_REMOVED;
+ break;
+ case MODIFY:
+ case ADD_TO_EXISTING:
+ case REMOVE_FROM_EXISTING:
+ expectedEventType = GroupEvent.Type.GROUP_UPDATED;
+ break;
+ default:
+ expectedEventType = null;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PendingGroupKey pendingGroupKey = (PendingGroupKey) o;
+ return Objects.equal(groupId, pendingGroupKey.groupId) &&
+ expectedEventType == pendingGroupKey.expectedEventType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(groupId, expectedEventType);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("groupId", groupId)
+ .add("expectedEventType", expectedEventType)
+ .toString();
}
}
}