CORD-354 OF-DPA support for link-failures.
Bug fix in flowObjectives store. Adding a removeNextGroup API to the store.
Change-Id: I5890411e5b4eabdc057402687ada26e539500f8f
diff --git a/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java b/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
index c2b07cd..cb1a650 100644
--- a/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
+++ b/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
@@ -94,7 +94,6 @@
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupListener;
import org.onosproject.net.group.GroupService;
-import org.onosproject.net.packet.PacketService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
@@ -149,7 +148,6 @@
protected FlowObjectiveStore flowObjectiveStore;
protected DeviceId deviceId;
protected ApplicationId driverId;
- protected PacketService packetService;
protected DeviceService deviceService;
protected KryoNamespace appKryo = new KryoNamespace.Builder()
.register(KryoNamespaces.API)
@@ -174,6 +172,10 @@
Map<VlanId, Set<PortNumber>> vlan2Port = new ConcurrentHashMap<VlanId,
Set<PortNumber>>();
+ // local store for pending bucketAdds - by design there can only be one
+ // pending bucket for a group
+ ConcurrentHashMap<Integer, NextObjective> pendingBuckets = new ConcurrentHashMap<>();
+
// index number for group creation
AtomicInteger l3vpnindex = new AtomicInteger(0);
@@ -202,7 +204,6 @@
flowRuleService = serviceDirectory.get(FlowRuleService.class);
groupService = serviceDirectory.get(GroupService.class);
flowObjectiveStore = context.store();
- packetService = serviceDirectory.get(PacketService.class);
deviceService = serviceDirectory.get(DeviceService.class);
groupService.addListener(new InnerGroupListener());
@@ -293,10 +294,13 @@
if (nextGroup != null) {
log.debug("Processing NextObjective id{} in dev{} - add bucket",
nextObjective.id(), deviceId);
- addBucketToGroup(nextObjective);
+ addBucketToGroup(nextObjective, nextGroup);
} else {
// it is possible that group-chain has not been fully created yet
- waitToAddBucketToGroup(nextObjective);
+ log.debug("Waiting to add bucket to group for next-id:{} in dev:{}",
+ nextObjective.id(), deviceId);
+ // by design only one pending bucket is allowed for the group
+ pendingBuckets.put(nextObjective.id(), nextObjective);
}
break;
case REMOVE:
@@ -307,7 +311,7 @@
}
log.debug("Processing NextObjective id{} in dev{} - remove group",
nextObjective.id(), deviceId);
- removeGroup(nextObjective);
+ removeGroup(nextObjective, nextGroup);
break;
case REMOVE_FROM_EXISTING:
if (nextGroup == null) {
@@ -317,7 +321,7 @@
}
log.debug("Processing NextObjective id{} in dev{} - remove bucket",
nextObjective.id(), deviceId);
- removeBucketFromGroup(nextObjective);
+ removeBucketFromGroup(nextObjective, nextGroup);
break;
default:
log.warn("Unsupported operation {}", nextObjective.op());
@@ -791,17 +795,19 @@
return Collections.emptySet();
}
- NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
- List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
- // we only need the top level group's key to point the flow to it
- Group group = groupService.getGroup(deviceId, gkeys.get(0).peekFirst());
- if (group == null) {
- log.warn("Group with key:{} for next-id:{} not found in dev:{}",
- gkeys.get(0).peekFirst(), fwd.nextId(), deviceId);
- fail(fwd, ObjectiveError.GROUPMISSING);
- return Collections.emptySet();
+ NextGroup next = getGroupForNextObjective(fwd.nextId());
+ if (next != null) {
+ List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
+ // we only need the top level group's key to point the flow to it
+ Group group = groupService.getGroup(deviceId, gkeys.get(0).peekFirst());
+ if (group == null) {
+ log.warn("Group with key:{} for next-id:{} not found in dev:{}",
+ gkeys.get(0).peekFirst(), fwd.nextId(), deviceId);
+ fail(fwd, ObjectiveError.GROUPMISSING);
+ return Collections.emptySet();
+ }
+ tb.deferred().group(group.id());
}
- tb.deferred().group(group.id());
}
tb.transition(ACL_TABLE);
FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
@@ -868,7 +874,7 @@
TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
if (fwd.nextId() != null) {
- NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
+ NextGroup next = getGroupForNextObjective(fwd.nextId());
if (next != null) {
List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
// we only need the top level group's key to point the flow to it
@@ -903,6 +909,23 @@
return rules;
}
+ protected NextGroup getGroupForNextObjective(Integer nextId) {
+ NextGroup next = flowObjectiveStore.getNextGroup(nextId);
+ if (next != null) {
+ List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
+ if (gkeys != null && !gkeys.isEmpty()) {
+ return next;
+ } else {
+ log.warn("Empty next group found in FlowObjective store for "
+ + "next-id:{} in dev:{}", nextId, deviceId);
+ }
+ } else {
+ log.warn("next-id {} not found in Flow objective store for dev:{}",
+ nextId, deviceId);
+ }
+ return null;
+ }
+
private void pass(Objective obj) {
if (obj.context().isPresent()) {
obj.context().get().onSuccess(obj);
@@ -1013,6 +1036,16 @@
}
}
+ private void updatePendingGroups(GroupKey gkey, GroupChainElem gce) {
+ Set<GroupChainElem> gceSet = Collections.newSetFromMap(
+ new ConcurrentHashMap<GroupChainElem, Boolean>());
+ gceSet.add(gce);
+ Set<GroupChainElem> retval = pendingGroups.putIfAbsent(gkey, gceSet);
+ if (retval != null) {
+ retval.add(gce);
+ }
+ }
+
/**
* Creates a simple L2 Interface Group.
*
@@ -1242,14 +1275,8 @@
}
// store l2groupkey with the groupChainElem for the outer-group that depends on it
- GroupChainElem gce = new GroupChainElem(outerGrpDesc, 1);
- Set<GroupChainElem> gceSet = Collections.newSetFromMap(
- new ConcurrentHashMap<GroupChainElem, Boolean>());
- gceSet.add(gce);
- Set<GroupChainElem> retval = pendingGroups.putIfAbsent(l2groupkey, gceSet);
- if (retval != null) {
- retval.add(gce);
- }
+ GroupChainElem gce = new GroupChainElem(outerGrpDesc, 1, false);
+ updatePendingGroups(l2groupkey, gce);
// create group description for the inner l2interfacegroup
GroupBucket l2interfaceGroupBucket =
@@ -1376,7 +1403,8 @@
l2floodgroupId,
nextObj.appId());
GroupChainElem gce = new GroupChainElem(l2floodGroupDescription,
- l2interfaceGroupDescs.size());
+ l2interfaceGroupDescs.size(),
+ false);
log.debug("Trying L2-Flood: device:{} gid:{} gkey:{} nextid:{}",
deviceId, Integer.toHexString(l2floodgroupId),
l2floodgroupkey, nextObj.id());
@@ -1392,16 +1420,8 @@
for (GroupDescription l2intGrpDesc : l2interfaceGroupDescs) {
// store all l2groupkeys with the groupChainElem for the l2floodgroup
// that depends on it
- Set<GroupChainElem> gceSet = Collections.newSetFromMap(
- new ConcurrentHashMap<GroupChainElem, Boolean>());
- gceSet.add(gce);
- Set<GroupChainElem> retval = pendingGroups.putIfAbsent(
- l2intGrpDesc.appCookie(), gceSet);
- if (retval != null) {
- retval.add(gce);
- }
-
- // create and send groups for all l2 interface groups
+ updatePendingGroups(l2intGrpDesc.appCookie(), gce);
+ // send groups for all l2 interface groups
groupService.addGroup(l2intGrpDesc);
}
}
@@ -1430,17 +1450,81 @@
* <p>
* NOTE: We do not create MPLS ECMP groups as they are unimplemented in
* OF-DPA 2.0 (even though it is in the spec). Therefore we do not
- * check the nextObjective meta.
+ * check the nextObjective meta to see what is matching before being
+ * sent to this nextObjective.
*
* @param nextObj the nextObjective of type HASHED
*/
private void processHashedNextObjective(NextObjective nextObj) {
- // break up hashed next objective to multiple groups
- Collection<TrafficTreatment> buckets = nextObj.next();
-
// storage for all group keys in the chain of groups created
List<Deque<GroupKey>> allGroupKeys = new ArrayList<>();
List<GroupInfo> unsentGroups = new ArrayList<>();
+ createHashBucketChains(nextObj, allGroupKeys, unsentGroups);
+
+ // now we can create the outermost L3 ECMP group
+ List<GroupBucket> l3ecmpGroupBuckets = new ArrayList<>();
+ for (GroupInfo gi : unsentGroups) {
+ // create ECMP bucket to point to the outer group
+ TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder();
+ ttb.group(new DefaultGroupId(gi.outerGrpDesc.givenGroupId()));
+ GroupBucket sbucket = DefaultGroupBucket
+ .createSelectGroupBucket(ttb.build());
+ l3ecmpGroupBuckets.add(sbucket);
+ }
+ int l3ecmpGroupId = L3ECMPMASK | nextObj.id() << 12;
+ GroupKey l3ecmpGroupKey = new DefaultGroupKey(appKryo.serialize(l3ecmpGroupId));
+ GroupDescription l3ecmpGroupDesc =
+ new DefaultGroupDescription(
+ deviceId,
+ GroupDescription.Type.SELECT,
+ new GroupBuckets(l3ecmpGroupBuckets),
+ l3ecmpGroupKey,
+ l3ecmpGroupId,
+ nextObj.appId());
+ GroupChainElem l3ecmpGce = new GroupChainElem(l3ecmpGroupDesc,
+ l3ecmpGroupBuckets.size(),
+ false);
+
+ // create objects for local and distributed storage
+ allGroupKeys.forEach(gkeyChain -> gkeyChain.addFirst(l3ecmpGroupKey));
+ OfdpaNextGroup ofdpaGrp = new OfdpaNextGroup(allGroupKeys, nextObj);
+
+ // store l3ecmpGroupKey with the ofdpaGroupChain for the nextObjective
+ // that depends on it
+ updatePendingNextObjective(l3ecmpGroupKey, ofdpaGrp);
+
+ log.debug("Trying L3ECMP: device:{} gid:{} gkey:{} nextId:{}",
+ deviceId, Integer.toHexString(l3ecmpGroupId),
+ l3ecmpGroupKey, nextObj.id());
+ // finally we are ready to send the innermost groups
+ for (GroupInfo gi : unsentGroups) {
+ log.debug("Sending innermost group {} in group chain on device {} ",
+ Integer.toHexString(gi.innerGrpDesc.givenGroupId()), deviceId);
+ updatePendingGroups(gi.outerGrpDesc.appCookie(), l3ecmpGce);
+ groupService.addGroup(gi.innerGrpDesc);
+ }
+
+ }
+
+ /**
+ * Creates group chains for all buckets in a hashed group, and stores the
+ * GroupInfos and GroupKeys for all the groups in the lists passed in, which
+ * should be empty.
+ * <p>
+ * Does not create the top level ECMP group. Does not actually send the
+ * groups to the groupService.
+ *
+ * @param nextObj the Next Objective with buckets that need to be converted
+ * to group chains
+ * @param allGroupKeys a list to store groupKey for each bucket-group-chain
+ * @param unsentGroups a list to store GroupInfo for each bucket-group-chain
+ */
+ private void createHashBucketChains(NextObjective nextObj,
+ List<Deque<GroupKey>> allGroupKeys,
+ List<GroupInfo> unsentGroups) {
+ // break up hashed next objective to multiple groups
+ Collection<TrafficTreatment> buckets = nextObj.next();
+
for (TrafficTreatment bucket : buckets) {
//figure out how many labels are pushed in each bucket
int labelsPushed = 0;
@@ -1508,15 +1592,8 @@
l3vpngroupkey,
l3vpngroupId,
nextObj.appId());
- GroupChainElem l3vpnGce = new GroupChainElem(l3vpnGroupDesc, 1);
- Set<GroupChainElem> gceSet = Collections.newSetFromMap(
- new ConcurrentHashMap<GroupChainElem, Boolean>());
- gceSet.add(l3vpnGce);
- Set<GroupChainElem> retval = pendingGroups
- .putIfAbsent(onelabelGroupInfo.outerGrpDesc.appCookie(), gceSet);
- if (retval != null) {
- retval.add(l3vpnGce);
- }
+ GroupChainElem l3vpnGce = new GroupChainElem(l3vpnGroupDesc, 1, false);
+ updatePendingGroups(onelabelGroupInfo.outerGrpDesc.appCookie(), l3vpnGce);
gkeyChain.addFirst(onelabelGroupInfo.innerGrpDesc.appCookie());
gkeyChain.addFirst(onelabelGroupInfo.outerGrpDesc.appCookie());
@@ -1535,80 +1612,186 @@
} else {
log.warn("Driver currently does not handle more than 1 MPLS "
- + "labels. Not processing nextObjective {}", nextObj);
+ + "labels. Not processing nextObjective {}", nextObj.id());
return;
}
// all groups in this chain
allGroupKeys.add(gkeyChain);
}
+ }
- // now we can create the outermost L3 ECMP group
- List<GroupBucket> l3ecmpGroupBuckets = new ArrayList<>();
- for (GroupInfo gi : unsentGroups) {
- // create ECMP bucket to point to the outer group
- TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder();
- ttb.group(new DefaultGroupId(gi.outerGrpDesc.givenGroupId()));
- GroupBucket sbucket = DefaultGroupBucket
- .createSelectGroupBucket(ttb.build());
- l3ecmpGroupBuckets.add(sbucket);
+ /**
+ * Adds a bucket to the top level group of a group-chain, and creates the chain.
+ *
+ * @param nextObjective the next group to add a bucket to
+ * @param next the representation of the existing group-chain for this next objective
+ */
+ private void addBucketToGroup(NextObjective nextObjective, NextGroup next) {
+ if (nextObjective.type() != NextObjective.Type.HASHED) {
+ log.warn("AddBuckets not applied to nextType:{} in dev:{} for next:{}",
+ nextObjective.type(), deviceId, nextObjective.id());
+ return;
}
- int l3ecmpGroupId = L3ECMPMASK | nextObj.id() << 12;
+ if (nextObjective.next().size() > 1) {
+ log.warn("Only one bucket can be added at a time");
+ return;
+ }
+ // storage for all group keys in the chain of groups created
+ List<Deque<GroupKey>> allGroupKeys = new ArrayList<>();
+ List<GroupInfo> unsentGroups = new ArrayList<>();
+ createHashBucketChains(nextObjective, allGroupKeys, unsentGroups);
+
+ // now we can create the outermost L3 ECMP group bucket to add
+ GroupInfo gi = unsentGroups.get(0); // only one bucket, so only one group-chain
+ TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder();
+ ttb.group(new DefaultGroupId(gi.outerGrpDesc.givenGroupId()));
+ GroupBucket sbucket = DefaultGroupBucket.createSelectGroupBucket(ttb.build());
+
+ // recreate the original L3 ECMP group id and description
+ int l3ecmpGroupId = L3ECMPMASK | nextObjective.id() << 12;
GroupKey l3ecmpGroupKey = new DefaultGroupKey(appKryo.serialize(l3ecmpGroupId));
+
+ // Although GroupDescriptions are not necessary for adding buckets to
+ // existing groups, we use one in the GroupChainElem. When the latter is
+ // processed, the info will be extracted for the bucketAdd call to groupService
GroupDescription l3ecmpGroupDesc =
new DefaultGroupDescription(
deviceId,
GroupDescription.Type.SELECT,
- new GroupBuckets(l3ecmpGroupBuckets),
+ new GroupBuckets(Collections.singletonList(sbucket)),
l3ecmpGroupKey,
l3ecmpGroupId,
- nextObj.appId());
- GroupChainElem l3ecmpGce = new GroupChainElem(l3ecmpGroupDesc,
- l3ecmpGroupBuckets.size());
+ nextObjective.appId());
+ GroupChainElem l3ecmpGce = new GroupChainElem(l3ecmpGroupDesc, 1, true);
- // create objects for local and distributed storage
- allGroupKeys.forEach(gkeyChain -> gkeyChain.addFirst(l3ecmpGroupKey));
- OfdpaNextGroup ofdpaGrp = new OfdpaNextGroup(allGroupKeys, nextObj);
+ // update original NextGroup with new bucket-chain
+ // don't need to update pendingNextObjectives -- group already exists
+ Deque<GroupKey> newBucketChain = allGroupKeys.get(0);
+ newBucketChain.addFirst(l3ecmpGroupKey);
+ List<Deque<GroupKey>> allOriginalKeys = appKryo.deserialize(next.data());
+ allOriginalKeys.add(newBucketChain);
+ flowObjectiveStore.putNextGroup(nextObjective.id(),
+ new OfdpaNextGroup(allOriginalKeys, nextObjective));
- // store l3ecmpGroupKey with the ofdpaGroupChain for the nextObjective
- // that depends on it
- updatePendingNextObjective(l3ecmpGroupKey, ofdpaGrp);
-
- log.debug("Trying L3ECMP: device:{} gid:{} gkey:{} nextId:{}",
+ log.debug("Adding to L3ECMP: device:{} gid:{} gkey:{} nextId:{}",
deviceId, Integer.toHexString(l3ecmpGroupId),
- l3ecmpGroupKey, nextObj.id());
- // finally we are ready to send the innermost groups
- for (GroupInfo gi : unsentGroups) {
- log.debug("Sending innermost group {} in group chain on device {} ",
- Integer.toHexString(gi.innerGrpDesc.givenGroupId()), deviceId);
- Set<GroupChainElem> gceSet = Collections.newSetFromMap(
- new ConcurrentHashMap<GroupChainElem, Boolean>());
- gceSet.add(l3ecmpGce);
- Set<GroupChainElem> retval = pendingGroups
- .putIfAbsent(gi.outerGrpDesc.appCookie(), gceSet);
- if (retval != null) {
- retval.add(l3ecmpGce);
- }
+ l3ecmpGroupKey, nextObjective.id());
+ // send the innermost group
+ log.debug("Sending innermost group {} in group chain on device {} ",
+ Integer.toHexString(gi.innerGrpDesc.givenGroupId()), deviceId);
+ updatePendingGroups(gi.outerGrpDesc.appCookie(), l3ecmpGce);
+ groupService.addGroup(gi.innerGrpDesc);
- groupService.addGroup(gi.innerGrpDesc);
+ }
+
+ /**
+ * Removes the bucket in the top level group of a possible group-chain. Does
+ * not remove the groups in a group-chain pointed to by this bucket, as they
+ * may be in use (referenced by other groups) elsewhere.
+ *
+ * @param nextObjective the next group to remove a bucket from
+ * @param next the representation of the existing group-chain for this next objective
+ */
+ private void removeBucketFromGroup(NextObjective nextObjective, NextGroup next) {
+ if (nextObjective.type() != NextObjective.Type.HASHED) {
+ log.warn("RemoveBuckets not applied to nextType:{} in dev:{} for next:{}",
+ nextObjective.type(), deviceId, nextObjective.id());
+ return;
+ }
+ Collection<TrafficTreatment> treatments = nextObjective.next();
+ TrafficTreatment treatment = treatments.iterator().next();
+ // find the bucket to remove by noting the outport, and figuring out the
+ // top-level group in the group-chain that indirectly references the port
+ PortNumber outport = null;
+ for (Instruction ins : treatment.allInstructions()) {
+ if (ins instanceof OutputInstruction) {
+ outport = ((OutputInstruction) ins).port();
+ break;
+ }
+ }
+ if (outport == null) {
+ log.error("next objective {} has no outport", nextObjective.id());
+ return;
}
+ List<Deque<GroupKey>> allgkeys = appKryo.deserialize(next.data());
+ Deque<GroupKey> foundChain = null;
+ int index = 0;
+ for (Deque<GroupKey> gkeys : allgkeys) {
+ GroupKey groupWithPort = gkeys.peekLast();
+ Group group = groupService.getGroup(deviceId, groupWithPort);
+ if (group == null) {
+ log.warn("Inconsistent group chain");
+ continue;
+ }
+ // last group in group chain should have a single bucket pointing to port
+ List<Instruction> lastIns = group.buckets().buckets().iterator()
+ .next().treatment().allInstructions();
+ for (Instruction i : lastIns) {
+ if (i instanceof OutputInstruction) {
+ PortNumber lastport = ((OutputInstruction) i).port();
+ if (lastport.equals(outport)) {
+ foundChain = gkeys;
+ break;
+ }
+ }
+ }
+ if (foundChain != null) {
+ break;
+ }
+ index++;
+ }
+ if (foundChain != null) {
+ //first groupkey is the one we want to modify
+ GroupKey modGroupKey = foundChain.peekFirst();
+ Group modGroup = groupService.getGroup(deviceId, modGroupKey);
+ //second groupkey is the one we wish to remove the reference to
+ GroupKey pointedGroupKey = null;
+ int i = 0;
+ for (GroupKey gk : foundChain) {
+ if (i++ == 1) {
+ pointedGroupKey = gk;
+ break;
+ }
+ }
+ Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
+ GroupBucket bucket = DefaultGroupBucket.createSelectGroupBucket(
+ DefaultTrafficTreatment.builder()
+ .group(pointedGroup.id())
+ .build());
+ GroupBuckets removeBuckets = new GroupBuckets(Collections
+ .singletonList(bucket));
+ log.debug("Removing buckets from group id {} for next id {} in device {}",
+ modGroup.id(), nextObjective.id(), deviceId);
+ groupService.removeBucketsFromGroup(deviceId, modGroupKey,
+ removeBuckets, modGroupKey,
+ nextObjective.appId());
+ //update store
+ allgkeys.remove(index);
+ flowObjectiveStore.putNextGroup(nextObjective.id(),
+ new OfdpaNextGroup(allgkeys, nextObjective));
+ } else {
+ log.warn("Could not find appropriate group-chain for removing bucket"
+ + " for next id {} in dev:{}", nextObjective.id(), deviceId);
+ }
}
- private void addBucketToGroup(NextObjective nextObjective) {
- // TODO Auto-generated method stub
- }
-
- private void waitToAddBucketToGroup(NextObjective nextObjective) {
- // TODO Auto-generated method stub
- }
-
- private void removeBucketFromGroup(NextObjective nextObjective) {
- // TODO Auto-generated method stub
- }
-
- private void removeGroup(NextObjective nextObjective) {
- // TODO Auto-generated method stub
+ /**
+ * Removes all groups in multiple possible group-chains that represent the next
+ * objective.
+ *
+ * @param nextObjective the next objective to remove
+ * @param next the NextGroup that represents the existing group-chain for
+ * this next objective
+ */
+ private void removeGroup(NextObjective nextObjective, NextGroup next) {
+ List<Deque<GroupKey>> allgkeys = appKryo.deserialize(next.data());
+ allgkeys.forEach(groupChain -> {
+ groupChain.forEach(groupKey ->
+ groupService.removeGroup(deviceId, groupKey, nextObjective.appId()));
+ });
+ flowObjectiveStore.removeNextGroup(nextObjective.id());
}
/**
@@ -1617,7 +1800,7 @@
* and this driver has received notification for it. A second assumption is
* that if there is another group waiting for this group then the appropriate
* stores already have the information to act upon the notification for the
- * creating of this group.
+ * creation of this group.
* <p>
* The processing of the GroupChainElement depends on the number of groups
* this element is waiting on. For all group types other than SIMPLE, a
@@ -1632,7 +1815,15 @@
return;
}
log.debug("GCE: {} ready to be processed", gce);
- groupService.addGroup(gce.groupDescription);
+ if (gce.addBucketToGroup) {
+ groupService.addBucketsToGroup(gce.groupDescription.deviceId(),
+ gce.groupDescription.appCookie(),
+ gce.groupDescription.buckets(),
+ gce.groupDescription.appCookie(),
+ gce.groupDescription.appId());
+ } else {
+ groupService.addGroup(gce.groupDescription);
+ }
}
private class GroupChecker implements Runnable {
@@ -1646,33 +1837,45 @@
.collect(Collectors.toSet());
keys.addAll(otherkeys);
- keys.stream().forEach(key -> {
- //first check for group chain
- Set<GroupChainElem> gceSet = pendingGroups.remove(key);
- if (gceSet != null) {
- for (GroupChainElem gce : gceSet) {
- log.info("Group service processed group key {} in device {}. "
- + "Processing next group in group chain with group id {}",
- key, deviceId,
- Integer.toHexString(gce.groupDescription.givenGroupId()));
- processGroupChain(gce);
+ keys.stream().forEach(key ->
+ processPendingGroupsOrNextObjectives(key, false));
+ }
+ }
+
+ private void processPendingGroupsOrNextObjectives(GroupKey key, boolean added) {
+ //first check for group chain
+ Set<GroupChainElem> gceSet = pendingGroups.remove(key);
+ if (gceSet != null) {
+ for (GroupChainElem gce : gceSet) {
+ log.info("Group service {} group key {} in device {}. "
+ + "Processing next group in group chain with group id {}",
+ (added) ? "ADDED" : "processed",
+ key, deviceId,
+ Integer.toHexString(gce.groupDescription.givenGroupId()));
+ processGroupChain(gce);
+ }
+ } else {
+ // otherwise chain complete - check for waiting nextObjectives
+ List<OfdpaNextGroup> nextGrpList = pendingNextObjectives.getIfPresent(key);
+ if (nextGrpList != null) {
+ pendingNextObjectives.invalidate(key);
+ nextGrpList.forEach(nextGrp -> {
+ log.info("Group service {} group key {} in device:{}. "
+ + "Done implementing next objective: {} <<-->> gid:{}",
+ (added) ? "ADDED" : "processed",
+ key, deviceId, nextGrp.nextObjective().id(),
+ Integer.toHexString(groupService.getGroup(deviceId, key)
+ .givenGroupId()));
+ pass(nextGrp.nextObjective());
+ flowObjectiveStore.putNextGroup(nextGrp.nextObjective().id(), nextGrp);
+ // check if addBuckets waiting for this completion
+ NextObjective pendBkt = pendingBuckets
+ .remove(nextGrp.nextObjective().id());
+ if (pendBkt != null) {
+ addBucketToGroup(pendBkt, nextGrp);
}
- } else {
- List<OfdpaNextGroup> objList = pendingNextObjectives.getIfPresent(key);
- if (objList != null) {
- pendingNextObjectives.invalidate(key);
- objList.forEach(obj -> {
- log.info("Group service processed group key {} in device:{}. "
- + "Done implementing next objective: {} <<-->> gid:{}",
- key, deviceId, obj.nextObjective().id(),
- Integer.toHexString(groupService.getGroup(deviceId, key)
- .givenGroupId()));
- pass(obj.nextObjective());
- flowObjectiveStore.putNextGroup(obj.nextObjective().id(), obj);
- });
- }
- }
- });
+ });
+ }
}
}
@@ -1682,31 +1885,7 @@
log.trace("received group event of type {}", event.type());
if (event.type() == GroupEvent.Type.GROUP_ADDED) {
GroupKey key = event.subject().appCookie();
- // first check for group chain
- Set<GroupChainElem> gceSet = pendingGroups.remove(key);
- if (gceSet != null) {
- for (GroupChainElem gce : gceSet) {
- log.info("group ADDED with group key {} .. "
- + "Processing next group in group chain with group key {}",
- key,
- gce.groupDescription.appCookie());
- processGroupChain(gce);
- }
- } else {
- List<OfdpaNextGroup> objList = pendingNextObjectives.getIfPresent(key);
- if (objList != null) {
- pendingNextObjectives.invalidate(key);
- objList.forEach(obj -> {
- log.info("group ADDED with key {} in dev {}.. Done implementing next "
- + "objective: {} <<-->> gid:{}",
- key, deviceId, obj.nextObjective().id(),
- Integer.toHexString(groupService.getGroup(deviceId, key)
- .givenGroupId()));
- pass(obj.nextObjective());
- flowObjectiveStore.putNextGroup(obj.nextObjective().id(), obj);
- });
- }
- }
+ processPendingGroupsOrNextObjectives(key, true);
}
}
}
@@ -1714,7 +1893,7 @@
/**
* Represents an entire group-chain that implements a Next-Objective from
* the application. The objective is represented as a list of deques, where
- * each deque can is a separate chain of groups.
+ * each deque is a separate chain of groups.
* <p>
* For example, an ECMP group with 3 buckets, where each bucket points to
* a group chain of L3 Unicast and L2 interface groups will look like this:
@@ -1765,10 +1944,13 @@
private class GroupChainElem {
private GroupDescription groupDescription;
private AtomicInteger waitOnGroups;
+ private boolean addBucketToGroup;
- GroupChainElem(GroupDescription groupDescription, int waitOnGroups) {
+ GroupChainElem(GroupDescription groupDescription, int waitOnGroups,
+ boolean addBucketToGroup) {
this.groupDescription = groupDescription;
this.waitOnGroups = new AtomicInteger(waitOnGroups);
+ this.addBucketToGroup = addBucketToGroup;
}
/**
@@ -1788,6 +1970,7 @@
return (Integer.toHexString(groupDescription.givenGroupId()) +
" groupKey: " + groupDescription.appCookie() +
" waiting-on-groups: " + waitOnGroups.get() +
+ " addBucketToGroup: " + addBucketToGroup +
" device: " + deviceId);
}
}