Prevent objective context from being executed multiple times
Consider the following sequence:
1. Flow objective issued
2. Cache timeout since the objective is not processed in time.
onError()() will be triggered with ObjectiveError.INSTALLATIONTIMEOUT
3. While 2 is still being processed, the objective actually completed.
onSuccess() will be triggered and therefore the same objective will be dequeued and executed again
The unit test was also wrong.
One objective context should only be executed once no matter the objective succeeded or failed.
Change-Id: I2501903bebee6fd9eefb03a71042c4a06ba4c42f
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 48a4d35..e180d6e 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -53,6 +53,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -155,20 +156,7 @@
// Inject ObjectiveContext such that we can get notified when it is completed
Objective.Builder objBuilder = originalObjective.copy();
Optional<ObjectiveContext> originalContext = originalObjective.context();
- ObjectiveContext context = new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.trace("Flow objective onSuccess {}", objective);
- dequeue(deviceId, objective, null);
- originalContext.ifPresent(c -> c.onSuccess(objective));
- }
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- log.warn("Flow objective onError {}. Reason = {}", objective, error);
- dequeue(deviceId, objective, error);
- originalContext.ifPresent(c -> c.onError(objective, error));
- }
- };
+ ObjectiveContext context = new InOrderObjectiveContext(deviceId, originalContext.orElse(null));
// Preserve Objective.Operation
Objective objective;
@@ -432,4 +420,42 @@
}
}
}
+
+ final class InOrderObjectiveContext implements ObjectiveContext {
+ private final DeviceId deviceId;
+ private final ObjectiveContext originalContext;
+ // Prevent context from being executed multiple times.
+ // E.g. when the context actually succeed after the cache timeout
+ private final AtomicBoolean done;
+
+ InOrderObjectiveContext(DeviceId deviceId, ObjectiveContext originalContext) {
+ this.deviceId = deviceId;
+ this.originalContext = originalContext;
+ this.done = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void onSuccess(Objective objective) {
+ log.trace("Flow objective onSuccess {}", objective);
+
+ if (!done.getAndSet(true)) {
+ dequeue(deviceId, objective, null);
+ if (originalContext != null) {
+ originalContext.onSuccess(objective);
+ }
+ }
+
+ }
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.warn("Flow objective onError {}. Reason = {}", objective, error);
+
+ if (!done.getAndSet(true)) {
+ dequeue(deviceId, objective, error);
+ if (originalContext != null) {
+ originalContext.onError(objective, error);
+ }
+ }
+ }
+ }
}