Increase flow obj timeout and remove pendingForwards and pendingNexts when the next failed
Change-Id: I6a1fde50db9184d87af20e3c605ff5b697e082b6
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 3872702..7ded0f5 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -59,7 +59,7 @@
private final Logger log = LoggerFactory.getLogger(getClass());
// TODO Make queue timeout configurable
- static final int OBJ_TIMEOUT_MS = 5000;
+ static final int OBJ_TIMEOUT_MS = 15000;
private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
@@ -165,13 +165,13 @@
@Override
public void onSuccess(Objective objective) {
log.trace("Flow objective onSuccess {}", objective);
- dequeue(deviceId, objective);
+ dequeue(deviceId, objective, null);
originalContext.ifPresent(c -> c.onSuccess(objective));
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.warn("Flow objective onError {}. {}", objective, error);
- dequeue(deviceId, objective);
+ log.warn("Flow objective onError {}. Reason = {}", objective, error);
+ dequeue(deviceId, objective, error);
originalContext.ifPresent(c -> c.onError(objective, error));
}
};
@@ -308,8 +308,9 @@
*
* @param deviceId Device ID
* @param obj Flow objective
+ * @param error ObjectiveError that triggers this dequeue. Null if this is not triggered by an error.
*/
- private synchronized void dequeue(DeviceId deviceId, Objective obj) {
+ private synchronized void dequeue(DeviceId deviceId, Objective obj, ObjectiveError error) {
List<Objective> remaining;
int priority = obj.priority();
@@ -328,6 +329,22 @@
fwdObjQueue.remove(k, obj);
remaining = fwdObjQueue.get(k);
} else if (obj instanceof NextObjective) {
+ if (error != null) {
+ // Remove pendingForwards and pendingNexts if next objective failed
+ Set<PendingFlowObjective> removedForwards = pendingForwards.remove(obj.id());
+ List<PendingFlowObjective> removedNexts = pendingNexts.remove(obj.id());
+
+ if (removedForwards != null) {
+ removedForwards.stream().map(PendingFlowObjective::flowObjective)
+ .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
+ c.onError(pendingObj, error)));
+ }
+ if (removedNexts != null) {
+ removedNexts.stream().map(PendingFlowObjective::flowObjective)
+ .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
+ c.onError(pendingObj, error)));
+ }
+ }
NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
nextObjQueueHead.invalidate(k);
nextObjQueue.remove(k, obj);