Enhancing GroupChecker for faster processing of group updates
Change-Id: I9344ccf0ffbc9cff206318a627f5d97df1091677
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/RoutingRulePopulator.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/RoutingRulePopulator.java
index bb9378e..06b650d 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/RoutingRulePopulator.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/RoutingRulePopulator.java
@@ -133,8 +133,8 @@
}
ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("Brigding rule for {}/{} revoked", mac, vlanId),
- (objective, error) -> log.warn("Failed to revoke bridging rule for {}/{}: {}", mac, vlanId, error));
+ (objective) -> log.debug("Brigding rule for {}/{} populated", mac, vlanId),
+ (objective, error) -> log.warn("Failed to populate bridging rule for {}/{}: {}", mac, vlanId, error));
srManager.flowObjectiveService.forward(deviceId, fob.add(context));
}
@@ -155,8 +155,8 @@
}
ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("Brigding rule for {}/{} populated", mac, vlanId),
- (objective, error) -> log.warn("Failed to populate bridging rule for {}/{}: {}", mac, vlanId, error));
+ (objective) -> log.debug("Brigding rule for {}/{} revoked", mac, vlanId),
+ (objective, error) -> log.warn("Failed to revoke bridging rule for {}/{}: {}", mac, vlanId, error));
srManager.flowObjectiveService.forward(deviceId, fob.remove(context));
}
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
index 2c2f3ba..40aad0d 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
@@ -106,10 +106,10 @@
private AtomicCounter nextIndex;
protected DeviceId deviceId;
- private Cache<GroupKey, List<OfdpaGroupHandlerUtility.OfdpaNextGroup>> pendingAddNextObjectives;
- private Cache<NextObjective, List<GroupKey>> pendingRemoveNextObjectives;
- private Cache<GroupKey, Set<OfdpaGroupHandlerUtility.GroupChainElem>> pendingGroups;
- private ConcurrentHashMap<GroupKey, Set<NextObjective>> pendingUpdateNextObjectives;
+ Cache<GroupKey, List<OfdpaGroupHandlerUtility.OfdpaNextGroup>> pendingAddNextObjectives;
+ Cache<NextObjective, List<GroupKey>> pendingRemoveNextObjectives;
+ Cache<GroupKey, Set<OfdpaGroupHandlerUtility.GroupChainElem>> pendingGroups;
+ ConcurrentHashMap<GroupKey, Set<NextObjective>> pendingUpdateNextObjectives;
// local store for pending bucketAdds - by design there can be multiple
// pending bucket for a group
@@ -119,14 +119,6 @@
private ScheduledExecutorService groupCheckerExecutor =
Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner", "ofdpa-%d", log));
- public Cache<GroupKey, List<OfdpaNextGroup>> pendingAddNextObjectives() {
- return pendingAddNextObjectives;
- }
-
- public Cache<GroupKey, Set<GroupChainElem>> pendingGroups() {
- return pendingGroups;
- }
-
/**
* Determines whether this pipeline support copy ttl instructions or not.
*
@@ -1849,22 +1841,27 @@
});
}
- private void processPendingUpdateNextObjs(GroupKey groupKey) {
- pendingUpdateNextObjectives.compute(groupKey, (gKey, nextObjs) -> {
- if (nextObjs != null) {
-
- nextObjs.forEach(nextObj -> {
- log.debug("Group {} updated, update pending next objective {}.",
- groupKey, nextObj);
-
- pass(nextObj);
- });
- }
- return Sets.newHashSet();
- });
+ protected void addPendingRemoveNextObjective(NextObjective nextObjective,
+ List<GroupKey> groupKeys) {
+ pendingRemoveNextObjectives.put(nextObjective, groupKeys);
}
- private void processPendingRemoveNextObjs(GroupKey key) {
+ protected int getNextAvailableIndex() {
+ return (int) nextIndex.incrementAndGet();
+ }
+
+ protected void processPendingUpdateNextObjs(GroupKey groupKey) {
+ Set<NextObjective> nextObjs = pendingUpdateNextObjectives.remove(groupKey);
+ if (nextObjs != null) {
+ nextObjs.forEach(nextObj -> {
+ log.debug("Group {} updated, update pending next objective {}.",
+ groupKey, nextObj);
+ pass(nextObj);
+ });
+ }
+ }
+
+ protected void processPendingRemoveNextObjs(GroupKey key) {
pendingRemoveNextObjectives.asMap().forEach((nextObjective, groupKeys) -> {
if (groupKeys.isEmpty()) {
pendingRemoveNextObjectives.invalidate(nextObjective);
@@ -1875,10 +1872,6 @@
});
}
- protected int getNextAvailableIndex() {
- return (int) nextIndex.incrementAndGet();
- }
-
protected void processPendingAddGroupsOrNextObjs(GroupKey key, boolean added) {
//first check for group chain
Set<OfdpaGroupHandlerUtility.GroupChainElem> gceSet = pendingGroups.asMap().remove(key);
@@ -1951,11 +1944,6 @@
}
}
- protected void addPendingRemoveNextObjective(NextObjective nextObjective,
- List<GroupKey> groupKeys) {
- pendingRemoveNextObjectives.put(nextObjective, groupKeys);
- }
-
private void updateFlowObjectiveStore(Integer nextId, OfdpaNextGroup nextGrp) {
synchronized (flowObjectiveStore) {
// get fresh copy of what the store holds
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/OfdpaGroupHandlerUtility.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/OfdpaGroupHandlerUtility.java
index 229655d..016299c 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/OfdpaGroupHandlerUtility.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/OfdpaGroupHandlerUtility.java
@@ -263,7 +263,6 @@
return indices;
}
-
/**
* Get indices to remove comparing next group with next objective.
*
@@ -635,22 +634,53 @@
// GroupChecker execution needs to be protected
// from unhandled exceptions
try {
- if (groupHandler.pendingGroups().size() != 0) {
- log.debug("pending groups being checked: {}", groupHandler.pendingGroups().asMap().keySet());
+ if (groupHandler.pendingGroups.size() != 0) {
+ log.debug("pending groups being checked: {}",
+ groupHandler.pendingGroups.asMap().keySet());
}
- if (groupHandler.pendingAddNextObjectives().size() != 0) {
+ if (groupHandler.pendingAddNextObjectives.size() != 0) {
log.debug("pending add-next-obj being checked: {}",
- groupHandler.pendingAddNextObjectives().asMap().keySet());
+ groupHandler.pendingAddNextObjectives.asMap().keySet());
}
- Set<GroupKey> keys = groupHandler.pendingGroups().asMap().keySet().stream()
- .filter(key -> groupHandler.groupService.getGroup(groupHandler.deviceId, key) != null)
+ if (groupHandler.pendingRemoveNextObjectives.size() != 0) {
+ log.debug("pending remove-next-obj being checked: {}",
+ groupHandler.pendingRemoveNextObjectives.asMap().values());
+ }
+ if (groupHandler.pendingUpdateNextObjectives.size() != 0) {
+ log.debug("pending update-next-obj being checked: {}",
+ groupHandler.pendingUpdateNextObjectives.keySet());
+ }
+
+ Set<GroupKey> keys = groupHandler.pendingGroups.asMap().keySet()
+ .stream()
+ .filter(key -> groupHandler.groupService
+ .getGroup(groupHandler.deviceId, key) != null)
.collect(Collectors.toSet());
- Set<GroupKey> otherkeys = groupHandler.pendingAddNextObjectives().asMap().keySet().stream()
- .filter(otherkey -> groupHandler.groupService.getGroup(groupHandler.deviceId, otherkey) != null)
+ Set<GroupKey> otherkeys = groupHandler.pendingAddNextObjectives
+ .asMap().keySet().stream()
+ .filter(otherkey -> groupHandler.groupService
+ .getGroup(groupHandler.deviceId, otherkey) != null)
.collect(Collectors.toSet());
keys.addAll(otherkeys);
-
keys.forEach(key -> groupHandler.processPendingAddGroupsOrNextObjs(key, false));
+
+ keys = groupHandler.pendingUpdateNextObjectives.keySet()
+ .stream()
+ .filter(key -> groupHandler.groupService
+ .getGroup(groupHandler.deviceId, key) != null)
+ .collect(Collectors.toSet());
+ keys.forEach(key -> groupHandler.processPendingUpdateNextObjs(key));
+
+ Set<GroupKey> k = Sets.newHashSet();
+ groupHandler.pendingRemoveNextObjectives
+ .asMap().values().stream().forEach(keylist -> {
+ k.addAll(keylist.stream()
+ .filter(key -> groupHandler.groupService
+ .getGroup(groupHandler.deviceId, key) == null)
+ .collect(Collectors.toSet()));
+ });
+ k.forEach(key -> groupHandler.processPendingRemoveNextObjs(key));
+
} catch (Exception exception) {
// Just log. It is safe for now.
log.warn("Uncaught exception is detected: {}", exception.getMessage());