Support multiple pending add buckets
Change-Id: Ia528a9b52ad9cb935b4a5d0bc41263baabbdb3d3
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java
index fe605e9..8637e0b 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java
@@ -43,8 +43,10 @@
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.flowobjective.DefaultNextObjective;
import org.onosproject.net.flowobjective.FlowObjectiveStore;
import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
@@ -140,9 +142,9 @@
// index number for group creation
private AtomicCounter nextIndex;
- // local store for pending bucketAdds - by design there can only be one
+ // local store for pending bucketAdds - by design there can be multiple
// pending bucket for a group
- protected ConcurrentHashMap<Integer, NextObjective> pendingBuckets =
+ protected ConcurrentHashMap<Integer, Set<NextObjective>> pendingBuckets =
new ConcurrentHashMap<>();
/**
@@ -1054,6 +1056,8 @@
Set<TrafficTreatment> duplicateBuckets = Sets.newHashSet();
List<Deque<GroupKey>> allActiveKeys = Ofdpa2Pipeline.appKryo.deserialize(next.data());
Set<PortNumber> existingPorts = getExistingOutputPorts(allActiveKeys);
+ Set<TrafficTreatment> nonDuplicateBuckets = Sets.newHashSet();
+ NextObjective objectiveToAdd;
nextObjective.next().forEach(trafficTreatment -> {
PortNumber portNumber = readOutPortFromTreatment(trafficTreatment);
@@ -1064,18 +1068,33 @@
if (existingPorts.contains(portNumber)) {
duplicateBuckets.add(trafficTreatment);
+ } else {
+ nonDuplicateBuckets.add(trafficTreatment);
}
});
if (!duplicateBuckets.isEmpty()) {
- log.warn("Some buckets {} already exists in next id {}, abort.",
+ log.debug("Some buckets {} already exists in next id {}, duplicate buckets will be ignored.",
duplicateBuckets, nextObjective.id());
+
+ // new next objective with new treatments
+ NextObjective.Builder builder = DefaultNextObjective.builder()
+ .withType(nextObjective.type())
+ .withId(nextObjective.id())
+ .withMeta(nextObjective.meta())
+ .fromApp(nextObjective.appId());
+ nonDuplicateBuckets.forEach(builder::addTreatment);
+
+ ObjectiveContext context = nextObjective.context().orElse(null);
+ objectiveToAdd = builder.addToExisting(context);
+ } else {
+ objectiveToAdd = nextObjective;
}
if (nextObjective.type() == NextObjective.Type.HASHED) {
- addBucketToHashGroup(nextObjective, allActiveKeys);
+ addBucketToHashGroup(objectiveToAdd, allActiveKeys);
} else if (nextObjective.type() == NextObjective.Type.BROADCAST) {
- addBucketToBroadcastGroup(nextObjective, allActiveKeys);
+ addBucketToBroadcastGroup(objectiveToAdd, allActiveKeys);
}
}
@@ -1703,12 +1722,14 @@
.givenGroupId()));
Ofdpa2Pipeline.pass(nextGrp.nextObjective());
flowObjectiveStore.putNextGroup(nextGrp.nextObjective().id(), nextGrp);
+
// check if addBuckets waiting for this completion
- NextObjective pendBkt = pendingBuckets
- .remove(nextGrp.nextObjective().id());
- if (pendBkt != null) {
- addBucketToGroup(pendBkt, nextGrp);
- }
+ pendingBuckets.compute(nextGrp.nextObjective().id(), (nextId, pendBkts) -> {
+ if (pendBkts != null) {
+ pendBkts.forEach(pendBkt -> addBucketToGroup(pendBkt, nextGrp));
+ }
+ return null;
+ });
});
}
}
@@ -1917,23 +1938,15 @@
* types.
*/
protected enum OfdpaMplsGroupSubType {
-
MPLS_INTF((short) 0),
-
L2_VPN((short) 1),
-
L3_VPN((short) 2),
-
MPLS_TUNNEL_LABEL_1((short) 3),
-
MPLS_TUNNEL_LABEL_2((short) 4),
-
MPLS_SWAP_LABEL((short) 5),
-
MPLS_ECMP((short) 8);
private short value;
-
public static final int OFDPA_GROUP_TYPE_SHIFT = 28;
public static final int OFDPA_MPLS_SUBTYPE_SHIFT = 24;
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
index a698a81..4b2800c 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
@@ -16,6 +16,7 @@
package org.onosproject.driver.pipeline;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
import org.onlab.osgi.ServiceDirectory;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpPrefix;
@@ -290,8 +291,15 @@
// it is possible that group-chain has not been fully created yet
log.debug("Waiting to add bucket to group for next-id:{} in dev:{}",
nextObjective.id(), deviceId);
- // by design only one pending bucket is allowed for the group
- groupHandler.pendingBuckets.put(nextObjective.id(), nextObjective);
+
+ // by design multiple pending bucket is allowed for the group
+ groupHandler.pendingBuckets.compute(nextObjective.id(), (nextId, pendBkts) -> {
+ if (pendBkts == null) {
+ pendBkts = Sets.newHashSet();
+ }
+ pendBkts.add(nextObjective);
+ return pendBkts;
+ });
}
break;
case REMOVE: